csv파일을 읽고 JPARepository를 활용하여 Batch Job을 진행하던 중,
적게는 5초, 길게는 40초 동안 pc가 멈추는 상황이 발생했고,
Batch최적화의 필요성을 느꼈습니다.
사용하고 있던 설정은, chunk size 1000, jpaRepository 였습니다.
당장 비동기 처리를 적용하여 속도를 향상 시킬 수도 있겠지만,
더 세세한 최적화가 필요해보였습니다.
1. Spring Data JPA Batch Insert
DB에 데이터를 삽입하는 등의 요청을 실시하는 것은 많은 비용이 듭니다.
하나의 요청에 한 번의 round-trip이 발생하고,
1000개의 요청에 1000번의 round-trip이 발생합니다.
(db와 server간의 왕복을 round-trip이라고 합니다)
따라서 이를 일괄적으로 요청한다면,
한 번의 round-trip으로 여러 개의 요청을 처리할 수 있으며,
네트워크 지연 및 오버헤드를 낮출 수 있습니다.
(1000개의 SQL문을 하나의 SQL문으로 엮는 것이 아니라, 한 번의 전송으로 여러 개의 요청을 처리하는 것입니다)
@Bean(name = "gasStationEntityManagerFactory")
public LocalContainerEntityManagerFactoryBean gasStationEntityManagerFactory(
@Qualifier("gasStationEntityManagerFactoryBuilder") EntityManagerFactoryBuilder builder,
@Qualifier("gasStationDataSource") DataSource dataSource) {
Map<String, Object> jpaProperties = new HashMap<>();
jpaProperties.put("hibernate.dialect", "org.hibernate.dialect.MariaDBDialect");
jpaProperties.put("hibernate.hbm2ddl.auto", "update");
jpaProperties.put("hibernate.show_sql", true);
jpaProperties.put("hibernate.format_sql", true);
jpaProperties.put("hibernate.jdbc.batch_size", 1000);
jpaProperties.put("hibernate.order_inserts", true);
jpaProperties.put("hibernate.order_deletes", true);
jpaProperties.put("hibernate.order_updates", true);
return builder
.dataSource(dataSource)
.properties(jpaProperties)
.packages("com.pj.oil.gasStation")
.persistenceUnit("gasStation")
.build();
}
다중 데이터 소스를 사용했기 때문에,
application.proeprties에 작성하지 않고,
의존성을 수동으로 주입해주었습니다.
코드를 살펴보면,
batch_size를 1000으로 하여 최대 1000개의 요청을 한 번의 round-trip으로 처리하게 했고,
insert, update, delete 같은 유형의 SQL문을 순차적으로 처리(순서 최적화)하게 했습니다.
jpaProperties.put("hibernate.jdbc.batch_size", 1000);
jpaProperties.put("hibernate.order_inserts", true);
jpaProperties.put("hibernate.order_deletes", true);
jpaProperties.put("hibernate.order_updates", true);
여러 트랜잭션을 무조건 한 번에 처리하는 것이 좋은 것은 아닙니다.
너무 많은 트랜잭션을 한 번에 처리하려고 하면,
실패 시 복구가 어려워질 수 있으며, DB가 하나의 트랜잭션을 처리하려다 잠길(LOCK) 수도 있기 때문입니다.
2.JPA -> JDBC
jpa를 활용했을 때, 여러 개의 SQL문을 하나로 묶어서 요청하는 것은 쉽지 않았습니다.
따라서 JdbcTemplate를 구현하여, batchUpdate메서드를 사용했습니다.
이를 통해 여러 개의 SQL문을 한 번에 요청하는 것이 가능해집니다.
@Bean(name = "gasStationJdbcTemplate")
public JdbcTemplate gasStationJdbcTemplate(@Qualifier("gasStationDataSource") DataSource dataSource) {
return new JdbcTemplate(dataSource);
}
package com.pj.oil.batch.writer;
import com.pj.oil.gasStation.entity.maria.GasStationLpg;
import com.pj.oil.util.DateUtil;
import org.springframework.batch.item.Chunk;
import org.springframework.batch.item.ItemWriter;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.jdbc.core.BatchPreparedStatementSetter;
import org.springframework.jdbc.core.JdbcTemplate;
import java.sql.PreparedStatement;
import java.sql.SQLException;
import java.util.List;
public class GasStationLpgWriter implements ItemWriter<GasStationLpg> {
private final JdbcTemplate jdbcTemplate;
private final DateUtil dateUtil;
public GasStationLpgWriter(
@Qualifier("gasStationJdbcTemplate") JdbcTemplate jdbcTemplate,
DateUtil dateUtil
) {
this.jdbcTemplate = jdbcTemplate;
this.dateUtil = dateUtil;
}
@Override
public void write(Chunk<? extends GasStationLpg> chunk) throws Exception {
List<? extends GasStationLpg> items = chunk.getItems(); // Chunk에서 아이템 리스트를 가져옴
String sql = "INSERT INTO GasStationLpg (uni_id, area, os_name, poll_div_name, new_address, update_date) VALUES (?, ?, ?, ?, ?, ?) " +
"ON DUPLICATE KEY UPDATE update_date = VALUES(update_date)";
jdbcTemplate.batchUpdate(sql, new BatchPreparedStatementSetter() {
@Override
public void setValues(PreparedStatement ps, int i) throws SQLException {
GasStationLpg item = items.get(i);
ps.setString(1, item.getUniId());
ps.setString(2, item.getArea());
ps.setString(3, item.getOsName());
ps.setString(4, item.getPollDivName());
ps.setString(5, item.getNewAddress());
ps.setString(6, dateUtil.getTodayDateString());
}
@Override
public int getBatchSize() {
return items.size();
}
});
}
}
JPA에서 JDBC로 최적화를 진행한 writer입니다.
PreparedStatement를 통해 SQL Injection을 예방했습니다.
3. 비동기 처리
TaskExecutor Bean을 생성 및 주입하여 비동기 작업을 할 수 있습니다.
이를 통해 작업 속도를 향상시킬 수 있습니다.
@Bean(name = "lpgTaskExecutor")
public TaskExecutor taskExecutor() {
SimpleAsyncTaskExecutor asyncTaskExecutor = new SimpleAsyncTaskExecutor();
asyncTaskExecutor.setConcurrencyLimit(10);
return asyncTaskExecutor;
}
@Bean(name = "lpgImportStep")
public Step importStep() {
return new StepBuilder("lpgCsvImport", jobRepository)
.<GasStationLpg, GasStationLpg>chunk(1000, platformTransactionManager) // 한번에 처리하려는 레코드 라인 수
.reader(reader())
.processor(processor())
.writer(writer())
.taskExecutor(taskExecutor())
.build();
}
하지만 동시에 실행되는 작업의 수(ConcurrencyLimit)가 너무 많으면 리소스가 빠르게 소진되어,
주요 서비스가 마비될 수 있으므로 주의가 필요합니다.
결과
chunk 1000 평균 25초
chunk 1000 + batch_size 1000 평균 20초
chunk 1000 + jdbc 평균 4초
chunk 1000 + jdbc + 비동기 평균 3초
요청을 한 번에 처리할 수 있는지 여부가 속도에 많은 영향을 미쳤습니다.
데이터 10만 건을 삽입할 때,
chunk size를 10으로 설정한 경우와 1000으로 설정한 경우,
chunk 10 평균 11분
chunk 1000 평균 4분과 같은 결과를 얻었습니다.
(상황에 따라 다른 결과가 나오며, 소요시간이 선형적으로 변하지 않습니다)
(비동기 및 chunk의 설정은 리소스에 따라 적절하게 조율하는 것이 필요해보입니다)
최적화는 성능 뿐만이 아니라 유지보수, 사용자 경험, 확장성 등 여러 가지를 끊임없이 고민해야 합니다.