关于rocketmq与rocketmq-mqtt之间的acl控制

前言

因业务需求需要在rocketmq中整合mqtt,期间遇到了许多问题,在此处记录

一、安装rocketmq和rocket-mqtt并能正常测试

具体参考rocketmq使用mqtt协议_rocketmq mqtt-CSDN博客

如果按照该博客流程走,是可以正常生产和订阅的,其中补充遇到的几点坑

1. mqtt并不会自动创建索引,所以autoCreateTopicEnable属性不会生效,需要手动创建,会报如图所示的错误

2. 如果遇到了只能生产不能消费,有以下几种可能

2.1 未创建通配符列表(topic命名为mqtttest,rocketmq地址为192.168.5.251:9876为例)

配置通配符列表
sh mqadmin updateKvConfig -s LMQ -k mqtttest -v mqtttest/+ -n 192.168.5.251:9876

2.2 如果还是不能生效且是rocetmq生产,mqtt消费情景,有可能是子topic未设置(以子topic命名为task为例,用rocketmq消费断点可以看见,两种方式生产的数据差异在于properties中属性的不同,特别是mqtt生产中多出的INNER_MULTI_DISPATCH属性)

具体两者差异如下

#mqtt生产者
properties={CONSUME_START_TIME=1706501170642, MSG_REGION=DefaultRegion, UNIQ_KEY=C0A80510BC640EED1F1492CA922D0BBA, CLUSTER=DefaultCluster, INNER_MULTI_QUEUE_OFFSET=560,383, MIN_OFFSET=0, qosLevel=1, TAGS=MQTT_COMMON, TRACE_ON=true, originMqttTopic=mqtttest/task, INNER_MULTI_DISPATCH=%LMQ%mqtttest%+%,%LMQ%mqtttest%task%, IS_EMPTY_MSG=false, retryTimes=0, extData={"qosLevel":"1"}, MAX_OFFSET=191}

#rocketmq生产者
properties={CONSUME_START_TIME=1706509718866, MSG_REGION=DefaultRegion, UNIQ_KEY=C0A80C9A833873D16E93934D554F0009, CLUSTER=DefaultCluster, MIN_OFFSET=0, TAGS=task, WAIT=true, TRACE_ON=true, MAX_OFFSET=235}

具体代码参考rocketmq-mqtt源码中的RocketMQProducer

生产者关键代码:

static void sendMsg(DefaultMQProducer producer, String topic, String tag) throws Exception {
	    byte[] bytes = ("Hello RocketMQ " + topic + "/" + tag + DateUtil.formatDateTime(new Date())).getBytes(RemotingHelper.DEFAULT_CHARSET);
        // 创建消息,并指定Topic,Tag和消息体
        Message msg = new Message("mqtttest","MQTT_COMMON",bytes);
        
        String secondTopic = "/task";
        setLmq(msg, new HashSet<>(Arrays.asList(TopicUtils.wrapLmq(topic, secondTopic))));
        // 发送异步消息
        producer.send(msg,new SendCallback() {
            
            @Override
            public void onSuccess(SendResult sendResult) {
                System.out.println("发送结果:" + sendResult);
            }
            
            @Override
            public void onException(Throwable e) {
                System.out.println("发送异常:" + e);
            }
        });
	}
	
	private static void setLmq(Message msg, Set<String> queues) {
        msg.putUserProperty(MessageConst.PROPERTY_INNER_MULTI_DISPATCH,
                StringUtils.join(
                        queues.stream().map(s -> StringUtils.replace(s, "/", "%")).map(s -> MixAll.LMQ_PREFIX + s).collect(Collectors.toSet()),
                        MixAll.MULTI_DISPATCH_QUEUE_SPLITTER));
    }

消费者关键代码:

public static void main(String[] args) throws MqttException, NoSuchAlgorithmException, InvalidKeyException {
    //mqtt端口
    String brokerUrl = "tcp://192.168.5.251:1883";
    //订阅主题
    String firstTopic = "mqtttest";
    
    //lientid即连接MQTT的客户端ID,一般以客户端唯一标识符表示
    String sendClientId = "revc"+firstTopic;
    //MemoryPersistence设置clientid的保存形式,默认为以内存保存
    MemoryPersistence memoryPersistence = new MemoryPersistence();
    //连接属性
    MqttConnectOptions mqttConnectOptions = buildMqttConnectOptions(sendClientId);
    
    MqttClient mqttClient = new MqttClient(brokerUrl, sendClientId, memoryPersistence);
    //操作完成的最长时间
    mqttClient.setTimeToWait(5000L);
    
    mqttClient.setCallback(new MqttCallbackExtended() {
        
        /**
         * 连接成功
         * reconnect 是否自动重连
         * serverURI 连接目标
         */
        @Override
        public void connectComplete(boolean reconnect, String serverURI) {
            if(reconnect) {
                System.out.println("消费者"+firstTopic+"成功重连目标mqtt:"+serverURI);
            }else {
                System.out.println("消费者"+firstTopic+"成功连接目标mqtt:"+serverURI);
            }
            try {
                final String topicFilter[] = {firstTopic+"/task"};
                final int[] qos = {1};
                mqttClient.subscribe(topicFilter, qos);
            } catch (Exception e) {
                e.printStackTrace();
            }
        }

        /**
         * 重连处理
         * throwable 异常
         */
        @Override
        public void connectionLost(Throwable throwable) {
            System.out.println("失去连接:" + throwable.getMessage());
            int times = 1;
            while (!mqttClient.isConnected()) {
                try {
                    System.out.println("重新连接, 第" + (times++) + "次");
                    mqttClient.reconnect();
                } catch (MqttException e) {
                    e.printStackTrace();
                    System.out.println("重连失败, msg:" + e.getMessage());
                }
                // 每隔60秒重试一次
                try {
                    TimeUnit.SECONDS.sleep(60);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
        }

        /**
         * 接收消息时触发
         * topic 主题
         * mqttMessage 消息
         */
        @Override
        public void messageArrived(String topic, MqttMessage mqttMessage) {
            try {
                String message = new String(mqttMessage.getPayload(), "UTF-8");
                System.out.println("Message received: " + message);
            } catch (Exception e) {
                e.printStackTrace();
            }
        }

        /**
         * 当消息的传递已完成并且已接收到所有确认时调用
         * 对于QosO消息,消息被传递给网络后被调用
         * 对于Qos1消息,当接收到PUBACK时调用
         * 对于Qos2消息,当接收到PUBCOMP时调用它
         * iMqttDeliveryToken 消息关联的传递令牌,与发布消息时返回的令牌相同
         */
        @Override
        public void deliveryComplete(IMqttDeliveryToken iMqttDeliveryToken) {
            
        }
    });
    
    try {
        mqttClient.connect(mqttConnectOptions);
    } catch (Exception e) {
        e.printStackTrace();
    }
}

而后即可正常生产消费

二、修改rocketmq-mqtt源码

mqtt中与rocketmq连接的方法在org.apache.rocketmq.mqtt.ds.mq.MqFactory中

以MqProducer为例,查看后发现并未做ACL连接控制,所以需要我们自己添加,具体代码如下,其中自定义的accessKey和secretKey命名需要记住,等会我们部署后需要在对应配置文件添加

/*
 * Licensed to the Apache Software Foundation (ASF) under one or more
 * contributor license agreements.  See the NOTICE file distributed with
 * this work for additional information regarding copyright ownership.
 * The ASF licenses this file to You under the Apache License, Version 2.0
 * (the "License"); you may not use this file except in compliance with
 * the License.  You may obtain a copy of the License at
 *
 *     http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */

package org.apache.rocketmq.mqtt.ds.mq;


import org.apache.rocketmq.acl.common.AclClientRPCHook;
import org.apache.rocketmq.acl.common.SessionCredentials;
import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.client.producer.DefaultMQProducer;
import org.apache.rocketmq.common.UtilAll;
import org.apache.rocketmq.remoting.RPCHook;

import java.util.Properties;


public class MqProducer   {

    private DefaultMQProducer defaultMQProducer;
    
    public static final String ROCKET_ACCESS_KEY = "rocketAccessKey";
    public static final String ROCKET_SECRET_KEY = "rocketSecretKey";

    public MqProducer(Properties properties) {
        defaultMQProducer = new DefaultMQProducer();
        if(properties.get(ROCKET_ACCESS_KEY) != null&&
                properties.get(ROCKET_SECRET_KEY) != null) {
            defaultMQProducer = new DefaultMQProducer(getAclRPCHook((String)properties.get(ROCKET_ACCESS_KEY), (String)properties.get(ROCKET_SECRET_KEY)));
            System.out.println("MqProducer_rocketAccessKey:"+properties.get(ROCKET_ACCESS_KEY));
            System.out.println("MqProducer_rocketSecretKey:"+properties.get(ROCKET_SECRET_KEY));
        }
        defaultMQProducer.setNamesrvAddr(properties.getProperty("NAMESRV_ADDR"));
        defaultMQProducer.setInstanceName(buildInstanceName());
        defaultMQProducer.setVipChannelEnabled(false);
    }
    
    public MqProducer(String nameSrv) {
        defaultMQProducer = new DefaultMQProducer();
        defaultMQProducer.setNamesrvAddr(nameSrv);
        defaultMQProducer.setInstanceName(buildInstanceName());
        defaultMQProducer.setVipChannelEnabled(false);
    }

    public MqProducer(Properties properties,String nameSrv) {
        defaultMQProducer = new DefaultMQProducer();
        if(properties.get(ROCKET_ACCESS_KEY) != null&&
                properties.get(ROCKET_SECRET_KEY) != null) {
            defaultMQProducer = new DefaultMQProducer(getAclRPCHook((String)properties.get(ROCKET_ACCESS_KEY), (String)properties.get(ROCKET_SECRET_KEY)));
            System.out.println("MqProducer_rocketAccessKey:"+properties.get(ROCKET_ACCESS_KEY));
            System.out.println("MqProducer_rocketSecretKey:"+properties.get(ROCKET_SECRET_KEY));
        }
        defaultMQProducer.setNamesrvAddr(nameSrv);
        defaultMQProducer.setInstanceName(buildInstanceName());
        defaultMQProducer.setVipChannelEnabled(false);
    }

    public String buildInstanceName() {
        return Integer.toString(UtilAll.getPid())
                + "#" + System.nanoTime();
    }

    public DefaultMQProducer getDefaultMQProducer() {
        return defaultMQProducer;
    }

    public void setProducerGroup(String producerGroup) {
        defaultMQProducer.setProducerGroup(producerGroup);
    }

    public void start() {
        try {
            defaultMQProducer.start();
        } catch (MQClientException e) {
            throw new RuntimeException(e);
        }
    }
    
    private RPCHook getAclRPCHook(String rocketAccessKey, String rocketSecretKey) {
        return new AclClientRPCHook(new SessionCredentials(rocketAccessKey, rocketSecretKey));
    }

}

然后将其余三个方法都参照上面修改

三、重新打包

与官网打包命令不一样,需要额外添加-Dcheckstyle.skip跳过校验

mvn -Prelease-all -DskipTests -Dcheckstyle.skip clean install -U

四、替换lib包和修改配置文件

1 如果还未部署,可以直接将distribution\target下的zip包部署,如果已部署,只需要直接替换rocketmq-mqtt-1.0.2-SNAPSHOT\lib目录下的mqtt-ds-1.0.2-SNAPSHOT.jar

2 在mqtt的conf目录下的service.conf加上刚刚定义的属性rocketAccessKey和rocketSecretKey(与rocketmq的accessKey和secretKey一致,在rocketmq的conf目录下的plain_acl.yml文件中)

3 启动mqtt,日志可查到打印的信息(默认路径/root/logs/start_out.log)

五、验证

rocketmq消费者

mqtt消费者