Application Events
侦听器容器及其消费者会发布以下 Spring 应用事件:
-
ConsumerStartingEvent: 在消费者线程首次启动后发布,在它开始轮询之前。 -
ConsumerStartedEvent: 在消费者准备开始轮询时发布。 -
ConsumerFailedToStartEvent: 如果容器属性consumerStartTimeout`中未发布 `ConsumerStartingEvent,则发布。此事件可能表明配置的任务执行器没有足够的线程来支持它所使用的容器及其并发。此条件发生时,也会记录一条错误消息。 -
ListenerContainerIdleEvent: 如果 `idleInterval`中未收到任何消息(如果配置),则发布。 -
ListenerContainerNoLongerIdleEvent: 在之前发布了 `ListenerContainerIdleEvent`之后消费记录时发布。 -
ListenerContainerPartitionIdleEvent: 如果 `idlePartitionEventInterval`中未从该分区收到任何消息(如果配置),则发布。 -
ListenerContainerPartitionNoLongerIdleEvent: 当从以前发布了 `ListenerContainerPartitionIdleEvent`的分区中消费记录时发布。 -
NonResponsiveConsumerEvent: 当消费者似乎在 `poll`方法中被阻止时发布。 -
ConsumerPartitionPausedEvent: 当分区暂停时,每个消费者发布。 -
ConsumerPartitionResumedEvent: 当分区恢复时,每个消费者发布。 -
ConsumerPausedEvent: 当容器暂停时,每个消费者发布。 -
ConsumerResumedEvent: 当容器恢复时,每个消费者发布。 -
ConsumerStoppingEvent: 每个消费者在停止之前发布。 -
ConsumerStoppedEvent: 消费者关闭后发布。查看 Thread Safety。 -
ConsumerRetryAuthEvent: 当消费者的身份验证或授权失败并且正在重试时发布。 -
ConsumerRetryAuthSuccessfulEvent: 当身份验证或授权已成功重试时发布。仅当之前存在 `ConsumerRetryAuthEvent`时才可能发生。 -
ContainerStoppedEvent: 在所有消费者停止时发布。
默认情况下,应用程序上下文的事件多播器将在调用线程上调用事件侦听器。如果您更改多播器以使用异步执行器,则当事件包含对消费者的引用时,您不得调用任何 Consumer 方法。
ListenerContainerIdleEvent 具有以下属性:
-
source: 发布事件的监听器容器实例。 -
container: 如果源容器是子容器,则为主监听器容器或父监听器容器。 -
id: 监听器 ID(或容器 bean 名称)。 -
idleTime: 当事件发布时容器处于空闲状态的时间。 -
topicPartitions: 生成事件时分配给容器的主题和分区。 -
consumer: 对 KafkaConsumer`对象的引用。例如,如果以前调用了消费者的 `pause()`方法,则当收到事件时,它可以 `resume()。 -
paused: 容器当前是否暂停。有关详细信息,请参阅 Pausing and Resuming Listener Containers。
ListenerContainerNoLongerIdleEvent 具有相同的属性,但 idleTime 和 paused 除外。
ListenerContainerPartitionIdleEvent 具有以下属性:
-
source: 发布事件的监听器容器实例。 -
container: 如果源容器是子容器,则为主监听器容器或父监听器容器。 -
id: 监听器 ID(或容器 bean 名称)。 -
idleTime: 事件发布时时间分区消费处于空闲状态。 -
topicPartition: 触发事件的主题和分区。 -
consumer: 对 KafkaConsumer`对象的引用。例如,如果以前调用了消费者的 `pause()`方法,则当收到事件时,它可以 `resume()。 -
paused: 该分区消费是否当前针对该消费者暂停。有关更多信息,请参见 Pausing and Resuming Listener Containers。
ListenerContainerPartitionNoLongerIdleEvent 具有相同的属性,但 idleTime 和 paused 除外。
NonResponsiveConsumerEvent 具有以下属性:
-
source: 发布事件的监听器容器实例。 -
container: 如果源容器是子容器,则为主监听器容器或父监听器容器。 -
id: 监听器 ID(或容器 bean 名称)。 -
Pausing and Resuming Listener Containers: 容器上次调用
timeSinceLastPoll之前的时间。 -
topicPartitions: 生成事件时分配给容器的主题和分区。 -
consumer: 对 KafkaConsumer`对象的引用。例如,如果以前调用了消费者的 `pause()`方法,则当收到事件时,它可以 `resume()。 -
paused: 容器当前是否暂停。有关详细信息,请参阅 Pausing and Resuming Listener Containers。
ConsumerPausedEvent、ConsumerResumedEvent 和 ConsumerStopping 事件具有以下属性:
-
source: 发布事件的监听器容器实例。 -
container: 如果源容器是子容器,则为主监听器容器或父监听器容器。 -
timeSinceLastPoll: 涉及的poll()实例。
ConsumerPartitionPausedEvent、ConsumerPartitionResumedEvent 事件具有以下属性:
-
source: 发布事件的监听器容器实例。 -
container: 如果源容器是子容器,则为主监听器容器或父监听器容器。 -
poll(): 涉及的partitions实例。
ConsumerRetryAuthEvent 事件具有以下属性:
-
source: 发布事件的监听器容器实例。 -
container: 如果源容器是子容器,则为主监听器容器或父监听器容器。 -
reason:-
partitions- 由于身份验证异常而发布事件。 -
TopicPartition- 由于授权异常而发布事件。
-
ConsumerStartingEvent、ConsumerStartingEvent、ConsumerFailedToStartEvent、ConsumerStoppedEvent、ConsumerRetryAuthSuccessfulEvent 和 ContainerStoppedEvent 事件具有以下属性:
-
source: 发布事件的监听器容器实例。 -
container: 如果源容器是子容器,则为主监听器容器或父监听器容器。
所有容器(包括子级或父级)都发布 ContainerStoppedEvent。对于父级容器,source 和 container 属性是相同的。
此外,ConsumerStoppedEvent 还具有以下额外的属性:
-
reason:-
partition- 消费者正常停止(容器停止)。 -
TopicPartition- 抛出AUTHENTICATION。 -
AUTHENTICATION- 事务性生产者被隔离,且AUTHORIZATION容器属性为NORMAL。 -
ERROR- 抛出java.lang.Error或FENCED。且未配置stopContainerWhenFenced。 -
true- 没有分区的偏移量,且AUTH策略为AuthenticationException。
-
可以在出现此类情况后使用此事件重新启动容器:
if (event.getReason.equals(Reason.FENCED)) {
event.getSource(MessageListenerContainer.class).start();
}
Detecting Idle and Non-Responsive Consumers
尽管效率很高,但异步消费者检测何时处于空闲状态时存在一个问题。如果在一段时间内没有消息到达,您可能需要采取一些措施。
您可以将侦听器容器配置为在一段时间没有消息传送到时发布 ListenerContainerIdleEvent。在容器空闲时,每隔 idleEventInterval 毫秒发布一个事件。
要配置此功能,请在容器上设置 idleEventInterval。以下示例演示如何执行此操作:
@Bean
public KafkaMessageListenerContainer(ConsumerFactory<String, String> consumerFactory) {
ContainerProperties containerProps = new ContainerProperties("topic1", "topic2");
...
containerProps.setIdleEventInterval(60000L);
...
KafkaMessageListenerContainer<String, String> container = new KafKaMessageListenerContainer<>(consumerFactory, containerProps);
return container;
}
以下示例展示了如何为 @KafkaListener 设置 idleEventInterval:
@Bean
public ConcurrentKafkaListenerContainerFactory kafkaListenerContainerFactory() {
ConcurrentKafkaListenerContainerFactory<String, String> factory =
new ConcurrentKafkaListenerContainerFactory<>();
...
factory.getContainerProperties().setIdleEventInterval(60000L);
...
return factory;
}
在每个案例中,容器空闲时每分钟发布一次事件。
如果由于某种原因,使用者 poll() 方法没有退出,没有接收到任何消息,也无法生成空闲事件(当代理不可访问时,早期版本的 kafka-clients 存在此问题)。在这种情况下,如果轮回没有在 3x 的 pollTimeout 属性内返回,则容器会发布 NonResponsiveConsumerEvent。默认情况下,在每个容器中每 30 秒执行一次此检查。可以在配置侦听器容器时,在 ContainerProperties 中设置 monitorInterval(默认 30 秒)和 noPollThreshold(默认 3.0)属性来修改此行为。noPollThreshold 应大于 1.0,以避免因竞争条件而获得虚假事件。收到此类事件后,可以停止容器,从而唤醒使用者以便它停止。
从 2.6.2 版本开始,如果容器已发布 ListenerContainerIdleEvent,则在随后收到记录时将发布 ListenerContainerNoLongerIdleEvent。
Event Consumption
可以通过实现 ApplicationListener 来捕获这些事件,既可以是通用侦听器,也可以是仅接收此特定事件的窄化侦听器。您还可以使用 Spring Framework 4.2 中引入的 @EventListener。
下一个示例将 @KafkaListener 和 @EventListener 组合到一个类中。您应该明白,应用程序侦听器获取所有容器的事件,因此如果您想根据空闲的特定容器采取具体措施,您可能需要检查侦听器 ID。您还可以为此目的使用 @EventListener 的 condition。
请参阅 Application Events 来了解事件属性的信息。
该事件通常在使用者线程上发布,因此与 Consumer 对象进行交互是安全的。
以下示例同时使用了 @KafkaListener 和 @EventListener:
public class Listener {
@KafkaListener(id = "qux", topics = "annotated")
public void listen4(@Payload String foo, Acknowledgment ack) {
...
}
@EventListener(condition = "event.listenerId.startsWith('qux-')")
public void eventHandler(ListenerContainerIdleEvent event) {
...
}
}
事件侦听器可以看到所有容器的事件。因此,在下例中,我们会根据侦听器 ID 缩小接收的事件范围。由于为 @KafkaListener 创建的容器支持并发,所以实际容器会命名为 id-n,其中 n 是用于支持并发的每个实例的唯一值。这就是我们在条件中使用 startsWith 的原因。
如果您希望使用空闲事件停止侦听器容器,则不应在调用侦听器的线程上调用 container.stop()。这会导致延迟和不必要的日志消息。相反,您应该将事件传递给可以停止容器的其他线程。此外,如果它是子容器,则您不应 stop() 容器实例。您应该停止并发容器。
Current Positions when Idle
请注意,你可以在空闲状态被检测到时,通过在侦听器中实现 ConsumerSeekAware 来获得当前的位置。请参阅 seek 中的 onIdleContainer()。