Item processing

ItemReader and ItemWriter interfaces对于它们特定的任务都非常有用,但是如果你想在写入之前插入业务逻辑怎么办?读写的一个选项是使用复合模式:创建一个 ItemWriter`包含另一个 `ItemWriter`或一个 `ItemReader`包含另一个 `ItemReader。以下代码展示了一个示例:

The ItemReader and ItemWriter interfaces are both very useful for their specific tasks, but what if you want to insert business logic before writing? One option for both reading and writing is to use the composite pattern: Create an ItemWriter that contains another ItemWriter or an ItemReader that contains another ItemReader. The following code shows an example:

public class CompositeItemWriter<T> implements ItemWriter<T> {

    ItemWriter<T> itemWriter;

    public CompositeItemWriter(ItemWriter<T> itemWriter) {
        this.itemWriter = itemWriter;
    }

    public void write(Chunk<? extends T> items) throws Exception {
        //Add business logic here
       itemWriter.write(items);
    }

    public void setDelegate(ItemWriter<T> itemWriter){
        this.itemWriter = itemWriter;
    }
}

前面的类包含另一个 ItemWriter,它在提供了一些业务逻辑之后对其进行委托。此模式也可以轻松地用于 ItemReader,可能基于主 ItemReader 提供的输入获取更多引用数据。如果您需要自己控制对 write 的调用,它也很有用。但是,如果您只是希望在实际写入之前“转换”传入以进行写入的项目,则不必自己 write。您只需修改该项即可。对于此场景,Spring Batch 提供了 ItemProcessor 接口,如下面的接口定义所示:

The preceding class contains another ItemWriter to which it delegates after having provided some business logic. This pattern could easily be used for an ItemReader as well, perhaps to obtain more reference data based on the input that was provided by the main ItemReader. It is also useful if you need to control the call to write yourself. However, if you only want to “transform” the item passed in for writing before it is actually written, you need not write yourself. You can just modify the item. For this scenario, Spring Batch provides the ItemProcessor interface, as the following interface definition shows:

public interface ItemProcessor<I, O> {

    O process(I item) throws Exception;
}

ItemProcessor 很简单。给定一个对象,对其进行转换并返回另一个对象。提供对象可以是相同类型,也可以不是相同类型。关键在于可以在进程中应用业务逻辑,并且创建该逻辑完全由开发人员决定。ItemProcessor 可以直接连接到步骤。例如,假设 ItemReader 提供 Foo 类型的类,并且在写出之前需要将其转换为 Bar 类型。以下示例显示了执行转换的 ItemProcessor

An ItemProcessor is simple. Given one object, transform it and return another. The provided object may or may not be of the same type. The point is that business logic may be applied within the process, and it is completely up to the developer to create that logic. An ItemProcessor can be wired directly into a step. For example, assume an ItemReader provides a class of type Foo and that it needs to be converted to type Bar before being written out. The following example shows an ItemProcessor that performs the conversion:

public class Foo {}

public class Bar {
    public Bar(Foo foo) {}
}

public class FooProcessor implements ItemProcessor<Foo, Bar> {
    public Bar process(Foo foo) throws Exception {
        //Perform simple transformation, convert a Foo to a Bar
        return new Bar(foo);
    }
}

public class BarWriter implements ItemWriter<Bar> {
    public void write(Chunk<? extends Bar> bars) throws Exception {
        //write bars
    }
}

在前面的示例中,有一个名为 Foo 的类,一个名为 Bar 的类,以及一个名为 FooProcessor 的类,该类遵守 ItemProcessor 接口。转换很简单,但是可以在此处进行任何类型的转换。BarWriter 编写 Bar 对象,如果提供了任何其他类型,则抛出异常。同样,如果提供了除 Foo 之外的任何内容,FooProcessor 也会抛出异常。然后可以将 FooProcessor 注入到 Step 中,如下面的示例所示:

In the preceding example, there is a class named Foo, a class named Bar, and a class named FooProcessor that adheres to the ItemProcessor interface. The transformation is simple, but any type of transformation could be done here. The BarWriter writes Bar objects, throwing an exception if any other type is provided. Similarly, the FooProcessor throws an exception if anything but a Foo is provided. The FooProcessor can then be injected into a Step, as the following example shows:

  • Java

  • XML

Java Configuration
@Bean
public Job ioSampleJob(JobRepository jobRepository, Step step1) {
	return new JobBuilder("ioSampleJob", jobRepository)
				.start(step1)
				.build();
}

@Bean
public Step step1(JobRepository jobRepository, PlatformTransactionManager transactionManager) {
	return new StepBuilder("step1", jobRepository)
				.<Foo, Bar>chunk(2, transactionManager)
				.reader(fooReader())
				.processor(fooProcessor())
				.writer(barWriter())
				.build();
}
XML Configuration
<job id="ioSampleJob">
    <step name="step1">
        <tasklet>
            <chunk reader="fooReader" processor="fooProcessor" writer="barWriter"
                   commit-interval="2"/>
        </tasklet>
    </step>
</job>

ItemProcessorItemReaderItemWriter 之间的一个区别在于,ItemProcessor 对于 Step 是可选的。

A difference between ItemProcessor and ItemReader or ItemWriter is that an ItemProcessor is optional for a Step.

Chaining ItemProcessors

在许多场景中,执行单个转换很有用,但是如果您想将多个 ItemProcessor 实现“链接”在一起,该怎么办?您可以使用前面提到的复合模式来做到这一点。为了更新前面的单个转换示例,Foo 转换为 Bar,而 Bar 转换为 Foobar 并写出,如下面的示例所示:

Performing a single transformation is useful in many scenarios, but what if you want to “chain” together multiple ItemProcessor implementations? You can do so by using the composite pattern mentioned previously. To update the previous, single transformation, example, Foo is transformed to Bar, which is transformed to Foobar and written out, as the following example shows:

public class Foo {}

public class Bar {
    public Bar(Foo foo) {}
}

public class Foobar {
    public Foobar(Bar bar) {}
}

public class FooProcessor implements ItemProcessor<Foo, Bar> {
    public Bar process(Foo foo) throws Exception {
        //Perform simple transformation, convert a Foo to a Bar
        return new Bar(foo);
    }
}

public class BarProcessor implements ItemProcessor<Bar, Foobar> {
    public Foobar process(Bar bar) throws Exception {
        return new Foobar(bar);
    }
}

public class FoobarWriter implements ItemWriter<Foobar>{
    public void write(Chunk<? extends Foobar> items) throws Exception {
        //write items
    }
}

可以将 FooProcessorBarProcessor '链接' 在一起以给出 Foobar 结果,如以下示例所示:

A FooProcessor and a BarProcessor can be 'chained' together to give the resultant Foobar, as shown in the following example:

CompositeItemProcessor<Foo,Foobar> compositeProcessor =
                                      new CompositeItemProcessor<Foo,Foobar>();
List itemProcessors = new ArrayList();
itemProcessors.add(new FooProcessor());
itemProcessors.add(new BarProcessor());
compositeProcessor.setDelegates(itemProcessors);

与前面的示例一样,您可以将复合处理器配置到 Step 中:

Just as with the previous example, you can configure the composite processor into the Step:

  • Java

  • XML

Java Configuration
@Bean
public Job ioSampleJob(JobRepository jobRepository, Step step1) {
	return new JobBuilder("ioSampleJob", jobRepository)
				.start(step1)
				.build();
}

@Bean
public Step step1(JobRepository jobRepository, PlatformTransactionManager transactionManager) {
	return new StepBuilder("step1", jobRepository)
				.<Foo, Foobar>chunk(2, transactionManager)
				.reader(fooReader())
				.processor(compositeProcessor())
				.writer(foobarWriter())
				.build();
}

@Bean
public CompositeItemProcessor compositeProcessor() {
	List<ItemProcessor> delegates = new ArrayList<>(2);
	delegates.add(new FooProcessor());
	delegates.add(new BarProcessor());

	CompositeItemProcessor processor = new CompositeItemProcessor();

	processor.setDelegates(delegates);

	return processor;
}
XML Configuration
<job id="ioSampleJob">
    <step name="step1">
        <tasklet>
            <chunk reader="fooReader" processor="compositeItemProcessor" writer="foobarWriter"
                   commit-interval="2"/>
        </tasklet>
    </step>
</job>

<bean id="compositeItemProcessor"
      class="org.springframework.batch.item.support.CompositeItemProcessor">
    <property name="delegates">
        <list>
            <bean class="..FooProcessor" />
            <bean class="..BarProcessor" />
        </list>
    </property>
</bean>

Filtering Records

项目处理器的典型用途之一是在将记录传递给 ItemWriter 之前将其过滤掉。过滤是一个不同于跳过的操作。跳过表示记录无效,而过滤表示不应写入记录。

One typical use for an item processor is to filter out records before they are passed to the ItemWriter. Filtering is an action distinct from skipping. Skipping indicates that a record is invalid, while filtering indicates that a record should not be written.

例如,考虑一个读取包含三种不同类型的记录的文件的批处理作业:要插入的记录、要更新的记录和要删除的记录。如果系统不支持记录删除,则我们不希望将任何可删除记录发送到 ItemWriter。但是,由于这些记录实际上不是错误记录,因此我们希望对其进行筛选而不是跳过它们。结果,ItemWriter 将仅接收可插入和可更新记录。

For example, consider a batch job that reads a file containing three different types of records: records to insert, records to update, and records to delete. If record deletion is not supported by the system, we would not want to send any deletable records to the ItemWriter. However, since these records are not actually bad records, we would want to filter them out rather than skip them. As a result, the ItemWriter would receive only insertable and updatable records.

要筛选记录,您可以从 ItemProcessor 返回 null。框架检测到结果为 null,并避免将该项目添加到传递给 ItemWriter 的记录列表中。从 ItemProcessor 引发的异常导致跳过。

To filter a record, you can return null from the ItemProcessor. The framework detects that the result is null and avoids adding that item to the list of records delivered to the ItemWriter. An exception thrown from the ItemProcessor results in a skip.

Validating Input

ItemReaders and ItemWriters章节讨论了多种解析输入的方法。每个主要的实现如果 “well formed.” 都会抛出异常。FixedLengthTokenizer 在缺少数据范围时会抛出异常。类似地,尝试访问 RowMapper`或 `FieldSetMapper`中不存在的索引或与预期不同的索引会导致抛出异常。所有这些类型的异常都在 `read 返回之前抛出。但是,它们并没有解决已返回项目是否有效的问题。例如,如果某个字段是年龄,则不能为负。它可以正确解析,因为它存在并且是一个数字,但它不会导致异常。由于已经有很多验证框架,Spring Batch 不会尝试再提供一个。相反,它提供了一个简单的接口,称为 Validator,你可以通过任意数量的框架来实现它,如下面的接口定义所示:

The ItemReaders and ItemWriters chapter discusses multiple approaches to parsing input. Each major implementation throws an exception if it is not “well formed.” The FixedLengthTokenizer throws an exception if a range of data is missing. Similarly, attempting to access an index in a RowMapper or FieldSetMapper that does not exist or is in a different format than the one expected causes an exception to be thrown. All of these types of exceptions are thrown before read returns. However, they do not address the issue of whether or not the returned item is valid. For example, if one of the fields is an age, it cannot be negative. It may parse correctly, because it exists and is a number, but it does not cause an exception. Since there are already a plethora of validation frameworks, Spring Batch does not attempt to provide yet another. Rather, it provides a simple interface, called Validator, that you can implement by any number of frameworks, as the following interface definition shows:

public interface Validator<T> {

    void validate(T value) throws ValidationException;

}

契约是,如果对象无效,validate 方法会抛出异常;如果对象有效,则正常返回。Spring Batch 提供了一个 ValidatingItemProcessor,如下面的 bean 定义所示:

The contract is that the validate method throws an exception if the object is invalid and returns normally if it is valid. Spring Batch provides an ValidatingItemProcessor, as the following bean definition shows:

  • Java

  • XML

Java Configuration
@Bean
public ValidatingItemProcessor itemProcessor() {
	ValidatingItemProcessor processor = new ValidatingItemProcessor();

	processor.setValidator(validator());

	return processor;
}

@Bean
public SpringValidator validator() {
	SpringValidator validator = new SpringValidator();

	validator.setValidator(new TradeValidator());

	return validator;
}
XML Configuration
<bean class="org.springframework.batch.item.validator.ValidatingItemProcessor">
    <property name="validator" ref="validator" />
</bean>

<bean id="validator" class="org.springframework.batch.item.validator.SpringValidator">
	<property name="validator">
		<bean class="org.springframework.batch.samples.domain.trade.internal.validator.TradeValidator"/>
	</property>
</bean>

您还可以使用 BeanValidatingItemProcessor 来验证用 Bean Validation API (JSR-303) 注释注释的项目。例如,考虑以下类型 Person

You can also use the BeanValidatingItemProcessor to validate items annotated with the Bean Validation API (JSR-303) annotations. For example, consider the following type Person:

class Person {

    @NotEmpty
    private String name;

    public Person(String name) {
     this.name = name;
    }

    public String getName() {
     return name;
    }

    public void setName(String name) {
     this.name = name;
    }

}

您可以通过在应用程序上下文中声明 BeanValidatingItemProcessor bean 并将其注册为分块导向步骤中的处理器来验证项目:

You can validate items by declaring a BeanValidatingItemProcessor bean in your application context and register it as a processor in your chunk-oriented step:

@Bean
public BeanValidatingItemProcessor<Person> beanValidatingItemProcessor() throws Exception {
    BeanValidatingItemProcessor<Person> beanValidatingItemProcessor = new BeanValidatingItemProcessor<>();
    beanValidatingItemProcessor.setFilter(true);

    return beanValidatingItemProcessor;
}

Fault Tolerance

当区块回滚时,在读取期间已缓存的项可能会被重新处理。如果某个步骤被配置成容错(通常通过使用跳过或重试处理),则任何已使用的 ItemProcessor 都应以幂等的的方式实现。通常,这不包括对 ItemProcessor 的输入项执行任何变更,而只更新结果实例。

When a chunk is rolled back, items that have been cached during reading may be reprocessed. If a step is configured to be fault-tolerant (typically by using skip or retry processing), any ItemProcessor used should be implemented in a way that is idempotent. Typically that would consist of performing no changes on the input item for the ItemProcessor and updating only the instance that is the result.