[Spring Batch] Multiple Data source를 활용한 Spring Batch
한국석유공사-오피넷(외부 api)로부터 전국 유가 평균, 지역별 저렴 주유소 등의 json data와,
한국석유공사-오피넷에서 전체 주유소 정보 및 주유소 가격을 csv파일로 받아서 DB에 삽입하기 위해,
Spring Batch을 구현했습니다.
다중 데이터 소스
프로젝트에 다중 데이터 소스를 사용한 이유는 다음과 같습니다.
Batch Job을 진행하는 동안 DB에 부하가 가해질 수 있으며,
부하가 조회 성능을 저하시킬 우려가 있었습니다.
따라서,
데이터 입력에는 MariaDB를 사용하였고,
데이터 조회에는 Redis를 사용하여 성능을 높였습니다.
인메모리형 데이터 스토리지인 Redis를 캐시 스토리지로 사용한 이유는 Redis가 2ms 미만의 조회 성능을 지녔고,
다양한 자료형을 다룰 수 있으며, NoSQL로 확장성 및 유연성(sacle-out 수평적 확장)이 뛰어나기 때문입니다.
또한 도메인별로 DB를 분리했는데,
각 도메인별로 데이터베이스를 분리하면,
특정 도메인의 데이터 접근 빈도나 데이터 양이 급격히 증가할 때,
해당 데이터를 중심으로 서버를 추가하는 수평적 확장(sacle-out)을 할 수 있기 때문입니다.
(전체 시스템의 안정성을 유지하면서 처리 능력을 효율적으로 증가시킬 수 있습니다)
수직적 확장은 서버 하나의 성능을 스펙을 증가시켜 성능을 증가시키는 것이고,
수평적 확장은 서버의 대수를 늘려 성능을 증가시키는 것이라고 볼 수 있습니다.
매일 데이터를 받아야 하기 때문에 Scheduler 또한 사용했습니다.
api로부터 데이터를 받아 저장할 때, db에 저장하는 것과 동시에 cache 스토리지에 저장하는 것으로 데이터 일관성을 확보했습니다.
build.gradle
먼저, 의존성을 추가합니다.
//actuactor
implementation 'org.springframework.boot:spring-boot-starter-actuator'
//jpa
implementation 'org.springframework.boot:spring-boot-starter-data-jpa'
//db
implementation 'org.springframework.boot:spring-boot-starter-data-redis'
runtimeOnly 'org.mariadb.jdbc:mariadb-java-client'
//lombok
compileOnly 'org.projectlombok:lombok'
annotationProcessor 'org.projectlombok:lombok'
//batch
implementation 'org.springframework.boot:spring-boot-starter-batch'
모니터링을 위한 actuator 등 의존성을 추가해줍니다.
applicatiom.properties
application.properties에서 Spring Auto Configruration에 참조할 값, 스프링 애플리케이션 내에서 사용할 환경변수를 선언합니다.
logging.level.org.springframework=info
logging.level.com.pj=debug
# spring boot env
api.baseurl=https://www.opinet.co.kr/api/
api.datatype=json
api.key=한국석유공사api키
# mariadb gas_station
spring.datasource.gas-station.jdbc-url=jdbc:mariadb://mariadb1:3306/gas_station
spring.datasource.gas-station.username=ride
spring.datasource.gas-station.password=mypassword
# mariadb member_post
spring.datasource.member-post.jdbc-url=jdbc:mariadb://mariadb2:3306/member_post
spring.datasource.member-post.username=ride
spring.datasource.member-post.password=mypassword
# redis
spring.refresh-token.redis.host=redis
spring.refresh-token.redis.port=6379
spring.gas-station.redis.host=redis2
spring.gas-station.redis.port=6380
docker-compose
version: '3.8'
services:
springboot-app:
build:
context: .
dockerfile: Dockerfile
volumes:
- ./src/main/resources/application.properties:/app/src/main/resources/application.properties
environment:
TZ: Asia/Seoul
ports:
- "8080:8080"
depends_on:
- mariadb1
- mariadb2
- redis
- redis2
redis:
image: redis:latest
container_name: redis
ports:
- "6379:6379"
command: ["redis-server", "--port", "6379"]
environment:
TZ: Asia/Seoul
redis2:
image: redis:latest
container_name: redis2
ports:
- "6380:6379"
command: ["redis-server", "--port", "6380"]
environment:
TZ: Asia/Seoul
mariadb1:
image: mariadb:latest
container_name: mariadb1
volumes:
- mariadb1_data:/var/lib/mysql
ports:
- "3306:3306"
environment:
MARIADB_ROOT_PASSWORD: mypassword
MARIADB_USER: ride
MARIADB_PASSWORD: mypassword
MARIADB_DATABASE: gas_station
TZ: Asia/Seoul
mariadb2:
image: mariadb:latest
container_name: mariadb2
volumes:
- mariadb2_data:/var/lib/mysql
ports:
- "3307:3306"
environment:
MARIADB_ROOT_PASSWORD: mypassword
MARIADB_USER: ride
MARIADB_PASSWORD: mypassword
MARIADB_DATABASE: member_post
TZ: Asia/Seoul
volumes:
mariadb1_data:
mariadb2_data:
컨테이너가 종료되었을 때 데이터가 손실되지 않도록 volumes를 지정합니다.
spring-app은 Dockerfile을 생성하여 다양한 설정을 가능하게 합니다.
# Java 17 JDK를 사용하는 베이스 이미지
FROM openjdk:17-jdk-slim as build
# 작업 디렉토리 설정
WORKDIR /app
# Gradle 래퍼와 프로젝트 파일 복사
COPY gradlew .
COPY gradle gradle
COPY build.gradle .
COPY settings.gradle .
COPY src src
# gradlew 파일의 줄바꿈 포맷을 LF로 변경 및 실행 권한 부여
RUN sed -i 's/\r$//' ./gradlew && chmod +x ./gradlew
# 애플리케이션 빌드 (단위 테스트 제외)
RUN ./gradlew build -x test
COPY /app/build/libs/*.jar app.jar
ENTRYPOINT ["java","-Dfile.encoding=UTF-8","-jar","app.jar"]
DataSourceConfig
다중 데이터 소스를 활용하기 위해 application.properties에 db연결 정보를 정의하고, DataSource에 주입합니다.
package com.pj.oil;
import jakarta.persistence.EntityManagerFactory;
import org.springframework.boot.autoconfigure.domain.EntityScan;
import org.springframework.boot.context.properties.ConfigurationProperties;
import org.springframework.boot.jdbc.DataSourceBuilder;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.context.annotation.Primary;
import org.springframework.data.jpa.repository.config.EnableJpaRepositories;
import org.springframework.jdbc.core.JdbcTemplate;
import org.springframework.orm.jpa.LocalContainerEntityManagerFactoryBean;
import org.springframework.orm.jpa.JpaTransactionManager;
import org.springframework.orm.jpa.vendor.HibernateJpaVendorAdapter;
import org.springframework.transaction.PlatformTransactionManager;
import org.springframework.transaction.TransactionManager;
import org.springframework.transaction.annotation.EnableTransactionManagement;
import org.springframework.boot.orm.jpa.EntityManagerFactoryBuilder;
import org.springframework.beans.factory.annotation.Qualifier;
import javax.sql.DataSource;
import java.util.HashMap;
import java.util.Map;
@Configuration
@EnableTransactionManagement
@EnableJpaRepositories(
basePackages = "com.pj.oil.gasStation",
transactionManagerRef = "gasStationTransactionManager",
entityManagerFactoryRef = "gasStationEntityManagerFactory"
)
@EntityScan("com.pj.oil.gasStation")
public class DataSourceConfig {
@Bean(name = "gasStationDataSource")
@ConfigurationProperties(prefix = "spring.datasource.gas-station")
public DataSource gasStationDataSource() {
return DataSourceBuilder.create().build();
}
@Bean(name = "gasStationEntityManagerFactoryBuilder")
public EntityManagerFactoryBuilder gasStationEntityManagerFactoryBuilder() {
return new EntityManagerFactoryBuilder(new HibernateJpaVendorAdapter(), new HashMap<>(), null);
}
@Bean(name = "gasStationEntityManagerFactory")
public LocalContainerEntityManagerFactoryBean gasStationEntityManagerFactory(
@Qualifier("gasStationEntityManagerFactoryBuilder") EntityManagerFactoryBuilder builder,
@Qualifier("gasStationDataSource") DataSource dataSource) {
Map<String, Object> jpaProperties = new HashMap<>();
jpaProperties.put("hibernate.dialect", "org.hibernate.dialect.MariaDBDialect");
jpaProperties.put("hibernate.hbm2ddl.auto", "update");
jpaProperties.put("hibernate.show_sql", true);
jpaProperties.put("hibernate.format_sql", true);
jpaProperties.put("hibernate.jdbc.batch_size", 1000);
jpaProperties.put("hibernate.order_inserts", true);
jpaProperties.put("hibernate.order_deletes", true);
jpaProperties.put("hibernate.order_updates", true);
return builder
.dataSource(dataSource)
.properties(jpaProperties)
.packages("com.pj.oil.gasStation")
.persistenceUnit("gasStation")
.build();
}
@Bean(name = "gasStationTransactionManager")
public PlatformTransactionManager gasStationTransactionManager(
@Qualifier("gasStationEntityManagerFactory") EntityManagerFactory entityManagerFactory) {
JpaTransactionManager transactionManager = new JpaTransactionManager();
transactionManager.setEntityManagerFactory(entityManagerFactory);
return transactionManager;
}
@Bean(name = "gasStationJdbcTemplate")
public JdbcTemplate gasStationJdbcTemplate(@Qualifier("gasStationDataSource") DataSource dataSource) {
return new JdbcTemplate(dataSource);
}
}
DataSource에 대한 SpringConfiguration을 수동으로 정의합니다.
주의 깊게 봐야할 빈은 세 개 입니다.
첫 번째 빈은 application.properties에 선언했던 username, passowrd, jdbc-url을 주입받는,
gasStationDataSource 빈입니다.
@Bean(name = "gasStationDataSource")
@ConfigurationProperties(prefix = "spring.datasource.gas-station")
public DataSource gasStationDataSource() {
return DataSourceBuilder.create().build();
}
두 번째 빈은 EntityManagerFactory를 구현-상속 하는 gasStationEntotyManagerFactory Bean입니다.
EntityManagerFactory는 엔티티를 관리하고,
JPA를 통해 데이터베이스 작업을 수행하는 데 필요한 API를 제공합니다)
@Bean(name = "gasStationEntityManagerFactory")
public LocalContainerEntityManagerFactoryBean gasStationEntityManagerFactory(
@Qualifier("gasStationEntityManagerFactoryBuilder") EntityManagerFactoryBuilder builder,
@Qualifier("gasStationDataSource") DataSource dataSource) {
Map<String, Object> jpaProperties = new HashMap<>();
jpaProperties.put("hibernate.dialect", "org.hibernate.dialect.MariaDBDialect");
jpaProperties.put("hibernate.hbm2ddl.auto", "update");
jpaProperties.put("hibernate.show_sql", true);
jpaProperties.put("hibernate.format_sql", true);
jpaProperties.put("hibernate.jdbc.batch_size", 1000);
jpaProperties.put("hibernate.order_inserts", true);
jpaProperties.put("hibernate.order_deletes", true);
jpaProperties.put("hibernate.order_updates", true);
return builder
.dataSource(dataSource)
.properties(jpaProperties)
.packages("com.pj.oil.gasStation")
.persistenceUnit("gasStation")
.build();
}
application.properties에 선언했던 내용을 직접 주입합니다.
order_ 옵션을 통해 같은 유형의 SQL문을 순차적으로 처리할 수 있게 합니다.
(처리 순서를 최적화합니다)
hibernate.jdbc.batch_size은 hirbernate가 한 번에 db로 전송할 수 있는 sql문의 수를 지정하는 것입니다.
(db와 server간의 왕복을 round-trip이라고 합니다)
예컨대 1000개의 sql이 개별적으로 처리되면, 1000번의 round-trip이 발생하고,
네트워크 지연, 오버헤드 발생으로 인해 성능이 저하될 수 있습니다.
따라서 batch size를 설정해주어 여러 개의 요청을 하나의 round-trip으로 처리합니다.
(1000개의 SQL문을 하나의 SQL문으로 엮는 것이 아니라, 한 번의 전송으로 여러 개의 요청을 처리하는 것입니다)
세 번째 빈은 gasStationJdbcTemplate입니다.
Batch Job을 최적화하기 위해 주입했습니다.
@Bean(name = "gasStationJdbcTemplate")
public JdbcTemplate gasStationJdbcTemplate(@Qualifier("gasStationDataSource") DataSource dataSource) {
return new JdbcTemplate(dataSource);
}
batch
BatchConfig에 대한 공통 구조 class입니다.
(JobRepository를 설정하여, 배치 작업의 메타데이터(작업 실행 상태, 재시도 횟수 등)를 관리합니다)
package com.pj.oil.batch;
import org.springframework.batch.core.DefaultJobKeyGenerator;
import org.springframework.batch.core.repository.ExecutionContextSerializer;
import org.springframework.batch.core.repository.JobRepository;
import org.springframework.batch.core.repository.dao.DefaultExecutionContextSerializer;
import org.springframework.batch.core.repository.support.JobRepositoryFactoryBean;
import org.springframework.batch.item.database.support.DefaultDataFieldMaxValueIncrementerFactory;
import org.springframework.batch.support.DatabaseType;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.core.convert.support.ConfigurableConversionService;
import org.springframework.core.convert.support.DefaultConversionService;
import org.springframework.jdbc.core.JdbcTemplate;
import org.springframework.transaction.PlatformTransactionManager;
import javax.sql.DataSource;
@Configuration
public class BatchInfrastructureConfig {
@Bean(name = "gasStationJobRepository")
public JobRepository jobRepository(@Qualifier("gasStationDataSource") DataSource dataSource,
@Qualifier("gasStationTransactionManager") PlatformTransactionManager transactionManager) {
JobRepositoryFactoryBean factory = new JobRepositoryFactoryBean();
// JdbcOperations 설정
JdbcTemplate jdbcTemplate = new JdbcTemplate(dataSource);
factory.setJdbcOperations(jdbcTemplate);
// ConversionService 설정
ConfigurableConversionService conversionService = new DefaultConversionService();
factory.setConversionService(conversionService);
// Serializer 설정
ExecutionContextSerializer serializer = new DefaultExecutionContextSerializer();
factory.setSerializer(serializer);
// incrementerFactory 설정
DefaultDataFieldMaxValueIncrementerFactory incrementerFactory =
new DefaultDataFieldMaxValueIncrementerFactory(dataSource);
factory.setIncrementerFactory(incrementerFactory);
// jobKeyGenerator 설정
factory.setJobKeyGenerator(new DefaultJobKeyGenerator());
factory.setDataSource(dataSource);
factory.setTransactionManager(transactionManager);
factory.setIsolationLevelForCreate("ISOLATION_SERIALIZABLE");
factory.setTablePrefix("BATCH_");
// 데이터베이스 유형을 설정
factory.setDatabaseType(DatabaseType.MARIADB.name());
try {
return factory.getObject();
} catch (Exception e) {
throw new RuntimeException("JobRepository 생성 중 오류 발생", e);
}
}
}
아래는 Batch Job을 정의한 Config class입니다
package com.pj.oil.batch.apiConfig;
import com.pj.oil.batch.BeforeJobExecutionListener;
import com.pj.oil.batch.process.GasStationLpgProcess;
import com.pj.oil.batch.writer.GasStationLpgWriter;
import com.pj.oil.gasStation.entity.maria.GasStationLpg;
import com.pj.oil.util.DateUtil;
import org.springframework.batch.core.Job;
import org.springframework.batch.core.Step;
import org.springframework.batch.core.configuration.annotation.EnableBatchProcessing;
import org.springframework.batch.core.configuration.annotation.StepScope;
import org.springframework.batch.core.job.builder.JobBuilder;
import org.springframework.batch.core.repository.JobRepository;
import org.springframework.batch.core.step.builder.StepBuilder;
import org.springframework.batch.item.ItemWriter;
import org.springframework.batch.item.file.FlatFileItemReader;
import org.springframework.batch.item.file.LineMapper;
import org.springframework.batch.item.file.mapping.BeanWrapperFieldSetMapper;
import org.springframework.batch.item.file.mapping.DefaultLineMapper;
import org.springframework.batch.item.file.transform.DelimitedLineTokenizer;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.core.io.FileSystemResource;
import org.springframework.jdbc.core.JdbcTemplate;
import org.springframework.transaction.PlatformTransactionManager;
@Configuration
@EnableBatchProcessing(
dataSourceRef = "gasStationDataSource",
transactionManagerRef = "gasStationTransactionManager")
public class LpgBatchConfig {
private final PlatformTransactionManager platformTransactionManager;
private final JobRepository jobRepository;
private final DateUtil dateUtil;
private final String READER_PATH;
private final BeforeJobExecutionListener beforeJobExecutionListener;
private final JdbcTemplate jdbcTemplate;
public LpgBatchConfig(@Qualifier("gasStationTransactionManager") PlatformTransactionManager platformTransactionManager,
@Qualifier("gasStationJobRepository") JobRepository jobRepository,
DateUtil dateUtil,
BeforeJobExecutionListener beforeJobExecutionListener,
@Qualifier("gasStationJdbcTemplate") JdbcTemplate jdbcTemplate
) {
this.platformTransactionManager = platformTransactionManager;
this.jobRepository = jobRepository;
this.dateUtil = dateUtil;
this.READER_PATH = "src/main/resources/csv/" + dateUtil.getTodayDateString() + "/" + dateUtil.getTodayDateString() + "-";
this.beforeJobExecutionListener = beforeJobExecutionListener;
this.jdbcTemplate = jdbcTemplate;
}
@Bean(name = "lpgReader")
@StepScope
public FlatFileItemReader<GasStationLpg> reader() {
FlatFileItemReader<GasStationLpg> itemReader = new FlatFileItemReader<>();
String path = READER_PATH + "basic-info-lpg.csv";
itemReader.setResource(new FileSystemResource(path)); // api 나 파일로부터 작업을 처리하도록 할 수 있음
itemReader.setName("csvReader"); // itemReader 이름 설정
itemReader.setEncoding("UTF-8");
itemReader.setLinesToSkip(1); // 건너뛸 줄 설정
itemReader.setLineMapper(lineMapper());
return itemReader;
}
@Bean(name = "lpgProcess")
public GasStationLpgProcess processor() {
return new GasStationLpgProcess();
}
private LineMapper<GasStationLpg> lineMapper() {
DefaultLineMapper<GasStationLpg> lineMapper = new DefaultLineMapper<>();
DelimitedLineTokenizer lineTokenizer = new DelimitedLineTokenizer();
lineTokenizer.setDelimiter(","); // 데이터 쉼표로 구분
lineTokenizer.setStrict(false);
lineTokenizer.setIncludedFields(0, 1, 2, 3, 4); // csv 에서 특정 열을 선택
lineTokenizer.setNames("uniId", "area", "osName", "pollDivName", "newAddress"); // 요소의 열을 구분
// 고유번호,지역,상호,상표,주소,전화번호
BeanWrapperFieldSetMapper<GasStationLpg> fieldSetMapper = new BeanWrapperFieldSetMapper<>();
fieldSetMapper.setTargetType(GasStationLpg.class); // 파일을 객체로 변환할 수 있도록 도와주는 객체
lineMapper.setLineTokenizer(lineTokenizer);
lineMapper.setFieldSetMapper(fieldSetMapper);
return lineMapper;
}
@Bean(name = "lpgWriter")
public ItemWriter<GasStationLpg> writer() {
return new GasStationLpgWriter(jdbcTemplate, dateUtil);
}
@Bean(name = "lpgImportStep")
public Step importStep() {
return new StepBuilder("lpgCsvImport", jobRepository)
.<GasStationLpg, GasStationLpg>chunk(1000, platformTransactionManager) // 한번에 처리하려는 레코드 라인 수
.reader(reader())
.processor(processor())
.writer(writer())
// .taskExecutor(taskExecutor())
.build();
}
// @Bean
// public TaskExecutor taskExecutor() {
// SimpleAsyncTaskExecutor asyncTaskExecutor = new SimpleAsyncTaskExecutor();
// asyncTaskExecutor.setConcurrencyLimit(10); // 비동기 작업 수 설정, -1은 동시성 제한 없는 것
// return asyncTaskExecutor;
// }
@Bean(name = "lpgJob")
public Job runJob() {
return new JobBuilder("importLpg", jobRepository)
.start(importStep()) // .next 를 사용하여 다음 작업 수행할 수도 있음
.listener(beforeJobExecutionListener)
.build();
}
}
lineMapper를 통해 csv 파일을 Java 객체로 변환합니다,
setIncludedFields를 통해 csv 파일의 내용 중 필요한 열만 선택하여 DB에 삽입합니다.
(db column이 아닌 Entity 필드명으로 지정합니다)
chunk size를 통해 1000개의 row를 한 번에 처리하도록 합니다.
또한 데이터를 chunk로 나누어 처리하기 때문에 메모리 사용량을 줄이고 성능을 향상시킵니다.
taskExecutor은 비동기 작업을 할 수 있게 해주는 메서드입니다.
비동기 작업을 진행하면 Batch Job의 속도가 향상됩니다.
다른 곳에서 진행한 최적화 작업이 얼마나 효율적으로 되었는지 확인하기 위해 비활성화 했습니다.
이렇게 정의한 Job은 lpgJob이라는 이름으로 의존성을 주입합니다.
package com.pj.oil.batch.process;
import com.pj.oil.gasStation.entity.maria.GasStationLpg;
import io.micrometer.common.util.StringUtils;
import org.springframework.batch.item.ItemProcessor;
public class GasStationLpgProcess implements ItemProcessor<GasStationLpg, GasStationLpg>{
@Override
public GasStationLpg process(GasStationLpg item) {
// ID가 null이거나 공백인 레코드를 필터링
if (StringUtils.isBlank(item.getUniId())) {
return null; // 이 레코드를 스킵
}
return item;
}
}
csv 파일의 내용 중, 불필요한 내용(공백, null)이 있으면 생략하도록 하는 Processor입니다.
(ItemProcessor<입력 받은 값, 출력할 값> 의 형식으로 할 수도 있습니다)
package com.pj.oil.batch.writer;
import com.pj.oil.gasStation.entity.maria.GasStationLpg;
import com.pj.oil.util.DateUtil;
import org.springframework.batch.item.Chunk;
import org.springframework.batch.item.ItemWriter;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.jdbc.core.BatchPreparedStatementSetter;
import org.springframework.jdbc.core.JdbcTemplate;
import java.sql.PreparedStatement;
import java.sql.SQLException;
import java.util.List;
public class GasStationLpgWriter implements ItemWriter<GasStationLpg> {
private final JdbcTemplate jdbcTemplate;
private final DateUtil dateUtil;
public GasStationLpgWriter(
@Qualifier("gasStationJdbcTemplate") JdbcTemplate jdbcTemplate,
DateUtil dateUtil
) {
this.jdbcTemplate = jdbcTemplate;
this.dateUtil = dateUtil;
}
@Override
public void write(Chunk<? extends GasStationLpg> chunk) throws Exception {
List<? extends GasStationLpg> items = chunk.getItems(); // Chunk에서 아이템 리스트를 가져옴
String sql = "INSERT INTO GasStationLpg (uni_id, area, os_name, poll_div_name, new_address, update_date) VALUES (?, ?, ?, ?, ?, ?) " +
"ON DUPLICATE KEY UPDATE update_date = VALUES(update_date)";
jdbcTemplate.batchUpdate(sql, new BatchPreparedStatementSetter() {
@Override
public void setValues(PreparedStatement ps, int i) throws SQLException {
GasStationLpg item = items.get(i);
ps.setString(1, item.getUniId());
ps.setString(2, item.getArea());
ps.setString(3, item.getOsName());
ps.setString(4, item.getPollDivName());
ps.setString(5, item.getNewAddress());
ps.setString(6, dateUtil.getTodayDateString());
}
@Override
public int getBatchSize() {
return items.size();
}
});
}
}
JPA에서 JDBC로 최적화를 진행한 writer입니다.
PreparedStatement를 통해 SQL Injection을 예방했고,
jdbc의 batchUpdate를 통해 여러 개의 요청을 하나의 sql로 엮었습니다.
package com.pj.oil.batch;
import com.pj.oil.gasStation.repository.maria.*;
import com.pj.oil.gasStation.repository.redis.AreaAverageRecentPriceRedisRepository;
import com.pj.oil.gasStation.repository.redis.AverageAllPriceRedisRepository;
import com.pj.oil.gasStation.repository.redis.AverageRecentPriceRedisRepository;
import com.pj.oil.gasStation.repository.redis.LowTop20PriceRedisRepository;
import com.pj.oil.util.DateUtil;
import lombok.RequiredArgsConstructor;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.batch.core.JobExecution;
import org.springframework.batch.core.JobExecutionListener;
import org.springframework.stereotype.Component;
import org.springframework.transaction.annotation.Transactional;
@Component
@Transactional("gasStationTransactionManager")
@RequiredArgsConstructor
public class BeforeJobExecutionListener implements JobExecutionListener {
private final Logger LOGGER = LoggerFactory.getLogger(this.getClass());
private final LowTop20PriceRepository lowTop20PriceRepository;
private final AreaAverageRecentPriceRepository areaAverageRecentPriceRepository;
private final AverageRecentPriceRepository averageRecentPriceRepository;
private final AverageAllPriceRepository averageAllPriceRepository;
private final LowTop20PriceRedisRepository lowTop20PriceRedisRepository;
private final AreaAverageRecentPriceRedisRepository areaAverageRecentPriceRedisRepository;
private final AverageRecentPriceRedisRepository averageRecentPriceRedisRepository;
private final AverageAllPriceRedisRepository averageAllPriceRedisRepository;
private final PriceOilRepository priceOilRepository;
private final PriceLpgRepository priceLpgRepository;
private final DateUtil dateUtil;
@Override
public void beforeJob(JobExecution jobExecution) {
String jobName = jobExecution.getJobInstance().getJobName();
switch (jobName) {
case "importPriceLpg":
priceLpgRepository.deleteAllData();
LOGGER.info("Deleted all data from PriceLpgRepository before starting the job: {}", jobName);
break;
case "importPriceOil":
priceOilRepository.deleteAllData();
LOGGER.info("Deleted all data from PriceOilRepository before starting the job: {}", jobName);
break;
case "importLowTop20Price":
lowTop20PriceRepository.deleteAllData();
LOGGER.info("Deleted all data from LowTop20PriceRepository before starting the job: {}", jobName);
break;
case "importAverageAllPrice":
averageAllPriceRepository.deleteByTradeDate(dateUtil.getYesterdayDateString());
LOGGER.info("Deleted all data from AverageAllPriceRepository before starting the job: {}", jobName);
break;
case "importAverageRecentPrice":
averageRecentPriceRepository.deleteByDate(dateUtil.getYesterdayDateString());
LOGGER.info("Deleted all data from AverageRecentPriceRepository before starting the job: {}", jobName);
break;
case "importAreaAverageRecentPrice":
areaAverageRecentPriceRepository.deleteByBaseDate(dateUtil.getYesterdayDateString());
LOGGER.info("Deleted all data from AreaAverageRecentPriceRepository before starting the job: {}", jobName);
break;
default:
LOGGER.warn("No specific pre-processing required for this job: {}", jobName);
break;
}
}
@Override
public void afterJob(JobExecution jobExecution) {
String jobName = jobExecution.getJobInstance().getJobName();
switch (jobName) {
case "importLowTop20Price":
lowTop20PriceRedisRepository.deleteAll();
LOGGER.info("Deleted all data from LowTop20PriceRepository before starting the job: {}", jobName);
break;
case "importAverageAllPrice":
averageAllPriceRedisRepository.deleteAll();
LOGGER.info("Deleted all data from AverageAllPriceRepository before starting the job: {}", jobName);
break;
case "importAverageRecentPrice":
averageRecentPriceRedisRepository.deleteAll();
LOGGER.info("Deleted all data from AverageRecentPriceRepository before starting the job: {}", jobName);
break;
case "importAreaAverageRecentPrice":
areaAverageRecentPriceRedisRepository.deleteAll();
LOGGER.info("Deleted all data from AreaAverageRecentPriceRepository before starting the job: {}", jobName);
break;
default:
LOGGER.warn("No specific post-processing required for this job: {}", jobName);
break;
}
}
}
job 시작 전, 완료 후에 동작하는 작업입니다.
요구사항에 따라 삭제 로직을 추가했습니다.
scheduler
전술한 로직을 Spring Scheduler를 통해 특정 시간에 실행되도록 했습니다.
package com.pj.oil.scheduler;
import org.springframework.context.annotation.Configuration;
import org.springframework.scheduling.annotation.EnableScheduling;
@Configuration
@EnableScheduling
public class SchedulerConfig {
}
package com.pj.oil.scheduler;
import com.pj.oil.config.PropertyConfiguration;
import com.pj.oil.util.CrawlerUtil;
import com.pj.oil.util.EncodingUtil;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.batch.core.Job;
import org.springframework.batch.core.JobExecution;
import org.springframework.batch.core.JobParameters;
import org.springframework.batch.core.JobParametersBuilder;
import org.springframework.batch.core.launch.JobLauncher;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.scheduling.annotation.Scheduled;
import org.springframework.stereotype.Component;
@Component
public class GasStationScheduler {
private static final Logger LOGGER = LoggerFactory.getLogger(GasStationScheduler.class);
private final CrawlerUtil crawlerUtil;
private final EncodingUtil encodingUtil;
private final PropertyConfiguration config;
private final JobLauncher jobLauncher;
//
private final Job oilJob;
private final Job priceOilJob;
private final Job lpgJob;
private final Job priceLpgJob;
//
private final Job averageRecentPriceJob;
private final Job areaAverageRecentPriceJob;
private final Job averageAllPriceJob;
private final Job lowTop20PriceJob;
public GasStationScheduler(
CrawlerUtil crawlerUtil,
EncodingUtil encodingUtil,
PropertyConfiguration config,
JobLauncher jobLauncher,
//
@Qualifier("oilJob") Job oilJob,
@Qualifier("priceOilJob") Job priceOilJob,
@Qualifier("lpgJob") Job lpgJob,
@Qualifier("priceLpgJob") Job priceLpgJob,
//
@Qualifier("averageRecentPriceJob") Job averageRecentPriceJob,
@Qualifier("areaAverageRecentPriceJob") Job areaAverageRecentPriceJob,
@Qualifier("averageAllPriceJob") Job averageAllPriceJob,
@Qualifier("lowTop20PriceJob") Job lowTop20PriceJob
) {
this.crawlerUtil = crawlerUtil;
this.encodingUtil = encodingUtil;
this.config = config;
this.jobLauncher = jobLauncher;
this.oilJob = oilJob;
this.priceOilJob = priceOilJob;
this.lpgJob = lpgJob;
this.priceLpgJob = priceLpgJob;
this.averageRecentPriceJob = averageRecentPriceJob;
this.areaAverageRecentPriceJob = areaAverageRecentPriceJob;
this.averageAllPriceJob = averageAllPriceJob;
this.lowTop20PriceJob = lowTop20PriceJob;
}
@Scheduled(cron = "0 0 1 * * ?") // 매일 새벽 1시에 실행
public void saveCsv() {
LOGGER.info("스케줄링 작업 실행 중...");
boolean downloadCSVCompleted = crawlerUtil.downloadCSVFromWeb();
if (downloadCSVCompleted) {
String[] files = {config.getBasicInfoLpg(), config.getBasicInfoOil(), config.getCurrentPriceLpg(), config.getCurrentPriceOil()};
String[] names = {"basic-info-lpg.csv", "basic-info-oil.csv", "current-price-lpg.csv", "current-price-oil.csv"};
boolean encodingCompleted = false;
for (int i = 0; i < files.length; i++) {
encodingCompleted = encodingUtil.convertFileEncoding(files[i], names[i]);
LOGGER.info("Encoding completed for: {}", names[i]);
}
if (downloadCSVCompleted && encodingCompleted) {
LOGGER.info("스케줄링 작업 완료");
} else {
LOGGER.warn("스케줄링 작업 실패");
}
}
}
@Scheduled(cron = "0 5 1 * * ?")
public void saveOil() {
LOGGER.info("saveOil 실행 중...");
try {
JobParameters jobParameters = new JobParametersBuilder()
.addLong("time", System.currentTimeMillis())
.toJobParameters();
JobExecution jobExecution = jobLauncher.run(oilJob, jobParameters);
LOGGER.info("Job ID: {} 상태: {}",jobExecution.getId(), jobExecution.getStatus());
} catch (Exception e) {
LOGGER.error("작업 실행 중 오류가 발생했습니다", e);
}
}
@Scheduled(cron = "0 10 1 * * ?")
public void savePriceOil() {
LOGGER.info("savePriceOil 실행 중...");
try {
JobParameters jobParameters = new JobParametersBuilder()
.addLong("time", System.currentTimeMillis())
.toJobParameters();
JobExecution jobExecution = jobLauncher.run(priceOilJob, jobParameters);
LOGGER.info("Job ID: {} 상태: {}", jobExecution.getId(), jobExecution.getStatus());
} catch (Exception e) {
LOGGER.error("작업 실행 중 오류가 발생했습니다", e);
}
}
@Scheduled(cron = "0 15 1 * * ?")
public void saveLpg() {
LOGGER.info("saveLpg 실행 중...");
try {
JobParameters jobParameters = new JobParametersBuilder()
.addLong("time", System.currentTimeMillis())
.toJobParameters();
JobExecution jobExecution = jobLauncher.run(lpgJob, jobParameters);
LOGGER.info("Job ID: {} 상태: {}", jobExecution.getId(), jobExecution.getStatus());
} catch (Exception e) {
LOGGER.error("작업 실행 중 오류가 발생했습니다", e);
}
}
@Scheduled(cron = "0 20 1 * * ?")
public void savePriceLpg() {
LOGGER.info("savePriceLpg 실행 중...");
try {
JobParameters jobParameters = new JobParametersBuilder()
.addLong("time", System.currentTimeMillis())
.toJobParameters();
JobExecution jobExecution = jobLauncher.run(priceLpgJob, jobParameters);
LOGGER.info("Job ID: {} 상태: {}", jobExecution.getId(), jobExecution.getStatus());
} catch (Exception e) {
LOGGER.error("작업 실행 중 오류가 발생했습니다", e);
}
}
@Scheduled(cron = "0 25 1 * * ?")
public void saveLowTop20Price() {
LOGGER.info("saveLowTop20Price 실행 중...");
try {
JobParameters jobParameters = new JobParametersBuilder()
.addLong("time", System.currentTimeMillis())
.toJobParameters();
JobExecution jobExecution = jobLauncher.run(lowTop20PriceJob, jobParameters);
LOGGER.info("Job ID: {} 상태: {}", jobExecution.getId(), jobExecution.getStatus());
} catch (Exception e) {
LOGGER.error("작업 실행 중 오류가 발생했습니다", e);
}
}
@Scheduled(cron = "0 26 1 * * ?")
public void saveAverageAllPrice() {
LOGGER.info("saveAverageAllPrice 실행 중...");
try {
JobParameters jobParameters = new JobParametersBuilder()
.addLong("time", System.currentTimeMillis())
.toJobParameters();
JobExecution jobExecution = jobLauncher.run(averageAllPriceJob, jobParameters);
LOGGER.info("Job ID: {} 상태: {}", jobExecution.getId(), jobExecution.getStatus());
} catch (Exception e) {
LOGGER.error("작업 실행 중 오류가 발생했습니다", e);
}
}
@Scheduled(cron = "0 27 1 * * ?")
public void saveAverageRecentPrice() {
LOGGER.info("saveAverageRecentPrice 실행 중...");
try {
JobParameters jobParameters = new JobParametersBuilder()
.addLong("time", System.currentTimeMillis())
.toJobParameters();
JobExecution jobExecution = jobLauncher.run(averageRecentPriceJob, jobParameters);
LOGGER.info("Job ID: {} 상태: {}", jobExecution.getId(), jobExecution.getStatus());
} catch (Exception e) {
LOGGER.error("작업 실행 중 오류가 발생했습니다", e);
}
}
@Scheduled(cron = "0 28 1 * * ?")
public void saveAreaAverageRecentPrice() {
LOGGER.info("saveAreaAverageRecentPrice 실행 중...");
try {
JobParameters jobParameters = new JobParametersBuilder()
.addLong("time", System.currentTimeMillis())
.toJobParameters();
JobExecution jobExecution = jobLauncher.run(areaAverageRecentPriceJob, jobParameters);
LOGGER.info("Job ID: {} 상태: {}", jobExecution.getId(), jobExecution.getStatus());
} catch (Exception e) {
LOGGER.error("작업 실행 중 오류가 발생했습니다", e);
}
}
}
scheduler가 시행되는 시간에는 여유를 두었습니다.
무료 api 키 호출 횟수가 하루 1500회로 고정되어 있기 때문에,
scheduler 실행을 하루 한 번으로 설정했습니다.
무료 api를 사용하지 않고,
웹사이트에서 csv 파일을 다운로드 하도록 로직을 변경한다면,
시행 횟수를 더 늘릴 수 있을 것 같습니다.