본문 바로가기
프로그래밍/java

[Srping Batch] 스프링 배치를 이용하여 예,적금 데이터 동기화하기 2편

by 뜨끔쓰 2022. 9. 9.
728x90
728x90

스프링 배치 예, 적금 데이터 동기화 시리즈

2022.09.07 - [프로그래밍/java] - [Srping Batch] 스프링 배치를 이용하여 예,적금 데이터 동기화하기 1편

[현재글]2022.09.09 - [프로그래밍/java] - [Srping Batch] 스프링 배치를 이용하여 예,적금 데이터 동기화하기 2편

2022.09.10 - [프로그래밍/java] - [Srping Batch] 스프링 배치를 이용하여 예,적금 데이터 동기화하기 3편

2022.09.12 - [프로그래밍/java] - [Srping Batch] 스프링 배치를 이용하여 예,적금 데이터 동기화하기 4편


 

안녕하세요!

 

스프링 배치를 이용하여 예,적금 데이터 동기화하기 2편입니다.

 

이번편에서는 직접 SpringBatch를 이용하여 금융감독원 OPEN API에서 받아온 데이터를 스프링배치를 이용하여 데이터베이스에 저장하는 것을 글로 정리해보도록합시다.

 

우선 은행 테이블은 다음과 같이 구성하였습니다.

 

BankEntity를 통해 자동생성된 테이블 구조입니다.

tb_bank 테이블 명세서

일단 스프링배치 프로젝트를 생성후 잡은 다음과 같이 구성하였습니다.

bankSyncJob

    @Bean
    public Job bankSyncJob(){
        return jobBuilderFactory.get("bankSyncJob")
                .incrementer(new RunIdIncrementer())    /* 여러번 호출할 수 있도록 RunIdIncrementer 메서드를 사용하여 중복되지 않게 실행 */
                .start(bankInitStep())                  /* 기존 은행목록들의 사용여부를 0으로 바꿔주는 Step */
                .next(bankSyncStep())                   /* 금융감독원 OPEN API를 이용하여 동기화하는 Step */
                .build();
    }

bankSyncJob은 다음 순서대로 실행됩니다. 보다시피 따로 복잡한 로직이 없기때문에 간단히 구성하였습니다.

 

bankInitStep: tb_bank 테이블의 enable값을 0으로 만들어주는 Step

 

bankSyncStep: tb_bank테이블에 OpenAPI를 통하여 금융회사 정보를 동기화 시켜주는 Step

 

그럼 각 Step의 내부구조를 보도록 합시다.

bankInitStep

    @Bean
    /* 기존 은행목록들의 사용여부를 0으로 바꿔주는 Step */
    public Step bankInitStep(){
        return stepBuilderFactory.get("bankInitStep")
                .tasklet((contribution, chunkContext) -> {
                    bankRepository.updateBankAllEnable(0);
                    return RepeatStatus.FINISHED;
                }).build();

    }

별다른 로직이 필요하지 않아 tasklet으로 간단히 구현하였습니다.

bankRepository의 updateBankAllEnable메서드를 호출하여 기존에 저장되어 있는 모든 금융회사 row들의 enable값을 0으로 변경시켜주고 있습니다.

return 값으로 RepeatStatus.FINISHED를 지정하였습니다. 

updateBankAllEnable

public interface BankRepository extends JpaRepository<Bank, String> {

    @Modifying(clearAutomatically = true)
    @Query("update Bank set enable = :enable")
    void updateBankAllEnable(@Param("enable") int enable);
}

JPQL을 이용하여 간단히 구현하였습니다.

 

@Modifying(clearAutomatically = true를 적용한 것은 벌크 연산 직후 자동으로 영속성 컨택스트를 Clear 해주도록 하기위해 추가하였습니다.

 

그다음 Step으로 bankSyncStep은 다음과 같이 구성되어있습니다.

bankSyncStep

    @Bean
    public Step bankSyncStep(){
        return stepBuilderFactory.get("bankSyncStep")
                .<List<BankDto>, List<Bank>>chunk(1)
                .reader(bankItemReader())
                .processor(bankItemProcessor())
                .writer(bankItemWriterList())
                .build();
    }

Chunk기반의 STEP을 구성하였는데요. 단위는 1로 지정하였습니다.

코드상에서 chunk앞의 <input, output>이며 호출 API의 page단위로 구성했다고 생각하시면 될 것 같습니다.

Chunk기반의 STEP은 3가지 단계를 거친다고 보시면됩니다. processor는 생략가능

reader: 말 그대로 배치작업할 대상을 읽는 단계

processor: 읽어온 데이터를 가공하는 단계 ( 생략가능 ) 

writer: 가공한 데이터를 처리(저장)하는 단계

 

그럼 차례대로 코드를 보도록합시다.

bankItemReader

    @Bean
    public ItemReader<List<BankDto>> bankItemReader() {
        return new CustomBankItemReader(webClient, modelMapper);
    }

 

ItemReader의 경우 이미 구현된 다양한 구현체가 있지만 저는 Custom하게 사용하고 싶어서 직접 CustomBankItemReader 이라는 메서드명으로 작성하였습니다.

CustomBankItemReader

 

@Slf4j
public class CustomBankItemReader implements ItemReader<List<BankDto>> {

    public CustomBankItemReader(WebClient webClient, ModelMapper modelMapper){
        this.webClient = webClient;
        this.modelMapper = modelMapper;
    }

    private final WebClient webClient;
    private final ModelMapper modelMapper;
    @Value(value = "${api.fss.host}")
    private String fssHost;     // 금융감독원 OPEN API 호스트정보
    @Value(value = "${api.fss.bank.path}")
    private String bankPath;    // 금융감독원 OPEN API 금융회사 요청 Path
    @Value(value = "${api.fss.authKey}")
    private String authKey;     // API 호출을 위한 키값

    private int currentPage = 1;    //현재 요청 PAGE

    private List<String> topFinGrpNoList = new ArrayList<>(Arrays.asList("020000", "030300"));  //권역 코드
    private int currentGrpNo = 0;




    @Override
    public List<BankDto> read() throws Exception, UnexpectedInputException, ParseException, NonTransientResourceException {

        BankDto.ResponseBankApi result = getBankList(currentPage, topFinGrpNoList.get(currentGrpNo));

        /* 정상 호출이 실패한 경우 break */
        if(!result.requestSuccess()){
            throw new Exception("");
        }

        if(result.isOverLastPage() && currentGrpNo == 0){
            currentGrpNo++;
            currentPage = 0;
        }else if(result.isOverLastPage() && currentGrpNo == 1){
            /* return 값으로 null을 반환 할 경우 read 종료 */
            return null;
        }



        /* 다음페이지로 셋팅한다. */
        currentPage++;


        return result.getResult().getBaseList().stream().map(bankInfo ->{
            BankDto bankDto = modelMapper.map(bankInfo, BankDto.class);

            if(topFinGrpNoList.get(currentGrpNo).equals("020000")){
                bankDto.setBankType(Bank.BankType.BANK);
            }else if (topFinGrpNoList.get(currentGrpNo).equals("030300")){
                bankDto.setBankType(Bank.BankType.SAVING_BANK);
            }

            return bankDto;
        }).collect(Collectors.toList());
    }


    public BankDto.ResponseBankApi getBankList(int currentPage, String topFinGrpNo) throws Exception {

        return webClient.get()
                    .uri(uriBuilder -> uriBuilder.scheme("https")
                            .host(fssHost)
                            .path(bankPath)
                            .queryParam("auth", authKey)
                            .queryParam("topFinGrpNo", topFinGrpNo)
                            .queryParam("pageNo", currentPage)
                            .build())
                    .retrieve()
                    .onStatus(HttpStatus::isError, clientResponse -> Mono.error(new Exception()))
                    .bodyToMono(BankDto.ResponseBankApi.class)
                    .flux()
                    .toStream()
                    .findFirst().orElse(null);


    }
}

ItemReader interface를 구현하여 read를 오버라이드하여 작성하였습니다.

코드구현이 매끄럽지 않을 수 있지만 은행(020000), 저축은행(030300) 권역코드 두곳을 동기화 할 것이기 때문에 각 권역코드 별로 마지막페이지까지 webclient로 API 호출하여 동기화할 금융회사 정보를 읽습니다.

 

다음으로 읽은 데이터를 가공하기 위한Processor입니다.

bankItemProcessor

    @Bean
    public ItemProcessor<List<BankDto>, List<Bank>> bankItemProcessor() {
        return new CustomBankItemProcessor();
    }

이번에도 마찬가지로 구현체를 사용하지않고 Custom하게 구현하였습니다.

 

CustomBankItemProcessor

public class CustomBankItemProcessor implements ItemProcessor<List<BankDto>, List<Bank>> {

    @Override
    public List<Bank> process(List<BankDto> items)  {
        return items.stream().map(BankDto::toEntity).collect(Collectors.toList());
    }
}

보다시피 대단한 로직은 아니며 BankDto형태로 읽어드린 값들을 DB에 저장하기위해 Entity형태로 변환하는 코드입니다.

 

이제 마지막 단계로 Writer에서 

Entity형태로 가공된 값들을 DB에 저장하도록 합시다.

bankItemWriterList

    public JpaItemListWriter<Bank> bankItemWriterList() {
        JpaItemWriter<Bank> writer = new JpaItemWriter<>();
        writer.setEntityManagerFactory(em);
        return new JpaItemListWriter<>(writer);
    }

writer의 경우 reader, processor와는 다르게 JpaItemWriter를 이용하여 작성하였습니다.

JpaItemWriter는 JPA를 사용하기 때문에 영속성 관리를 위하여 EntityManager가 필요합니다.

하지만 저희는 chunk의 output이 list형태이기 때문에 List<List<Bank>>형태로 writer에 값이 전달 되기 때문에 바로 사용할 수 없고 List<Bank>형태로 변환하기 위해서 JpaItemWriter의 write메소드를 재구현해주어야 합니다.

 

저의 경우 JpaItemListWriter라는 메서드를 생성하여 처리하였습니다.

JpaItemListWriter

 

public class JpaItemListWriter<T> extends JpaItemWriter<List<T>> {
    private JpaItemWriter<T> jpaItemWriter;

    public JpaItemListWriter(JpaItemWriter<T> jpaItemWriter){
        this.jpaItemWriter = jpaItemWriter;
    }

    @Override
    public void write(List<? extends List<T>> items) {
        jpaItemWriter.write(items.stream().flatMap(innerArray -> innerArray.stream()).collect(Collectors.toList()));
    }
}

코드를 보시면 stream의 flatMap을 이용하여 List<List<Bank>> 형태를 List<Bank>형태로 변환하였습니다.

 

이렇게 writer까지 작성하면 기본적인 금융회사 정보를 동기화하는 배치의 작성은 끝이나게됩니다.

 

이제 실제 배치를 구동하여 DB에 정말 데이터가 들어가는지 확인해보도록 합시다.

 

금융회사 동기화 BATCH 실행

spring:
  batch:
    job:
      enabled: true
      names: ${job.name:NONE}

특정잡만 실행하기 위해 application.yml 설정을 다음과 같이 설정하였습니다.

특정 job실행

저희가 작성한 bankSyncJob을 실행하기위해 실행변수를 추가하여 실행하면 다음과 같이 DB에 데이터가 쌓인 모습을 확인할 수 있습니다.

 

tb_bank 테이블 데이터

tb_bank

 

이런식으로 데이터가 쌓이는것을 확인 할 수 있습니다!

 

이렇게 금융회사 정보 BATCH는 작성 해보았습니다.

 

앞으로 예,적금 동기화 BATCH도 작성하여 토이프로젝트를 진행해보도록 합시다.

 

부족하지만 긴 글 읽어주셔서 감사합니다.

 

해당 API를 이용하여 작성한 ToyProject 사이트

https://cash.sundry.ninja

 

[뜨끔한가계부] 메인

금융/재테크/잡다한것 가득가득한 저장소

cash.sundry.ninja

 

728x90
반응형

댓글