物联网平台搭建(物联网平台搭建 工具)燃爆了
物联网是现在比较热门的软件领域,众多物联网厂商都有自己的物联网平台,而物联网平台其中一个核心的模块就是Mqtt网关。
本文分享自华为云社区《一文带你掌握物联网mqtt网关搭建背后的技术原理-云社区-华为云》,作者:张俭前言物联网是现在比较热门的软件领域,众多物联网厂商都有自己的物联网平台,而物联网平台其中一个核心的模块就是Mqtt网关。
这篇文章的目的是手把手教大家写书写一个mqtt网关,后端存储支持Kafka/Pulsar,支持mqtt 连接、断链、发送消息、订阅消息技术选型:Netty java最流行的网络框架netty-codec-mqtt netty的子项目,mqtt编解码插件
Pulsar/Kafka 流行的消息中间件作为后端存储核心pom依赖如下io.nettynetty-codec-mqtt
io.nettynetty-common
>io.nettynetty-transport
>org.apache.pulsarpulsar-client-original
>${pulsar.version}org.apache.kafkakafka-clients
${kafka.version}org.eclipse.paho
>org.eclipse.paho.client.mqttv3${mqtt-client.version}
>test软件参数设计软件参数可谓是非常常见,复杂的开源项目,参数甚至可以达到上百个、配置文件长达数千行我们需要的配置有MqttServer监听的端口监听端口的配置即使是写demo也非常必要,常常用在单元测试中,由于单元测试跑完之后,即使网络服务器关闭,操作系统也不会立即释放端口,所以单元测试的时候指定随机端口非常关键,在java中,我们可以通过这样的工具类来获取一个空闲的端口。
未配置的话,我们就使用mqtt的默认端口1883package io.github.protocol.mqtt.broker.util; import java.io.IOException; import
java.io.UncheckedIOException; import java.net.ServerSocket; publicclassSocketUtil{ publicstatic
intgetFreePort(){ try (ServerSocket serverSocket = new ServerSocket(0)) { return serverSocket.getLocalPort(); }
catch (IOException e) { thrownew UncheckedIOException(e); } } } 后端存储配置我们的mqtt网关是没有可靠的存储能力的,依赖后端的消息中间件来做持久化处理。
后端规划支持Pulsar、Kafka两种类型定义枚举类如下publicenumProcessorType{ KAFKA, PULSAR, } 对应的KafkaProcessorConfig、PulsarProcessorConfig比较简单,包含基础的连接地址即可,如果后续要做性能调优、安全,这块还是会有更多的配置项
@Setter@GetterpublicclassKafkaProcessorConfig{ private String bootstrapServers = "localhost:9092"
; publicKafkaProcessorConfig(){ } } @Setter@GetterpublicclassPulsarProcessorConfig{ private
String httpUrl = "http://localhost:8080"; private String serviceUrl = "pulsar://localhost:6650"
; publicPulsarProcessorConfig(){ } } 启动netty MqttServer我们通过netty启动一个mqttServer,添加mqtt解码器package
io.github.protocol.mqtt.broker; import io.github.protocol.mqtt.broker.processor.KafkaProcessor; import
io.github.protocol.mqtt.broker.processor.KafkaProcessorConfig; import io.github.protocol.mqtt.broker.processor.MqttProcessor;
import io.github.protocol.mqtt.broker.processor.PulsarProcessor; import io.github.protocol.mqtt.broker.processor.PulsarProcessorConfig;
import io.github.protocol.mqtt.broker.util.SocketUtil; import io.netty.bootstrap.ServerBootstrap; import
io.netty.channel.ChannelFuture; import io.netty.channel.ChannelInitializer; import io.netty.channel.ChannelOption;
import io.netty.channel.ChannelPipeline; import io.netty.channel.EventLoopGroup; import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel; import io.netty.channel.socket.nio.NioServerSocketChannel;
import io.netty.handler.codec.mqtt.MqttDecoder; import io.netty.handler.codec.mqtt.MqttEncoder; import
io.netty.handler.logging.LogLevel; import io.netty.handler.logging.LoggingHandler; import lombok.extern.slf4j.Slf4j;
@Slf4j publicclassMqttServer{ privatefinal MqttServerConfig mqttServerConfig; publicMqttServer
(){ this(new MqttServerConfig()); } publicMqttServer(MqttServerConfig mqttServerConfig)
{ this.mqttServerConfig = mqttServerConfig; if (mqttServerConfig.getPort() == 0) { mqttServerConfig.setPort(SocketUtil.getFreePort()); } }
publicvoidstart()throws Exception { EventLoopGroup bossGroup = new NioEventLoopGroup(1); EventLoopGroup workerGroup =
new NioEventLoopGroup(); try { ServerBootstrap b = new ServerBootstrap(); b.group(bossGroup, workerGroup) .channel(NioServerSocketChannel
.class) .option(ChannelOption.SO_BACKLOG, 100) .handler(newLoggingHandler
(LogLevel.INFO)) .childHandler(newChannelInitializer() {
@OverridepublicvoidinitChannel(SocketChannel ch)throws Exception { ChannelPipeline p = ch.pipeline();
// decoder p.addLast(new MqttDecoder()); p.addLast(MqttEncoder.INSTANCE); } });
// Start the server. ChannelFuture f = b.bind(mqttServerConfig.getPort()).sync();
// Wait until the server socket is closed. f.channel().closeFuture().sync(); } finally
{ // Shut down all event loops to terminate all threads. bossGroup.shutdownGracefully(); workerGroup.shutdownGracefully(); } }
private MqttProcessor processor(MqttServerConfig config){ returnswitch (config.getProcessorType()) {
case KAFKA -> new KafkaProcessor(config.getMqttAuth(), config.getKafkaProcessorConfig());
case PULSAR -> new PulsarProcessor(config.getMqttAuth(), config.getPulsarProcessorConfig()); }; }
publicintgetPort(){ return mqttServerConfig.getPort(); } } MqttserverStarter.java我们写一个简单的main函数用来启动mqttServer,方便调测
package io.github.protocol.mqtt.broker; publicclassMqttServerStarter{ publicstaticvoidmain(String[] args)
throws Exception { new MqttServer().start(); } } 客户端使用eclipse mqtt client进行测试package io.github.protocol.mqtt;
import lombok.extern.log4j.Log4j2; import org.eclipse.paho.client.mqttv3.MqttClient; import org.eclipse.paho.client.mqttv3.MqttConnectOptions;
import org.eclipse.paho.client.mqttv3.MqttException; import org.eclipse.paho.client.mqttv3.MqttMessage;
import org.eclipse.paho.client.mqttv3.persist.MemoryPersistence; @Log4j2 publicclassMqttClientPublishExample
{publicstaticvoidmain(String[] args) throws Exception { String topic = "MQTT Examples"; String content =
"Message from MqttPublishExample"; int qos = 2; String broker = "tcp://127.0.0.1:1883"
; String clientId = "JavaSample"; MemoryPersistence persistence = new MemoryPersistence();
try { MqttClient sampleClient = new MqttClient(broker, clientId, persistence); MqttConnectOptions connOpts =
new MqttConnectOptions(); connOpts.setCleanSession(true); log.info("Connecting to broker: {}"
, broker); sampleClient.connect(connOpts); log.info("Connected");
log.info("Publishing message: {}", content); MqttMessage message = new MqttMessage(content.getBytes()); message.setQos(qos); sampleClient.publish(topic, message);
log.info("Message published"); sampleClient.disconnect(); log.info("Disconnected"
); System.exit(0); } catch (MqttException me) { log.error("reason {} msg {}"
, me.getReasonCode(), me.getMessage(), me); } } } 然后我们先运行MqttServer,再运行MqttClient,发现MqttClient卡住了
Connecting to broker: tcp://127.0.0.1:1883 这是为什么呢,我们通过抓包发现仅仅只有客户端发送了Mqtt connect信息,服务端并没有响应
但是根据mqtt标准协议,发送Connect消息,必须要有ConnAck响应
所以我们需要在接收到Connect后,返回connAck消息我们创建一个MqttHandler,让他继承ChannelInboundHandlerAdapter, 用来接力MqttDecoder解码完成后的消息,这里要重点继承其中的channelRead方法,以及channelInactive方法,用来释放断链时需要释放的资源。
packagecom.github.shoothzj.mqtt; importio.netty.channel.ChannelHandlerContext; importio.netty.channel
.ChannelInboundHandlerAdapter; importlombok.extern.slf4j.Slf4j; @Slf4j public class MqttHandler extends ChannelInboundHandlerAdapter {
@Override public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
super.channelRead(ctx, msg); } } 然后把这个handler加入到netty的职责链中,放到解码器的后面
在mqtt handler中插入我们的代码@OverridepublicvoidchannelRead(ChannelHandlerContext ctx, Object msg)throws Exception
{ super.channelRead(ctx, msg); if (msg instanceof MqttConnectMessage) { handleConnect(ctx, (MqttConnectMessage) msg); }
else { log.error("Unsupported type msg [{}]", msg); } } privatevoidhandleConnect
(ChannelHandlerContext ctx, MqttConnectMessage connectMessage){ log.info("connect msg is [{}]"
, connectMessage); } 打印出connectMessage如下[MqttConnectMessage[fixedHeader=MqttFixedHeader[messageType=CONNECT, isDup=
false, qosLevel=AT_MOST_ONCE, isRetain=false, remainingLength=22], variableHeader=MqttConnectVariableHeader[name=MQTT, version=
4, hasUserName=false, hasPassword=false, isWillRetain=false, isWillFlag=false, isCleanSession=true, keepAliveTimeSeconds=
60], payload=MqttConnectPayload[clientIdentifier=JavaSample, willTopic=null, willMessage=null, userName=
null, password=null]]] 通常,mqtt connect message中会包含qos、用户名、密码等信息,由于我们启动客户端的时候也没有携带用户名和密码,这里获取到的都为null,我们先不校验这些消息,直接给客户端返回connack消息,代表连接成功
final MqttConnAckMessage ackMessage = MqttMessageBuilders.connAck().returnCode(CONNECTION_ACCEPTED).build(); ctx.channel().writeAndFlush(ackMessage);
我们再运行起Server和Client,随后可以看到已经走过了Connect阶段,进入了publish message过程,接下来我们再实现更多的其他场景
附上此阶段的MqttHandler代码package com.github.shoothzj.mqtt; import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter; import io.netty.handler.codec.mqtt.MqttConnAckMessage;
import io.netty.handler.codec.mqtt.MqttConnectMessage; import io.netty.handler.codec.mqtt.MqttConnectPayload;
import io.netty.handler.codec.mqtt.MqttConnectVariableHeader; import io.netty.handler.codec.mqtt.MqttFixedHeader;
import io.netty.handler.codec.mqtt.MqttMessageBuilders; import lombok.extern.slf4j.Slf4j; importstatic
io.netty.handler.codec.mqtt.MqttConnectReturnCode.CONNECTION_ACCEPTED; @Slf4j publicclassMqttHandler
extendsChannelInboundHandlerAdapter{ @OverridepublicvoidchannelRead(ChannelHandlerContext ctx, Object msg)
throws Exception { super.channelRead(ctx, msg); if (msg instanceof MqttConnectMessage) { handleConnect(ctx, (MqttConnectMessage) msg); }
else { log.error("Unsupported type msg [{}]", msg); } } privatevoidhandleConnect
(ChannelHandlerContext ctx, MqttConnectMessage connectMessage){ log.info("connect msg is [{}]"
, connectMessage); final MqttFixedHeader fixedHeader = connectMessage.fixedHeader();
final MqttConnectVariableHeader variableHeader = connectMessage.variableHeader(); final MqttConnectPayload connectPayload = connectMessage.payload();
final MqttConnAckMessage ackMessage = MqttMessageBuilders.connAck().returnCode(CONNECTION_ACCEPTED).build(); ctx.channel().writeAndFlush(ackMessage); } }
我们当前把所有的逻辑都放在MqttHandler里面,不方便后续的扩展抽象出一个MqttProcessor接口来处理具体的请求,MqttHandler负责解析MqttMessage的类型并分发MqttProcess接口设计如下。
package io.github.protocol.mqtt.broker.processor; import io.netty.channel.ChannelHandlerContext; import
io.netty.handler.codec.mqtt.MqttConnAckMessage; import io.netty.handler.codec.mqtt.MqttConnectMessage;
import io.netty.handler.codec.mqtt.MqttMessage; import io.netty.handler.codec.mqtt.MqttPubAckMessage;
import io.netty.handler.codec.mqtt.MqttPublishMessage; import io.netty.handler.codec.mqtt.MqttSubAckMessage;
import io.netty.handler.codec.mqtt.MqttSubscribeMessage; import io.netty.handler.codec.mqtt.MqttUnsubAckMessage;
import io.netty.handler.codec.mqtt.MqttUnsubscribeMessage; publicinterfaceMqttProcessor{ voidprocessConnect
(ChannelHandlerContext ctx, MqttConnectMessage msg)throws Exception; voidprocessConnAck(ChannelHandlerContext ctx, MqttConnAckMessage msg)
throws Exception; voidprocessPublish(ChannelHandlerContext ctx, MqttPublishMessage msg)throws Exception
; voidprocessPubAck(ChannelHandlerContext ctx, MqttPubAckMessage msg)throws Exception; void
processPubRec(ChannelHandlerContext ctx, MqttMessage msg)throws Exception; voidprocessPubRel(ChannelHandlerContext ctx, MqttMessage msg)
throws Exception; voidprocessPubComp(ChannelHandlerContext ctx, MqttMessage msg)throws Exception
; voidprocessSubscribe(ChannelHandlerContext ctx, MqttSubscribeMessage msg)throws Exception;
voidprocessSubAck(ChannelHandlerContext ctx, MqttSubAckMessage msg)throws Exception; voidprocessUnsubscribe
(ChannelHandlerContext ctx, MqttUnsubscribeMessage msg)throws Exception; voidprocessUnsubAck(ChannelHandlerContext ctx, MqttUnsubAckMessage msg)
throws Exception; voidprocessPingReq(ChannelHandlerContext ctx, MqttMessage msg)throws Exception
; voidprocessPingResp(ChannelHandlerContext ctx, MqttMessage msg)throws Exception; voidprocessDisconnect
(ChannelHandlerContext ctx)throws Exception; voidprocessAuth(ChannelHandlerContext ctx, MqttMessage msg)
throws Exception; } 我们允许这些方法抛出异常,当遇到极难处理的故障时,把mqtt连接断掉(如后端存储故障),等待客户端的重连MqttHandler中来调用MqttProcessor,相关MqttHandler代码如下。
Preconditions.checkArgument(message instanceof MqttMessage); MqttMessage msg = (MqttMessage) message;
try { if (msg.decoderResult().isFailure()) { Throwable cause = msg.decoderResult().cause();
if (cause instanceof MqttUnacceptableProtocolVersionException) { // Unsupported protocol version
MqttConnAckMessage connAckMessage = (MqttConnAckMessage) MqttMessageFactory.newMessage(
new MqttFixedHeader(MqttMessageType.CONNACK, false, MqttQoS.AT_MOST_ONCE,
false, 0), new MqttConnAckVariableHeader( MqttConnectReturnCode.CONNECTION_REFUSED_UNACCEPTABLE_PROTOCOL_VERSION,
false), null); ctx.writeAndFlush(connAckMessage); log.error("connection refused due to invalid protocol, client address [{}]"
, ctx.channel().remoteAddress()); ctx.close();
return; } elseif (cause instanceof MqttIdentifierRejectedException) {
// ineligible clientId MqttConnAckMessage connAckMessage = (MqttConnAckMessage) MqttMessageFactory.newMessage(
new MqttFixedHeader(MqttMessageType.CONNACK, false, MqttQoS.AT_MOST_ONCE,
false, 0), new MqttConnAckVariableHeader(MqttConnectReturnCode.CONNECTION_REFUSED_IDENTIFIER_REJECTED,
false), null); ctx.writeAndFlush(connAckMessage); log.error("ineligible clientId, client address [{}]"
, ctx.channel().remoteAddress()); ctx.close(); return; }
thrownew IllegalStateException(msg.decoderResult().cause().getMessage()); } MqttMessageType messageType = msg.fixedHeader().messageType();
if (log.isDebugEnabled()) { log.debug("Processing MQTT Inbound handler message, type={}"
, messageType); } switch (messageType) { case CONNECT: Preconditions.checkArgument(msg
instanceof MqttConnectMessage); processor.processConnect(ctx, (MqttConnectMessage) msg);
break; case CONNACK: Preconditions.checkArgument(msg instanceof MqttConnAckMessage); processor.processConnAck(ctx, (MqttConnAckMessage) msg);
break; case PUBLISH: Preconditions.checkArgument(msg instanceof MqttPublishMessage); processor.processPublish(ctx, (MqttPublishMessage) msg);
break; case PUBACK: Preconditions.checkArgument(msg instanceof MqttPubAckMessage); processor.processPubAck(ctx, (MqttPubAckMessage) msg);
break; case PUBREC: processor.processPubRec(ctx, msg);
break; case PUBREL: processor.processPubRel(ctx, msg);
break; case PUBCOMP: processor.processPubComp(ctx, msg);
break; case SUBSCRIBE: Preconditions.checkArgument(msg instanceof
MqttSubscribeMessage); processor.processSubscribe(ctx, (MqttSubscribeMessage) msg);
break; case SUBACK: Preconditions.checkArgument(msg instanceof MqttSubAckMessage); processor.processSubAck(ctx, (MqttSubAckMessage) msg);
break; case UNSUBSCRIBE: Preconditions.checkArgument(msg instanceof
MqttUnsubscribeMessage); processor.processUnsubscribe(ctx, (MqttUnsubscribeMessage) msg);
break; case UNSUBACK: Preconditions.checkArgument(msg instanceof MqttUnsubAckMessage); processor.processUnsubAck(ctx, (MqttUnsubAckMessage) msg);
break; case PINGREQ: processor.processPingReq(ctx, msg);
break; case PINGRESP: processor.processPingResp(ctx, msg);
break; case DISCONNECT: processor.processDisconnect(ctx);
break; case AUTH: processor.processAuth(ctx, msg);
break; default: thrownew UnsupportedOperationException("Unknown MessageType: "
+ messageType); } } catch (Throwable ex) { ReferenceCountUtil.safeRelease(msg); log.error(
"Exception was caught while processing MQTT message, ", ex); ctx.close(); } 这里的代码,主要是针对MqttMessage的不同类型,调用MqttProcessor的不同方法,值得一提的有两点
提前判断了一些解码异常,fast fail全局捕获异常,并进行断链处理维护MqttSession维护Mqtt会话的session,主要用来持续跟踪客户端会话信息,跟踪在系统中占用的资源等,考虑到无论是何种后端实现,都需要维护Mqtt的Session,我们构筑一个
AbstractMqttProcessor来维护MqttSessionpackage io.github.protocol.mqtt.broker.processor; import io.github.protocol.mqtt.broker.MqttSessionKey;
import io.github.protocol.mqtt.broker.auth.MqttAuth; import io.github.protocol.mqtt.broker.util.ChannelUtils;
import io.github.protocol.mqtt.broker.util.MqttMessageUtil; import io.netty.channel.ChannelHandlerContext;
import io.netty.handler.codec.mqtt.MqttConnAckMessage; import io.netty.handler.codec.mqtt.MqttConnAckVariableHeader;
import io.netty.handler.codec.mqtt.MqttConnectMessage; import io.netty.handler.codec.mqtt.MqttConnectReturnCode;
import io.netty.handler.codec.mqtt.MqttFixedHeader; import io.netty.handler.codec.mqtt.MqttMessage; import
io.netty.handler.codec.mqtt.MqttMessageFactory; import io.netty.handler.codec.mqtt.MqttMessageIdVariableHeader;
import io.netty.handler.codec.mqtt.MqttMessageType; import io.netty.handler.codec.mqtt.MqttPubAckMessage;
import io.netty.handler.codec.mqtt.MqttPublishMessage; import io.netty.handler.codec.mqtt.MqttQoS; import
io.netty.handler.codec.mqtt.MqttSubAckMessage; import io.netty.handler.codec.mqtt.MqttSubAckPayload;
import io.netty.handler.codec.mqtt.MqttSubscribeMessage; import io.netty.handler.codec.mqtt.MqttSubscribePayload;
import io.netty.handler.codec.mqtt.MqttUnsubAckMessage; import io.netty.handler.codec.mqtt.MqttUnsubscribeMessage;
import lombok.extern.slf4j.Slf4j; import org.apache.commons.lang3.StringUtils; import java.util.stream.IntStream;
@Slf4j publicabstractclassAbstractProcessorimplementsMqttProcessor{ protectedfinal MqttAuth mqttAuth;
publicAbstractProcessor(MqttAuth mqttAuth){ this.mqttAuth = mqttAuth; } @Overridepublic
voidprocessConnect(ChannelHandlerContext ctx, MqttConnectMessage msg)throws Exception { String clientId = msg.payload().clientIdentifier(); String username = msg.payload().userName();
byte[] pwd = msg.payload().passwordInBytes(); if (StringUtils.isBlank(clientId) || StringUtils.isBlank(username)) { MqttConnAckMessage connAckMessage = (MqttConnAckMessage) MqttMessageFactory.newMessage(
new MqttFixedHeader(MqttMessageType.CONNACK, false, MqttQoS.AT_MOST_ONCE,
false, 0), new MqttConnAckVariableHeader(MqttConnectReturnCode.CONNECTION_REFUSED_IDENTIFIER_REJECTED,
false), null); ctx.writeAndFlush(connAckMessage); log.error("the clientId username pwd cannot be empty, client address[{}]"
, ctx.channel().remoteAddress()); ctx.close(); return; } if (!mqttAuth.connAuth(clientId, username, pwd)) { MqttConnAckMessage connAckMessage = (MqttConnAckMessage) MqttMessageFactory.newMessage(
new MqttFixedHeader(MqttMessageType.CONNACK, false, MqttQoS.AT_MOST_ONCE,
false, 0), new MqttConnAckVariableHeader(MqttConnectReturnCode.CONNECTION_REFUSED_BAD_USER_NAME_OR_PASSWORD,
false), null); ctx.writeAndFlush(connAckMessage); log.error("the clientId username pwd cannot be empty, client address[{}]"
, ctx.channel().remoteAddress()); ctx.close(); return; } MqttSessionKey mqttSessionKey =
new MqttSessionKey(); mqttSessionKey.setUsername(username); mqttSessionKey.setClientId(clientId); ChannelUtils.setMqttSession(ctx.channel(), mqttSessionKey); log.info(
"username {} clientId {} remote address {} connected", username, clientId, ctx.channel().remoteAddress()); onConnect(mqttSessionKey); MqttConnAckMessage mqttConnectMessage = (MqttConnAckMessage) MqttMessageFactory.newMessage(
new MqttFixedHeader(MqttMessageType.CONNACK, false, MqttQoS.AT_MOST_ONCE, false
, 0), new MqttConnAckVariableHeader(MqttConnectReturnCode.CONNECTION_ACCEPTED, false),
null); ctx.writeAndFlush(mqttConnectMessage); } protectedvoidonConnect(MqttSessionKey mqttSessionKey)
{ } @OverridepublicvoidprocessConnAck(ChannelHandlerContext ctx, MqttConnAckMessage msg)throws
Exception { MqttSessionKey mqttSession = ChannelUtils.getMqttSession(ctx.channel());
if (mqttSession == null) { log.error("conn ack, client address {} not authed", ctx.channel().remoteAddress()); ctx.close(); } }
@OverridepublicvoidprocessPublish(ChannelHandlerContext ctx, MqttPublishMessage msg)throws Exception
{ MqttSessionKey mqttSession = ChannelUtils.getMqttSession(ctx.channel()); if (mqttSession ==
null) { log.error("publish, client address {} not authed", ctx.channel().remoteAddress()); ctx.close();
return; } if (msg.fixedHeader().qosLevel() == MqttQoS.FAILURE) { log.error(
"failure. clientId {}, username {} ", mqttSession.getClientId(), mqttSession.getUsername());
return; } if (msg.fixedHeader().qosLevel() == MqttQoS.EXACTLY_ONCE) { log.error(
"does not support QoS2 protocol. clientId {}, username {} ", mqttSession.getClientId(), mqttSession.getUsername());
return; } onPublish(ctx, mqttSession, msg); } protectedvoidonPublish(ChannelHandlerContext ctx, MqttSessionKey mqttSessionKey, MqttPublishMessage msg)
throws Exception { } @OverridepublicvoidprocessPubAck(ChannelHandlerContext ctx, MqttPubAckMessage msg)
throws Exception { MqttSessionKey mqttSession = ChannelUtils.getMqttSession(ctx.channel());
if (mqttSession == null) { log.error("pub ack, client address {} not authed", ctx.channel().remoteAddress()); ctx.close(); } }
@OverridepublicvoidprocessPubRec(ChannelHandlerContext ctx, MqttMessage msg)throws Exception { MqttSessionKey mqttSession = ChannelUtils.getMqttSession(ctx.channel());
if (mqttSession == null) { log.error("pub rec, client address {} not authed", ctx.channel().remoteAddress()); ctx.close(); } }
@OverridepublicvoidprocessPubRel(ChannelHandlerContext ctx, MqttMessage msg)throws Exception { MqttSessionKey mqttSession = ChannelUtils.getMqttSession(ctx.channel());
if (mqttSession == null) { log.error("pub rel, client address {} not authed", ctx.channel().remoteAddress()); ctx.close(); } }
@OverridepublicvoidprocessPubComp(ChannelHandlerContext ctx, MqttMessage msg)throws Exception { MqttSessionKey mqttSession = ChannelUtils.getMqttSession(ctx.channel());
if (mqttSession == null) { log.error("pub comp, client address {} not authed", ctx.channel().remoteAddress()); ctx.close(); } }
@OverridepublicvoidprocessSubscribe(ChannelHandlerContext ctx, MqttSubscribeMessage msg)throws Exception
{ MqttSessionKey mqttSession = ChannelUtils.getMqttSession(ctx.channel()); if (mqttSession ==
null) { log.error("sub, client address {} not authed", ctx.channel().remoteAddress()); ctx.close(); } onSubscribe(ctx, mqttSession, msg.payload()); MqttFixedHeader fixedHeader =
new MqttFixedHeader(MqttMessageType.SUBACK, false, MqttQoS.AT_MOST_ONCE, false, 0); IntStream intStream = msg.payload().topicSubscriptions().stream().mapToInt(s -> s.qualityOfService().value()); MqttSubAckPayload payload =
new MqttSubAckPayload(intStream.toArray()); ctx.writeAndFlush(MqttMessageFactory.newMessage( fixedHeader, MqttMessageIdVariableHeader.from(msg.variableHeader().messageId()), payload)); }
protectedvoidonSubscribe(ChannelHandlerContext ctx, MqttSessionKey mqttSessionKey, MqttSubscribePayload subscribePayload)
throws Exception { } @OverridepublicvoidprocessSubAck(ChannelHandlerContext ctx, MqttSubAckMessage msg)
throws Exception { MqttSessionKey mqttSession = ChannelUtils.getMqttSession(ctx.channel());
if (mqttSession == null) { log.error("sub ack, client address {} not authed", ctx.channel().remoteAddress()); ctx.close(); } }
@OverridepublicvoidprocessUnsubscribe(ChannelHandlerContext ctx, MqttUnsubscribeMessage msg)throws Exception
{ MqttSessionKey mqttSession = ChannelUtils.getMqttSession(ctx.channel()); if (mqttSession ==
null) { log.error("unsub, client address {} not authed", ctx.channel().remoteAddress()); ctx.close(); } }
@OverridepublicvoidprocessUnsubAck(ChannelHandlerContext ctx, MqttUnsubAckMessage msg)throws Exception
{ MqttSessionKey mqttSession = ChannelUtils.getMqttSession(ctx.channel()); if (mqttSession ==
null) { log.error("unsub ack, client address {} not authed", ctx.channel().remoteAddress()); ctx.close(); } }
@OverridepublicvoidprocessPingReq(ChannelHandlerContext ctx, MqttMessage msg)throws Exception { ctx.writeAndFlush(MqttMessageUtil.pingResp()); }
@OverridepublicvoidprocessPingResp(ChannelHandlerContext ctx, MqttMessage msg)throws Exception { MqttSessionKey mqttSession = ChannelUtils.getMqttSession(ctx.channel());
if (mqttSession == null) { log.error("ping resp, client address {} not authed", ctx.channel().remoteAddress()); ctx.close(); } }
@OverridepublicvoidprocessDisconnect(ChannelHandlerContext ctx)throws Exception { MqttSessionKey mqttSession = ChannelUtils.getMqttSession(ctx.channel());
if (mqttSession == null) { log.error("disconnect, client address {} not authed", ctx.channel().remoteAddress()); } onDisconnect(mqttSession); }
protectedvoidonDisconnect(MqttSessionKey mqttSessionKey){ } @OverridepublicvoidprocessAuth(ChannelHandlerContext ctx, MqttMessage msg)
throws Exception { MqttSessionKey mqttSession = ChannelUtils.getMqttSession(ctx.channel());
if (mqttSession == null) { log.error("auth, client address {} not authed", ctx.channel().remoteAddress()); ctx.close(); } } }
可以看到,这里的AbstractProcessor主要是维护了MqttSessionKey,校验MqttSessionKey,并拦截publish中不支持的Qos2、Failure同时,也影响了mqtt心跳请求。
同样的,我们允许在onPublish、onSubscribe中抛出异常基于消息队列实现的mqtt网关的基础思想也比较简单,简而言之就是,有publish消息的时候向消息队列中生产消息有订阅的时候就从消息队列中拉取消息。
由此延伸出来,我们可能需要维护每个mqtt topic和producer、consumer的对应关系,因为像kafka、pulsar这些消息中间件的消费者都是区分topic的,片段通用代码如下:protected
final ReentrantReadWriteLock.ReadLock rLock; protected final ReentrantReadWriteLock.WriteLock wLock;
protected final Map sessionProducerMap; protected final Map sessionConsumerMap;
protected final Map producerMap; protected final Map consumerMap;
publicAbstractMqProcessor(MqttAuth mqttAuth) { super(mqttAuth); ReentrantReadWriteLock
lock = new ReentrantReadWriteLock(); rLock = lock.readLock(); wLock = lock.writeLock();
this.sessionProducerMap = new HashMap<>(); this.sessionConsumerMap = new HashMap<>();
this.producerMap = new HashMap<>(); this.consumerMap = new HashMap<>(); } @Override
protectedvoidonConnect(MqttSessionKey mqttSessionKey) { wLock.lock(); try { sessionProducerMap.put(mqttSessionKey,
new ArrayList<>()); sessionConsumerMap.put(mqttSessionKey, new ArrayList<>()); }
finally { wLock.unlock(); } } @Override protectedvoidonDisconnect(MqttSessionKey mqttSessionKey
) { wLock.lock(); try { // find producers List produceTopicKeys = sessionProducerMap.
get(mqttSessionKey); if (produceTopicKeys != null) { for (MqttTopicKey mqttTopicKey : produceTopicKeys) { P producer = producerMap.
get(mqttTopicKey); if (producer != null) { ClosableUtils.close(producer); producerMap.
remove(mqttTopicKey); } } } sessionProducerMap.
remove(mqttSessionKey); List consumeTopicKeys = sessionConsumerMap.get(mqttSessionKey);
if (consumeTopicKeys != null) { for (MqttTopicKey mqttTopicKey : consumeTopicKeys) { C consumer = consumerMap.
get(mqttTopicKey); if (consumer != null) { ClosableUtils.close(consumer); consumerMap.
remove(mqttTopicKey); } } } sessionConsumerMap.
remove(mqttSessionKey); } finally { wLock.unlock(); } } } kafka processor实现
由于kafka producer不区分topic,我们可以在kafka processor中复用producer,在将来单个kafka producer的性能到达上限时,我们可以将kafka producer扩展为kafka producer列表进行轮询处理,消费者由于mqtt协议可能针对每个订阅topic有不同的行为,不合适复用同一个消费者实例。
我们在构造函数中启动KafkaProducerprivatefinal KafkaProcessorConfig kafkaProcessorConfig; privatefinal KafkaProducer producer;
publicKafkaProcessor(MqttAuth mqttAuth, KafkaProcessorConfig kafkaProcessorConfig){ super(mqttAuth);
this.kafkaProcessorConfig = kafkaProcessorConfig; this.producer = createProducer(); }
protected KafkaProducer createProducer(){ Properties properties = new Properties(); properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, kafkaProcessorConfig.getBootstrapServers()); properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer
.class); properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, ByteBufferSerializer.class
); returnnew KafkaProducer<>(properties); } 处理MqttPublish消息,MqttPublish消息包含如下几个关键参数MqttQoS
mqttQoS = publishMessage.fixedHeader().qosLevel(); String topic = publishMessage.variableHeader().topicName();
ByteBuffer byteBuffer = publishMessage.payload().nioBuffer(); 其中qos代表这条消息的质量级别,0没有任何保障,1代表至少一次,2代表恰好一次。
当前仅支持qos0、qos1topicName就是topic的名称ByteBuffer就是消息的内容根据topic、qos发送消息,代码如下 String topic = msg.variableHeader().topicName(); ProducerRecord record =
new ProducerRecord<>(topic, msg.payload().nioBuffer()); switch (msg.fixedHeader().qosLevel()) {
case AT_MOST_ONCE -> producer.send(record, (metadata, exception) -> { if (exception !=
null) { log.error("mqttSessionKey {} send message to kafka error", mqttSessionKey,
exception); return; } log.debug("mqttSessionKey {} send message to kafka success, topic {}, partition {}, offset {}"
, mqttSessionKey, metadata.topic(), metadata.partition(), metadata.offset()); });
case AT_LEAST_ONCE -> { try { RecordMetadata recordMetadata = producer.send(record).get(); log.info(
"mqttSessionKey {} send message to kafka success, topic {}, partition {}, offset {}", mqttSessionKey, recordMetadata.topic(), recordMetadata.partition(), recordMetadata.offset()); ctx.writeAndFlush(MqttMessageUtil.pubAckMessage(msg.variableHeader().packetId())); }
catch (Exception e) { log.error("mqttSessionKey {} send message to kafka error", mqttSessionKey, e); } }
case EXACTLY_ONCE, FAILURE -> thrownew IllegalStateException( String.format("mqttSessionKey %s can not reach here"
, mqttSessionKey)); } 处理订阅消息,我们暂时仅根据订阅的topic,创建topic进行消费即可,由于kafka原生客户端建议的消费代码模式如下while (true
) { ConsumerRecords records = consumer.poll(Duration.ofSeconds(1)); for (ConsumerRecord
byte[]> record : records) { // do logic } } 我们需要切换到其他线程对consumer进行消息,书写一个KafkaConsumerListenerWrapper
的wrapper,转换为listener异步消费模型package io.github.protocol.mqtt.broker.processor; import lombok.extern.slf4j.Slf4j;
import org.apache.kafka.clients.admin.AdminClient; import org.apache.kafka.clients.admin.AdminClientConfig;
import org.apache.kafka.clients.admin.KafkaAdminClient; import org.apache.kafka.clients.admin.NewTopic;
import org.apache.kafka.clients.admin.TopicDescription; import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRecord; import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer; import org.apache.kafka.common.errors.UnknownTopicOrPartitionException;
import org.apache.kafka.common.errors.WakeupException; import org.apache.kafka.common.serialization.ByteArrayDeserializer;
import org.apache.kafka.common.serialization.StringDeserializer; import java.time.Duration; import java.util.Collections;
import java.util.Properties; import java.util.concurrent.ExecutionException; @Slf4j publicclassKafkaConsumerListenerWrapper
implementsAutoCloseable{ privatefinal AdminClient adminClient; privatefinal KafkaConsumer
byte[]> consumer; publicKafkaConsumerListenerWrapper(KafkaProcessorConfig config, String username)
{ Properties adminProperties = new Properties(); adminProperties.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, config.getBootstrapServers());
this.adminClient = KafkaAdminClient.create(adminProperties); Properties properties = new Properties(); properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, config.getBootstrapServers()); properties.put(ConsumerConfig.GROUP_ID_CONFIG, username); properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer
.class); properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, ByteArrayDeserializer
.class); this.consumer = new KafkaConsumer<>(properties); } publicvoidstart(String topic, KafkaMessageListener listener)
throws Exception { try { TopicDescription topicDescription = adminClient.describeTopics(Collections.singletonList(topic)) .values().get(topic).get(); log.info(
"topic info is {}", topicDescription); } catch (ExecutionException ee) { if (ee.getCause()
instanceof UnknownTopicOrPartitionException) { log.info("topic {} not exist, create it"
, topic); adminClient.createTopics(Collections.singletonList(new NewTopic(topic, 1, (
short) 1))); } else { log.error("find topic info {} error", topic, ee); } }
catch (Exception e) { thrownew IllegalStateException("find topic info error", e); } consumer.subscribe(Collections.singletonList(topic)); log.info(
"consumer topic {} start", topic); new Thread(() -> { try { while
(true) { ConsumerRecords records = consumer.poll(Duration.ofSeconds(
1)); for (ConsumerRecord record : records) { listener.messageReceived(record); } } }
catch (WakeupException we) { consumer.close(); } catch (Exception e) { log.error(
"consumer topic {} consume error", topic, e); consumer.close(); } }).start(); Thread.sleep(
5_000); } @Overridepublicvoidclose()throws Exception { log.info("wake up {} consumer"
, consumer); consumer.wakeup(); } } @Override protectedvoidonSubscribe(ChannelHandlerContext ctx, MqttSessionKey mqttSessionKey, MqttSubscribePayload subscribePayload
) throws Exception { for (MqttTopicSubscription topicSubscription : subscribePayload.topicSubscriptions()) { KafkaConsumerListenerWrapper consumer = createConsumer(mqttSessionKey, topicSubscription.topicName()); subscribe(ctx, consumer, topicSubscription.topicName()); } }
private KafkaConsumerListenerWrapper createConsumer(MqttSessionKey mqttSessionKey, String topic) { MqttTopicKey mqttTopicKey =
new MqttTopicKey(); mqttTopicKey.setTopic(topic); mqttTopicKey.setMqttSessionKey(mqttSessionKey); wLock.
lock(); try { KafkaConsumerListenerWrapper consumer = consumerMap.get(mqttTopicKey);
if (consumer == null) { consumer = new KafkaConsumerListenerWrapper(kafkaProcessorConfig, mqttSessionKey.getUsername()); sessionConsumerMap.compute(mqttSessionKey, (mqttSessionKey1, mqttTopicKeys) -> {
if (mqttTopicKeys == null) { mqttTopicKeys = new ArrayList<>(); } mqttTopicKeys.
add(mqttTopicKey); return mqttTopicKeys; }); consumerMap.put(mqttTopicKey, consumer); }
return consumer; } finally { wLock.unlock(); } } protectedvoidsubscribe
(ChannelHandlerContext ctx, KafkaConsumerListenerWrapper consumer, String topic
) throws Exception { BoundInt boundInt = new BoundInt(65535); consumer.start(topic, record -> { log.info(
"receive message from kafka, topic {}, partition {}, offset {}", record.topic(), record.partition(), record.offset()); MqttPublishMessage mqttPublishMessage = MqttMessageUtil.publishMessage( MqttQoS.AT_LEAST_ONCE, topic, boundInt.nextVal(), record.
value()); ctx.writeAndFlush(mqttPublishMessage); }); } 在上述的代码中,有一个需要通篇注意的点:日志打印的时候,注意要将关键的信息携带,比如:topic、mqtt username、mqtt clientId等,在写demo的时候没有感觉,但是海量请求下需要定位问题的时候,就知道这些信息的关键之处了。
使用BountInt这个简单的工具类来生成从0~65535的packageId,满足协议的要求pulsar processor实现pulsar相比kafka来说,更适合作为mqtt协议的代理原因有如下几点:。
pulsar支持百万topic、topic实现更轻量pulsar原生支持listener的消费模式,不需要每个消费者启动一个线程pulsar支持share的消费模式,消费模式更灵活pulsar消费者的subscribe可确保成功创建订阅,相比kafka的消费者没有这样的语义保障
protectedfinal ReentrantReadWriteLock.ReadLock rLock; protectedfinal ReentrantReadWriteLock.WriteLock wLock;
protectedfinal Map sessionProducerMap; protectedfinal Map sessionConsumerMap;
protectedfinal Map producerMap; protectedfinal Map
byte[]>> consumerMap; privatefinal PulsarProcessorConfig pulsarProcessorConfig; privatefinal
PulsarAdmin pulsarAdmin; privatefinal PulsarClient pulsarClient; publicPulsarProcessor(MqttAuth mqttAuth, PulsarProcessorConfig pulsarProcessorConfig)
{ super(mqttAuth); ReentrantReadWriteLock lock = new ReentrantReadWriteLock(); rLock = lock.readLock(); wLock = lock.writeLock();
this.sessionProducerMap = new HashMap<>(); this.sessionConsumerMap = new HashMap<>();
this.producerMap = new HashMap<>(); this.consumerMap = new HashMap<>(); this.pulsarProcessorConfig = pulsarProcessorConfig;
try { this.pulsarAdmin = PulsarAdmin.builder() .serviceHttpUrl(pulsarProcessorConfig.getHttpUrl()) .build();
this.pulsarClient = PulsarClient.builder() .serviceUrl(pulsarProcessorConfig.getServiceUrl()) .build(); }
catch (Exception e) { thrownew IllegalStateException("Failed to create pulsar client", e); } }
处理publish消息 @Override protectedvoidonPublish(ChannelHandlerContext ctx, MqttSessionKey mqttSessionKey, MqttPublishMessage msg
) throws Exception { String topic = msg.variableHeader().topicName(); Producer producer = getOrCreateProducer(mqttSessionKey, topic);
int len = msg.payload().readableBytes(); byte[] messageBytes = newbyte[len]; msg.payload().getBytes(msg.payload().readerIndex(), messageBytes);
switch (msg.fixedHeader().qosLevel()) { case AT_MOST_ONCE -> producer.sendAsync(messageBytes). thenAccept(messageId -> log.info(
"clientId [{}]," + " username [{}]. send message to pulsar success messageId: {}"
, mqttSessionKey.getClientId(), mqttSessionKey.getUsername(), messageId)) .exceptionally((e) -> { log.error(
"clientId [{}], username [{}]. send message to pulsar fail: ", mqttSessionKey.getClientId(), mqttSessionKey.getUsername(), e);
returnnull; }); case AT_LEAST_ONCE -> { try { MessageId messageId = producer.send(messageBytes); MqttFixedHeader fixedHeader =
new MqttFixedHeader(MqttMessageType.PUBACK, false, MqttQoS.AT_MOST_ONCE,
false, 0); MqttPubAckMessage pubAckMessage = (MqttPubAckMessage) MqttMessageFactory.newMessage(fixedHeader, MqttMessageIdVariableHeader.
from(msg.variableHeader().packetId()), null); log.info("clientId [{}], username [{}]. send pulsar success. messageId: {}"
, mqttSessionKey.getClientId(), mqttSessionKey.getUsername(), messageId); ctx.writeAndFlush(pubAckMessage); }
catch (PulsarClientException e) { log.error("clientId [{}], username [{}]. send pulsar error: {}"
, mqttSessionKey.getClientId(), mqttSessionKey.getUsername(), e.getMessage()); } }
case EXACTLY_ONCE, FAILURE -> thrownew IllegalStateException( String.format("mqttSessionKey %s can not reach here"
, mqttSessionKey)); } } private Producer getOrCreateProducer(MqttSessionKey mqttSessionKey, String topic) throws Exception { MqttTopicKey mqttTopicKey =
new MqttTopicKey(); mqttTopicKey.setTopic(topic); mqttTopicKey.setMqttSessionKey(mqttSessionKey); rLock.
lock(); try { Producer producer = producerMap.get(mqttTopicKey);
if (producer != null) { return producer; } } finally { rLock.unlock(); } wLock.
lock(); try { Producer producer = producerMap.get(mqttTopicKey);
if (producer == null) { producer = createProducer(topic); sessionProducerMap.compute(mqttSessionKey, (mqttSessionKey1, mqttTopicKeys) -> {
if (mqttTopicKeys == null) { mqttTopicKeys = new ArrayList<>(); } mqttTopicKeys.
add(mqttTopicKey); return mqttTopicKeys; }); producerMap.put(mqttTopicKey, producer); }
return producer; } finally { wLock.unlock(); } } protected Producer<
byte[]> createProducer(String topic) throws Exception { return pulsarClient.newProducer(Schema.BYTES).topic(topic).create(); }
处理subscribe消息@OverrideprotectedvoidonSubscribe(ChannelHandlerContext ctx, MqttSessionKey mqttSessionKey, MqttSubscribePayload subscribePayload)
throws Exception { for (MqttTopicSubscription topicSubscription : subscribePayload.topicSubscriptions()) { subscribe(ctx, mqttSessionKey, topicSubscription.topicName()); } }
protectedvoidsubscribe(ChannelHandlerContext ctx, MqttSessionKey mqttSessionKey, String topic)
throws Exception { MqttTopicKey mqttTopicKey = new MqttTopicKey(); mqttTopicKey.setTopic(topic); mqttTopicKey.setMqttSessionKey(mqttSessionKey); wLock.lock();
try { Consumer consumer = consumerMap.get(mqttTopicKey); if (consumer ==
null) { consumer = createConsumer(ctx, mqttSessionKey.getUsername(), topic); sessionConsumerMap.compute(mqttSessionKey, (mqttSessionKey1, mqttTopicKeys) -> {
if (mqttTopicKeys == null) { mqttTopicKeys = new ArrayList<>(); } mqttTopicKeys.add(mqttTopicKey);
return mqttTopicKeys; }); consumerMap.put(mqttTopicKey, consumer); } }
finally { wLock.unlock(); } } protected Consumer createConsumer(ChannelHandlerContext ctx, String username, String topic)
throws Exception { BoundInt boundInt = new BoundInt(65535); try { PartitionedTopicStats partitionedStats = pulsarAdmin.topics().getPartitionedStats(topic,
false); log.info("topic {} partitioned stats {}", topic, partitionedStats); } catch
(PulsarAdminException.NotFoundException nfe) { log.info("topic {} not found", topic); pulsarAdmin.topics().createPartitionedTopic(topic,
1); } return pulsarClient.newConsumer(Schema.BYTES).topic(topic) .messageListener((consumer, msg) -> { log.info(
"receive message from pulsar, topic {}, message {}", topic, msg.getMessageId()); MqttPublishMessage mqttPublishMessage = MqttMessageUtil.publishMessage( MqttQoS.AT_LEAST_ONCE, topic, boundInt.nextVal(), msg.getData()); ctx.writeAndFlush(mqttPublishMessage); }) .subscriptionName(username).subscribe(); }
测试用例鲁邦的软件应该有相应的测试用例,这里简单写了两个基础的pubsub用例,实际的production ready的项目,测试用例会更加复杂,涵盖各种异常的场景有句话说的很好 ”单元测试是对开发人员的即时激励“,我也很认同这句话。
kafka启动kafka测试broker我们可以通过embedded-kafka-java这个项目来启动用做单元测试的kafka broker通过如下的group引入依赖
>io.github.embedded-middlewareembedded-kafka-core0.0.2
>test我们就可以通过如下的代码启动基于kafka的mqtt broker@Slf4j public classMqttKafkaTestUtil
{ public static MqttServer setupMqttKafka() throws Exception { EmbeddedKafkaServer embeddedKafkaServer =
new EmbeddedKafkaServer(); new Thread(() -> { try { embeddedKafkaServer.start(); }
catch (Exception e) { log.error("kafka broker started exception ", e); } }).start(); Thread.sleep(
5_000); MqttServerConfig mqttServerConfig = new MqttServerConfig(); mqttServerConfig.setPort(
0); mqttServerConfig.setProcessorType(ProcessorType.KAFKA); KafkaProcessorConfig kafkaProcessorConfig =
new KafkaProcessorConfig(); kafkaProcessorConfig.setBootstrapServers(String.format("localhost:%d"
, embeddedKafkaServer.getKafkaPort())); mqttServerConfig.setKafkaProcessorConfig(kafkaProcessorConfig); MqttServer mqttServer =
new MqttServer(mqttServerConfig); new Thread(() -> { try { mqttServer.start(); }
catch (Exception e) { log.error("mqsar broker started exception ", e); } }).start(); Thread.sleep(
5000L); return mqttServer; } } kafka端到端测试用例,比较简单,通过mqtt client publish一条消息,然后消费出来@Log4j2
publicclassMqttKafkaPubSubTest { @Test publicvoidpubSubTest() throws Exception { MqttServer mqttServer = MqttKafkaTestUtil.setupMqttKafka(); String topic = UUID.randomUUID().toString(); String content =
"test-msg"; String broker = String.format("tcp://localhost:%d", mqttServer.getPort()); String clientId = UUID.randomUUID().toString(); MemoryPersistence persistence =
new MemoryPersistence(); MqttClient sampleClient = new MqttClient(broker, clientId, persistence); MqttConnectOptions connOpts =
new MqttConnectOptions(); connOpts.setUserName(UUID.randomUUID().toString()); connOpts.setPassword(UUID.randomUUID().toString().toCharArray()); connOpts.setCleanSession(
true); log.info("Mqtt connecting to broker"); sampleClient.connect(connOpts); CompletableFuture
future = new CompletableFuture<>(); log.info("Mqtt subscribing"); sampleClient.subscribe(topic, (s, mqttMessage) -> {
log.info("messageArrived"); future.complete(mqttMessage.toString()); });
log.info("Mqtt subscribed"); MqttMessage message = new MqttMessage(content.getBytes()); message.setQos(
1); log.info("Mqtt message publishing"); sampleClient.publish(topic, message);
log.info("Mqtt message published"); TimeUnit.SECONDS.sleep(3); sampleClient.disconnect(); String msg =
future.get(5, TimeUnit.SECONDS); Assertions.assertEquals(content, msg); } } pulsar我们可以通过embedded-pulsar-java这个项目来启动用做单元测试的pulsar broker。
通过如下的group引入依赖io.github.embedded-middlewareembedded-pulsar-core
0.0.2test我们就可以通过如下的代码启动基于pulsar的mqtt broker
@Slf4j publicclassMqttPulsarTestUtil{ publicstatic MqttServer setupMqttPulsar()throws Exception
{ EmbeddedPulsarServer embeddedPulsarServer = new EmbeddedPulsarServer(); embeddedPulsarServer.start(); MqttServerConfig mqttServerConfig =
new MqttServerConfig(); mqttServerConfig.setPort(0); mqttServerConfig.setProcessorType(ProcessorType.PULSAR); PulsarProcessorConfig pulsarProcessorConfig =
new PulsarProcessorConfig(); pulsarProcessorConfig.setHttpUrl(String.format("http://localhost:%d"
, embeddedPulsarServer.getWebPort())); pulsarProcessorConfig.setServiceUrl(String.format("pulsar://localhost:%d"
, embeddedPulsarServer.getTcpPort())); mqttServerConfig.setPulsarProcessorConfig(pulsarProcessorConfig); MqttServer mqttServer =
new MqttServer(mqttServerConfig); new Thread(() -> { try { mqttServer.start(); }
catch (Exception e) { log.error("mqsar broker started exception ", e); } }).start(); Thread.sleep(
5000L); return mqttServer; } } pulsar端到端测试用例,比较简单,通过mqtt client publish一条消息,然后消费出来@Log4j2
publicclassMqttPulsarPubSubTest { @Test publicvoidpubSubTest() throws Exception { MqttServer mqttServer = MqttPulsarTestUtil.setupMqttPulsar(); String topic = UUID.randomUUID().toString(); String content =
"test-msg"; String broker = String.format("tcp://localhost:%d", mqttServer.getPort()); String clientId = UUID.randomUUID().toString(); MemoryPersistence persistence =
new MemoryPersistence(); MqttClient sampleClient = new MqttClient(broker, clientId, persistence); MqttConnectOptions connOpts =
new MqttConnectOptions(); connOpts.setUserName(UUID.randomUUID().toString()); connOpts.setPassword(UUID.randomUUID().toString().toCharArray()); connOpts.setCleanSession(
true); log.info("Mqtt connecting to broker"); sampleClient.connect(connOpts); CompletableFuture
future = new CompletableFuture<>(); log.info("Mqtt subscribing"); sampleClient.subscribe(topic, (s, mqttMessage) -> {
log.info("messageArrived"); future.complete(mqttMessage.toString()); });
log.info("Mqtt subscribed"); MqttMessage message = new MqttMessage(content.getBytes()); message.setQos(
1); log.info("Mqtt message publishing"); sampleClient.publish(topic, message);
log.info("Mqtt message published"); TimeUnit.SECONDS.sleep(3); sampleClient.disconnect(); String msg =
future.get(5, TimeUnit.SECONDS); Assertions.assertEquals(content, msg); } } 性能优化这里我们简单描述几个性能优化点,像一些调整线程数、buffer大小这类的参数调整就不在这里赘述了,这些需要具体的性能压测来决定参数的设置。
在linux上使用Epoll网络模型publicclassEventLoopUtil{ /** * @return an EventLoopGroup suitable for the current platform */
publicstatic EventLoopGroup newEventLoopGroup(int nThreads, ThreadFactory threadFactory){ if (Epoll.isAvailable()) {
returnnew EpollEventLoopGroup(nThreads, threadFactory); } else { returnnew NioEventLoopGroup(nThreads, threadFactory); } }
publicstatic Class getServerSocketChannelClass(EventLoopGroup eventLoopGroup) {
if (eventLoopGroup instanceof EpollEventLoopGroup) { return EpollServerSocketChannel.class
; } else { return NioServerSocketChannel.class; } } } 通过Epollo.isAvailable,以及在指定channel类型的时候通过判断group的类型选择对应的channel类型
EventLoopGroup acceptorGroup = EventLoopUtil.newEventLoopGroup(1, new DefaultThreadFactory(
"mqtt-acceptor")); EventLoopGroup workerGroup = EventLoopUtil.newEventLoopGroup(1,
new DefaultThreadFactory("mqtt-worker")); b.group(acceptorGroup, workerGroup)
// key point .channel(EventLoopUtil.getServerSocketChannelClass(workerGroup)) .option(ChannelOption.SO_BACKLOG,
100) .handler(new LoggingHandler(LogLevel.INFO)) .childHandler(
new ChannelInitializer() { @OverridepublicvoidinitChannel(SocketChannel ch)
throws Exception { ChannelPipeline p = ch.pipeline();
// decoder p.addLast(new MqttDecoder()); p.addLast(MqttEncoder.INSTANCE); p.addLast(
new MqttHandler(processor(mqttServerConfig))); } }); 关闭tcp keepalive
由于mqtt协议本身就有心跳机制,所以可以关闭tcp的keepalive,依赖mqtt协议层的心跳即可,节约海量连接下的性能配置ChannelOption.SO_KEEPALIVE为false即可.option。
(ChannelOption.SO_KEEPALIVE, false) 超时时间调短默认情况下,无论是单元测试中mqtt,还是pulsar producer和kafka producer的生产超时时间,都相对较长(一般为30s),如果在内网环境部署,可以将超时时间调整到5s。
来避免无意义的超时等待使用多个KafkaProducer来优化性能单个KafkaProducer会达到tcp链路带宽的瓶颈,当有海量请求,而延时在kafka生产比较突出的情况下,可以考虑启动多个KafkaProducer。
并根据mqtt协议的特点(链路多,单个链路上qps不高),用mqttSessionKey的哈希值来决定使用那个KafkaProducer发送消息在KafkaProcessorConfig中添加如下配置,生产者个数,默认为1
privateint producerNum = 1; 在初始化的时候,初始化Producer数组,而不是单个Producerthis.producerArray = new KafkaProducer[kafkaProcessorConfig.getProducerNum()];
for (int i = 0; i < kafkaProcessorConfig.getProducerNum(); i++) { producerArray[i] = createProducer(); }
封装一个方法来获取producerprivate Producer getProducer(MqttSessionKey mqttSessionKey) {
return producerArray[Math.abs(mqttSessionKey.hashCode() % kafkaProcessorConfig.getProducerNum())]; }
结语本文的代码均已上传到github我们这里仅仅只实现了基础的mqtt 连接、发布、订阅功能,甚至不支持暂停、取消订阅想要实现一个成熟商用的mqtt网关,我们还需要用户隔离、对协议的更多支持、可靠性、可运维、流控、安全等能力。
如有商用生产级别的mqtt需求,又无法快速构筑成熟的mqtt网关的,可以选择华为云IoTDA服务,提供稳定可靠的mqtt服务,支持海量设备连接上云、设备和云端消息双向通信能力点击下方,第一时间了解华为云新鲜技术~
华为云博客_大数据博客_AI博客_云计算博客_开发者中心-华为云#华为云开发者联盟#
- 标签:
- 编辑:李松一
- 相关文章
-
新浪微博怎样私信(新浪微博运营私信我)燃爆了
我是卢松松,点点上面的头像,欢迎关注我哦!从今日起很多新浪微博用户发现:微博用私信要收费了,确切的说是对方没有回关或回复你之前,你只…
-
qq相册图标怎么熄灭(如何熄灭qq绑定图标)学到了
目前腾讯QQ8.9.5版本已经开始测试App个性图标,不过只有超级会员SVIP才能使用这些专属个性图标。该功能支持SVIP在默认图标(免费)、光…
- 集群和负载均衡的区别(负载均衡和集群关系)不看后悔
- 为什么主页改不了(为什么改不了自己)奔走相告
- 驱动精灵2013官网(安卓手机驱动精灵官网)满满干货
- 您的操作频率过快(操作频率过快怎么办)速看
- 数据可视化组件(数据可视化组件分为四种)速看