Chris's Blog

Keep Walking......

The Foundation of Spring Batch

Spring Batch是埃森哲贡献给Spring的一个开源项目,现在由双方共同维护。通过Spring Batch可以构建出轻量级的大数据量并⾏处理应⽤,⽀持事务、并发、流程、监控、纵向和横向扩展,提供统一的接口管理。Spring Batch并不包含scheduler,它只是一个通用的batch处理框架,你可以通过QuartZ,Control-M等scheduler去调用。

Spring Batch能够处理大批量数据的导入、导出和业务逻辑计算,执行过程无需人工干预。我们的系统是一个金融产品信息的数据中心,需要从不同的系统读取产品信息,也需要将这些信息提供给其它的系统,这种data loading就是Spring Batch的一种应用场景。

Spring Batch Common Process

上图就是Spring Batch的常用方式,从DB或是不同类型的文件中读取数据,经过转换后再写入DB或者文件。

Spring Batch Domain

上图是一个Job执行的结构图,下面是对这些domain对象的简单描述,只是学习时随笔记下来的,不够详细,可以查看Spring Batch文档中的描述。

Job

Job是对一个batch处理流程的定义,可以简单理解为Spring中的一个Job配置,一般都包括reader和writer。

JobInstance

JobInstance是Job运行时产生的实例,如果这个Job是按天执行的,则每天都会创建一个JobInstance;当Job执行失败时,restart这个Job将会重用JobInstance,以便于从执行失败的地方开始重新执行。

JobParameter

JobParameter用于标识一个JobInstance,可以将它理解为JobInstance的ID。

JobListener

JobListener可以监听JobInstance生命周期中的两个事件:

beforeJob - 在Job执行前被调用。

afterJob - 不论Job执行成功或失败都会被调用,Job的status可以从JobExecution中获取。

Parent Job

对于相似的Job,可以取出相同的部分定义成一个parent Job,只需要在Job定义时添加abstract=”true”,然后在sub Job中通过parent属性来指定parent Job。

JobRepository

MapJobRepository通常用于测试环境或是standalone的batch。它不够稳定;不允许在不同JVM实例之间执行restart;也不能保证有相同JobParameter的两个JobInstance同时运行;它不适合在多线程Job或本地的partition step中使用。但是它仍然需要配置transaction manager,因为在Job的实现中会涉及到rollback的问题,所以可以用ResourcelessTransactionManager来代替。

JobLauncher

JobLauncher使用spring的taskExecutor来实现异步处理,只需要在配置中指定taskExecutor属性。如果batch是通过http request的方式触发的,那应当使用异步的方式来处理Job以避免长时间的占用链接。

JobOperator

JobOperator的stop()不会立刻停止一个Job,如果当前流程处理的控制权在framework,则会将StepExecution的status设置为STOPPED并保存,然后按正常的处理流程结束Job。

如果不希望一个restartable的Job在执行失败后restart,可以将status设置为ABORTED。

JobParametersIncrementer

JobParametersIncrementer可强制创建一个新的jobInstance,以避免在使用相同JobParameter时不可以再次执行Job。这只是个接口,需要自己去实现getNext(),然后在Job中定义incremented属性来引用它。

Step

start-limit: 用于step的restart,用来控制一个start的次数。默认是Integer.MAX_VALUE。
allow-start-if-complete: 强制执行step,不管之前执行成功或失败。

Skip

通过指定exception来决定是否skip有问题的数据,可以结合include和exclude来定义exception列表。

skip-limit:用来控制允许skip的数据的最大数量,在step execution中分别保存有针对read,process和write中skip的数量。

Retry

通过指定exception来指定是否允许重试当前有问题的错误数据,对于一些通过重试可以解决问题的数据是非常有用的,比如更新当前数据到DB时,该条数据被其它进程lock了,则在重试时可能就可以正常更新了。

retry-limit:用来控制每个item允许重试的次数。

Rollback

通过指定exception来忽略rollback操作。对于Skip和Retry,如果Exception是由ItemWriter抛出的,则step中被当前transaction控制的数据会被rollback,因此要配合使用no-rollback-exception-classes来决定是否应该执行rollback。

Step通常会缓存Reader读入的数据,如果发生了rollback则不需要重新读入数据,但是对于一些基于transaction资源的Reader,比如从JMS queue中读取数据的Reader,JMS message也会执行rollback,则需要通过is-reader-transactional-queue来标识不需要缓存读入数据。

ItemStream

在Step执行失败需要restart时,可以通过ItemStream获取存储在execution之间状态信息。如果ItemReader,ItemProcessor或者ItemWriter实现了ItemStream接口,则会自动被注册在Spring Context中;否则需要单独注册streams。对于CompositeItemWriter,如果delegate的ItemWriter实现了ItemStream接口,也需要主动注册。

StepListener

和ItemStream一样,如果ItemReader,ItemProcessor或者ItemWriter实现了StepListener接口,则会被自动注册。

StepExecutionListener

beforeStep - 在step执行之前调用。
afterStep - 在step结束时调用,不管执行成功或失败。可以在这里更改ExitStatus。

ChunkListener

beforeChunk - 被调用在transaction开始后,但在ItemReader的read方法执行前。
afterChunk - chunk被commit/rollback后被调用。

ChunkListener也可被用在未使用chunk方式的step中,比如Tasklet, 会在tasklet执行前后被调用。

ItemReadListener

beforeRead - 在read方法执行前被调用。
afterRead - 在read方法执行成功后被调用,并返回读到的item作为参数。
onReadError - 在read方法出现异常时被调用,并提供异常的类型作为参数。

ItemProcessListener

beforeProcess - 在ItemProcessor的process方法执行前被调用。
afterProcess - 在process方法执行成功后被调用。
onProcessError - 在process方法出现异常时被调用,并提供item和异常作为参数。

ItemWriteListener

beforeWrite - 在ItemWriter的write方法执行前被调用。
afterWrite - 在ItemWriter的write方法执行成功后被调用。
onWriteError - 在ItemWriter的write方法出现异常时被调用,并提供chunk data和异常作为参数。

SkipListener

onSkipInRead - 当item在读取阶段skip时被调用。
onSkipInProcess - 当item在process阶段skip时被调用。
onSkipInWrite - 当item在写入阶段skip时被调用,并且在transaction被commit之前。

Batch Status vs. Exit Status

Conditional Flow的配置中的on属性使用Exit Status,通常情况下Batch Status和Exit Status是一样的,但StepExecutionListener可以更改Exit Status。

Step Scope

使用Job和Step的属性延迟绑定特性时,必须将Bean的scope设置为step。使用该属性可以通过定义Spring Batch的namespace或者定义StepScope。

applicationContext.xml
1
   <bean class="org.springframework.batch.core.scope.StepScope" />

Resources

Official site: http://www.springsource.org/spring-batch

IBM DeveloperWorks - 使用 Spring Batch 构建企业级批处理应用:
- http://www.ibm.com/developerworks/cn/java/j-lo-springbatch1/
- http://www.ibm.com/developerworks/cn/java/j-lo-springbatch2/

Comments