入站通道适配器
以下列表显示了AMQP入站通道适配器可能的配置选项:
-
Java DSL
-
Java
-
XML
@Bean
public IntegrationFlow amqpInbound(ConnectionFactory connectionFactory) {
return IntegrationFlow.from(Amqp.inboundAdapter(connectionFactory, "aName"))
.handle(m -> System.out.println(m.getPayload()))
.get();
}
@Bean
public MessageChannel amqpInputChannel() {
return new DirectChannel();
}
@Bean
public AmqpInboundChannelAdapter inbound(SimpleMessageListenerContainer listenerContainer,
@Qualifier("amqpInputChannel") MessageChannel channel) {
AmqpInboundChannelAdapter adapter = new AmqpInboundChannelAdapter(listenerContainer);
adapter.setOutputChannel(channel);
return adapter;
}
@Bean
public SimpleMessageListenerContainer container(ConnectionFactory connectionFactory) {
SimpleMessageListenerContainer container =
new SimpleMessageListenerContainer(connectionFactory);
container.setQueueNames("aName");
container.setConcurrentConsumers(2);
// ...
return container;
}
@Bean
@ServiceActivator(inputChannel = "amqpInputChannel")
public MessageHandler handler() {
return new MessageHandler() {
@Override
public void handleMessage(Message<?> message) throws MessagingException {
System.out.println(message.getPayload());
}
};
}
<int-amqp:inbound-channel-adapter
id="inboundAmqp" [id="CO1-1"]1
channel="inboundChannel" [id="CO1-2"]2
queue-names="si.test.queue" [id="CO1-3"]3
acknowledge-mode="AUTO" [id="CO1-4"]4
advice-chain="" [id="CO1-5"]5
channel-transacted="" [id="CO1-6"]6
concurrent-consumers="" [id="CO1-7"]7
connection-factory="" [id="CO1-8"]8
error-channel="" [id="CO1-9"]9
expose-listener-channel="" [id="CO1-10"]10
header-mapper="" [id="CO1-11"]11
mapped-request-headers="" [id="CO1-12"]12
listener-container="" [id="CO1-13"]13
message-converter="" [id="CO1-14"]14
message-properties-converter="" [id="CO1-15"]15
phase="" [id="CO1-16"]16
prefetch-count="" [id="CO1-17"]17
receive-timeout="" [id="CO1-18"]18
recovery-interval="" [id="CO1-19"]19
missing-queues-fatal="" [id="CO1-20"]20
shutdown-timeout="" [id="CO1-21"]21
task-executor="" [id="CO1-22"]22
transaction-attribute="" [id="CO1-23"]23
transaction-manager="" [id="CO1-24"]24
batch-size="" [id="CO1-25"]25
consumers-per-queue [id="CO1-26"]26
batch-mode="MESSAGES"/> [id="CO1-27"]27
<1> 此适配器的唯一ID。
可选。
<2> 转换后的消息应发送到的消息通道。
必需。
<3> 消息应从中消费的AMQP队列名称(逗号分隔列表)。
必需。
<4> `MessageListenerContainer`的确认模式。
当设置为`MANUAL`时,交付标签和通道分别在消息头`amqp_deliveryTag`和`amqp_channel`中提供。
用户应用程序负责确认。
`NONE`表示不确认(`autoAck`)。
`AUTO`表示当下游流完成时适配器的容器进行确认。
可选(默认为AUTO)。
请参阅xref:amqp/inbound-ack.adoc[入站端点确认模式]。
<5> 用于处理与此入站通道适配器相关的横切行为的额外AOP Advice。
可选。
<6> 标志,指示此组件创建的通道是否具有事务性。
如果为true,它会告诉框架使用事务性通道,并根据结果,以提交或回滚结束所有操作(发送或接收),如果出现异常则表示回滚。
可选(默认为false)。
<7> 指定要创建的并发消费者数量。
默认值为`1`。
我们建议增加并发消费者数量以扩展来自队列的消息消费。
但是,请注意,一旦注册了多个消费者,任何排序保证都会丢失。
通常,对于低流量队列,请使用一个消费者。
当设置了'consumers-per-queue'时不允许。
可选。
<8> RabbitMQ `ConnectionFactory`的Bean引用。
可选(默认为`connectionFactory`)。
<9> 错误消息应发送到的消息通道。
可选。
<10> 侦听器通道(com.rabbitmq.client.Channel)是否暴露给已注册的`ChannelAwareMessageListener`。
可选(默认为true)。
<11> 对接收AMQP消息时使用的`AmqpHeaderMapper`的引用。
可选。
默认情况下,只有标准的AMQP属性(例如`contentType`)会复制到Spring Integration `MessageHeaders`。
AMQP `MessageProperties`中任何用户定义的头都不会被默认的`DefaultAmqpHeaderMapper`复制到消息中。
如果提供了'request-header-names',则不允许。
<12> 逗号分隔的AMQP头名称列表,用于从AMQP请求映射到`MessageHeaders`中。
这只能在未提供'header-mapper'引用的情况下提供。
此列表中的值也可以是用于匹配头名称的简单模式(例如"*"或"thing1*, thing2"或"*something")。
<13> 对用于接收AMQP消息的`AbstractMessageListenerContainer`的引用。
如果提供了此属性,则不应提供与侦听器容器配置相关的任何其他属性。
换句话说,通过设置此引用,您必须对侦听器容器配置承担全部责任。
唯一的例外是`MessageListener`本身。
由于这是此通道适配器实现的核心职责,因此引用的侦听器容器不得已经拥有自己的`MessageListener`。
可选。
<14> 接收AMQP消息时使用的`MessageConverter`。
可选。
<15> 接收AMQP消息时使用的`MessagePropertiesConverter`。
可选。
<16> 指定底层`AbstractMessageListenerContainer`启动和停止的阶段。
启动顺序从低到高,停止顺序与此相反。
默认情况下,此值为`Integer.MAX_VALUE`,这意味着此容器尽可能晚启动,尽可能早停止。
可选。
<17> 告诉AMQP代理在单个请求中向每个消费者发送多少消息。
通常,您可以将此值设置得很高以提高吞吐量。
它应大于或等于事务大小(请参阅此列表后面的`batch-size`属性)。
可选(默认为`1`)。
<18> 接收超时(毫秒)。
可选(默认为`1000`)。
<19> 指定底层`AbstractMessageListenerContainer`恢复尝试之间的间隔(毫秒)。
可选(默认为`5000`)。
<20> 如果为'true'并且代理上没有队列可用,容器在启动期间会抛出致命异常并停止(如果在容器运行时队列被删除,则在三次尝试被动声明队列后)。
如果为`false`,容器不会抛出异常并进入恢复模式,尝试根据`recovery-interval`重新启动。
可选(默认为`true`)。
<21> 在底层`AbstractMessageListenerContainer`停止后,AMQP连接被强制关闭之前,等待工作者(毫秒)的时间。
如果在关闭信号到来时有任何工作者处于活动状态,只要它们能在此超时内完成处理,就允许它们完成处理。
否则,连接将关闭,消息将保持未确认(如果通道是事务性的)。
可选(默认为`5000`)。
<22> 默认情况下,底层`AbstractMessageListenerContainer`使用`SimpleAsyncTaskExecutor`实现,它为每个任务启动一个新线程,异步运行。
默认情况下,并发线程数是无限的。
请注意,此实现不重用线程。
考虑使用线程池`TaskExecutor`实现作为替代。
可选(默认为`SimpleAsyncTaskExecutor`)。
<23> 默认情况下,底层`AbstractMessageListenerContainer`创建`DefaultTransactionAttribute`的新实例(它采用EJB方法对运行时异常进行回滚,但不对已检查异常进行回滚)。
可选(默认为`DefaultTransactionAttribute`)。
<24> 在底层`AbstractMessageListenerContainer`上设置对外部`PlatformTransactionManager`的bean引用。
事务管理器与`channel-transacted`属性协同工作。
如果在框架发送或接收消息时已经存在事务,并且`channelTransacted`标志为`true`,则消息事务的提交或回滚将推迟到当前事务结束。
如果`channelTransacted`标志为`false`,则事务语义不适用于消息操作(它是自动确认的)。
有关更多信息,请参阅
https://docs.spring.io/spring-amqp/reference/html/%255Freference.html#%5Ftransactions[使用Spring AMQP进行事务]。
可选。
<25> 告诉`SimpleMessageListenerContainer`在单个请求中处理多少条消息。
为了获得最佳结果,它应小于或等于`prefetch-count`中设置的值。
当设置了'consumers-per-queue'时不允许。
可选(默认为`1`)。
<26> 指示底层侦听器容器应为`DirectMessageListenerContainer`而不是默认的`SimpleMessageListenerContainer`。
有关更多信息,请参阅https://docs.spring.io/spring-amqp/reference/html/[Spring AMQP参考手册]。
<27> 当容器的`consumerBatchEnabled`为`true`时,确定适配器如何在消息有效负载中呈现消息批次。
当设置为`MESSAGES`(默认)时,有效负载是`List<Message<?>>`,其中每条消息都包含从传入AMQP `Message`映射的头,并且有效负载是转换后的`body`。
当设置为`EXTRACT_PAYLOADS`时,有效负载是`List<?>`,其中元素从AMQP `Message`主体转换而来。
`EXTRACT_PAYLOADS_WITH_HEADERS`类似于`EXTRACT_PAYLOADS`,但此外,从`MessageProperties`映射的每个消息的头会以`List<Map<String, Object>`的形式存储在相应的索引处;头名称是`AmqpInboundChannelAdapter.CONSOLIDATED_HEADERS`。
container
请注意,当使用XML配置外部容器时,您不能使用Spring AMQP命名空间来定义容器。 这是因为命名空间至少需要一个`<listener/>`元素。 在此环境中,侦听器是适配器内部的。 因此,您必须使用正常的Spring `<bean/>`定义来定义容器,如以下示例所示:
|
尽管Spring Integration JMS和AMQP支持类似,但存在重要差异。 JMS入站通道适配器在底层使用`JmsDestinationPollingSource`,并期望配置一个轮询器。 AMQP入站通道适配器使用`AbstractMessageListenerContainer`,并且是消息驱动的。 在这方面,它更类似于JMS消息驱动通道适配器。
从版本5.5开始,AmqpInboundChannelAdapter`可以配置一个`org.springframework.amqp.rabbit.retry.MessageRecoverer`策略,该策略在内部调用重试操作时用于`RecoveryCallback
。
有关更多信息,请参阅`setMessageRecoverer()` JavaDocs。
@Publisher`注解也可以与
@RabbitListener`结合使用:
@Configuration
@EnableIntegration
@EnableRabbit
@EnablePublisher
public static class ContextConfiguration {
@Bean
QueueChannel fromRabbitViaPublisher() {
return new QueueChannel();
}
@RabbitListener(queuesToDeclare = @Queue("publisherQueue"))
@Publisher("fromRabbitViaPublisher")
@Payload("#args.payload.toUpperCase()")
public void consumeForPublisher(String payload) {
}
}
默认情况下,@Publisher
AOP拦截器处理方法调用的返回值。
但是,@RabbitListener`方法的返回值被视为AMQP回复消息。
因此,这种方法不能与
@Publisher`一起使用,所以建议使用带有针对方法参数的SpEL表达式的`@Payload`注解。
有关`@Publisher`的更多信息,请参阅注解驱动配置部分。
当在侦听器容器中使用独占或单活动消费者时,建议将容器属性`forceStop`设置为`true`。 这将防止竞态条件,即在停止容器后,另一个消费者可能在此实例完全停止之前开始消费消息。
批处理消息
有关批处理消息的更多信息,请参阅Spring AMQP文档。
要使用Spring Integration生成批处理消息,只需使用`BatchingRabbitTemplate`配置出站端点。
接收批处理消息时,默认情况下,侦听器容器会提取每个片段消息,并且适配器会为每个片段生成一个`Message<?>`。 从版本5.2开始,如果容器的`deBatchingEnabled`属性设置为`false`,则由适配器执行解批处理,并生成单个`Message<List<?>>`,其有效负载是片段有效负载的列表(如果适用,经过转换)。
默认的`BatchingStrategy`是`SimpleBatchingStrategy`,但可以在适配器上重写。
当重试操作需要恢复时,`org.springframework.amqp.rabbit.retry.MessageBatchRecoverer`必须与批处理一起使用。 |