[Spring Batch] CSV Item reader 구현하기

글쓴이 Engineer Myoa 날짜

1. 들어가기에 앞서

 

현재 파트 내에서 Batch 관련 작업을 많이 하고 있습니다.

SCDF 와 Confluent Kafka 로 전향하는 것을 다음 페이즈로 준비하고 있어,

현재 개발환경은 Spring Batch 프레임워크를 이용하고 있습니다.

따라서 한동안 Spring batch 를 사용하며 겪었던 내용들을 작성하고자 합니다.

 

 

2. FlatFileItemReader

Spring batch 로 다양한 Batch job 을 만들다 보면 곧 마주하는일이 있습니다.

Datasource 가 DB 가 아닌, spread sheets 파일을 line by line 으로 파싱하는 일입니다.

(혹은 custom 한 db item reader ..)

 

Spread sheets 라 함은 크게 2가지 종류가 있습니다.

  • Excel 파일은 Apache POI 를 이용하고,
  • CSV 파일은 Spring Batch 에서 제공하는 FlatFileItemReader 를 이용하면 됩니다.

 

추가로 JSON 이나 XML 같은 key-value based data format 도 File item reader 와 원리는 같습니다.

얼마든지 custom item reader 를 구현하실 수 있습니다.

나중에 기회가 되면 이 부분도 직접 구현해서 공유하고 싶네요.

 

 

3. CSV Item reader 구현하기

 

1) 예제 파일

바로 시작하기 전에 예제를 하나 준비하겠습니다.

 

이름, 나이, 이메일로 구성된 명부 파일입니다.

이름 나이 이메일
홍길동 30 engineer.myoa@gmail.com
김철수 kimV3@naver.com
박민지 모름 minji
은유리 25살 silve_glass@s-glass.com

위 처럼 구성된 CSV 파일이 있다고 가정하겠습니다.

 

이름에는 문자열, 나이에는 숫자, 이메일에는 문자열이 들어가야합니다.

 

2) FlatFileItemReader

CSV 파일은 Plain Text file 에 해당합니다. 따라서 ItemReader 의 구현체 중 하나인 FlatFileItemReader 를 이용하면 line by line 으로 record 를 읽어 올 수 있습니다.

  • FlatFileItemReader
    public class FlatFileItemReader<T> extends AbstractItemCountingItemStreamItemReader<T> implements
    		ResourceAwareItemReaderItemStream<T>, InitializingBean

 

FlatFileItemReader 를 객체를 생성할 때는 Type token 을 필요로 하는데, field 명으로 mapping 할 class 의 Type token 입니다.

 

따라서 아래와 같은 class 를 하나 만듭니다.

public class CsvRawUserDTO {
    String name;
    Long age;
    String email;
}

 

그리고 FlatFileItemReader 객체를 생성합니다.

FlatFileItemReader<CsvRawUserDTO> flatFileItemReader = new FlatFileItemReader<>();

 

또한 Encoding 과 Input file 경로가 있는 Resource 객체, 그리고 Line Mapper 3가지를 설정해줍니다.

flatFileItemReader.setEncoding(StandardCharsets.UTF_8.name());
flatFileItemReader.setResource(getFileSystemResource(csvFilePath));
flatFileItemReader.setLineMapper(lineMapper);

(LineMapper 는 다음 절을 참고해주세요)

 

3) LineMapper

Item Reader 가 읽어온 Line 을 어떻게 mapping 할 것인지 정의해주어야 합니다.

DefaultLineMapper 를 사용하는데, 이 Line Mapper 에 FieldSetMapper 와 LineTokenizer 를 설정해주어야 합니다.

DefaultLineMapper<CsvRawUserDTO> lineMapper = new DefaultLineMapper<>();
lineMapper.setFieldSetMapper(fieldSetMapper);
lineMapper.setLineTokenizer(lineTokenizer);

 

4) LineTokenizer

LineTokenizer 는 new line(“\0”) 를 만나기 전까지(record 를 의미하겠죠?)의 CharSequence 를 어떻게 쪼갤지 정의합니다.

LineTokenizer 에서 설정한 field 명과 column index 가 FieldSetMapper 에서 사용됩니다.

이는 RowMapper 의 동작과 유사합니다.

DelimitedLineTokenizer 를 이용합니다.

        DelimitedLineTokenizer lineTokenizer = new DelimitedLineTokenizer();
        lineTokenizer.setStrict(true); // default
        lineTokenizer.setNames(CsvRawUserDTOFields.getFieldNameArrays());
        lineTokenizer.setIncludedFields(CsvRawUserDTOFields.getColumnIndexArrays());

 

이 때 strict mode 는 true 가 default 입니다.

  • strict mode 란?

If true (the default) then number of tokens in line must match the number of tokens defined.

If false then lines with less tokens will be tolerated and padded with empty columns, and lines with more tokens will simply be truncated.

즉, true 일 경우 line tokenizer 에 설정한 names 와 includeFields 가 읽어온 line 의 tokens 와 정확하게 일치해야 한다는 뜻입니다.

하지만 true 로 지정하여도 field 가 더 많이 추출된 line은 truncated 되는 것을 확인했습니다.

e.g.) [이름, 나이, 이메일, 지역] -> [이름, 나이, 이메일] 로 truncated

 

이 때 field 명과 column index 가 포함돼있는, private enum 하나를 생성하면 더욱 안전하게 사용할 수 있습니다.

    private enum CsvRawUserDTOFields {
        EMAIL("email", 0),
        NAME("name", 1),
        AGE("age", 2),
        ;

        private String fieldName;
        private int columnIndex;

        // getter ...
        // constructor ...

        public static String[] getFieldNameArrays() {
            return Arrays.stream(CsvRawUserDTOFields.values())
                         .map(CsvRawUserDTOFields::getFieldName)
                         .toArray(String[]::new);
        }

        public static int[] getColumnIndexArrays() {
            return Arrays.stream(CsvRawUserDTOFields.values())
                         .mapToInt(CsvRawUserDTOFields::getColumnIndex)
                         .toArray();
        }
    }

 

 

 

5) FieldSetMapper

JpaItemReader 나 JdbcItemReader 와 같은 datasource 가 db 인 item reader 들은

한 record 를 읽어왔을 때 ResultSet 객체를 반환합니다.

FlatFileItemReader 에서는 FieldSet 객체를 반환합니다.

따라서 반환된 FieldSet 으로 부터 Destination 에 해당하는 class 에 매핑하는 mapper 가 필요합니다.

BeanWrapperFieldSetMapper 를 이용하면 됩니다.

이 때, Line Tokenizer 에 지정한 fields 를 기반으로 reflection 이 일어나게 됩니다.

BeanWrapperFieldSetMapper<CsvRawUserDTO> fieldSetMapper = new BeanWrapperFieldSetMapper<>();
fieldSetMapper.setStrict(true); // default
fieldSetMapper.setTargetType(CsvRawUserDTO.class);

 

strict 모드는 true 가 default 입니다.

  • strict mode 란?

지정한 type token 에서 대응하는 field 에 set 을 하려고 보니, set 을 할 수 없는 상황일 때 exception 을 발생할지, 아니면 mapping 하지 않은 채 item 을 return 할 지 여부입니다.

e.g.) true 로 설정했을 때 : CsvRawUserDTO 의 age 는 Long type 의 field 입니다. 하지만 String 형태의 데이터가 넘어왔을 시? null 등으로 mapping 되지 않고 Exception 이 발생합니다. (FlatFileParseException.class)

우리는 false 로 설정하지 않고, 문제되는 FieldSet 이 발견되면 exception 을 throw 할 것 입니다.

믿고 따라오세요.

잘못된 데이터가 DB 에 입력되는 것보다 로그를 남겨 운영으로 풀어내는 것이 시스템에 더 안전합니다.

 

6) StepConfiguration

다 끝났습니다.

Step configuration 에서 추가로 설정해야하는 것들을 보겠습니다.

return stepBuilderFactory.get("csvParsingStep")
        .transactionManager(transactionManager)
        .<CsvRawUserDTO, CsvUserEntity>chunk(CSV_PARSING_STEP_CHUNK_SIZE)
        .reader(itemReader) // csvItemReader
        .processor(itemProcessor)
        .writer(itemWriter)

        // for handle exception
        .faultTolerant()

        // skip exception when convert line to POJO
        .skip(FlatFileParseException.class)

        // default policy is allow exception skip, just little bit.
        .skipPolicy(new AlwaysSkipItemSkipPolicy())
        .build();

reader 에는 조금 전에 정의한 FlatFileItemReader 를 설정해줍니다. (jobParameters 와 다른 bean 을 사용할 수 있도록 bean 으로 등록해주면 reader 의 동적 제어가 수월해집니다.)

 

writer 까지 지정하고나면, faultTolerant() 를 chaining 합니다.

.faultTolerant()

 

일반적으로 Item Reader 는 exception 이 발생하면 Step 이 ExitStatus.FAILED 를 반환합니다.

(step listener 가 있다면 stepExecution 의 failureExceptions 에서 검출이 가능합니다)

이를 막고 해당 record 는 skip 할 수 있도록 exception 을 handling 하기 위해 사용합니다.

 

다음으로 무엇을(어떤 Exception) skip 할 지, listener 에 등록해줍니다.

.skip(FlatFileParseException.class)

이 예외 클래스는 FlatFileItemReader 와 LineMapper, LineTokenizer, FieldSetMapper 에서 발생한 Exception 을 포함합니다.

 

마지막으로 얼마나 이 예외가 발생할지 모르기 때문에 항상 skip 할 수 있도록 정책을 지정합니다.

.skipPolicy(new AlwaysSkipItemSkipPolicy())

(설정하지 않으면, 10회인가 지정된 예외가 발생하여 skip 했을 때 step 이 실패하게 됩니다)

 

 

4. 결과

id 이름 나이 이메일
1 홍길동 30 engineer.myoa@gmail.com
2 김철수 kimV3@naver.com
3 박민지 모름 minji
4 은유리 25살 silve_glass@s-glass.com

 

위 csv 파일을 이용해 item reader 에 input 을 넣게되면 예상되는 결과는 다음과 같습니다.

 

  • id == 1
    • 모든 필드가 정상이므로 record 가 parsing 됩니다.
  • id == 2
    • 나이 필드가 null 이지만 CsvRawUserDTO 의 age 필드는 Long.class 이기 때문에 nullable 입니다. 따라서 정상적으로 parsing 됩니다.
    • 만약 [김철수, , kimV3@naver.com] 이 아니라  [김철수, kimV3@naver.com] 와 같이 record 가 들어있었다면  해당 record 는 exception 이 발생되고 skip 됩니다.
  • id == 3
    • 나이 필드가 String 형태의 데이터 이기 때문에 Long.parseLong 에서 NumberFormatException 이 발생하게 됩니다. 따라서 skip 됩니다.
  • id == 4
    • 나이 필드에 나이는 포함되어 있지만 뒤에 “살” 이 붙어 있기 때문에 String 형태의 데이터입니다. 이는 CsvRawUserDTO 의 age 필드를 String 타입으로 변경하고, Long 으로 getter 를 만드는 방법으로 fool proof 를 만들 수도 있습니다.

 

5. 마치며

위 작업을 통해 csv item reader 를 이용한 step 을 만들 수 있게 됐습니다.

FlatFileItemReader 의 장점은 BufferedReader 를 통해 streaming 으로 record(line) 를 읽어오기 때문에 파일 크기에 제한이 거의 없다는 점입니다.

 

이어서 다음 포스팅은 Spring Batch 에서 JdbcCursorItemReader 를 직접 제어하고,

reocrds 를 Map reduce 의 형태로 집약할 수 있는 Aggregated Item Reader 를 만들어 사용하는 방법을 전달해드리고자 합니다.

 


155개의 댓글

답글 남기기

Avatar placeholder

이메일 주소는 공개되지 않습니다. 필수 필드는 *로 표시됩니다