发送方结果通道
从 4.0.3 版本开始,您可以配置 resultMetadataChannel 以接收 SenderResult<?>,从而确定发送的成功/失败。
SenderResult 包含 correlationMetadata,允许您将结果与发送关联起来;它还包含 RecordMetadata,指示发送记录的 TopicPartition 和偏移量。
resultMetadataChannel 必须 是 FluxMessageChannel 实例。
这是一个如何使用此功能的示例,其中关联元数据类型为 Integer:
@Bean
FluxMessageChannel sendResults() {
return new FluxMessageChannel();
}
@ServiceActivator(inputChannel = "sendResults")
void handleResults(SenderResult<Integer> result) {
if (result.exception() != null) {
failureFor(result);
}
else {
successFor(result);
}
}
要在输出记录上设置关联元数据,请设置 CORRELATION_ID 头部:
streamBridge.send("words1", MessageBuilder.withPayload("foobar")
.setCorrelationId(42)
.build());
当将此功能与 Function 一起使用时,函数输出类型必须是 Message<?>,并且关联 ID 头部必须设置为所需的值。
元数据应该是唯一的,至少在发送期间是如此。