MQTT 支持

Spring Integration 提供入站和出站通道适配器来支持消息队列遥测传输 (MQTT) 协议。 你需要将此依赖项添加到你的项目中:

  • Maven

  • Gradle

<dependency>
    <groupId>org.springframework.integration</groupId>
    <artifactId>spring-integration-mqtt</artifactId>
    <version>{project-version}</version>
</dependency>
compile "org.springframework.integration:spring-integration-mqtt:{project-version}"

当前实现使用 Eclipse Paho MQTT 客户端 库。 从 6.5 版本开始,org.eclipse.paho:org.eclipse.paho.client.mqttv3 依赖项是一个 optional 依赖项,因此必须显式包含在目标项目中以支持 MQTT v3。

XML 配置和本章的大部分内容都涉及 MQTT v3.1 协议支持和相应的 Paho 客户端。 有关相应的协议支持,请参阅 mqtt-v5 段落。

两个适配器的配置都通过 DefaultMqttPahoClientFactory 实现。 有关配置选项的更多信息,请参阅 Paho 文档。

我们建议配置一个 MqttConnectOptions 对象并将其注入到工厂中,而不是在工厂本身上设置(已弃用的)选项。

入站(消息驱动)通道适配器

入站通道适配器由 MqttPahoMessageDrivenChannelAdapter 实现。 为了方便起见,你可以使用命名空间对其进行配置。 最小配置可能如下所示:

<bean id="clientFactory"
        class="org.springframework.integration.mqtt.core.DefaultMqttPahoClientFactory">
    <property name="connectionOptions">
        <bean class="org.eclipse.paho.client.mqttv3.MqttConnectOptions">
            <property name="userName" value="${mqtt.username}"/>
            <property name="password" value="${mqtt.password}"/>
        </bean>
    </property>
</bean>

<int-mqtt:message-driven-channel-adapter id="mqttInbound"
    client-id="${mqtt.default.client.id}.src"
    url="${mqtt.url}"
    topics="sometopic"
    client-factory="clientFactory"
    channel="output"/>

以下列表显示了可用属性:

<int-mqtt:message-driven-channel-adapter id="oneTopicAdapter"
    client-id="foo"  [id="CO1-1"]1
    url="tcp://localhost:1883"  [id="CO1-2"]2
    topics="bar,baz"  [id="CO1-3"]3
    qos="1,2"  [id="CO1-4"]4
    converter="myConverter"  [id="CO1-5"]5
    client-factory="clientFactory"  [id="CO1-6"]6
    send-timeout="123"  [id="CO1-7"]7
    error-channel="errors"  [id="CO1-8"]8
    recovery-interval="10000"  [id="CO1-9"]9
    manual-acks="false" [id="CO1-10"]10
    channel="out" />
 <1> 客户端 ID。
 <1> 代理 URL。
 <1> 以逗号分隔的主题列表,此适配器从中接收消息。
 <1> 以逗号分隔的 QoS 值列表。
它可以是应用于所有主题的单个值,也可以是每个主题的值(在这种情况下,列表的长度必须相同)。
 <1> `MqttMessageConverter`(可选)。
默认情况下,默认的 `DefaultPahoMessageConverter` 生成一个带有 `String` 负载的消息,其中包含以下头:
 * `mqtt_topic`:接收消息的主题
 * `mqtt_duplicate`:如果消息是重复的,则为 `true`
 * `mqtt_qos`:服务质量
你可以通过将其声明为 `<bean/>` 并将 `payloadAsBytes` 属性设置为 `true` 来配置 `DefaultPahoMessageConverter` 以在负载中返回原始 `byte[]`。
 <1> 客户端工厂。
 <1> `send()` 超时。
它仅在通道可能阻塞时(例如,当前已满的有界 `QueueChannel`)适用。
 <1> 错误通道。
如果提供,下游异常将以 `ErrorMessage` 的形式发送到此通道。
负载是一个 `MessagingException`,其中包含失败的消息和原因。
 <1> 恢复间隔。
它控制适配器在失败后尝试重新连接的间隔。
它默认为 `10000ms`(十秒)。
 <1> 确认模式;设置为 true 表示手动确认。

从 4.1 版本开始,你可以省略 URL。 相反,你可以在 DefaultMqttPahoClientFactoryserverURIs 属性中提供服务器 URI。 这样做可以实现,例如,连接到高可用性 (HA) 集群。

从 4.2.2 版本开始,当适配器成功订阅主题时,会发布 MqttSubscribedEvent。 当连接或订阅失败时,会发布 MqttConnectionFailedEvent 事件。 这些事件可以由实现 ApplicationListener 的 bean 接收。

此外,一个名为 recoveryInterval 的新属性控制适配器在失败后尝试重新连接的间隔。 它默认为 10000ms(十秒)。

在 4.2.3 版本之前,客户端在适配器停止时总是取消订阅。 这是不正确的,因为如果客户端 QoS 大于 0,我们需要保持订阅处于活动状态,以便在适配器停止时到达的消息在下次启动时传递。 这还需要将客户端工厂上的 cleanSession 属性设置为 false。 它默认为 true。 从 4.2.3 版本开始,如果 cleanSession 属性为 false,则适配器不会(默认情况下)取消订阅。 此行为可以通过设置工厂上的 consumerCloseAction 属性来覆盖。 它可以具有以下值:UNSUBSCRIBE_ALWAYSUNSUBSCRIBE_NEVERUNSUBSCRIBE_CLEAN。 后者(默认值)仅在 cleanSession 属性为 true 时取消订阅。 要恢复到 4.2.3 之前的行为,请使用 UNSUBSCRIBE_ALWAYS

从 5.0 版本开始,topicqosretained 属性被映射到 .RECEIVED_…​ 头(MqttHeaders.RECEIVED_TOPICMqttHeaders.RECEIVED_QOSMqttHeaders.RECEIVED_RETAINED),以避免意外传播到出站消息,出站消息(默认情况下)使用 MqttHeaders.TOPICMqttHeaders.QOSMqttHeaders.RETAINED 头。

运行时添加和删除主题

从 4.1 版本开始,你可以通过编程方式更改适配器订阅的主题。 Spring Integration 提供了 addTopic()removeTopic() 方法。 添加主题时,你可以选择指定 QoS(默认值:1)。 你还可以通过向 <control-bus/> 发送带有适当负载的消息来修改主题——例如:"myMqttAdapter.addTopic('foo', 1)"

停止和启动适配器对主题列表没有影响(它不会恢复到配置中的原始设置)。 这些更改不会在应用程序上下文的生命周期之外保留。 新的应用程序上下文将恢复到配置的设置。

在适配器停止(或与代理断开连接)时更改主题将在下次建立连接时生效。

手动确认

从 5.3 版本开始,你可以将 manualAcks 属性设置为 true。 通常用于异步确认交付。 当设置为 true 时,头 (IntegrationMessageHeaderAccessor.ACKNOWLEDGMENT_CALLBACK) 将添加到消息中,其值为 SimpleAcknowledgment。 你必须调用 acknowledge() 方法才能完成交付。 有关更多信息,请参阅 IMqttClient setManualAcks()messageArrivedComplete() 的 Javadoc。 为方便起见,提供了一个头访问器:

StaticMessageHeaderAccessor.acknowledgment(someMessage).acknowledge();

5.2.11 版本开始,当消息转换器抛出异常或从 MqttMessage 转换返回 null 时,MqttPahoMessageDrivenChannelAdapter 会将 ErrorMessage 发送到 errorChannel(如果提供)。 否则,会将此转换错误重新抛出到 MQTT 客户端回调中。

使用 Java 配置进行配置

以下 Spring Boot 应用程序展示了如何使用 Java 配置配置入站适配器的示例:

@SpringBootApplication
public class MqttJavaApplication {

    public static void main(String[] args) {
        new SpringApplicationBuilder(MqttJavaApplication.class)
                .web(false)
                .run(args);
    }

    @Bean
    public MessageChannel mqttInputChannel() {
        return new DirectChannel();
    }

    @Bean
    public MessageProducer inbound() {
        MqttPahoMessageDrivenChannelAdapter adapter =
                new MqttPahoMessageDrivenChannelAdapter("tcp://localhost:1883", "testClient",
                                                 "topic1", "topic2");
        adapter.setCompletionTimeout(5000);
        adapter.setConverter(new DefaultPahoMessageConverter());
        adapter.setQos(1);
        adapter.setOutputChannel(mqttInputChannel());
        return adapter;
    }

    @Bean
    @ServiceActivator(inputChannel = "mqttInputChannel")
    public MessageHandler handler() {
        return new MessageHandler() {

            @Override
            public void handleMessage(Message<?> message) throws MessagingException {
                System.out.println(message.getPayload());
            }

        };
    }

}

使用 Java DSL 进行配置

以下 Spring Boot 应用程序提供了使用 Java DSL 配置入站适配器的示例:

@SpringBootApplication
public class MqttJavaApplication {

    public static void main(String[] args) {
        new SpringApplicationBuilder(MqttJavaApplication.class)
            .web(false)
            .run(args);
    }

    @Bean
    public IntegrationFlow mqttInbound() {
        return IntegrationFlow.from(
                         new MqttPahoMessageDrivenChannelAdapter("tcp://localhost:1883",
                                        "testClient", "topic1", "topic2"))
                .handle(m -> System.out.println(m.getPayload()))
                .get();
    }

}

出站通道适配器

出站通道适配器由 MqttPahoMessageHandler 实现,它被封装在 ConsumerEndpoint 中。 为了方便起见,你可以使用命名空间对其进行配置。

从 4.1 版本开始,适配器支持异步发送操作,避免阻塞直到确认交付。 你可以发出应用程序事件,以使应用程序能够根据需要确认交付。

以下列表显示了出站通道适配器的可用属性:

<int-mqtt:outbound-channel-adapter id="withConverter"
    client-id="foo"  [id="CO2-1"]1
    url="tcp://localhost:1883"  [id="CO2-2"]2
    converter="myConverter"  [id="CO2-3"]3
    client-factory="clientFactory"  [id="CO2-4"]4
    default-qos="1"  [id="CO2-5"]5
    qos-expression="" [id="CO2-6"]6
    default-retained="true"  [id="CO2-7"]7
    retained-expression="" [id="CO2-8"]8
    default-topic="bar"  [id="CO2-9"]9
    topic-expression="" [id="CO2-10"]10
    async="false"  [id="CO2-11"]11
    async-events="false"  [id="CO2-12"]12
    channel="target" />
 <1> 客户端 ID。
 <1> 代理 URL。
 <1> `MqttMessageConverter`(可选)。
默认的 `DefaultPahoMessageConverter` 识别以下头:
 * `mqtt_topic`:消息将发送到的主题
 * `mqtt_retained`:如果消息被保留,则为 `true`
 * `mqtt_qos`:服务质量
 <1> 客户端工厂。
 <1> 默认服务质量。
如果在 `mqtt_qos` 头中未找到或 `qos-expression` 返回 `null`,则使用此值。
如果你提供自定义 `converter`,则不使用此值。
 <1> 用于确定 qos 的表达式。
默认值为 `headers[mqtt_qos]`。
 <1> 保留标志的默认值。
如果在 `mqtt_retained` 头中未找到,则使用此值。
如果提供自定义 `converter`,则不使用此值。
 <1> 用于确定保留布尔值的表达式。
默认值为 `headers[mqtt_retained]`。
 <1> 消息发送到的默认主题(如果在 `mqtt_topic` 头中未找到,则使用此值)。
 <1> 用于确定目标主题的表达式。
默认值为 `headers['mqtt_topic']`。
 <1> 当 `true` 时,调用者不会阻塞。
相反,它会在发送消息时等待交付确认。
默认值为 `false`(发送会阻塞直到确认交付)。
 <1> 当 `async` 和 `async-events` 都为 `true` 时,会发出 `MqttMessageSentEvent`(参见 <<Events,mqtt-events>>)。
它包含消息、主题、客户端库生成的 `messageId`、`clientId` 和 `clientInstance`(每次客户端连接时递增)。
当客户端库确认交付时,会发出 `MqttMessageDeliveredEvent`。
它包含 `messageId`、`clientId` 和 `clientInstance`,从而可以将交付与 `send()` 相关联。
任何 `ApplicationListener` 或事件入站通道适配器都可以接收这些事件。
请注意,`MqttMessageDeliveredEvent` 可能会在 `MqttMessageSentEvent` 之前接收到。
默认值为 `false`。

从 4.1 版本开始,可以省略 URL。 相反,可以在 DefaultMqttPahoClientFactoryserverURIs 属性中提供服务器 URI。 这使得,例如,连接到高可用性 (HA) 集群成为可能。

使用 Java 配置进行配置

以下 Spring Boot 应用程序展示了如何使用 Java 配置配置出站适配器的示例:

@SpringBootApplication
@IntegrationComponentScan
public class MqttJavaApplication {

    public static void main(String[] args) {
        ConfigurableApplicationContext context =
                new SpringApplicationBuilder(MqttJavaApplication.class)
                        .web(false)
                        .run(args);
        MyGateway gateway = context.getBean(MyGateway.class);
        gateway.sendToMqtt("foo");
    }

    @Bean
    public MqttPahoClientFactory mqttClientFactory() {
        DefaultMqttPahoClientFactory factory = new DefaultMqttPahoClientFactory();
        MqttConnectOptions options = new MqttConnectOptions();
        options.setServerURIs(new String[] { "tcp://host1:1883", "tcp://host2:1883" });
        options.setUserName("username");
        options.setPassword("password".toCharArray());
        factory.setConnectionOptions(options);
        return factory;
    }

    @Bean
    @ServiceActivator(inputChannel = "mqttOutboundChannel")
    public MessageHandler mqttOutbound() {
        MqttPahoMessageHandler messageHandler =
                       new MqttPahoMessageHandler("testClient", mqttClientFactory());
        messageHandler.setAsync(true);
        messageHandler.setDefaultTopic("testTopic");
        return messageHandler;
    }

    @Bean
    public MessageChannel mqttOutboundChannel() {
        return new DirectChannel();
    }

    @MessagingGateway(defaultRequestChannel = "mqttOutboundChannel")
    public interface MyGateway {

        void sendToMqtt(String data);

    }

}

使用 Java DSL 进行配置

以下 Spring Boot 应用程序提供了使用 Java DSL 配置出站适配器的示例:

@SpringBootApplication
public class MqttJavaApplication {

    public static void main(String[] args) {
        new SpringApplicationBuilder(MqttJavaApplication.class)
            .web(false)
            .run(args);
    }

       @Bean
       public IntegrationFlow mqttOutboundFlow() {
           return f -> f.handle(new MqttPahoMessageHandler("tcp://host1:1883", "someMqttClient"));
    }

}

事件

某些应用程序事件由适配器发布。

  • MqttConnectionFailedEvent - 如果连接失败或随后连接丢失,则由两个适配器发布。 对于 MQTT v5 Paho 客户端,当服务器执行正常断开连接时,也会发出此事件,在这种情况下,连接丢失的 causenull

  • MqttMessageSentEvent - 如果在异步模式下运行,则在发送消息时由出站适配器发布。

  • MqttMessageDeliveredEvent - 如果在异步模式下运行,则在客户端指示消息已送达时由出站适配器发布。

  • MqttMessageNotDeliveredEvent - 如果在异步模式下运行,则在客户端指示消息未送达时由出站适配器发布。

  • MqttSubscribedEvent - 在订阅主题后由入站适配器发布。

这些事件可以由 ApplicationListener<MqttIntegrationEvent>@EventListener 方法接收。

要确定事件的来源,请使用以下方法;你可以检查 bean 名称和/或连接选项(以访问服务器 URI 等)。

MqttPahoComponent source = event.getSourceAsType();
String beanName = source.getBeanName();
MqttConnectOptions options = source.getConnectionInfo();

MQTT v5 支持

从 5.5.5 版本开始,spring-integration-mqtt 模块提供了 MQTT v5 协议的通道适配器实现。 org.eclipse.paho:org.eclipse.paho.mqttv5.client 是一个 optional 依赖项,因此必须显式包含在目标项目中。

由于 MQTT v5 协议支持 MQTT 消息中的额外任意属性,因此引入了 MqttHeaderMapper 实现,用于在发布和接收操作时映射到/从头。 默认情况下,(通过 * 模式)它映射所有接收到的 PUBLISH 帧属性(包括用户属性)。 在出站端,它映射 PUBLISH 帧的以下头子集:contentTypemqtt_messageExpiryIntervalmqtt_responseTopicmqtt_correlationData

MQTT v5 协议的出站通道适配器作为 Mqttv5PahoMessageHandler 存在。 它需要 clientId 和 MQTT 代理 URL 或 MqttConnectionOptions 引用。 它支持 MqttClientPersistence 选项,可以是 async,并且在这种情况下可以发出 MqttIntegrationEvent 对象(参见 asyncEvents 选项)。 如果请求消息负载是 org.eclipse.paho.mqttv5.common.MqttMessage,则通过内部 IMqttAsyncClient 原样发布。 如果负载是 byte[],则原样用作目标 MqttMessage 负载以发布。 如果负载是 String,则将其转换为 byte[] 以发布。 其余用例委托给提供的 MessageConverter,它是应用程序上下文中的 IntegrationContextUtils.ARGUMENT_RESOLVER_MESSAGE_CONVERTER_BEAN_NAME ConfigurableCompositeMessageConverter bean。 注意:当请求的消息负载已经是 MqttMessage 时,不使用提供的 HeaderMapper<MqttProperties>。 以下 Java DSL 配置示例演示了如何在集成流中使用此通道适配器:

@Bean
public IntegrationFlow mqttOutFlow() {
    Mqttv5PahoMessageHandler messageHandler = new Mqttv5PahoMessageHandler(MQTT_URL, "mqttv5SIout");
    MqttHeaderMapper mqttHeaderMapper = new MqttHeaderMapper();
    mqttHeaderMapper.setOutboundHeaderNames("some_user_header", MessageHeaders.CONTENT_TYPE);
    messageHandler.setHeaderMapper(mqttHeaderMapper);
    messageHandler.setAsync(true);
    messageHandler.setAsyncEvents(true);
    messageHandler.setConverter(mqttStringToBytesConverter());

    return f -> f.handle(messageHandler);
}

org.springframework.integration.mqtt.support.MqttMessageConverter 不能与 Mqttv5PahoMessageHandler 一起使用,因为其契约仅针对 MQTT v3 协议。

如果连接在启动时或运行时失败,Mqttv5PahoMessageHandler 会尝试在下一个生成到此处理器的消息时重新连接。 如果此手动重新连接失败,则连接异常将抛回给调用者。 在这种情况下,将应用标准的 Spring Integration 错误处理过程,包括请求处理器建议,例如重试或断路器。

有关更多信息,请参阅 Mqttv5PahoMessageHandler Javadoc 及其超类。

MQTT v5 协议的入站通道适配器作为 Mqttv5PahoMessageDrivenChannelAdapter 存在。 它需要 clientId 和 MQTT 代理 URL 或 MqttConnectionOptions 引用,以及要订阅和消费的主题。 它支持 MqttClientPersistence 选项,默认情况下是内存中的。 可以配置预期的 payloadType(默认为 byte[]),并将其传播到提供的 SmartMessageConverter,用于从接收到的 MqttMessagebyte[] 进行转换。 如果设置了 manualAck 选项,则 IntegrationMessageHeaderAccessor.ACKNOWLEDGMENT_CALLBACK 头将添加到消息中,以生成 SimpleAcknowledgment 实例。 HeaderMapper<MqttProperties> 用于将 PUBLISH 帧属性(包括用户属性)映射到目标消息头中。 标准 MqttMessage 属性,例如 qosiddupretained,以及接收到的主题,总是映射到头中。 有关更多信息,请参阅 MqttHeaders

从 6.3 版本开始,Mqttv5PahoMessageDrivenChannelAdapter 提供了基于 MqttSubscription 的构造函数,用于细粒度配置,而不是普通主题名称。 当提供了这些订阅时,通道适配器的 qos 选项不能使用,因为这种 qos 模式是 MqttSubscription API 的一部分。

以下 Java DSL 配置示例演示了如何在集成流中使用此通道适配器:

@Bean
public IntegrationFlow mqttInFlow() {
    Mqttv5PahoMessageDrivenChannelAdapter messageProducer =
        new Mqttv5PahoMessageDrivenChannelAdapter(MQTT_URL, "mqttv5SIin", "siTest");
    messageProducer.setPayloadType(String.class);
    messageProducer.setMessageConverter(mqttStringToBytesConverter());
    messageProducer.setManualAcks(true);

    return IntegrationFlow.from(messageProducer)
            .channel(c -> c.queue("fromMqttChannel"))
            .get();
}

org.springframework.integration.mqtt.support.MqttMessageConverter 不能与 Mqttv5PahoMessageDrivenChannelAdapter 一起使用,因为其契约仅针对 MQTT v3 协议。

有关更多信息,请参阅 Mqttv5PahoMessageDrivenChannelAdapter Javadoc 及其超类。

建议将 MqttConnectionOptions#setAutomaticReconnect(boolean) 设置为 true,以使内部 IMqttAsyncClient 实例处理重新连接。 否则,只有手动重启 Mqttv5PahoMessageDrivenChannelAdapter 才能处理重新连接,例如通过在断开连接时处理 MqttConnectionFailedEvent

共享 MQTT 客户端支持

如果多个集成需要单个 MQTT 客户端 ID,则不能使用多个 MQTT 客户端实例,因为 MQTT 代理可能会限制每个客户端 ID 的连接数(通常只允许单个连接)。 为了将单个客户端重用于不同的通道适配器,可以使用 org.springframework.integration.mqtt.core.ClientManager 组件并将其传递给所需的任何通道适配器。 它将管理 MQTT 连接生命周期并在需要时执行自动重新连接。 此外,可以向客户端管理器提供自定义连接选项和 MqttClientPersistence,就像目前可以为通道适配器组件所做的那样。

请注意,MQTT v5 和 v3 通道适配器都受支持。

以下 Java DSL 配置示例演示了如何在集成流中使用此客户端管理器:

@Bean
public ClientManager<IMqttAsyncClient, MqttConnectionOptions> clientManager() {
    MqttConnectionOptions connectionOptions = new MqttConnectionOptions();
    connectionOptions.setServerURIs(new String[]{ "tcp://localhost:1883" });
    connectionOptions.setConnectionTimeout(30000);
    connectionOptions.setMaxReconnectDelay(1000);
    connectionOptions.setAutomaticReconnect(true);
    Mqttv5ClientManager clientManager = new Mqttv5ClientManager(connectionOptions, "client-manager-client-id-v5");
    clientManager.setPersistence(new MqttDefaultFilePersistence());
    return clientManager;
}

@Bean
public IntegrationFlow mqttInFlowTopic1(
        ClientManager<IMqttAsyncClient, MqttConnectionOptions> clientManager) {

    Mqttv5PahoMessageDrivenChannelAdapter messageProducer =
        new Mqttv5PahoMessageDrivenChannelAdapter(clientManager, "topic1");
    return IntegrationFlow.from(messageProducer)
            .channel(c -> c.queue("fromMqttChannel"))
            .get();
}

@Bean
public IntegrationFlow mqttInFlowTopic2(
        ClientManager<IMqttAsyncClient, MqttConnectionOptions> clientManager) {

    Mqttv5PahoMessageDrivenChannelAdapter messageProducer =
        new Mqttv5PahoMessageDrivenChannelAdapter(clientManager, "topic2");
    return IntegrationFlow.from(messageProducer)
            .channel(c -> c.queue("fromMqttChannel"))
            .get();
}

@Bean
public IntegrationFlow mqttOutFlow(
        ClientManager<IMqttAsyncClient, MqttConnectionOptions> clientManager) {

    return f -> f.handle(new Mqttv5PahoMessageHandler(clientManager));
}

从 6.4 版本开始,现在可以使用相应的 ClientManager 通过 IntegrationFlowContext 在运行时添加 MqttPahoMessageDrivenChannelAdapterMqttv5PahoMessageDrivenChannelAdapter 的多个实例。

private void addAddRuntimeAdapter(IntegrationFlowContext flowContext, Mqttv5ClientManager clientManager,
                                  String topic, MessageChannel channel) {
    flowContext
        .registration(
            IntegrationFlow
                .from(new Mqttv5PahoMessageDrivenChannelAdapter(clientManager, topic))
                .channel(channel)
                .get())
        .register();
}