ZeroMQ 支持

Spring Integration 提供了组件来支持应用程序中的 ZeroMQ 通信。 该实现基于 JeroMQ 库的良好支持的 Java API。 所有组件都封装了 ZeroMQ 套接字生命周期并在内部管理它们的线程,使得与这些组件的交互是无锁和线程安全的。 你需要将此依赖项添加到你的项目中:

  • Maven

  • Gradle

<dependency>
    <groupId>org.springframework.integration</groupId>
    <artifactId>spring-integration-zeromq</artifactId>
    <version>{project-version}</version>
</dependency>
compile "org.springframework.integration:spring-integration-zeromq:{project-version}"

ZeroMQ 代理

ZeroMqProxy 是内置 ZMQ.proxy() 函数 的 Spring 友好型封装。 它封装了套接字生命周期和线程管理。 此代理的客户端仍然可以使用标准的 ZeroMQ 套接字连接和交互 API。 除了标准的 ZContext,它还需要一种众所周知的 ZeroMQ 代理模式:SUB/PUB、PULL/PUSH 或 ROUTER/DEALER。 这样,就为代理的前端和后端使用了适当的 ZeroMQ 套接字类型对。 详见 ZeroMqProxy.Type

ZeroMqProxy 实现了 SmartLifecycle,用于创建、绑定和配置套接字,并在 Executor(如果有)的专用线程中启动 ZMQ.proxy()。 前端和后端套接字的绑定通过 tcp:// 协议在所有可用的网络接口上使用提供的端口完成。 否则,它们将绑定到随机端口,这些端口稍后可以通过相应的 getFrontendPort()getBackendPort() API 方法获取。

控制套接字作为 SocketType.PAIR 暴露,使用 "inproc://" + beanName + ".control" 地址进行线程间传输;它可以通过 getControlAddress() 获取。 它应该与同一应用程序中的另一个 SocketType.PAIR 套接字一起使用,以发送 ZMQ.PROXY_TERMINATEZMQ.PROXY_PAUSE 和/或 ZMQ.PROXY_RESUME 命令。 当调用其生命周期的 stop() 时,ZeroMqProxy 执行 ZMQ.PROXY_TERMINATE 命令,以终止 ZMQ.proxy() 循环并优雅地关闭所有绑定的套接字。

setExposeCaptureSocket(boolean) 选项使此组件绑定一个额外的线程间套接字,类型为 SocketType.PUB,以捕获和发布前端和后端套接字之间的所有通信,正如 ZMQ.proxy() 实现所述。 此套接字绑定到 "inproc://" + beanName + ".capture" 地址,并且不期望任何特定的订阅进行过滤。

前端和后端套接字可以通过附加属性(例如读/写超时或安全性)进行自定义。 此自定义可通过 setFrontendSocketConfigurer(Consumer<ZMQ.Socket>)setBackendSocketConfigurer(Consumer<ZMQ.Socket>) 回调分别实现。

ZeroMqProxy 可以作为一个简单的 bean 提供,如下所示:

@Bean
ZeroMqProxy zeroMqProxy() {
    ZeroMqProxy proxy = new ZeroMqProxy(CONTEXT, ZeroMqProxy.Type.SUB_PUB);
    proxy.setExposeCaptureSocket(true);
    proxy.setFrontendPort(6001);
    proxy.setBackendPort(6002);
    return proxy;
}

所有客户端节点都应该通过 tcp:// 连接到此代理的主机,并使用其感兴趣的相应端口。

ZeroMQ 消息通道

ZeroMqChannel 是一个 SubscribableChannel,它使用一对 ZeroMQ 套接字连接发布者和订阅者进行消息交互。 它可以在 PUB/SUB 模式下工作(默认为 PUSH/PULL);它也可以用作本地线程间通道(使用 PAIR 套接字)——在这种情况下不提供 connectUrl。 在分布式模式下,它必须连接到外部管理的 ZeroMQ 代理,在那里它可以与其他连接到同一代理的类似通道交换消息。 连接 URL 选项是一个标准的 ZeroMQ 连接字符串,包含协议和主机以及 ZeroMQ 代理前端和后端套接字的一对端口,用冒号分隔。 为方便起见,如果通道与代理在同一应用程序中配置,则可以为其提供 ZeroMqProxy 实例而不是连接字符串。

发送和接收套接字都在其各自的专用线程中管理,使得此通道支持并发。 这样,我们可以从不同的线程向/从 ZeroMqChannel 发布和消费而无需同步。

默认情况下,ZeroMqChannel 使用 EmbeddedJsonHeadersMessageMapper 通过 Jackson JSON 处理器将 Message(包括头部)从/到 byte[] 进行(反)序列化。 此逻辑可以通过 setMessageMapper(BytesMessageMapper) 进行配置。

发送和接收套接字可以通过各自的 setSendSocketConfigurer(Consumer<ZMQ.Socket>)setSubscribeSocketConfigurer(Consumer<ZMQ.Socket>) 回调进行任何选项(读/写超时、安全性等)的自定义。

ZeroMqChannel 的内部逻辑基于 Project Reactor FluxMono 运算符的响应式流。 这提供了更简单的线程控制,并允许向/从通道进行无锁并发发布和消费。 本地 PUB/SUB 逻辑实现为 Flux.publish() 运算符,以允许此通道的所有本地订阅者接收相同的已发布消息,就像分布式订阅者接收 PUB 套接字一样。

以下是 ZeroMqChannel 配置的简单示例:

@Bean
ZeroMqChannel zeroMqPubSubChannel(ZContext context) {
    ZeroMqChannel channel = new ZeroMqChannel(context, true);
    channel.setConnectUrl("tcp://localhost:6001:6002");
    channel.setConsumeDelay(Duration.ofMillis(100));
    return channel;
}

ZeroMQ 入站通道适配器

ZeroMqMessageProducer 是一个具有响应式语义的 MessageProducerSupport 实现。 它以非阻塞方式不断从 ZeroMQ 套接字读取数据,并将消息发布到无限的 Flux 中,该 FluxFluxMessageChannel 订阅,或者如果输出通道不是响应式的,则在 start() 方法中显式订阅。 当套接字上没有收到数据时,会在下一次读取尝试之前应用 consumeDelay(默认为 1 秒)。

ZeroMqMessageProducer 仅支持 SocketType.PAIRSocketType.PULLSocketType.SUB。 此组件可以连接到远程套接字或通过提供的或随机端口绑定到 TCP 协议。 在组件启动并绑定 ZeroMQ 套接字后,可以通过 getBoundPort() 获取实际端口。 套接字选项(例如安全性或写入超时)可以通过 setSocketConfigurer(Consumer<ZMQ.Socket> socketConfigurer) 回调进行配置。

如果 receiveRaw 选项设置为 true,则从套接字消费的 ZMsg 将按原样发送到生成的消息的有效负载中:下游流负责解析和转换 ZMsg。 否则,使用 InboundMessageMapper 将消费的数据转换为 Message。 如果接收到的 ZMsg 是多帧的,则第一帧被视为此 ZeroMQ 消息发布到的 ZeroMqHeaders.TOPIC 头部。

如果 unwrapTopic 选项设置为 false,则传入消息被认为由两帧组成:主题和 ZeroMQ 消息。 否则,默认情况下,ZMsg 被认为由三帧组成:第一帧包含主题,最后一帧包含消息,中间有一个空帧。

使用 SocketType.SUB 时,ZeroMqMessageProducer 使用提供的 topics 选项进行订阅;默认为订阅所有。 可以使用 subscribeToTopics()unsubscribeFromTopics() @ManagedOperation 在运行时调整订阅。

以下是 ZeroMqMessageProducer 配置的示例:

@Bean
ZeroMqMessageProducer zeroMqMessageProducer(ZContext context, MessageChannel outputChannel) {
    ZeroMqMessageProducer messageProducer = new ZeroMqMessageProducer(context, SocketType.SUB);
    messageProducer.setOutputChannel(outputChannel);
    messageProducer.setTopics("some");
    messageProducer.setReceiveRaw(true);
    messageProducer.setBindPort(7070);
    messageProducer.setConsumeDelay(Duration.ofMillis(100));
    return messageProducer;
}

ZeroMQ 出站通道适配器

ZeroMqMessageHandler 是一个 ReactiveMessageHandler 实现,用于将发布消息生成到 ZeroMQ 套接字中。 仅支持 SocketType.PAIRSocketType.PUSHSocketType.PUB。 此组件可以连接到远程套接字或通过提供的或随机端口绑定到 TCP 协议。 在组件启动并绑定 ZeroMQ 套接字后,可以通过 getBoundPort() 获取实际端口。

当使用 SocketType.PUB 时,topicExpression 会根据请求消息进行评估,以将主题帧注入 ZeroMQ 消息(如果它不为 null)。 订阅方(SocketType.SUB)必须先接收主题帧,然后才能解析实际数据。

如果 wrapTopic 选项设置为 false,则 ZeroMQ 消息帧将在注入的主题(如果存在)之后发送。 默认情况下,在主题和消息之间会发送一个额外的空帧。

当请求消息的有效负载是 ZMsg 时,不执行转换或主题提取:ZMsg 按原样发送到套接字,并且不会被销毁以备将来重用。 否则,使用 OutboundMessageMapper<byte[]> 将请求消息(或仅其有效负载)转换为 ZeroMQ 帧以进行发布。 默认情况下,使用 ConvertingBytesMessageMapperConfigurableCompositeMessageConverter。 套接字选项(例如安全性或写入超时)可以通过 setSocketConfigurer(Consumer<ZMQ.Socket> socketConfigurer) 回调进行配置。

以下是连接到套接字的 ZeroMqMessageHandler 配置示例:

@Bean
@ServiceActivator(inputChannel = "zeroMqPublisherChannel")
ZeroMqMessageHandler zeroMqMessageHandler(ZContext context) {
    ZeroMqMessageHandler messageHandler =
                  new ZeroMqMessageHandler(context, "tcp://localhost:6060", SocketType.PUB);
    messageHandler.setTopicExpression(
                  new FunctionExpression<Message<?>>((message) -> message.getHeaders().get("topic")));
    messageHandler.setMessageMapper(new EmbeddedJsonHeadersMessageMapper());
}

以下是绑定到指定端口的 ZeroMqMessageHandler 配置示例:

@Bean
@ServiceActivator(inputChannel = "zeroMqPublisherChannel")
ZeroMqMessageHandler zeroMqMessageHandler(ZContext context) {
    ZeroMqMessageHandler messageHandler =
                  new ZeroMqMessageHandler(context, 7070, SocketType.PUB);
    messageHandler.setTopicExpression(
                  new FunctionExpression<Message<?>>((message) -> message.getHeaders().get("topic")));
    messageHandler.setMessageMapper(new EmbeddedJsonHeadersMessageMapper());
}

ZeroMQ Java DSL 支持

spring-integration-zeromq 通过 ZeroMq 工厂和 IntegrationComponentSpec 实现为上述组件提供了便捷的 Java DSL 流式 API。

这是 ZeroMqChannel 的 Java DSL 示例:

.channel(ZeroMq.zeroMqChannel(this.context)
            .connectUrl("tcp://localhost:6001:6002")
            .consumeDelay(Duration.ofMillis(100)))
}

ZeroMQ Java DSL 的入站通道适配器是:

IntegrationFlow.from(
            ZeroMq.inboundChannelAdapter(this.context, SocketType.SUB)
                        .connectUrl("tcp://localhost:9000")
                        .topics("someTopic")
                        .receiveRaw(true)
                        .consumeDelay(Duration.ofMillis(100)))
}

ZeroMQ Java DSL 的出站通道适配器是:

.handle(ZeroMq.outboundChannelAdapter(this.context, "tcp://localhost:9001", SocketType.PUB)
                  .topicFunction(message -> message.getHeaders().get("myTopic")))
}