AMQP 支持的消息通道
有两种可用的消息通道实现。
一种是点对点,另一种是发布-订阅。
这两种通道都为底层的 AmqpTemplate
和 SimpleMessageListenerContainer
提供了广泛的配置属性(如本章前面所示的通道适配器和网关)。
然而,我们这里展示的例子具有最小的配置。
请查阅 XML 模式以查看可用属性。
点对点通道可能如下例所示:
<int-amqp:channel id="p2pChannel"/>
在幕后,上述示例会导致声明一个名为 si.p2pChannel
的 Queue
,并且此通道将消息发送到该 Queue
(技术上,通过将消息发送到路由键与此 Queue
名称匹配的匿名直连交换机)。
此通道还会在该 Queue
上注册一个消费者。
如果您希望通道是“可轮询的”而不是消息驱动的,请将 message-driven
标志设置为 false
,如下例所示:
<int-amqp:channel id="p2pPollableChannel" message-driven="false"/>
发布-订阅通道可能如下所示:
<int-amqp:publish-subscribe-channel id="pubSubChannel"/>
在幕后,上述示例会导致声明一个名为 si.fanout.pubSubChannel
的扇出交换机,并且此通道将消息发送到该扇出交换机。
此通道还会声明一个服务器命名的排他、自动删除、非持久化 Queue
,并将其绑定到扇出交换机,同时在该 Queue
上注册一个消费者以接收消息。
发布-订阅通道没有“可轮询”选项。
它必须是消息驱动的。
从版本 4.1 开始,AMQP 支持的消息通道(结合 channel-transacted
)支持 template-channel-transacted
,用于分离 AbstractMessageListenerContainer
和 RabbitTemplate
的 transactional
配置。
请注意,以前 channel-transacted
默认为 true
。
现在,对于 AbstractMessageListenerContainer
,它默认为 false
。
在 4.3 版本之前,AMQP 支持的通道只支持带有 Serializable
负载和消息头的消息。
整个消息被转换(序列化)并发送到 RabbitMQ。
现在,您可以将 extract-payload
属性(或使用 Java 配置时设置 setExtractPayload()
)设置为 true
。
当此标志为 true
时,消息负载将被转换,并且消息头将被映射,其方式类似于您使用通道适配器时。
这种安排允许 AMQP 支持的通道与不可序列化的负载一起使用(可能与另一个消息转换器一起使用,例如 Jackson2JsonMessageConverter
)。
有关默认映射消息头的更多信息,请参阅 AMQP 消息头。
您可以通过提供使用 outbound-header-mapper
和 inbound-header-mapper
属性的自定义映射器来修改映射。
您现在还可以指定 default-delivery-mode
,它用于在没有 amqp_deliveryMode
消息头时设置传递模式。
默认情况下,Spring AMQP MessageProperties
使用 PERSISTENT
传递模式。
与其他持久化通道一样,AMQP 支持的通道旨在提供消息持久性以避免消息丢失。 它们不旨在将工作分发给其他对等应用程序。 为此,请改用通道适配器。
从版本 5.0 开始,可轮询通道现在会阻塞轮询器线程,直到达到指定的 receiveTimeout
(默认为 1 秒)。
以前,与其他 PollableChannel
实现不同,如果消息不可用,无论接收超时时间如何,线程都会立即返回到调度程序。
阻塞比使用 basicGet()
检索消息(没有超时)稍微昂贵一些,因为必须为接收每条消息创建一个消费者。
要恢复以前的行为,请将轮询器的 receiveTimeout
设置为 0。
使用 Java 配置
以下示例展示了如何使用 Java 配置通道:
@Bean
public AmqpChannelFactoryBean pollable(ConnectionFactory connectionFactory) {
AmqpChannelFactoryBean factoryBean = new AmqpChannelFactoryBean();
factoryBean.setConnectionFactory(connectionFactory);
factoryBean.setQueueName("foo");
factoryBean.setPubSub(false);
return factoryBean;
}
@Bean
public AmqpChannelFactoryBean messageDriven(ConnectionFactory connectionFactory) {
AmqpChannelFactoryBean factoryBean = new AmqpChannelFactoryBean(true);
factoryBean.setConnectionFactory(connectionFactory);
factoryBean.setQueueName("bar");
factoryBean.setPubSub(false);
return factoryBean;
}
@Bean
public AmqpChannelFactoryBean pubSub(ConnectionFactory connectionFactory) {
AmqpChannelFactoryBean factoryBean = new AmqpChannelFactoryBean(true);
factoryBean.setConnectionFactory(connectionFactory);
factoryBean.setQueueName("baz");
factoryBean.setPubSub(false);
return factoryBean;
}
使用 Java DSL 配置
以下示例展示了如何使用 Java DSL 配置通道:
@Bean
public IntegrationFlow pollableInFlow(ConnectionFactory connectionFactory) {
return IntegrationFlow.from(...)
...
.channel(Amqp.pollableChannel(connectionFactory)
.queueName("foo"))
...
.get();
}
@Bean
public IntegrationFlow messageDrivenInFow(ConnectionFactory connectionFactory) {
return IntegrationFlow.from(...)
...
.channel(Amqp.channel(connectionFactory)
.queueName("bar"))
...
.get();
}
@Bean
public IntegrationFlow pubSubInFlow(ConnectionFactory connectionFactory) {
return IntegrationFlow.from(...)
...
.channel(Amqp.publishSubscribeChannel(connectionFactory)
.queueName("baz"))
...
.get();
}