Consuming Records
在上述 upppercase
函数中,我们以 Flux<String>
的方式消耗记录,然后以 Flux<String>
方式生成它。有时,您可能需要按原始接收格式(ReceiverRecord
)接收记录。这里有一个这样的函数。
In the above upppercase
function, we are consuming the record as Flux<String>
and then produce it as Flux<String>
.
There might be occasions in which you need to receive the record in the original received format - the ReceiverRecord
.
Here is such a function.
@Bean
public Function<Flux<ReceiverRecord<byte[], byte[]>>, Flux<String>> lowercase() {
return s -> s.map(rec -> new String(rec.value()).toLowerCase());
}
请注意,在此函数中,我们将记录作为 Flux<ReceiverRecord<byte[], byte[]>>
消费,然后作为 Flux<String>
生产。ReceiverRecord
是基本接收到的记录,它是 Reactor Kafka 中一个专门的 Kafka ConsumerRecord
。使用反应性 Kafka 粘合剂时,上述函数将使您能够访问每个传入记录的 ReceiverRecord
类型。但是,在这种情况下,您需要为 RecordMessageConverter 提供一个自定义实现。默认情况下,反应性 Kafka 粘合剂会使用一个将其负载和标头从 ConsumerRecord
转换的 MessagingMessageConverter。因此,当您的处理方法接收到记录时,负载已从接收到的记录中抽取出来,并作为上述的第一个函数的情况传递到该方法中。通过在应用中提供自定义 RecordMessageConverter
实现,您可以覆盖默认响应。例如,如果您想要将记录作为未经处理的 Flux<ReceiverRecord<byte[], byte[]>>
消费,则可以在应用中提供以下 bean 定义。
In this function, note that, we are consuming the record as Flux<ReceiverRecord<byte[], byte[]>>
and then producing it as Flux<String>
.
ReceiverRecord
is the basic received record which is a specialized Kafka ConsumerRecord
in Reactor Kafka.
When using the reactive Kafka binder, the above function will give you access to the ReceiverRecord
type for each incoming record.
However, in this case, you need to provide a custom implementation for a RecordMessageConverter.
By default, the reactive Kafka binder uses a MessagingMessageConverter that converts the payload and headers from the ConsumerRecord
.
Therefore, by the time your handler method receives it, the payload is already extracted from the received record and passed onto the method as in the case of the first function we looked above.
By providing a custom RecordMessageConverter
implementation in the application, you can override the default behavior.
For example, if you want to consume the record as raw Flux<ReceiverRecord<byte[], byte[]>>
, then you can provide the following bean definition in the application.
@Bean
RecordMessageConverter fullRawReceivedRecord() {
return new RecordMessageConverter() {
private final RecordMessageConverter converter = new MessagingMessageConverter();
@Override
public Message<?> toMessage(ConsumerRecord<?, ?> record, Acknowledgment acknowledgment,
Consumer<?, ?> consumer, Type payloadType) {
return MessageBuilder.withPayload(record).build();
}
@Override
public ProducerRecord<?, ?> fromMessage(Message<?> message, String defaultTopic) {
return this.converter.fromMessage(message, defaultTopic);
}
};
}
然后,您需要指示框架为必需的绑定使用此转换器。这里有一个基于我们的 lowercase
函数的示例。
Then, you need to instruct the framework to use this converter for the required binding.
Here is an example based on our lowercase
function.
spring.cloud.stream.kafka.bindings.lowercase-in-0.consumer.converterBeanName=fullRawReceivedRecord"
lowercase-in-0
是 lowercase
函数的输入绑定名称。对于 outbound(lowecase-out-0
),我们仍然使用常规 MessagingMessageConverter
。
lowercase-in-0
is the input binding name for our lowercase
function.
For the outbound (lowecase-out-0
), we still use the regular MessagingMessageConverter
.
在上述 toMessage
实现中,我们接收原始 ConsumerRecord
(由于我们处于反应式绑定程序上下文中,所以为 ReceiverRecord
),然后将其包装在 Message
中。然后,将该消息有效负载(即 ReceiverRecord
)提供给用户方法。
In the toMessage
implementation above, we receive the raw ConsumerRecord
(ReceiverRecord
since we are in a reactive binder context) and then wrap it inside a Message
.
Then that message payload which is the ReceiverRecord
is provided to the user method.
如果 reactiveAutoCommit
为 false
(默认值),则调用 rec.receiverOffset().acknowledge()
(或 commit()
)以导致提交偏移量;如果 reactiveAutoCommit
为 true
,则 flux 提供 ConsumerRecord
。有关更多信息,请参阅 reactor-kafka
文档和 javadoc。
If reactiveAutoCommit
is false
(default), call rec.receiverOffset().acknowledge()
(or commit()
) to cause the offset to be committed; if reactiveAutoCommit
is true
, the flux supplies ConsumerRecord
s instead.
Refer to the reactor-kafka
documentation and javadocs for more information.