대규모 데이터에 대해
데이터를 일괄적으로 처리하고,처리 중 발생한 오류를 처리하며,실패한 작업을 원활하게 재시작하는 것은 중요합니다.
Spring Batch는 대규모 데이터 처리를 위한 Spring 프레임워크입니다.
일반적으로 대규모 트랜잭션을 통해 데이터를 저장할 때 사용합니다.
트랜잭션을 추적하여 관리하고, 트랜잭션 최적화 및 파티셔닝 기술을 통한 고성능 배치 작업을 통해 작업 재시작 등 대용량 레코드 처리에 필수적인 기능을 제공합니다.

Job은 배치 작업을 의미합니다. Job에는 여러 Step이 있습니다.
은행에서 매일 고객의 입금, 출금 내역을 확인한 이후 요약하여 저장하는 Job을 한다면,
Step1.내역을 확인하고
Step2.요약하여
Step3.저장하는
단계로 나뉠 것입니다.
더 구체적으로 살펴보면, 아래와 같습니다.

JobInstance는 Job의 논리적인 실행단위로, Job의 특정 실행을 구분하는 데 사용하며,
실행될 때마다 고유하고 독립적인 JobInstance가 생성됩니다.
JobExecution은 JobInstance 한번의 실행에 포함되며,
JobInstance가 실제로 실행하면서 발생하는 모든 상황(성공, 실패, 재시작, 중지 등)에 대한 정보를 포함합니다.
StepExecution은 JobExecution에 포함된 Step의 개별적인 처리과정이며,
진행 중 발생한 모든 상황에 대한 세부 정보를 포함합니다.
은행에서 고객의 입금, 출금 내역을 확인한 이후 요약하여 저장하는 Job을 한다면,
1월 1일(JobInstance 1)에 Job을 실시(JobExecution 1)합니다.
1월 2일(JobInstance 2)에 Job을 실시(JobExecution 1)하지만, 실패하여 재실시 (JobExecution 2) 합니다.
1월 1일(JobInstance 1)에
실시된 JobExecution에는
Step1-1.내역을 확인(StepExecution 1)하고
Step1-2.요약(StepExecution 1)하지만, 실패하여 재실시 (StepExecution 2) 하며,
Step1-3.저장(StepExecution 1)하는 등의 과정을 포함합니다.
이처럼 JobInstance는 논리적 단위이며, Step은 개념적인 단위로 간주할 수 있습니다.
이제 전체적인 구조를 보겠습니다.

JobRepository는 Job에 대한 상태를 관리하고 추적하며, CRUD를 제공합니다.
JobLauncher는 Job에 대한 매개변수 및 실행에 대한 인터페이스입니다.
ItemReader는 데이터에 대한 입력 및 검색을 나타내는 추상화로, DB나 파일로부터 데이터를 읽습니다.
Job에 사용할 데이터를 소진(작업 끝)하면 null을 반환합니다.
ItemProcessor는 비즈니스 처리를 나타내는 추상화입니다.
유효성 검증, 예외처리 등을 포함합니다.
ItemWriter는 한 번에 하나의 데이터(또는 채널)의 출력을 나타내는 추상화로,
다음 프로세스에 대한 정보 없이 전달받은 데이터만을 처리합니다.
Step1. 내역 확인 (StepExecution)하기 위해
ItemReader가 DB에서 사용자 데이터를 읽고,
ItemProcessor가 각 데이터의 유효성을 검증하고,
ItemWriter가 유효한 데이터를 중간 저장소에 저장합니다.
이제 Batch의 기본적인 개념들에 대해 알아보았으니, demo project를 만들겠습니다.
프로젝트 생성

세부 설정을 해주고, 의존성을 추가합니다.

Entity, Repository, csv 구성
이제 Batch 작업을 할 자료를 생성하고, Entity를 구성하겠습니다.
package com.ride.batch.student;
import jakarta.persistence.Entity;
import jakarta.persistence.GeneratedValue;
import jakarta.persistence.Id;
import lombok.Getter;
import lombok.Setter;
@Getter @Setter
@Entity
public class Student {
@Id
@GeneratedValue
private Long id;
private String name;
private String email;
private String age;
}
package com.ride.batch.student;
import org.springframework.data.jpa.repository.JpaRepository;
import org.springframework.stereotype.Repository;
@Repository
public interface StudentRepository extends JpaRepository<Student, Long> {
}
아래는 batch 작업을 통해 db에 저장할 csv 파일의 일부입니다.
id,name,email,age
1,Riley,riley1@email.com,25
2,Casey,casey2@service.com,20
3,Morgan,morgan3@internet.com,20
application.proeprties
spring.datasource.url=jdbc:mariadb://localhost:3306/file-upload
spring.datasource.username=ride
spring.datasource.password=mypassword
spring.jpa.properties.hibernate.dialect=org.hibernate.dialect.MariaDBDialect
spring.jpa.hibernate.ddl-auto=create
spring.jpa.show-sql=false
spring.batch.jdbc.initialize-schema=ALWAYS
spring.batch.job.enabled=false
datasource는 localhost의 mariadb를 사용하고, file-upload라는 이름의 데이터베이스를 사용합니다.
db 사용자 명은 ride, 비밀번호는 mypassword입니다.
ddl-auto를 create으로 설정했기 때문에,
애플리케이션이 실행될 때마다 db 테이블이 생성됩니다.
(편의상 create을 사용했습니다, 이후 update로 변경하여 Entity관계 등이 변경되었을 때 적용하도록 할 수 있습니다)
애플리케이션이 실행될 때마다 batch 스키마가 초기화됩니다.
(이 옵션을 never로 하고 schema-mariadb.sql의 ddl을 실행해도 됩니다)



CREATE TABLE BATCH_JOB_INSTANCE (
JOB_INSTANCE_ID BIGINT NOT NULL PRIMARY KEY ,
VERSION BIGINT ,
JOB_NAME VARCHAR(100) NOT NULL,
JOB_KEY VARCHAR(32) NOT NULL,
constraint JOB_INST_UN unique (JOB_NAME, JOB_KEY)
) ENGINE=InnoDB;
CREATE TABLE BATCH_JOB_EXECUTION (
JOB_EXECUTION_ID BIGINT NOT NULL PRIMARY KEY ,
VERSION BIGINT ,
JOB_INSTANCE_ID BIGINT NOT NULL,
CREATE_TIME DATETIME(6) NOT NULL,
START_TIME DATETIME(6) DEFAULT NULL ,
END_TIME DATETIME(6) DEFAULT NULL ,
STATUS VARCHAR(10) ,
EXIT_CODE VARCHAR(2500) ,
EXIT_MESSAGE VARCHAR(2500) ,
LAST_UPDATED DATETIME(6),
constraint JOB_INST_EXEC_FK foreign key (JOB_INSTANCE_ID)
references BATCH_JOB_INSTANCE(JOB_INSTANCE_ID)
) ENGINE=InnoDB;
CREATE TABLE BATCH_JOB_EXECUTION_PARAMS (
JOB_EXECUTION_ID BIGINT NOT NULL ,
PARAMETER_NAME VARCHAR(100) NOT NULL ,
PARAMETER_TYPE VARCHAR(100) NOT NULL ,
PARAMETER_VALUE VARCHAR(2500) ,
IDENTIFYING CHAR(1) NOT NULL ,
constraint JOB_EXEC_PARAMS_FK foreign key (JOB_EXECUTION_ID)
references BATCH_JOB_EXECUTION(JOB_EXECUTION_ID)
) ENGINE=InnoDB;
CREATE TABLE BATCH_STEP_EXECUTION (
STEP_EXECUTION_ID BIGINT NOT NULL PRIMARY KEY ,
VERSION BIGINT NOT NULL,
STEP_NAME VARCHAR(100) NOT NULL,
JOB_EXECUTION_ID BIGINT NOT NULL,
CREATE_TIME DATETIME(6) NOT NULL,
START_TIME DATETIME(6) DEFAULT NULL ,
END_TIME DATETIME(6) DEFAULT NULL ,
STATUS VARCHAR(10) ,
COMMIT_COUNT BIGINT ,
READ_COUNT BIGINT ,
FILTER_COUNT BIGINT ,
WRITE_COUNT BIGINT ,
READ_SKIP_COUNT BIGINT ,
WRITE_SKIP_COUNT BIGINT ,
PROCESS_SKIP_COUNT BIGINT ,
ROLLBACK_COUNT BIGINT ,
EXIT_CODE VARCHAR(2500) ,
EXIT_MESSAGE VARCHAR(2500) ,
LAST_UPDATED DATETIME(6),
constraint JOB_EXEC_STEP_FK foreign key (JOB_EXECUTION_ID)
references BATCH_JOB_EXECUTION(JOB_EXECUTION_ID)
) ENGINE=InnoDB;
CREATE TABLE BATCH_STEP_EXECUTION_CONTEXT (
STEP_EXECUTION_ID BIGINT NOT NULL PRIMARY KEY,
SHORT_CONTEXT VARCHAR(2500) NOT NULL,
SERIALIZED_CONTEXT TEXT ,
constraint STEP_EXEC_CTX_FK foreign key (STEP_EXECUTION_ID)
references BATCH_STEP_EXECUTION(STEP_EXECUTION_ID)
) ENGINE=InnoDB;
CREATE TABLE BATCH_JOB_EXECUTION_CONTEXT (
JOB_EXECUTION_ID BIGINT NOT NULL PRIMARY KEY,
SHORT_CONTEXT VARCHAR(2500) NOT NULL,
SERIALIZED_CONTEXT TEXT ,
constraint JOB_EXEC_CTX_FK foreign key (JOB_EXECUTION_ID)
references BATCH_JOB_EXECUTION(JOB_EXECUTION_ID)
) ENGINE=InnoDB;
CREATE SEQUENCE BATCH_STEP_EXECUTION_SEQ START WITH 1 MINVALUE 1 MAXVALUE 9223372036854775806 INCREMENT BY 1 NOCACHE NOCYCLE ENGINE=InnoDB;
CREATE SEQUENCE BATCH_JOB_EXECUTION_SEQ START WITH 1 MINVALUE 1 MAXVALUE 9223372036854775806 INCREMENT BY 1 NOCACHE NOCYCLE ENGINE=InnoDB;
CREATE SEQUENCE BATCH_JOB_SEQ START WITH 1 MINVALUE 1 MAXVALUE 9223372036854775806 INCREMENT BY 1 NOCACHE NOCYCLE ENGINE=InnoDB;
애플리케이션이 실행되면서 job이 실행되지 않게 합니다.
Batch Config 구성
앞서 작성한 것을 바탕으로 Batch Job을 생성하겠습니다.
package com.ride.batch.config;
import com.ride.batch.student.Student;
import com.ride.batch.student.StudentRepository;
import lombok.RequiredArgsConstructor;
import org.springframework.batch.core.Job;
import org.springframework.batch.core.Step;
import org.springframework.batch.core.job.builder.JobBuilder;
import org.springframework.batch.core.repository.JobRepository;
import org.springframework.batch.core.step.builder.StepBuilder;
import org.springframework.batch.item.data.RepositoryItemWriter;
import org.springframework.batch.item.file.FlatFileItemReader;
import org.springframework.batch.item.file.LineMapper;
import org.springframework.batch.item.file.mapping.BeanWrapperFieldSetMapper;
import org.springframework.batch.item.file.mapping.DefaultLineMapper;
import org.springframework.batch.item.file.transform.DelimitedLineTokenizer;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.core.io.FileSystemResource;
import org.springframework.core.task.SimpleAsyncTaskExecutor;
import org.springframework.core.task.TaskExecutor;
import org.springframework.transaction.PlatformTransactionManager;
@Configuration
@RequiredArgsConstructor
public class BatchConfig {
private final JobRepository jobRepository;
private final PlatformTransactionManager platformTransactionManager; // 트랜잭션 관리자 객체
private final StudentRepository studentRepository;
@Bean
public FlatFileItemReader<Student> reader() {
FlatFileItemReader<Student> itemReader = new FlatFileItemReader<>();
itemReader.setResource(new FileSystemResource("src/main/resources/students.csv")); // api 나 파일로부터 작업을 처리하도록 할 수 있음
itemReader.setName("csvReader"); // itemReader 이름 설정
itemReader.setLinesToSkip(1); // 건너뛸 줄 설정
itemReader.setLineMapper(lineMapper());
return itemReader;
}
@Bean
public StudentProcessor processor() {
return new StudentProcessor();
}
@Bean
public RepositoryItemWriter<Student> writer() {
RepositoryItemWriter<Student> writer = new RepositoryItemWriter<>();
writer.setRepository(studentRepository);
writer.setMethodName("save");
return writer;
}
@Bean
public Step importStep() {
return new StepBuilder("csvImport", jobRepository)
.<Student, Student>chunk(1000, platformTransactionManager) // 한번에 처리하려는 레코드 라인 수
.reader(reader())
.processor(processor())
.writer(writer())
.taskExecutor(taskExecutor())
.build();
}
@Bean
public TaskExecutor taskExecutor() {
SimpleAsyncTaskExecutor asyncTaskExecutor = new SimpleAsyncTaskExecutor();
asyncTaskExecutor.setConcurrencyLimit(10); // 비동기 작업 수 설정, -1은 동시성 제한 없는 것
return asyncTaskExecutor;
}
@Bean
public Job runJob() {
return new JobBuilder("importStudents", jobRepository)
.start(importStep()) // .next 를 사용하여 다음 작업 수행할 수도 있음
.build();
}
private LineMapper<Student> lineMapper() {
DefaultLineMapper<Student> lineMapper = new DefaultLineMapper<>();
DelimitedLineTokenizer lineTokenizer = new DelimitedLineTokenizer();
lineTokenizer.setDelimiter(","); // 데이터 쉼표로 구분
lineTokenizer.setStrict(false);
lineTokenizer.setNames("id", "name", "email", "age"); // 요소의 열을 구분
BeanWrapperFieldSetMapper<Student> fieldSetMapper = new BeanWrapperFieldSetMapper<>();
fieldSetMapper.setTargetType(Student.class); // 파일을 객체로 변환할 수 있도록 도와주는 객체
lineMapper.setLineTokenizer(lineTokenizer);
lineMapper.setFieldSetMapper(fieldSetMapper);
return lineMapper;
}
}
이제 위 코드를 하나씩 보겠습니다.
private final JobRepository jobRepository;
private final PlatformTransactionManager platformTransactionManager; // 트랜잭션 관리자 객체
private final StudentRepository studentRepository;
JobRepository는 Job에 대한 상태를 관리하고 추적하며, CRUD를 제공합니다.
PlatformTransactionManager는 트랙잭션 관리자 객체입니다.
위 두 Bean은 application.properties에 있는 내용을 바탕으로 자동설정됩니다.
(예시: mariaDB, JPA)
(만약 멀티플 데이터 소스를 사용한다면, 수동으로 설정해줘야 합니다. 이 부분은 다른 게시글에서 다루도록 하겠습니다)
@Bean
public FlatFileItemReader<Student> reader() {
FlatFileItemReader<Student> itemReader = new FlatFileItemReader<>();
itemReader.setResource(new FileSystemResource("src/main/resources/students.csv")); // api 나 파일로부터 작업을 처리하도록 할 수 있음
itemReader.setName("csvReader"); // itemReader 이름 설정
itemReader.setLinesToSkip(1); // 건너뛸 줄 설정
itemReader.setLineMapper(lineMapper());
return itemReader;
}
private LineMapper<Student> lineMapper() {
DefaultLineMapper<Student> lineMapper = new DefaultLineMapper<>();
DelimitedLineTokenizer lineTokenizer = new DelimitedLineTokenizer();
lineTokenizer.setDelimiter(","); // 데이터 쉼표로 구분
lineTokenizer.setStrict(false);
lineTokenizer.setNames("id", "name", "email", "age"); // 요소의 열을 구분
BeanWrapperFieldSetMapper<Student> fieldSetMapper = new BeanWrapperFieldSetMapper<>();
fieldSetMapper.setTargetType(Student.class); // 파일을 객체로 변환할 수 있도록 도와주는 객체
lineMapper.setLineTokenizer(lineTokenizer);
lineMapper.setFieldSetMapper(fieldSetMapper);
return lineMapper;
}
ItemReader와 lineMapper입니다.
Batch 작업에 사용할 파일을 읽어옵니다.
이 프로젝트에서는 csv파일을 읽어옵니다.
csv파일의 2번째 줄부터 읽을 것이고, 데이터를 쉼표로 구분하도록 합니다.
약 요소를 어떻게 읽을지 setName으로 지정합니다.
지정할 때는 DB의 컬럼이 아닌, Entity클래스에 정의된 필드를 기준으로 합니다.
id,name,email,age
1,Riley,riley1@email.com,25
2,Casey,casey2@service.com,20
3,Morgan,morgan3@internet.com,20
@Bean
public StudentProcessor processor() {
return new StudentProcessor();
}
ItemProcessor입니다.
ItemReader로 가져온 데이터를 가공하는 영역입니다.
비즈니스 로직을 적용할 수도 있습니다.
package com.ride.batch.config;
import com.ride.batch.student.Student;
import org.springframework.batch.item.ItemProcessor;
public class StudentProcessor implements ItemProcessor<Student, Student> { // <입력, 출력>
@Override
public Student process(Student student) {
// all the business logic goes here
// 비즈니스 로직을 아래에 작성
student.setId(null);
return student;
}
}
Sutudent 객체의 Id를 @GeneratedeValue로 했기 때문에 null로 처리할 수 있습니다.
위 예제에선 Student를 받아서 Student로 가공했지만,
StudentDto를 받아서 Student로 가공할 수도 있습니다.
@Bean
public RepositoryItemWriter<Student> writer() {
RepositoryItemWriter<Student> writer = new RepositoryItemWriter<>();
writer.setRepository(studentRepository);
writer.setMethodName("save");
return writer;
}
ItemWriter입니다.
데이터를 저장할 때 사용할 repository를 지정하고, 어떤 메서드를 사용할지 설정합니다.
(save는 JPARepository에서 기본으로 생성하는 메서드입니다,
필요에 따라 메서드를 repository에 정의하여 사용합니다)
@Bean
public Step importStep() {
return new StepBuilder("csvImport", jobRepository)
.<Student, Student>chunk(1000, platformTransactionManager) // 한번에 처리하려는 레코드 라인 수
.reader(reader())
.processor(processor())
.writer(writer())
.taskExecutor(taskExecutor())
.build();
}
ItemStep입니다.
Step에 대한 세부 사항을 지정합니다.
jobRepository로 추적할 수 있도록 step의 이름을 지정합니다.
입력받은 데이터와 출력할 데이터의 형식을 지정하고, 한 번에 처리할 레코드 라인 수를 지정합니다.
(1000개의 데이터가 쌓이면 실행하도록 했습니다)
동일한 BatchConfig 내에 정의한 reader, processor, writer를 사용합니다.
(위에 정의한 메서드)
@Bean
public TaskExecutor taskExecutor() {
SimpleAsyncTaskExecutor asyncTaskExecutor = new SimpleAsyncTaskExecutor();
asyncTaskExecutor.setConcurrencyLimit(10); // 비동기 작업 수 설정, -1은 동시성 제한 없는 것
return asyncTaskExecutor;
}
taskExecutor를 사용하여 Batch Job을 비동기 작업으로 진행합니다.
비동기 작업을 사용하여 Batch Job의 진행속도를 빠르게 만들 수 있습니다.
(10만개의 작업을 진행했을 때 제 PC에선 6분 -> 2분으로 크게 감소했습니다)
단, 다른 최적화 작업을 먼저 진행하는 것을 권장합니다.
(비동기 작업은 컴퓨터의 성능을 활용하는 것으로 언제나 빠른 속도를 보장합니다,
따라서 개발자가 시도할 수 있는 모든 최적화를 시도한 뒤 최후의 선택지로 비동기 작업을 진행하도록 합니다,
물론, 주어진 리소스에 따라 비동기 작업을 우선할 수 있습니다)
@Bean
public Job runJob() {
return new JobBuilder("importStudents", jobRepository)
.start(importStep()) // .next 를 사용하여 다음 작업 수행할 수도 있음
.build();
}
앞서 정의한 Step을 Job에 주입합니다.
이제 이 Job은 importStudents라는 이름으로 추적할 수 있습니다.
Job은 Bean이름 (runJob) 으로 실행합니다.
Controller
package com.ride.batch.student;
import lombok.RequiredArgsConstructor;
import org.springframework.batch.core.Job;
import org.springframework.batch.core.JobParameters;
import org.springframework.batch.core.JobParametersBuilder;
import org.springframework.batch.core.JobParametersInvalidException;
import org.springframework.batch.core.launch.JobLauncher;
import org.springframework.batch.core.repository.JobExecutionAlreadyRunningException;
import org.springframework.batch.core.repository.JobInstanceAlreadyCompleteException;
import org.springframework.batch.core.repository.JobRestartException;
import org.springframework.web.bind.annotation.PostMapping;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;
@RestController
@RequestMapping("/students")
@RequiredArgsConstructor
public class StudentController {
private final JobLauncher jobLauncher;
private final Job job;
@PostMapping
public void importCsvToDBJob() {
JobParameters jobParameters = new JobParametersBuilder()
.addLong("startAt", System.currentTimeMillis())
.toJobParameters();
try{
jobLauncher.run(job, jobParameters);
} catch (JobInstanceAlreadyCompleteException
| JobExecutionAlreadyRunningException
| JobParametersInvalidException
| JobRestartException e) {
e.printStackTrace();
}
}
}
localhost:8080/students를 Post로 접속하면 Job이 실행됩니다.
이 프로젝트에선 Job이 하나이기 때문에 Bean이름을 명시하지 않았습니다.
만약 여러 Job을 정의하고, 특정 Job을 실행하고자 한다면, 아래와 같이 작성합니다.
@RestController
@RequestMapping("/students")
public class StudentController {
private final JobLauncher jobLauncher;
private final Job myJob;
public StudentController(@Qualifier("runJob") Job myJob, JobLauncher jobLauncher){
this.myJob = myJob;
this.jobLauncher = jobLauncher;
}
}
지금까지 Spring Batch에 대해 가볍게 알아보았습니다.
Spring Batch를 통해 대규모 데이터 처리 작업을 효율적이고 안정적으로 처리할 수 있습니다.
이 게시글은 Spring Batch의 도입부에 해당합니다.
프로젝트에 맞게 적용하고, 최적화하는 것이 중요합니다.