본문 바로가기
프로그래밍/java

[Srping Batch] 스프링 배치를 이용하여 예,적금 데이터 동기화하기 3편

by 뜨끔쓰 2022. 9. 10.
728x90
728x90

스프링 배치 예, 적금 데이터 동기화 시리즈

2022.09.07 - [프로그래밍/java] - [Srping Batch] 스프링 배치를 이용하여 예,적금 데이터 동기화하기 1편

2022.09.09 - [프로그래밍/java] - [Srping Batch] 스프링 배치를 이용하여 예,적금 데이터 동기화하기 2편

[현재글]2022.09.10 - [프로그래밍/java] - [Srping Batch] 스프링 배치를 이용하여 예,적금 데이터 동기화하기 3편

2022.09.12 - [프로그래밍/java] - [Srping Batch] 스프링 배치를 이용하여 예,적금 데이터 동기화하기 4편


 

 

 

안녕하세요!

 

스프링 배치를 이용하여 예,적금 데이터 동기화하기 3편입니다.

 

이번편에서는 정기예금 상품 동기화 JOB을 만들어보도록 하겠습니다.

 

금융회사 JOB과 비슷한 구성으로 진행 예정입니다.

 


금융감독원 OPEN API 요청, 결과값부터 확인해봅시다.

http://finlife.fss.or.kr/PageLink.do?link=openapi/detail02&menuId=2000126 

 

홈|오픈 API|상세 및 테스트|정기예금 API | 금융상품 통합 비교공시 시스템

정기예금 API 상세 요청 URL - http://finlife.fss.or.kr/finlifeapi/depositProductsSearch.{응답방식} 요청변수 요청변수 목록 요청변수 명 요청변수 ID 타입 필수여부 설명 및 예시 서비스 명 - Text 필수 * 각 API의

finlife.fss.or.kr

정기예금 API 요청값

결과값은 다음과 같습니다.

정기예금 API 결과값

기존 금융회사 데이터의 경우 복잡한 부분이 따로 있지 않아 bankEntity하나로 처리하였지만

 

정기예금의 경우 Options에서 일대다 관계가 또 형성 되기 때문에 2개의 entity를 작성하여 처리하였습니다.

 

Deposit, DepositOption 2개의 entity로 나누었는데요

 

각각 예금상품정보(deposit), 각 예금상품의 상세 옵션정보(depositOption)으로 나누었습니다.


 

그럼 차례차례 확인해보도록합시다.

 

Deposit

 

@Getter
@Builder
@AllArgsConstructor
@NoArgsConstructor(access = AccessLevel.PROTECTED)
@Entity
@Table(name = "tb_deposit")
@IdClass(DepositPK.class)
public class Deposit extends BaseTimeEntity {
    
    @Id//금융상품코드
    @Column(length = 50)
    private String finPrdtCd;

    @Id//금융회사코드
    @ManyToOne(fetch = FetchType.LAZY)
    @JoinColumn(name ="finCoNo", columnDefinition="VARCHAR(20)", foreignKey = @ForeignKey(ConstraintMode.NO_CONSTRAINT))
    private Bank bank;

    @Column//최고한도
    private Long maxLimit;
    @Column(columnDefinition = "varchar(2000)")//우대조건
    private String spclCnd;
    @Column(columnDefinition = "varchar(2000)")//만기 후 이자율
    private String mtrtInt;
    @Column//가입대상
    private String joinMember;
    @Column//가입방법
    private String joinWay;
    @Column//가입제한 EX) 1:제한없음, 2:서민전용, 3일부제한
    private String joinDeny;
    @Column//금융회사명
    private String korCoNm;
    @Column//금융상품명
    private String finPrdtNm;
    @Column//기타 유의사항
    private String etcNote;
    @Column//공시 제출일[YYYYMM]
    private String dclsMonth;
    @Column
    private String dclsStrtDay;
    @Column
    private String dclsEndDay;
    @Column//금융회사 제출일 [YYYYMMDDHH24MI]
    private String finCoSubmDay;

    @Column
    private int enable;

    @OneToMany(mappedBy = "deposit", fetch = FetchType.LAZY)
    @Builder.Default
    private List<DepositOption> depositOptions = new ArrayList<>();
    
}

deposit entity는 다음과 같이 코드를 작성하였는데, bank 엔티티와 일대다 관계이며 fin_co_no, fin_prdt_cd 2개의 복합키를 PK로 설정하였습니다. 그렇기때문에 @Idclass어노테이션을 설정하였습니다.

 

DepositPK

 

@EqualsAndHashCode(onlyExplicitlyIncluded = true)
@NoArgsConstructor
@AllArgsConstructor
public class DepositPK implements Serializable {

    private static final long serialVersionUID = -2929789292155268166L;

    @EqualsAndHashCode.Include
    private String bank;    //finCoNo
    @EqualsAndHashCode.Include
    private String finPrdtCd;

}

복합키를 사용하기 위해선 Serializable을 상속받아 Equals와 HashCode를 구현해주어야 합니다.

 

일일이 구현하기 귀찮기 떄문에 Lombok에서 제공하는 어노테이션을 이용하여 작성하였습니다.

 

다음으로 각 예금의 옵션정보를 담는 entity인 depositOption entity를 확인해봅시다.

 

DepositOption

 

@Getter
@Builder
@AllArgsConstructor
@Entity
@NoArgsConstructor(access = AccessLevel.PROTECTED)
@Table(name = "tb_deposit_option")
public class DepositOption extends BaseTimeEntity {

    @Id
    @GeneratedValue(strategy = GenerationType.IDENTITY)
    private Long depositOptionNo;

    
    @ManyToOne(fetch = FetchType.LAZY)
    @JoinColumns(value = {
            @JoinColumn(name = "finPrdtCd",
                    referencedColumnName = "finPrdtCd"),
            @JoinColumn(name = "finCoNo",
                    referencedColumnName = "finCoNo")
    }, foreignKey = @ForeignKey(ConstraintMode.NO_CONSTRAINT))
    private Deposit deposit;
    
    @Column //저축 금리 [소수점 2자리]
    private double intrRate2;
    @Column //최고 우대금리[소수점 2자리]
    private double intrRate;
    @Column(length = 20) //저축 금리 유형명
    private String intrRateTypeNm;
    @Column(length = 10) //저축 기간[단위: 개월]
    private String saveTrm;
    @Column(length = 4) //저축 금리 유형
    private String intrRateType;
    @Column
    private String dclsMonth;
    
}

각 API 결과값에서 필요한 항목들만 추가하여 작성하였습니다.

 

deposit과 일대 다 관계를 맺고 있기 때문에 조인 컬럼으로 finPrdtCd, finCoNo을 함께 작성해 주었습니다.


 

이제 API 결과값을 매핑해줄  각각의 DTO를 작성해보도록합시다.

 

DepositDto

 

@Data
public class DepositDto {

    private String finCoSubmDay;
    private String dclsStrtDay;
    private String dclsEndDay;
    private long maxLimit;
    private String etcNote;
    private String joinMember;
    private String joinDeny;
    private String spclCnd;
    private String mtrtInt;
    private String joinWay;
    private String finPrdtNm;
    private String korCoNm;
    private String finPrdtCd;
    private String finCoNo;
    private String dclsMonth;

    private List<DepositOptionDto> options;

    public Deposit toEntity(Bank bank){
        
        return Deposit.builder()
                .finPrdtCd(finPrdtCd)
                .bank(bank)
                .finCoSubmDay(finCoSubmDay)
                .dclsStrtDay(dclsStrtDay)
                .dclsEndDay(dclsEndDay)
                .maxLimit(maxLimit)
                .etcNote(etcNote)
                .joinMember(joinMember)
                .joinDeny(joinDeny)
                .spclCnd(spclCnd)
                .mtrtInt(mtrtInt)
                .joinWay(joinWay)
                .finPrdtNm(finPrdtNm)
                .korCoNm(korCoNm)
                .dclsMonth(dclsMonth)
                .build();
    }

    @Getter
    @Setter
    public static class Result {
        private List<DepositOptionDto> optionList;
        private List<Baselist> baseList;
        @JsonProperty("err_msg")
        private String errMsg;
        @JsonProperty("err_cd")
        private String errCd;
        @JsonProperty("now_page_no")
        private String nowPageNo;
        @JsonProperty("max_page_no")
        private String maxPageNo;
        @JsonProperty("total_count")
        private String totalCount;
        @JsonProperty("prdt_div")
        private String prdtDiv;
    }


    @Getter
    @Setter
    public static class Baselist {
        @JsonProperty("fin_co_subm_day")
        private String finCoSubmDay;
        @JsonProperty("dcls_strt_day")
        private String dclsStrtDay;
        @JsonProperty("dcls_end_day")
        private String dclsEndDay;
        @JsonProperty("max_limit")
        private long maxLimit;
        @JsonProperty("etc_note")
        private String etcNote;
        @JsonProperty("join_member")
        private String joinMember;
        @JsonProperty("join_deny")
        private String joinDeny;
        @JsonProperty("spcl_cnd")
        private String spclCnd;
        @JsonProperty("mtrt_int")
        private String mtrtInt;
        @JsonProperty("join_way")
        private String joinWay;
        @JsonProperty("fin_prdt_nm")
        private String finPrdtNm;
        @JsonProperty("kor_co_nm")
        private String korCoNm;
        @JsonProperty("fin_prdt_cd")
        private String finPrdtCd;
        @JsonProperty("fin_co_no")
        private String finCoNo;
        @JsonProperty("dcls_month")
        private String dclsMonth;


        public boolean isDepositOption(DepositOptionDto depositOptionDto){
            if(finCoNo.equals(depositOptionDto.getFinCoNo()) && finPrdtCd.equals(depositOptionDto.getFinPrdtCd())){
                return true;
            }
            return false;
        }
    }

    @Data
    public static class ResponseDepositApi {
        private Result result;


        public boolean requestSuccess(){
            if(result != null && result.getErrCd().equals("000")){
                return true;
            }

            return false;
        }

        public boolean isOverLastPage(){
            if(requestSuccess() && Integer.parseInt(result.getMaxPageNo()) < Integer.parseInt(result.getNowPageNo())){
                return true;
            }

            return false;
        }
    }

}

API 결과에 따라 매핑해주고 있으며 나머지 메서드는 API 호출에 사용할 기능들을 작성하였습니다.

 

DepositOptionDto

 

@Setter
@Getter
public class DepositOptionDto {

    @JsonProperty("intr_rate2")
    private double intrRate2;
    @JsonProperty("intr_rate")
    private double intrRate;
    @JsonProperty("save_trm")
    private String saveTrm;
    @JsonProperty("intr_rate_type_nm")
    private String intrRateTypeNm;
    @JsonProperty("intr_rate_type")
    private String intrRateType;
    @JsonProperty("fin_prdt_cd")
    private String finPrdtCd;
    @JsonProperty("fin_co_no")
    private String finCoNo;
    @JsonProperty("dcls_month")
    private String dclsMonth;


    public DepositOption toEntity(Deposit deposit){
        return DepositOption.builder()
                .deposit(deposit)
                .intrRate(intrRate)
                .intrRate2(intrRate2)
                .saveTrm(saveTrm)
                .intrRateTypeNm(intrRateTypeNm)
                .intrRateType(intrRateType)
                .dclsMonth(dclsMonth).build();
    }
}

 

각각의 Dto는 entity로 만들어주는 toEntity 메서드를 이용하여 Entity변환을 해줄 수 있도록 작성하였습니다.


 

이제 본격적으로 JOB을 작성해보도록 합시다.

 

depositSyncJob

 

    @Bean
    public Job depositSyncJob() {
        return jobBuilderFactory.get("depositSyncJob")
                .incrementer(new RunIdIncrementer())
                .start(depositInitStep())                /* 기존 예금목록들의 사용여부를 0으로 바꿔주는 Step */
                .next(depositSyncStep())                 /* 금융감독원 OPEN API를 이용하여 동기화하는 Step */
                .build();
    }

기존 bankSyncJob과 매우 유사합니다.

 

첫번째 Step인 depositInitStep은 DB에 등록되어있는 deposit테이블의 enable 컬럼을 0으로 바꿔주며 deposit_option테이블을 truncate로 비우는 작업이 포함되어 있습니다.

 

depositInitStep

 

    @Bean
    public Step depositInitStep(){
        return stepBuilderFactory.get("depositInitStep")
                .tasklet((contribution, chunkContext) -> {
                    depositRepository.updateAllDepositEnable(0);
                    depositOptionRepository.truncateDepositOption();
                    return RepeatStatus.FINISHED;
                }).build();
    }

updateAllDepositEnable: deposit테이블의 enable 컬럼을 0으로 바꿔주는 메서드

trunacateDepositOption: deposit_option 테이블을 truncate로 초기화 해주는 메서드

 

enable컬럼을 0으로 바꿔주는 메서드는 기존 bank에서 사용한 메서드와 비슷하므로 생략하고 depositOptionRepository.truncateDepositOption() 코드를 확인해보도록 합시다.

 

truncateDepositOption

 

    @Transactional
    @Modifying
    @Query(value = "truncate tb_deposit_option", nativeQuery = true)
    void truncateDepositOption();

JPQL이 아닌 NativeQuery를 이용하였습니다.

 

이제 depositSyncStep입니다.

 

depositSyncStep

 

    @Bean
    public Step depositSyncStep(){
        return stepBuilderFactory.get("depositSyncStep")
                .<List<DepositDto>, List<Deposit>>chunk(1)
                .reader(depositItemReader())
                .writer(compositeDepositItemWriter())
                .build();
    }

저번글에서 작성한 bankSyncStep과 별차이는 없습니다.

 

input으로 List<DepositDto> output으로 List<Deposit>을 하고있습니다. 대신 bank의 경우 JpaWriter 구현체를 사용했지만

 

CompositeItemWriter과 JdbcBatchItemWriter를 함께 사용하여 작성할 예정입니다.

 

우선 Reader부터 보도록 하겠습니다.

 

depositItemReader

 

    @Bean
    public ItemReader<List<DepositDto>> depositItemReader() {
        return new CustomDepositItemReader(webClient, modelMapper);
    }

Redaer의 경우 bank와 그다지 차이가 나지 않습니다. 이번에도 마찬가지로 구현체를 사용하지 않고 직접 구현하도록 작성 하였습니다.

 

CustomDepositItemReader

 

public class CustomDepositItemReader implements ItemReader<List<DepositDto>> {
    public CustomDepositItemReader(WebClient webClient, ModelMapper modelMapper) {
        this.webClient = webClient;
        this.modelMapper = modelMapper;
    }

    private final WebClient webClient;
    private final ModelMapper modelMapper;
    @Value(value = "${api.fss.host}")
    private String fssHost;
    @Value(value = "${api.fss.deposit.path}")
    private String depositPath;
    @Value(value = "${api.fss.authKey}")
    private String authKey;
    private int currentPage = 1;

    private List<String> topFinGrpNoList = new ArrayList<>(Arrays.asList("020000", "030300"));
    private int currentGrpNo = 0;

    @Override
    public List<DepositDto> read() throws Exception {
        DepositDto.ResponseDepositApi result = getDepositList(currentPage, topFinGrpNoList.get(currentGrpNo));

        /* 정상 호출이 실패한 경우 break */
        if (!result.requestSuccess()) {
            throw new Exception("");
        }


        if (result.isOverLastPage() && currentGrpNo == 0) {
            currentGrpNo++;
            currentPage = 0;
        } else if (result.isOverLastPage() && currentGrpNo == 1) {
            return null;
        }


        /* 다음페이지로 셋팅한다. */
        currentPage++;


        return result.getResult().getBaseList().stream().map(depositInfo -> {

            List<DepositOptionDto> depositOptionDtos = new ArrayList<>();

            result.getResult().getOptionList().stream().forEach(depositOptionDto -> {
                /* 옵션이 존재하는 경우 체크 */
                if (depositInfo.isDepositOption(depositOptionDto)) {
                    depositOptionDtos.add(depositOptionDto);
                }
            });

            DepositDto depositDto = modelMapper.map(depositInfo, DepositDto.class);

            depositDto.setOptions(depositOptionDtos);
            return depositDto;
        }).collect(Collectors.toList());
    }


    public DepositDto.ResponseDepositApi getDepositList(int currentPage, String topFinGrpNo) {

        return webClient.get()
                .uri(uriBuilder -> uriBuilder.scheme("https")
                        .host(fssHost)
                        .path(depositPath)
                        .queryParam("auth", authKey)
                        .queryParam("topFinGrpNo", topFinGrpNo)
                        .queryParam("pageNo", currentPage)
                        .build())
                .retrieve()
                .onStatus(HttpStatus::isError, clientResponse -> null)
                .bodyToMono(DepositDto.ResponseDepositApi.class)
                .flux()
                .toStream()
                .findFirst().orElse(null);
    }
}

bank와 마찬가지로 host, path, authKey값은 applicateion.yml 파일에 정의한 값을 불러와 사용하였습니다.

 

한가지 다른점은 각 예금상품마다 옵션이 존재하기 때문에 옵션을 담아주는 부분이 추가되었습니다.

 

bank에선 jpa를 이용하여 데이터를 저장하였지만 이번에는 JdbcBatchItemWriter를 이용하여 저장할 것이기 때문에 Processor를 생략하고 Writer에서 전부 처리하도록 코드를 작성하였습니다.

 


compositeDepositItemWriter

 

    @Bean
    public CompositeItemWriter compositeDepositItemWriter() {
        List<ItemWriter> delegates = new ArrayList<>(2);
        delegates.add(new CustomDepositJdbcItemWriter(dataSource, new JdbcBatchItemWriter()));
        delegates.add(new CustomDepositOptionJdbcItemWriter(dataSource, new JdbcBatchItemWriter()));

        CompositeItemWriter compositeItemWriter = new CompositeItemWriter();
        compositeItemWriter.setDelegates(delegates);
        return compositeItemWriter;
    }

CompositeItemWriter를 사용한 이유는 읽어드린 값들이 Deposit, DepositOption 두가지로 나뉘는데 이것들을 각각 writer를 이용하여 2개를 한꺼번에 저장하기 위해 사용하였습니다.

 

jpa가 아닌 jdbc를 이용한 것은 배치성 insert를 더욱 용이하게 사용하기 위해 채택하였습니다.

 

이제 deposit, depositOption 각각을 구현한 CustomDepositJdbcItemWriter, CustomDepositOptionJdbcItemWriter를 확인해봅시다.

 

CustomDepositJdbcItemWriter

 

public class CustomDepositJdbcItemWriter implements ItemWriter<List<DepositDto>> {

    private final DataSource dataSource;
    private final JdbcBatchItemWriter<DepositDto> jdbcBatchItemWriter;

    public CustomDepositJdbcItemWriter(DataSource dataSource, JdbcBatchItemWriter jdbcBatchItemWriter) {
        this.dataSource = dataSource;
        this.jdbcBatchItemWriter = jdbcBatchItemWriter;
    }
    @Override
    public void write(List<? extends List<DepositDto>> items) throws Exception {

        /* List<List<DepositDto>> -> List<DepositDto> 변환 */
        List<DepositDto> depositDtos = items.stream().flatMap(Collection::stream).collect(Collectors.toList());

        String sql = "INSERT INTO tb_deposit (fin_co_no, fin_prdt_cd, dcls_end_day, dcls_month, dcls_strt_day, enable, etc_note, fin_co_subm_day, fin_prdt_nm, " +
                "join_deny, join_member, join_way, kor_co_nm, max_limit, mtrt_int, spcl_cnd, created_date, last_modified_date) values " +
                "(:finCoNo, :finPrdtCd, :dclsEndDay, :dclsMonth, :dclsStrtDay, 1, :etcNote, :finCoSubmDay, :finPrdtNm, :joinDeny, " +
                ":joinMember, :joinWay, :korCoNm, :maxLimit, :mtrtInt, :spclCnd, now(), now()) " +
                "ON DUPLICATE KEY UPDATE " +
                "dcls_end_day = :dclsEndDay," +
                "dcls_month = :dclsMonth, " +
                "dcls_strt_day = :dclsStrtDay, " +
                "enable = 1, " +
                "etc_note = :etcNote, " +
                "fin_co_subm_day = :finCoSubmDay, " +
                "fin_prdt_nm = :finPrdtNm, " +
                "join_deny = :joinDeny, " +
                "join_member = :joinMember, " +
                "join_way = :joinWay, " +
                "kor_co_nm = :korCoNm, " +
                "max_limit = :maxLimit, " +
                "mtrt_int = :mtrtInt, " +
                "spcl_cnd = :spclCnd, " +
                "last_modified_date = now()";

        jdbcBatchItemWriter.setDataSource(dataSource);
        jdbcBatchItemWriter.setJdbcTemplate(new NamedParameterJdbcTemplate(dataSource));
        jdbcBatchItemWriter.setSql(sql);
        /* DepositDto클래스를 자동으로 파라미터로 생성할 수 있는 설정 */
        jdbcBatchItemWriter.setItemSqlParameterSourceProvider(new BeanPropertyItemSqlParameterSourceProvider());
        jdbcBatchItemWriter.afterPropertiesSet();
        jdbcBatchItemWriter.write(depositDtos);

    }
}

기본적으론 INSERT를 진행하지만 해당 PK가 존재할 경우 업데이트 하는 형태로 구현하였습니다.

 

다음으로 CustomDepositOptionJdbcItemWriter입니다.

 

CustomDepositOptionJdbcItemWriter

 

public class CustomDepositOptionJdbcItemWriter implements ItemWriter<List<DepositDto>> {


    private final JdbcBatchItemWriter jdbcBatchItemWriter;
    private final DataSource dataSource;

    public CustomDepositOptionJdbcItemWriter(DataSource dataSource, JdbcBatchItemWriter jdbcBatchItemWriter) {

        this.jdbcBatchItemWriter = jdbcBatchItemWriter;
        this.dataSource = dataSource;
    }

    @Override
    public void write(List<? extends List<DepositDto>> items) throws Exception {
        /* List<List<DepositDto>> -> List<DepositOptionDto> */
        List<DepositOptionDto> depositOptionDtos = items.stream().flatMap(Collection::stream).map(DepositDto::getOptions)
                .flatMap(Collection::stream).collect(Collectors.toList());


        String sql = "INSERT INTO tb_deposit_option " +
                "(fin_co_no, fin_prdt_cd, save_trm, intr_rate_type_nm, intr_rate_type, intr_rate2, intr_rate, dcls_month, created_Date, last_modified_date)" +
                " values (:finCoNo, :finPrdtCd, :saveTrm, :intrRateTypeNm, :intrRateType, :intrRate2, :intrRate, :dclsMonth, now(), now())";

        jdbcBatchItemWriter.setDataSource(dataSource);
        jdbcBatchItemWriter.setJdbcTemplate(new NamedParameterJdbcTemplate(dataSource));
        jdbcBatchItemWriter.setSql(sql);
        jdbcBatchItemWriter.setItemSqlParameterSourceProvider(new BeanPropertyItemSqlParameterSourceProvider());
        jdbcBatchItemWriter.afterPropertiesSet();
        jdbcBatchItemWriter.write(depositOptionDtos);

    }
}

depositOption의 경우 truncate로 테이블을 비워주기 때문에 따로 업데이트를 진행하고 INSERT로만 진행합니다.

 

이제 예금 batch JOB을 완성하였으니 직접 실행해보도록합시다.

 

depositSyncJob

다음과 같이 파라미터를 넘겨주고 실행하면

 

tb_deposit 테이블에 예금정보가 동기화 된 모습
tb_deposit_option 테이블

동기화 되는 모습을 볼 수 있습니다!


이렇게 지금까지 예금정보를 OPEN API로 호출하여 DB에 동기화하는 JOB을 작성해보았습니다.

 

긴 글 읽어주셔서 감사합니다.

 

문의사항이나 잘 안되는 부분이 있으시면 댓글 남겨주시면 최대한 답변 드리도록 하겠습니다.

 

감사합니다

 

 

 

728x90
반응형

댓글