主要问题:
1、sub时不显示retain msg,pub时候retain一直是false,我用vernemq就可以读到retain标志
2、sub proxy经常失败,报空指针,直连mqtt listener是ok的
3:17:40.396 [mqtt-redirect-io-44-1] INFO io.streamnative.pulsar.handlers.mqtt.proxy.ProxyHandler - channel read: MqttConnAckMessage[fixedHeader=MqttFixedHeader[messageType=CONNACK, isDup=false, qosLevel=AT_MOST_ONCE, isRetain=false, remainingLength=2], variableHeader=MqttConnAckVariableHeader[connectReturnCode=CONNECTION_ACCEPTED, sessionPresent=false], payload=]
java.lang.NullPointerException
at io.streamnative.pulsar.handlers.mqtt.proxy.ProxyHandler$ProxyBackendHandler.channelRead(ProxyHandler.java:122)
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379)
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365)
at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:357)
at io.netty.handler.codec.ByteToMessageDecoder.fireChannelRead(ByteToMessageDecoder.java:324)
at io.netty.handler.codec.ByteToMessageDecoder.channelRead(ByteToMessageDecoder.java:296)
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379)
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365)
at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:357)
at io.netty.handler.timeout.IdleStateHandler.channelRead(IdleStateHandler.java:286)
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379)
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365)
at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:357)
at io.netty.channel.DefaultChannelPipeline$HeadContext.channelRead(DefaultChannelPipeline.java:1410)
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379)
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365)
at io.netty.channel.DefaultChannelPipeline.fireChannelRead(DefaultChannelPipeline.java:919)
at io.netty.channel.epoll.AbstractEpollStreamChannel$EpollStreamUnsafe.epollInReady(AbstractEpollStreamChannel.java:795)
at io.netty.channel.epoll.EpollEventLoop.processReady(EpollEventLoop.java:480)
at io.netty.channel.epoll.EpollEventLoop.run(EpollEventLoop.java:378)
at io.netty.util.concurrent.SingleThreadEventExecutor$4.run(SingleThreadEventExecutor.java:986)
at io.netty.util.internal.ThreadExecutorMap$2.run(ThreadExecutorMap.java:74)
at io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30)
at java.lang.Thread.run(Thread.java:750)
03:18:34.785 [pulsar-io-23-1] ERROR io.streamnative.pulsar.handlers.mqtt.support.ProtocolMethodProcessorImpl - [null] Failed to process MQTT subscribe.
java.util.concurrent.CompletionException: java.lang.NullPointerException
at java.util.concurrent.CompletableFuture.encodeThrowable(CompletableFuture.java:273) ~[?:1.8.0_372]
at java.util.concurrent.CompletableFuture.completeThrowable(CompletableFuture.java:280) ~[?:1.8.0_372]
at java.util.concurrent.CompletableFuture.uniAccept(CompletableFuture.java:673) ~[?:1.8.0_372]
at java.util.concurrent.CompletableFuture.uniAcceptStage(CompletableFuture.java:683) ~[?:1.8.0_372]
at java.util.concurrent.CompletableFuture.thenAccept(CompletableFuture.java:2010) ~[?:1.8.0_372]
at io.streamnative.pulsar.handlers.mqtt.utils.PulsarTopicUtils.getOrCreateSubscription(PulsarTopicUtils.java:47) ~[?:?]
at io.streamnative.pulsar.handlers.mqtt.support.ProtocolMethodProcessorImpl.processSubscribe(ProtocolMethodProcessorImpl.java:271) ~[?:?]
at io.streamnative.pulsar.handlers.mqtt.MQTTInboundHandler.channelRead(MQTTInboundHandler.java:61) ~[?:?]
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379) ~[netty-transport-4.1.68.Final.jar:4.1.68.Final]
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365) ~[netty-transport-4.1.68.Final.jar:4.1.68.Final]
pulsar版本 branch-2.7
mop版本 branch-2.7.4.8
主要配置
messagingProtocols=mqtt
protocolHandlerDirectory=./protocols
mqttListeners=mqtt://10.0.0.36:1884
mqttProxyPort=1883
mqttProxyEnable=true
advertisedAddress=10.0.0.36
集群信息
bin/pulsar initialize-cluster-metadata
–cluster pulsar-cluster-1
–zookeeper localhost:2181
–configuration-store localhost:2181
–web-service-url http://10.0.0.187:8080,10.0.0.36:8080,10.0.0.250:8080
–web-service-url-tls https://10.0.0.187:8443,10.0.0.36:8443,10.0.0.250:8443
–broker-service-url pulsar://10.0.0.187:6650,10.0.0.36:6650,10.0.0.250:6650
–broker-service-url-tls pulsar+ssl://10.0.0.187:6651,10.0.0.36:6651,10.0.0.250:6651
zkServers=10.0.0.187:2181,10.0.0.36:2181,10.0.0.250:2181
zookeeperServers=10.0.0.187:2181,10.0.0.36:2181,10.0.0.250:2181
configurationStoreServers=10.0.0.187:2181,10.0.0.36:2181,10.0.0.250:2181
clusterName=pulsar-cluster-1