Как стать автором
Обновить
Usetech
Международная IT-компания

Как упростить пакетную обработку данных со Spring Batch

Время на прочтение21 мин
Количество просмотров19K
Евгений Тришечкин

Ведущий Java разработчик

Добрый день! Меня зовут Евгений Тришечкин, я ведущий Java разработчик ростовского офиса компании Usetech. Сегодня я хочу рассказать вам о том, как упростить пакетную обработку данных со Spring Batch и приведу несколько примеров. Статья рассчитана на новичков, которые не работали с этой средой, но опытные специалисты могут рассказать о том, как они используют Spring Batch. Итак, начнём.

Введение:

Spring Batch — это среда, созданная для пакетной обработки больших объёмов данных. В основе Spring Batch лежит понятие задания (Job). Каждое задание может состоять из нескольких этапов (Step). Этап в свою очередь может быть либо фрагментом произвольного кода (Tasklet), либо иметь более сложную структуру, состоящую из считывателей элементов (ItemReader), обработчиков элементов (ItemProcessor) и записывателей элементов (ItemWriter).

Рассмотрим конкретную бизнес-задачу — загрузку отчёта о продажах из csv файла в базу данных. Для простоты решим, что историчность данных нам не важна и файл отчёта является накопительным. Под заданием в данном случае понимается последовательность шагов:

  1. Удаление ранее загруженных данных.

  2. Загрузка всех строк из файла отчёта.

Второй шаг в свою очередь подразумевает парсинг каждой строки отчета и преобразование её в сущность (POJO), валидацию сущностей на предмет ошибок и запись каждой сущности как отдельной строки в базу данных.

Для создания каркаса будущего приложения воспользуемся сервисом.

Перейдём по ссылке и заполним метаданные будущего проекта:

Далее в этом же окне выберем зависимости, которые потребуются для реализации логики:

Далее сгенерируем и сохраним проект.

Как было написано выше, каждая строка отчета будет трансформирована в Java объект, поэтому нам нужно создать класс SalesReportItem в одном пакете со сгенерированным классом SpringBatchDemoApplication со следующим исходным кодом:

package ru.usetech.springbatchdemo;
import lombok.AllArgsConstructor;
import lombok.Getter;
import lombok.NoArgsConstructor;
import lombok.Setter;
import lombok.ToString;
import java.math.BigDecimal;
@Getter
@Setter
@AllArgsConstructor
@NoArgsConstructor
@ToString
public class SalesReportItem {
	// регион
	private Long regionId;
	// точка продаж
	private Long outletId;
	// Сумма выручки от продажи смартфонов, карт памяти и ноутбуков
	private BigDecimal smartphones;
	private BigDecimal memoryCards;
	private BigDecimal notebooks;
	// Суммарная выручка по точке продаж
	private BigDecimal total;

}

Фрагмент csv файла отчета выглядит так:

1|102|9589|4894|4955

1|103|831|2637|126

1|104|9343|963|3130

 Поля отчёта будут проецироваться на соответствующие поля класса SalesReportItem:

regionId|outletId|smartphones|memoryCards|notebooks

Видно, что в отчёте нет значения для поля total, суммарное значение по каждой точке продаж мы вычислим в момент загрузки отчёта. Также в момент загрузки отчёта будет выполнена валидация значений. В реальных отчётах валидация может содержать сложную логику, в этом примере для простоты мы будем проверять, что сумма выручки не является отрицательной величиной. Признаком некорректного значения в строке отчёта будет возникновение исключения IncorrectValueException с кодом:

package ru.usetech.springbatchdemo;
public class IncorrectValueException extends RuntimeException {
}
Основной конфигурационный файл проекта имеет следующий код:
package ru.usetech.springbatchdemo;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.springframework.batch.core.Job;
import org.springframework.batch.core.Step;
import org.springframework.batch.core.configuration.annotation.EnableBatchProcessing;
import org.springframework.batch.core.configuration.annotation.JobBuilderFactory;
import org.springframework.batch.core.configuration.annotation.StepBuilderFactory;
import org.springframework.batch.core.configuration.annotation.StepScope;
import org.springframework.batch.core.launch.support.RunIdIncrementer;
import org.springframework.batch.core.step.tasklet.Tasklet;
import org.springframework.batch.item.ItemProcessor;
import org.springframework.batch.item.database.JdbcBatchItemWriter;
import org.springframework.batch.item.database.builder.JdbcBatchItemWriterBuilder;
import org.springframework.batch.item.file.FlatFileItemReader;
import org.springframework.batch.item.file.builder.FlatFileItemReaderBuilder;
import org.springframework.batch.repeat.RepeatStatus;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.core.io.ClassPathResource;
import org.springframework.jdbc.core.JdbcTemplate;
import javax.sql.DataSource;
import java.math.BigDecimal;
@Configuration
@EnableBatchProcessing //(1)
public class BatchConfig {
	private static final Log log = LogFactory.getLog(BatchConfig.class);
	@Bean // (2)
	public Tasklet clearTableTasklet(JdbcTemplate jdbcTemplate) {
        return (stepContribution, chunkContext) -> {
            log.info("Очистка таблицы sales_report");
            jdbcTemplate.update("delete from sales_report");
            return RepeatStatus.FINISHED;
        };
    }

	@Bean //(3)
	public Step setupStep(Tasklet clearTableTasklet,
                      	StepBuilderFactory stepBuilderFactory) {
        return stepBuilderFactory.get("clear-report-table")
            	.tasklet(clearTableTasklet)
            	.build();
    }

@Bean // (4)
	public Step loadCsvStep(StepBuilderFactory stepBuilderFactory,
                            FlatFileItemReader<SalesReportItem> csvReader,
                            ItemProcessor<SalesReportItem, SalesReportItem> totalCalculatingProcessor,
	                        JdbcBatchItemWriter<SalesReportItem> dbWriter) {
        return stepBuilderFactory.get("load-csv-file")
            	.<SalesReportItem, SalesReportItem>chunk(10) // (5)
            	.faultTolerant()
            	.skip(IncorrectValueException.class) // (6)
            	.skipLimit(3) // (7)
            	.reader(csvReader) // (8)
            	.processor(totalCalculatingProcessor) // (9)
            	.writer(dbWriter) // (10)
            	.build();
    }
@Bean
    @StepScope // (11)
	public FlatFileItemReader<SalesReportItem> csvReader() {
        return new FlatFileItemReaderBuilder<SalesReportItem>().name("csv-reader")
            	.resource(new ClassPathResource("report_data.csv"))
           	 .targetType(SalesReportItem.class)
            	.delimited()
            	.delimiter("|")
            	.names("regionId", "outletId", "smartphones", "memoryCards", "notebooks").build();
	}
@Bean // (12)
	public ItemProcessor<SalesReportItem, SalesReportItem> totalCalculatingProcessor() {
    	return item -> {
        	if (BigDecimal.ZERO.compareTo(item.getSmartphones()) > 0
            	|| BigDecimal.ZERO.compareTo(item.getMemoryCards()) > 0
            	|| BigDecimal.ZERO.compareTo(item.getNotebooks()) > 0) {
            	throw new IncorrectValueException();
        	}
        	item.setTotal(BigDecimal.ZERO.add(item.getSmartphones())
                	.add(item.getMemoryCards()
                      	  .add(item.getNotebooks())));
        	return item;
    	};
	}
@Bean // (13)
	public JdbcBatchItemWriter<SalesReportItem> dbWriter(DataSource dataSource) {
    	return new JdbcBatchItemWriterBuilder<SalesReportItem>()
            	.dataSource(dataSource)
            	.sql("insert into sales_report (region_id, outlet_id, smartphones, memory_cards, notebooks, total) " +
                    	"values (:regionId, :outletId, :smartphones, :memoryCards, :notebooks, :total)")
            	.beanMapped()
            	.build();
	}
@Bean // (14)
	public Job importReportJob(JobBuilderFactory jobBuilderFactory,
                           	Step setupStep,
                           	Step loadCsvStep,
                               ReportImportListener reportImportListener) {
    	return jobBuilderFactory.get("import-report-job")
            	.incrementer(new RunIdIncrementer())
            	.listener(reportImportListener) // (15)
            	.start(setupStep)
            	.next(loadCsvStep)
            	.build();
	}

}

Маркерами отмечены основные моменты:

(1) @EnableBatchProcessing — аннотация, активирующая многие функции экосистемы Spring Batch, в частности конфигурирует фабрики JobBuilderFactory и StepBuilderFactory, создает реестр заданий и многие другие инфраструктурные элементы.

(2) Создание тасклета, т.е. произвольного кода, который будет выполнен на первом этапе загрузки отчёта. Этот код, как видно, удаляет все существующие данные в таблице отчёта.

(3) Создание первого шага обработки отчёта на основе тасклета.

(4) Создание второго шага, непосредственной загрузки данных отчёта из файла.

(5) Здесь задаётся размер порции данных. То есть количество строк отчёта, обработка которых считается неделимой операцией и коммитится. Обработка данных порциями имеет много преимуществ, например, загрузка может выполняться в несколько потоков, где каждый поток будет обрабатывать свою порцию данных. Если загрузка отчёта была прервана в результате возникшей ошибки, то повторный перезапуск после исправления входного файла позволит не загружать ранее загруженные порции.

(6) — (7) Это относится к валидации строк отчёта, задаём, что исключение IncorrectValueException, которое мы создали ранее, не приведёт к падению загрузки отчёта, если оно возникло не более трёх раз. 

(8) — (10) Задаём считыватель данных из файла, обработчик и записыватель данных в БД.

(11) Считыватель данных. Тут мы воспользовались готовым классом FlatFileItemReader из библиотеки Spring Batch. В комплект поставки  Spring Batch входит множество различных считывателей, позволяющих загружать данные из курсоров БД, из JSON файлов, из топиков Kafka и др. Также можно написать свой считыватель, реализовав интерфейс ItemReader.

(12) Процессор данных, реализующий функциональный интерфейс ItemProcessor. В нашем примере процессор выполняет валидацию значений и вычисляет суммарную выручку по точке продаж. 

(13) Записыватель данных, который отображает каждый объект POJO в строку в базе данных. 

(14) Создание объекта задания (Job), состоящего из последовательности двух шагов.

(15) Слушатель, отслеживающий этапы жизненного цикла задания.

Исходный код слушателя представлен ниже:

package ru.usetech.springbatchdemo;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.springframework.batch.core.BatchStatus;
import org.springframework.batch.core.JobExecution;
import org.springframework.batch.core.listener.JobExecutionListenerSupport;
import org.springframework.jdbc.core.JdbcTemplate;
import org.springframework.stereotype.Service;

@Service
public class ReportImportListener extends JobExecutionListenerSupport {

	private static final Log log = LogFactory.getLog(ReportImportListener.class);

	private final JdbcTemplate jdbcTemplate;

	public ReportImportListener(JdbcTemplate jdbcTemplate) {
        this.jdbcTemplate = jdbcTemplate;
    }

	@Override
	public void afterJob(JobExecution jobExecution) {
        if(jobExecution.getStatus() == BatchStatus.COMPLETED) {
            log.info("Отчёт загружен в базу данных");
            logAll();
        }
    }

	@Override
	public void beforeJob(JobExecution jobExecution) {
        log.info("Данные в таблице до загрузки отчёта");
        logAll();
    }

	private void logAll() {
        jdbcTemplate.query("SELECT region_id, outlet_id, smartphones, memory_cards, notebooks, total FROM sales_report",
            	(rs, row) -> new SalesReportItem(
                    	rs.getLong(1),
                    	rs.getLong(2),
                    	rs.getBigDecimal(3),
                    	rs.getBigDecimal(4),
                    	rs.getBigDecimal(5),
                    	rs.getBigDecimal(6))
        ).forEach(log::info);
    }

}

Слушатель реализует два метода:

  1. beforeJob — выполняется перед выполнением задания и отображает содержимое таблицы отчёта до загрузки.

  2. afterJob — выполняется после выполнения задания и также выводит содержимое таблицы отчёта.

Для запуска нашего Spring Batch задания потребуется создать ещё несколько файлов ресурсов:

application.yml с настройками подключения к in memory базе данных:

spring:
  datasource:
	url: jdbc:h2:mem:mydb
	username: sales
	password: sales
	driverClassName: org.h2.Driver

schema-all.sql — скрипт создания таблицы отчета sales_report:

CREATE TABLE IF NOT EXISTS sales_report
(
    region_id BIGINT,
    outlet_id BIGINT,
    smartphones NUMBER,
    memory_cards NUMBER,
    notebooks NUMBER,
    total NUMBER
);

data.sql — данные начальной инициализации таблицы отчёта (для имитации наличия данных перед началом загрузки отчёта):

insert into sales_report (region_id, outlet_id, smartphones, memory_cards, notebooks, totalvalues (1, 121, 2365, 4377, 9256, 15998);

insert into sales_report (region_id, outlet_id, smartphones, memory_cards, notebooks, totalvalues (1, 122, 9589, 4894, 4955, 19438);

insert into sales_report (region_id, outlet_id, smartphones, memory_cards, notebooks, totalvalues (2, 233, 6836, 5318, 9563, 21717);

report_data.csv — тестовый файл отчёта:

1|101|2365|4377|-9256

1|102|9589|4894|4955

1|103|831|2637|126

1|104|9343|963|3130

1|105|8764|2605|985

1|106|4886|2789|5797

1|107|8201|9634|291

1|108|3287|-4926|2015

1|109|5411|9759|8777

1|110|2092|52|8092

2|201|4933|1308|4011

2|202|7868|2806|3763

2|203|6836|5318|9563

2|204|5085|-675|5987

2|205|7362|4139|5800

2|206|4729|554|7194

2|207|4112|3782|8052

2|208|4146|5560|2037

2|209|8435|2201|4973

2|210|1424|9298|4993

Обратим внимание, что три строки отчёта содержат некорректные отрицательные значения.

Теперь всё готово, чтобы запустить наше Spring Boot приложение. Запускаем его в IDE и наблюдаем результат выполнения задания в логе:

2022-04-29 01:28:14.066  INFO 7381 --- [       main] o.s.b.a.b.JobLauncherApplicationRunner   : Running default command line with: []

2022-04-29 01:28:14.112  INFO 7381 --- [       main] o.s.b.c.l.support.SimpleJobLauncher      : Job: [SimpleJob: [name=import-report-job]] launched with the following parameters: [{run.id=1}]

2022-04-29 01:28:14.127  INFO 7381 --- [       main] r.u.s.ReportImportListener               : Данные в таблице до загрузки отчёта

2022-04-29 01:28:14.147  INFO 7381 --- [       main] r.u.s.ReportImportListener               : SalesReportItem(regionId=1, outletId=121, smartphones=2365, memoryCards=4377, notebooks=9256, total=15998)

2022-04-29 01:28:14.147  INFO 7381 --- [       main] r.u.s.ReportImportListener               : SalesReportItem(regionId=1, outletId=122, smartphones=9589, memoryCards=4894, notebooks=4955, total=19438)

2022-04-29 01:28:14.147  INFO 7381 --- [       main] r.u.s.ReportImportListener               : SalesReportItem(regionId=2, outletId=233, smartphones=6836, memoryCards=5318, notebooks=9563, total=21717)

2022-04-29 01:28:14.153  INFO 7381 --- [       main] o.s.batch.core.job.SimpleStepHandler     : Executing step: [clear-report-table]

2022-04-29 01:28:14.162  INFO 7381 --- [       main] ru.usetech.springbatchdemo.BatchConfig   : Очистка таблицы sales_report

2022-04-29 01:28:14.169  INFO 7381 --- [       main] o.s.batch.core.step.AbstractStep         : Step: [clear-report-table] executed in 16ms

2022-04-29 01:28:14.176  INFO 7381 --- [      main] o.s.batch.core.job.SimpleStepHandler     : Executing step: [load-csv-file]

2022-04-29 01:28:14.244  INFO 7381 --- [       main] o.s.batch.core.step.AbstractStep         : Step: [load-csv-file] executed in 68ms

2022-04-29 01:28:14.247  INFO 7381 --- [       main] r.u.s.ReportImportListener               : Отчёт загружен в базу данных

2022-04-29 01:28:14.249  INFO 7381 --- [       main] r.u.s.ReportImportListener               : SalesReportItem(regionId=1, outletId=102, smartphones=9589, memoryCards=4894, notebooks=4955, total=19438)

2022-04-29 01:28:14.249  INFO 7381 --- [       main] r.u.s.ReportImportListener               : SalesReportItem(regionId=1, outletId=103, smartphones=831, memoryCards=2637, notebooks=126, total=3594)

2022-04-29 01:28:14.249  INFO 7381 --- [       main] r.u.s.ReportImportListener               : SalesReportItem(regionId=1, outletId=104, smartphones=9343, memoryCards=963, notebooks=3130, total=13436)

2022-04-29 01:28:14.249  INFO 7381 --- [      main] r.u.s.ReportImportListener           : SalesReportItem(regionId=1, outletId=105, smartphones=8764, memoryCards=2605, notebooks=985, total=12354)

2022-04-29 01:28:14.249  INFO 7381 --- [       main] r.u.s.ReportImportListener               : SalesReportItem(regionId=1, outletId=106, smartphones=4886, memoryCards=2789, notebooks=5797, total=13472)

2022-04-29 01:28:14.249  INFO 7381 --- [       main] r.u.s.ReportImportListener               : SalesReportItem(regionId=1, outletId=107, smartphones=8201, memoryCards=9634, notebooks=291, total=18126)

2022-04-29 01:28:14.249  INFO 7381 --- [       main] r.u.s.ReportImportListener               : SalesReportItem(regionId=1, outletId=109, smartphones=5411, memoryCards=9759, notebooks=8777, total=23947)

2022-04-29 01:28:14.249  INFO 7381 --- [       main] r.u.s.ReportImportListener               : SalesReportItem(regionId=1, outletId=110, smartphones=2092, memoryCards=52, notebooks=8092, total=10236)

2022-04-29 01:28:14.249  INFO 7381 --- [           main] r.u.s.ReportImportListener           : SalesReportItem(regionId=2, outletId=201, smartphones=4933, memoryCards=1308, notebooks=4011, total=10252)

2022-04-29 01:28:14.250  INFO 7381 --- [       main] r.u.s.ReportImportListener           : SalesReportItem(regionId=2, outletId=202, smartphones=7868, memoryCards=2806, notebooks=3763, total=14437)

2022-04-29 01:28:14.250  INFO 7381 --- [       main] r.u.s.ReportImportListener               : SalesReportItem(regionId=2, outletId=203, smartphones=6836, memoryCards=5318, notebooks=9563, total=21717)

2022-04-29 01:28:14.250  INFO 7381 --- [       main] r.u.s.ReportImportListener               : SalesReportItem(regionId=2, outletId=205, smartphones=7362, memoryCards=4139, notebooks=5800, total=17301)

2022-04-29 01:28:14.250  INFO 7381 --- [       main] r.u.s.ReportImportListener               : SalesReportItem(regionId=2, outletId=206, smartphones=4729, memoryCards=554, notebooks=7194, total=12477)

2022-04-29 01:28:14.250  INFO 7381 --- [       main] r.u.s.ReportImportListener               : SalesReportItem(regionId=2, outletId=207, smartphones=4112, memoryCards=3782, notebooks=8052, total=15946)

2022-04-29 01:28:14.250  INFO 7381 --- [       main] r.u.s.ReportImportListener               : SalesReportItem(regionId=2, outletId=208, smartphones=4146, memoryCards=5560, notebooks=2037, total=11743)

2022-04-29 01:28:14.250  INFO 7381 --- [       main] r.u.s.ReportImportListener               : SalesReportItem(regionId=2, outletId=209, smartphones=8435, memoryCards=2201, notebooks=4973, total=15609)

2022-04-29 01:28:14.250  INFO 7381 --- [       main] r.u.s.ReportImportListener               : SalesReportItem(regionId=2, outletId=210, smartphones=1424, memoryCards=9298, notebooks=4993, total=15715)

2022-04-29 01:28:14.252  INFO 7381 --- [       main] o.s.b.c.l.support.SimpleJobLauncher      : Job: [SimpleJob: [name=import-report-job]] completed with the following parameters: [{run.id=1}] and the following status: [COMPLETED] in 125ms

В сообщениях лога видно, что перед стартом задания таблица sales_report содержала 3 записи по точкам продаж 121, 122 и 233. Далее была выполнена очистка таблицы, загрузка отчёта и было выведено содержимое таблицы отчёта после загрузки. Видно, что данные по точкам 121, 122 и 233 отсутствуют, а также отсутствуют данные по точкам 101, 108 и 204, потому что они содержали некорректные значения и не были загружены.

Теперь попробуем изменить какую-нибудь строку во входном файле, чтобы ошибочных строк было больше трёх и запустить задание снова.

2022-04-29 01:29:11.070  INFO 7482 --- [       main] o.s.b.a.b.JobLauncherApplicationRunner   : Running default command line with: []

2022-04-29 01:29:11.114  INFO 7482 --- [       main] o.s.b.c.l.support.SimpleJobLauncher  : Job: [SimpleJob: [name=import-report-job]] launched with the following parameters: [{run.id=1}]

2022-04-29 01:29:11.128  INFO 7482 --- [       main] r.u.s.ReportImportListener               : Данные в таблице до загрузки отчёта

2022-04-29 01:29:11.149  INFO 7482 --- [       main] r.u.s.ReportImportListener               : SalesReportItem(regionId=1, outletId=121, smartphones=2365, memoryCards=4377, notebooks=9256, total=15998)

2022-04-29 01:29:11.149  INFO 7482 --- [       main] r.u.s.ReportImportListener               : SalesReportItem(regionId=1, outletId=122, smartphones=9589, memoryCards=4894, notebooks=4955, total=19438)

2022-04-29 01:29:11.149  INFO 7482 --- [       main] r.u.s.ReportImportListener               : SalesReportItem(regionId=2, outletId=233, smartphones=6836, memoryCards=5318, notebooks=9563, total=21717)

2022-04-29 01:29:11.156  INFO 7482 --- [       main] o.s.batch.core.job.SimpleStepHandler     : Executing step: [clear-report-table]

2022-04-29 01:29:11.164  INFO 7482 --- [       main] ru.usetech.springbatchdemo.BatchConfig   : Очистка таблицы sales_report

2022-04-29 01:29:11.170  INFO 7482 --- [       main] o.s.batch.core.step.AbstractStep         : Step: [clear-report-table] executed in 14ms

2022-04-29 01:29:11.177  INFO 7482 --- [       main] o.s.batch.core.job.SimpleStepHandler     : Executing step: [load-csv-file]

2022-04-29 01:29:11.241 ERROR 7482 --- [       main] o.s.batch.core.step.AbstractStep     : Encountered an error executing step load-csv-file in job import-report-job

 

org.springframework.batch.core.step.skip.SkipLimitExceededException: Skip limit of '3' exceeded

         at org.springframework.batch.core.step.skip.LimitCheckingItemSkipPolicy.shouldSkip(LimitCheckingItemSkipPolicy.java:133) ~[spring-batch-core-4.3.5.jar:4.3.5]

         at org.springframework.batch.core.step.skip.ExceptionClassifierSkipPolicy.shouldSkip(ExceptionClassifierSkipPolicy.java:70) ~[spring-batch-core-4.3.5.jar:4.3.5]

         at org.springframework.batch.core.step.item.FaultTolerantChunkProcessor.shouldSkip(FaultTolerantChunkProcessor.java:519) ~[spring-batch-core-4.3.5.jar:4.3.5]

         at org.springframework.batch.core.step.item.FaultTolerantChunkProcessor.access$500(FaultTolerantChunkProcessor.java:56) ~[spring-batch-core-4.3.5.jar:4.3.5]

         at org.springframework.batch.core.step.item.FaultTolerantChunkProcessor$2.recover(FaultTolerantChunkProcessor.java:289) ~[spring-batch-core-4.3.5.jar:4.3.5]

         at org.springframework.retry.support.RetryTemplate.handleRetryExhausted(RetryTemplate.java:539) ~[spring-retry-1.3.3.jar:na]

         at org.springframework.retry.support.RetryTemplate.doExecute(RetryTemplate.java:387) ~[spring-retry-1.3.3.jar:na]

         at org.springframework.retry.support.RetryTemplate.execute(RetryTemplate.java:255) ~[spring-retry-1.3.3.jar:na]

         at org.springframework.batch.core.step.item.BatchRetryTemplate.execute(BatchRetryTemplate.java:217) ~[spring-batch-core-4.3.5.jar:4.3.5]

         at org.springframework.batch.core.step.item.FaultTolerantChunkProcessor.transform(FaultTolerantChunkProcessor.java:308) ~[spring-batch-core-4.3.5.jar:4.3.5]

         at org.springframework.batch.core.step.item.SimpleChunkProcessor.process(SimpleChunkProcessor.java:210) ~[spring-batch-core-4.3.5.jar:4.3.5]

         at org.springframework.batch.core.step.item.ChunkOrientedTasklet.execute(ChunkOrientedTasklet.java:77) ~[spring-batch-core-4.3.5.jar:4.3.5]

         at org.springframework.batch.core.step.tasklet.TaskletStep$ChunkTransactionCallback.doInTransaction(TaskletStep.java:407) ~[spring-batch-core-4.3.5.jar:4.3.5]

         at org.springframework.batch.core.step.tasklet.TaskletStep$ChunkTransactionCallback.doInTransaction(TaskletStep.java:331) ~[spring-batch-core-4.3.5.jar:4.3.5]

         at org.springframework.transaction.support.TransactionTemplate.execute(TransactionTemplate.java:140) ~[spring-tx-5.3.19.jar:5.3.19]

         at org.springframework.batch.core.step.tasklet.TaskletStep$2.doInChunkContext(TaskletStep.java:273) ~[spring-batch-core-4.3.5.jar:4.3.5]

         at org.springframework.batch.core.scope.context.StepContextRepeatCallback.doInIteration(StepContextRepeatCallback.java:82) ~[spring-batch-core-4.3.5.jar:4.3.5]

         at org.springframework.batch.repeat.support.RepeatTemplate.getNextResult(RepeatTemplate.java:375) ~[spring-batch-infrastructure-4.3.5.jar:4.3.5]

         at org.springframework.batch.repeat.support.RepeatTemplate.executeInternal(RepeatTemplate.java:215) ~[spring-batch-infrastructure-4.3.5.jar:4.3.5]

         at org.springframework.batch.repeat.support.RepeatTemplate.iterate(RepeatTemplate.java:145) ~[spring-batch-infrastructure-4.3.5.jar:4.3.5]

         at org.springframework.batch.core.step.tasklet.TaskletStep.doExecute(TaskletStep.java:258) ~[spring-batch-core-4.3.5.jar:4.3.5]

         at org.springframework.batch.core.step.AbstractStep.execute(AbstractStep.java:208) ~[spring-batch-core-4.3.5.jar:4.3.5]

         at org.springframework.batch.core.job.SimpleStepHandler.handleStep(SimpleStepHandler.java:152) ~[spring-batch-core-4.3.5.jar:4.3.5]

         at org.springframework.batch.core.job.AbstractJob.handleStep(AbstractJob.java:413) ~[spring-batch-core-4.3.5.jar:4.3.5]

         at org.springframework.batch.core.job.SimpleJob.doExecute(SimpleJob.java:136) ~[spring-batch-core-4.3.5.jar:4.3.5]

         at org.springframework.batch.core.job.AbstractJob.execute(AbstractJob.java:320) ~[spring-batch-core-4.3.5.jar:4.3.5]

         at org.springframework.batch.core.launch.support.SimpleJobLauncher$1.run(SimpleJobLauncher.java:149) ~[spring-batch-core-4.3.5.jar:4.3.5]

         at org.springframework.core.task.SyncTaskExecutor.execute(SyncTaskExecutor.java:50) ~[spring-core-5.3.19.jar:5.3.19]

         at org.springframework.batch.core.launch.support.SimpleJobLauncher.run(SimpleJobLauncher.java:140) ~[spring-batch-core-4.3.5.jar:4.3.5]

         at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method) ~[na:na]

         at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) ~[na:na]

         at java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) ~[na:na]

         at java.base/java.lang.reflect.Method.invoke(Method.java:566) ~[na:na]

         at org.springframework.aop.support.AopUtils.invokeJoinpointUsingReflection(AopUtils.java:344) ~[spring-aop-5.3.19.jar:5.3.19]

         at org.springframework.aop.framework.ReflectiveMethodInvocation.invokeJoinpoint(ReflectiveMethodInvocation.java:198) ~[spring-aop-5.3.19.jar:5.3.19]

         at org.springframework.aop.framework.ReflectiveMethodInvocation.proceed(ReflectiveMethodInvocation.java:163) ~[spring-aop-5.3.19.jar:5.3.19]

         at org.springframework.batch.core.configuration.annotation.SimpleBatchConfiguration$PassthruAdvice.invoke(SimpleBatchConfiguration.java:128) ~[spring-batch-core-4.3.5.jar:4.3.5]

         at org.springframework.aop.framework.ReflectiveMethodInvocation.proceed(ReflectiveMethodInvocation.java:186) ~[spring-aop-5.3.19.jar:5.3.19]

         at org.springframework.aop.framework.JdkDynamicAopProxy.invoke(JdkDynamicAopProxy.java:215) ~[spring-aop-5.3.19.jar:5.3.19]

         at com.sun.proxy.$Proxy54.run(Unknown Source) ~[na:na]

         at org.springframework.boot.autoconfigure.batch.JobLauncherApplicationRunner.execute(JobLauncherApplicationRunner.java:199) ~[spring-boot-autoconfigure-2.6.7.jar:2.6.7]

         at org.springframework.boot.autoconfigure.batch.JobLauncherApplicationRunner.executeLocalJobs(JobLauncherApplicationRunner.java:173) ~[spring-boot-autoconfigure-2.6.7.jar:2.6.7]

         at org.springframework.boot.autoconfigure.batch.JobLauncherApplicationRunner.launchJobFromProperties(JobLauncherApplicationRunner.java:160) ~[spring-boot-autoconfigure-2.6.7.jar:2.6.7]

         at org.springframework.boot.autoconfigure.batch.JobLauncherApplicationRunner.run(JobLauncherApplicationRunner.java:155) ~[spring-boot-autoconfigure-2.6.7.jar:2.6.7]

         at org.springframework.boot.autoconfigure.batch.JobLauncherApplicationRunner.run(JobLauncherApplicationRunner.java:150) ~[spring-boot-autoconfigure-2.6.7.jar:2.6.7]

         at org.springframework.boot.SpringApplication.callRunner(SpringApplication.java:768) ~[spring-boot-2.6.7.jar:2.6.7]

         at org.springframework.boot.SpringApplication.callRunners(SpringApplication.java:758) ~[spring-boot-2.6.7.jar:2.6.7]

         at org.springframework.boot.SpringApplication.run(SpringApplication.java:310) ~[spring-boot-2.6.7.jar:2.6.7]

         at org.springframework.boot.SpringApplication.run(SpringApplication.java:1312) ~[spring-boot-2.6.7.jar:2.6.7]

         at org.springframework.boot.SpringApplication.run(SpringApplication.java:1301) ~[spring-boot-2.6.7.jar:2.6.7]

         at ru.usetech.springbatchdemo.SpringBatchDemoApplication.main(SpringBatchDemoApplication.java:10) ~[classes/:na]

Caused by: ru.usetech.springbatchdemo.IncorrectValueException: null

         at ru.usetech.springbatchdemo.BatchConfig.lambda$totalCalculatingProcessor$1(BatchConfig.java:84) ~[classes/:na]

         at org.springframework.batch.core.step.item.SimpleChunkProcessor.doProcess(SimpleChunkProcessor.java:134) ~[spring-batch-core-4.3.5.jar:4.3.5]

         at org.springframework.batch.core.step.item.FaultTolerantChunkProcessor$1.doWithRetry(FaultTolerantChunkProcessor.java:239) ~[spring-batch-core-4.3.5.jar:4.3.5]

         at org.springframework.retry.support.RetryTemplate.doExecute(RetryTemplate.java:329) ~[spring-retry-1.3.3.jar:na]

         ... 44 common frames omitted

2022-04-29 01:29:11.243  INFO 7482 --- [       main] o.s.batch.core.step.AbstractStep     : Step: [load-csv-file] executed in 66ms

2022-04-29 01:29:11.249  INFO 7482 --- [       main] o.s.b.c.l.support.SimpleJobLauncher      : Job: [SimpleJob: [name=import-report-job]] completed with the following parameters: [{run.id=1}] and the following status: [FAILED] in 124ms

В этот раз выполнение задания завершено с ошибкой Skip limit of '3' exceeded, что соответствует настройкам второго шага задания.

В этой статье показаны основы применения Spring Batch на примере пакетной загрузки отчёта. Множество других функций, таких как статистика выполнения заданий, диспетчеризация, управление транзакциями, перезапуск и пропуск заданий, распределённая обработка, интеграция с bpm системами не могут быть описаны в одной статье и требуют детального рассмотрения.

Теги:
Хабы:
Всего голосов 3: ↑1 и ↓2-1
Комментарии4

Публикации

Информация

Сайт
usetech.ru
Дата регистрации
Дата основания
Численность
501–1 000 человек
Местоположение
Россия
Представитель
Usetech