Stream Support
在许多情况下,应用程序数据是从流获取的。不建议将对流的引用作为消息有效负载发送到使用者。相反,将从输入流读取的数据创建消息,消息有效负载将逐个写入输出流。
In many cases, application data is obtained from a stream. It is not recommended sending a reference to a stream as a message payload to a consumer. Instead, messages are created from data that is read from an input stream, and message payloads are written to an output stream one by one.
你需要将此依赖项包含在你的项目中:
You need to include this dependency into your project:
-
Maven
-
Gradle
<dependency>
<groupId>org.springframework.integration</groupId>
<artifactId>spring-integration-stream</artifactId>
<version>{project-version}</version>
</dependency>
compile "org.springframework.integration:spring-integration-stream:{project-version}"
Reading from Streams
Spring Integration 提供了两个流适配器。ByteStreamReadingMessageSource
和 CharacterStreamReadingMessageSource
都实现了 MessageSource
。在通道适配器元素中配置其中一个,可以配置轮询周期,消息总线可以自动检测和调度它们。字节流版本需要一个 InputStream
,字符流版本需要一个 Reader
作为单个构造函数参数。ByteStreamReadingMessageSource
还接受 bytesPerMessage
属性,以确定它尝试写入每个 Message
的字节数。默认值为 1024
。以下示例创建了一个输入流,创建每个包含 2048 个字节的消息:
Spring Integration provides two adapters for streams.
Both ByteStreamReadingMessageSource
and CharacterStreamReadingMessageSource
implement MessageSource
.
By configuring one of these within a channel-adapter element, the polling period can be configured and the message bus can automatically detect and schedule them.
The byte stream version requires an InputStream
, and the character stream version requires a Reader
as the single constructor argument.
The ByteStreamReadingMessageSource
also accepts the 'bytesPerMessage' property to determine how many bytes it tries to read into each Message
.
The default value is 1024
.
The following example creates an input stream that creates messages that each contain 2048 bytes:
<bean class="org.springframework.integration.stream.ByteStreamReadingMessageSource">
<constructor-arg ref="someInputStream"/>
<property name="bytesPerMessage" value="2048"/>
</bean>
<bean class="org.springframework.integration.stream.CharacterStreamReadingMessageSource">
<constructor-arg ref="someReader"/>
</bean>
CharacterStreamReadingMessageSource
将读取器包装在一个 BufferedReader
中(如果它不是一个 BufferedReader
)。您可以在第二个构造函数参数中设置缓冲读取器使用的缓冲区大小。从版本 5.0 开始,第三个构造函数参数(blockToDetectEOF
)控制 CharacterStreamReadingMessageSource
的行为。当为 false
(默认值)时,receive()
方法检查读取器是否为 ready()
,如果不是,则返回 null。在这种情况下,不会检测到 EOF(文件结束)。当为 true
时,receive()
方法会阻塞,直到有数据可用或在基础流上检测到 EOF。检测到 EOF 时,会发布 StreamClosedEvent
(应用程序事件)。您可以使用实现了 ApplicationListener<StreamClosedEvent>
的 Bean 来使用这个事件。
The CharacterStreamReadingMessageSource
wraps the reader in a BufferedReader
(if it is not one already).
You can set the buffer size used by the buffered reader in the second constructor argument.
Starting with version 5.0, a third constructor argument (blockToDetectEOF
) controls the behavior of the CharacterStreamReadingMessageSource
.
When false
(the default), the receive()
method checks whether the reader is ready()
and returns null if not.
EOF (end of file) is not detected in this case.
When true
, the receive()
method blocks until data is available or EOF is detected on the underlying stream.
When EOF is detected, a StreamClosedEvent
(application event) is published.
You can consume this event with a bean that implements ApplicationListener<StreamClosedEvent>
.
为促进 EOF 检测,轮询线程在 |
To facilitate EOF detection, the poller thread blocks in the |
轮询在检测到 EOF 后继续在每次轮询中发布事件。应用程序侦听器可以停止适配器以防止这种情况发生。事件在轮询线程上发布。停止适配器会导致线程中断。如果你打算在停止适配器后执行一些可中断任务,那么你必须在其他线程上执行 stop()
或为这些下游活动使用其他线程。请注意,发送到 QueueChannel
是可中断的,所以,如果你想要从此侦听器发送消息,那么请在停止适配器之前执行此操作。
The poller continues to publish an event on each poll once EOF has been detected.
The application listener can stop the adapter to prevent this.
The event is published on the poller thread.
Stopping the adapter causes the thread to be interrupted.
If you intend to perform some interruptible task after stopping the adapter, you must either perform the stop()
on a different thread or use a different thread for those downstream activities.
Note that sending to a QueueChannel
is interruptible, so, if you wish to send a message from the listener, do it before stopping the adapter.
这促进了“管道
”或将数据重定向到 stdin
,如下面的两个示例所示:
This facilitates “piping” or redirecting data to stdin
, as the following two examples shows:
cat myfile.txt | java -jar my.jar
java -jar my.jar < foo.txt
此方法让应用程序在管道关闭时停止。
This approach lets the application stop when the pipe is closed.
提供了四个方便的工厂方法:
Four convenient factory methods are available:
public static final CharacterStreamReadingMessageSource stdin() { ... }
public static final CharacterStreamReadingMessageSource stdin(String charsetName) { ... }
public static final CharacterStreamReadingMessageSource stdinPipe() { ... }
public static final CharacterStreamReadingMessageSource stdinPipe(String charsetName) { ... }
Writing to Streams
对于目标流,可以使用以下两个实现中的任一个:ByteStreamWritingMessageHandler
或 CharacterStreamWritingMessageHandler
。每个实现都需要一个构造函数参数(OutputStream
用于字节流或 Writer
用于字符流),并且每个实现都提供一个添加可选的“bufferSize”的第二个构造函数。由于这两个实现最终都实现了 MessageHandler
接口,因此可以从 channel-adapter
配置中引用它们,如 Channel Adapter 中所述。
For target streams, you can use either of two implementations: ByteStreamWritingMessageHandler
or CharacterStreamWritingMessageHandler
.
Each requires a single constructor argument (OutputStream
for byte streams or Writer
for character streams), and each provides a second constructor that adds the optional 'bufferSize'.
Since both of these ultimately implement the MessageHandler
interface, you can reference them from a channel-adapter
configuration, as described in Channel Adapter.
<bean class="org.springframework.integration.stream.ByteStreamWritingMessageHandler">
<constructor-arg ref="someOutputStream"/>
<constructor-arg value="1024"/>
</bean>
<bean class="org.springframework.integration.stream.CharacterStreamWritingMessageHandler">
<constructor-arg ref="someWriter"/>
</bean>
Stream Namespace Support
Spring Integration 定义了一个命名空间来减少流相关通道适配器所需的配置。使用它需要以下模式位置:
Spring Integration defines a namespace to reduce the configuration needed for stream-related channel adapters. The following schema locations are needed to use it:
<?xml version="1.0" encoding="UTF-8"?>
<beans:beans xmlns:int-stream="http://www.springframework.org/schema/integration/stream"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xmlns:beans="http://www.springframework.org/schema/beans"
xsi:schemaLocation="http://www.springframework.org/schema/beans
https://www.springframework.org/schema/beans/spring-beans.xsd
http://www.springframework.org/schema/integration/stream
https://www.springframework.org/schema/integration/stream/spring-integration-stream.xsd">
以下代码片段显示了支持的不同配置选项,用于配置入站通道适配器:
The following code snippet shows the different configuration options that are supported to configure the inbound channel adapter:
<int-stream:stdin-channel-adapter id="adapterWithDefaultCharset"/>
<int-stream:stdin-channel-adapter id="adapterWithProvidedCharset" charset="UTF-8"/>
从 5.0 版本开始,可以设置 detect-eof
属性,它设置 blockToDetectEOF
属性。有关详细信息,请参见 Reading from Streams。
Starting with version 5.0, you can set the detect-eof
attribute, which sets the blockToDetectEOF
property.
See Reading from Streams for more information.
要配置出站通道适配器,您也可以使用命名空间支持。以下示例显示出站通道适配器的不同配置:
To configure the outbound channel adapter, you can use the namespace support as well. The following example shows the different configuration for an outbound channel adapters:
<int-stream:stdout-channel-adapter id="stdoutAdapterWithDefaultCharset"
channel="testChannel"/>
<int-stream:stdout-channel-adapter id="stdoutAdapterWithProvidedCharset" charset="UTF-8"
channel="testChannel"/>
<int-stream:stderr-channel-adapter id="stderrAdapter" channel="testChannel"/>
<int-stream:stdout-channel-adapter id="newlineAdapter" append-newline="true"
channel="testChannel"/>