Stream Support

在许多情况下,应用程序数据是从流获取的。不建议将对流的引用作为消息有效负载发送到使用者。相反,将从输入流读取的数据创建消息,消息有效负载将逐个写入输出流。

In many cases, application data is obtained from a stream. It is not recommended sending a reference to a stream as a message payload to a consumer. Instead, messages are created from data that is read from an input stream, and message payloads are written to an output stream one by one.

你需要将此依赖项包含在你的项目中:

You need to include this dependency into your project:

  • Maven

  • Gradle

<dependency>
    <groupId>org.springframework.integration</groupId>
    <artifactId>spring-integration-stream</artifactId>
    <version>{project-version}</version>
</dependency>
compile "org.springframework.integration:spring-integration-stream:{project-version}"

Reading from Streams

Spring Integration 提供了两个流适配器。ByteStreamReadingMessageSourceCharacterStreamReadingMessageSource 都实现了 MessageSource。在通道适配器元素中配置其中一个,可以配置轮询周期,消息总线可以自动检测和调度它们。字节流版本需要一个 InputStream,字符流版本需要一个 Reader 作为单个构造函数参数。ByteStreamReadingMessageSource 还接受 bytesPerMessage 属性,以确定它尝试写入每个 Message 的字节数。默认值为 1024。以下示例创建了一个输入流,创建每个包含 2048 个字节的消息:

Spring Integration provides two adapters for streams. Both ByteStreamReadingMessageSource and CharacterStreamReadingMessageSource implement MessageSource. By configuring one of these within a channel-adapter element, the polling period can be configured and the message bus can automatically detect and schedule them. The byte stream version requires an InputStream, and the character stream version requires a Reader as the single constructor argument. The ByteStreamReadingMessageSource also accepts the 'bytesPerMessage' property to determine how many bytes it tries to read into each Message. The default value is 1024. The following example creates an input stream that creates messages that each contain 2048 bytes:

<bean class="org.springframework.integration.stream.ByteStreamReadingMessageSource">
  <constructor-arg ref="someInputStream"/>
  <property name="bytesPerMessage" value="2048"/>
</bean>

<bean class="org.springframework.integration.stream.CharacterStreamReadingMessageSource">
  <constructor-arg ref="someReader"/>
</bean>

CharacterStreamReadingMessageSource 将读取器包装在一个 BufferedReader 中(如果它不是一个 BufferedReader)。您可以在第二个构造函数参数中设置缓冲读取器使用的缓冲区大小。从版本 5.0 开始,第三个构造函数参数(blockToDetectEOF)控制 CharacterStreamReadingMessageSource 的行为。当为 false(默认值)时,receive() 方法检查读取器是否为 ready(),如果不是,则返回 null。在这种情况下,不会检测到 EOF(文件结束)。当为 true 时,receive() 方法会阻塞,直到有数据可用或在基础流上检测到 EOF。检测到 EOF 时,会发布 StreamClosedEvent(应用程序事件)。您可以使用实现了 ApplicationListener<StreamClosedEvent> 的 Bean 来使用这个事件。

The CharacterStreamReadingMessageSource wraps the reader in a BufferedReader (if it is not one already). You can set the buffer size used by the buffered reader in the second constructor argument. Starting with version 5.0, a third constructor argument (blockToDetectEOF) controls the behavior of the CharacterStreamReadingMessageSource. When false (the default), the receive() method checks whether the reader is ready() and returns null if not. EOF (end of file) is not detected in this case. When true, the receive() method blocks until data is available or EOF is detected on the underlying stream. When EOF is detected, a StreamClosedEvent (application event) is published. You can consume this event with a bean that implements ApplicationListener<StreamClosedEvent>.

为促进 EOF 检测,轮询线程在 receive() 方法中阻塞,直到数据到达或检测到 EOF。

To facilitate EOF detection, the poller thread blocks in the receive() method until either data arrives or EOF is detected.

轮询在检测到 EOF 后继续在每次轮询中发布事件。应用程序侦听器可以停止适配器以防止这种情况发生。事件在轮询线程上发布。停止适配器会导致线程中断。如果你打算在停止适配器后执行一些可中断任务,那么你必须在其他线程上执行 stop() 或为这些下游活动使用其他线程。请注意,发送到 QueueChannel 是可中断的,所以,如果你想要从此侦听器发送消息,那么请在停止适配器之前执行此操作。

The poller continues to publish an event on each poll once EOF has been detected. The application listener can stop the adapter to prevent this. The event is published on the poller thread. Stopping the adapter causes the thread to be interrupted. If you intend to perform some interruptible task after stopping the adapter, you must either perform the stop() on a different thread or use a different thread for those downstream activities. Note that sending to a QueueChannel is interruptible, so, if you wish to send a message from the listener, do it before stopping the adapter.

这促进了“管道”或将数据重定向到 stdin,如下面的两个示例所示:

This facilitates “piping” or redirecting data to stdin, as the following two examples shows:

cat myfile.txt | java -jar my.jar
java -jar my.jar < foo.txt

此方法让应用程序在管道关闭时停止。

This approach lets the application stop when the pipe is closed.

提供了四个方便的工厂方法:

Four convenient factory methods are available:

public static final CharacterStreamReadingMessageSource stdin() { ... }

public static final CharacterStreamReadingMessageSource stdin(String charsetName) { ... }

public static final CharacterStreamReadingMessageSource stdinPipe() { ... }

public static final CharacterStreamReadingMessageSource stdinPipe(String charsetName) { ... }

Writing to Streams

对于目标流,可以使用以下两个实现中的任一个:ByteStreamWritingMessageHandlerCharacterStreamWritingMessageHandler。每个实现都需要一个构造函数参数(OutputStream 用于字节流或 Writer 用于字符流),并且每个实现都提供一个添加可选的“bufferSize”的第二个构造函数。由于这两个实现最终都实现了 MessageHandler 接口,因此可以从 channel-adapter 配置中引用它们,如 Channel Adapter 中所述。

For target streams, you can use either of two implementations: ByteStreamWritingMessageHandler or CharacterStreamWritingMessageHandler. Each requires a single constructor argument (OutputStream for byte streams or Writer for character streams), and each provides a second constructor that adds the optional 'bufferSize'. Since both of these ultimately implement the MessageHandler interface, you can reference them from a channel-adapter configuration, as described in Channel Adapter.

<bean class="org.springframework.integration.stream.ByteStreamWritingMessageHandler">
  <constructor-arg ref="someOutputStream"/>
  <constructor-arg value="1024"/>
</bean>

<bean class="org.springframework.integration.stream.CharacterStreamWritingMessageHandler">
  <constructor-arg ref="someWriter"/>
</bean>

Stream Namespace Support

Spring Integration 定义了一个命名空间来减少流相关通道适配器所需的配置。使用它需要以下模式位置:

Spring Integration defines a namespace to reduce the configuration needed for stream-related channel adapters. The following schema locations are needed to use it:

<?xml version="1.0" encoding="UTF-8"?>
<beans:beans xmlns:int-stream="http://www.springframework.org/schema/integration/stream"
  xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
  xmlns:beans="http://www.springframework.org/schema/beans"
  xsi:schemaLocation="http://www.springframework.org/schema/beans
      https://www.springframework.org/schema/beans/spring-beans.xsd
      http://www.springframework.org/schema/integration/stream
      https://www.springframework.org/schema/integration/stream/spring-integration-stream.xsd">

以下代码片段显示了支持的不同配置选项,用于配置入站通道适配器:

The following code snippet shows the different configuration options that are supported to configure the inbound channel adapter:

<int-stream:stdin-channel-adapter id="adapterWithDefaultCharset"/>

<int-stream:stdin-channel-adapter id="adapterWithProvidedCharset" charset="UTF-8"/>

从 5.0 版本开始,可以设置 detect-eof 属性,它设置 blockToDetectEOF 属性。有关详细信息,请参见 Reading from Streams

Starting with version 5.0, you can set the detect-eof attribute, which sets the blockToDetectEOF property. See Reading from Streams for more information.

要配置出站通道适配器,您也可以使用命名空间支持。以下示例显示出站通道适配器的不同配置:

To configure the outbound channel adapter, you can use the namespace support as well. The following example shows the different configuration for an outbound channel adapters:

<int-stream:stdout-channel-adapter id="stdoutAdapterWithDefaultCharset"
    channel="testChannel"/>

<int-stream:stdout-channel-adapter id="stdoutAdapterWithProvidedCharset" charset="UTF-8"
    channel="testChannel"/>

<int-stream:stderr-channel-adapter id="stderrAdapter" channel="testChannel"/>

<int-stream:stdout-channel-adapter id="newlineAdapter" append-newline="true"
    channel="testChannel"/>