关于rocketmq与rocketmq-mqtt之间的acl控制
前言
因业务需求需要在rocketmq中整合mqtt,期间遇到了许多问题,在此处记录
一、安装rocketmq和rocket-mqtt并能正常测试
具体参考rocketmq使用mqtt协议_rocketmq mqtt-CSDN博客
如果按照该博客流程走,是可以正常生产和订阅的,其中补充遇到的几点坑
1. mqtt并不会自动创建索引,所以autoCreateTopicEnable属性不会生效,需要手动创建,会报如图所示的错误
2. 如果遇到了只能生产不能消费,有以下几种可能
2.1 未创建通配符列表(topic命名为mqtttest,rocketmq地址为192.168.5.251:9876为例)
配置通配符列表
sh mqadmin updateKvConfig -s LMQ -k mqtttest -v mqtttest/+ -n 192.168.5.251:9876
2.2 如果还是不能生效且是rocetmq生产,mqtt消费情景,有可能是子topic未设置(以子topic命名为task为例,用rocketmq消费断点可以看见,两种方式生产的数据差异在于properties中属性的不同,特别是mqtt生产中多出的INNER_MULTI_DISPATCH属性)
具体两者差异如下
#mqtt生产者
properties={CONSUME_START_TIME=1706501170642, MSG_REGION=DefaultRegion, UNIQ_KEY=C0A80510BC640EED1F1492CA922D0BBA, CLUSTER=DefaultCluster, INNER_MULTI_QUEUE_OFFSET=560,383, MIN_OFFSET=0, qosLevel=1, TAGS=MQTT_COMMON, TRACE_ON=true, originMqttTopic=mqtttest/task, INNER_MULTI_DISPATCH=%LMQ%mqtttest%+%,%LMQ%mqtttest%task%, IS_EMPTY_MSG=false, retryTimes=0, extData={"qosLevel":"1"}, MAX_OFFSET=191}
#rocketmq生产者
properties={CONSUME_START_TIME=1706509718866, MSG_REGION=DefaultRegion, UNIQ_KEY=C0A80C9A833873D16E93934D554F0009, CLUSTER=DefaultCluster, MIN_OFFSET=0, TAGS=task, WAIT=true, TRACE_ON=true, MAX_OFFSET=235}
具体代码参考rocketmq-mqtt源码中的RocketMQProducer
生产者关键代码:
static void sendMsg(DefaultMQProducer producer, String topic, String tag) throws Exception {
byte[] bytes = ("Hello RocketMQ " + topic + "/" + tag + DateUtil.formatDateTime(new Date())).getBytes(RemotingHelper.DEFAULT_CHARSET);
// 创建消息,并指定Topic,Tag和消息体
Message msg = new Message("mqtttest","MQTT_COMMON",bytes);
String secondTopic = "/task";
setLmq(msg, new HashSet<>(Arrays.asList(TopicUtils.wrapLmq(topic, secondTopic))));
// 发送异步消息
producer.send(msg,new SendCallback() {
@Override
public void onSuccess(SendResult sendResult) {
System.out.println("发送结果:" + sendResult);
}
@Override
public void onException(Throwable e) {
System.out.println("发送异常:" + e);
}
});
}
private static void setLmq(Message msg, Set<String> queues) {
msg.putUserProperty(MessageConst.PROPERTY_INNER_MULTI_DISPATCH,
StringUtils.join(
queues.stream().map(s -> StringUtils.replace(s, "/", "%")).map(s -> MixAll.LMQ_PREFIX + s).collect(Collectors.toSet()),
MixAll.MULTI_DISPATCH_QUEUE_SPLITTER));
}
消费者关键代码:
public static void main(String[] args) throws MqttException, NoSuchAlgorithmException, InvalidKeyException {
//mqtt端口
String brokerUrl = "tcp://192.168.5.251:1883";
//订阅主题
String firstTopic = "mqtttest";
//lientid即连接MQTT的客户端ID,一般以客户端唯一标识符表示
String sendClientId = "revc"+firstTopic;
//MemoryPersistence设置clientid的保存形式,默认为以内存保存
MemoryPersistence memoryPersistence = new MemoryPersistence();
//连接属性
MqttConnectOptions mqttConnectOptions = buildMqttConnectOptions(sendClientId);
MqttClient mqttClient = new MqttClient(brokerUrl, sendClientId, memoryPersistence);
//操作完成的最长时间
mqttClient.setTimeToWait(5000L);
mqttClient.setCallback(new MqttCallbackExtended() {
/**
* 连接成功
* reconnect 是否自动重连
* serverURI 连接目标
*/
@Override
public void connectComplete(boolean reconnect, String serverURI) {
if(reconnect) {
System.out.println("消费者"+firstTopic+"成功重连目标mqtt:"+serverURI);
}else {
System.out.println("消费者"+firstTopic+"成功连接目标mqtt:"+serverURI);
}
try {
final String topicFilter[] = {firstTopic+"/task"};
final int[] qos = {1};
mqttClient.subscribe(topicFilter, qos);
} catch (Exception e) {
e.printStackTrace();
}
}
/**
* 重连处理
* throwable 异常
*/
@Override
public void connectionLost(Throwable throwable) {
System.out.println("失去连接:" + throwable.getMessage());
int times = 1;
while (!mqttClient.isConnected()) {
try {
System.out.println("重新连接, 第" + (times++) + "次");
mqttClient.reconnect();
} catch (MqttException e) {
e.printStackTrace();
System.out.println("重连失败, msg:" + e.getMessage());
}
// 每隔60秒重试一次
try {
TimeUnit.SECONDS.sleep(60);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
/**
* 接收消息时触发
* topic 主题
* mqttMessage 消息
*/
@Override
public void messageArrived(String topic, MqttMessage mqttMessage) {
try {
String message = new String(mqttMessage.getPayload(), "UTF-8");
System.out.println("Message received: " + message);
} catch (Exception e) {
e.printStackTrace();
}
}
/**
* 当消息的传递已完成并且已接收到所有确认时调用
* 对于QosO消息,消息被传递给网络后被调用
* 对于Qos1消息,当接收到PUBACK时调用
* 对于Qos2消息,当接收到PUBCOMP时调用它
* iMqttDeliveryToken 消息关联的传递令牌,与发布消息时返回的令牌相同
*/
@Override
public void deliveryComplete(IMqttDeliveryToken iMqttDeliveryToken) {
}
});
try {
mqttClient.connect(mqttConnectOptions);
} catch (Exception e) {
e.printStackTrace();
}
}
而后即可正常生产消费
二、修改rocketmq-mqtt源码
mqtt中与rocketmq连接的方法在org.apache.rocketmq.mqtt.ds.mq.MqFactory中
以MqProducer为例,查看后发现并未做ACL连接控制,所以需要我们自己添加,具体代码如下,其中自定义的accessKey和secretKey命名需要记住,等会我们部署后需要在对应配置文件添加
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.rocketmq.mqtt.ds.mq;
import org.apache.rocketmq.acl.common.AclClientRPCHook;
import org.apache.rocketmq.acl.common.SessionCredentials;
import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.client.producer.DefaultMQProducer;
import org.apache.rocketmq.common.UtilAll;
import org.apache.rocketmq.remoting.RPCHook;
import java.util.Properties;
public class MqProducer {
private DefaultMQProducer defaultMQProducer;
public static final String ROCKET_ACCESS_KEY = "rocketAccessKey";
public static final String ROCKET_SECRET_KEY = "rocketSecretKey";
public MqProducer(Properties properties) {
defaultMQProducer = new DefaultMQProducer();
if(properties.get(ROCKET_ACCESS_KEY) != null&&
properties.get(ROCKET_SECRET_KEY) != null) {
defaultMQProducer = new DefaultMQProducer(getAclRPCHook((String)properties.get(ROCKET_ACCESS_KEY), (String)properties.get(ROCKET_SECRET_KEY)));
System.out.println("MqProducer_rocketAccessKey:"+properties.get(ROCKET_ACCESS_KEY));
System.out.println("MqProducer_rocketSecretKey:"+properties.get(ROCKET_SECRET_KEY));
}
defaultMQProducer.setNamesrvAddr(properties.getProperty("NAMESRV_ADDR"));
defaultMQProducer.setInstanceName(buildInstanceName());
defaultMQProducer.setVipChannelEnabled(false);
}
public MqProducer(String nameSrv) {
defaultMQProducer = new DefaultMQProducer();
defaultMQProducer.setNamesrvAddr(nameSrv);
defaultMQProducer.setInstanceName(buildInstanceName());
defaultMQProducer.setVipChannelEnabled(false);
}
public MqProducer(Properties properties,String nameSrv) {
defaultMQProducer = new DefaultMQProducer();
if(properties.get(ROCKET_ACCESS_KEY) != null&&
properties.get(ROCKET_SECRET_KEY) != null) {
defaultMQProducer = new DefaultMQProducer(getAclRPCHook((String)properties.get(ROCKET_ACCESS_KEY), (String)properties.get(ROCKET_SECRET_KEY)));
System.out.println("MqProducer_rocketAccessKey:"+properties.get(ROCKET_ACCESS_KEY));
System.out.println("MqProducer_rocketSecretKey:"+properties.get(ROCKET_SECRET_KEY));
}
defaultMQProducer.setNamesrvAddr(nameSrv);
defaultMQProducer.setInstanceName(buildInstanceName());
defaultMQProducer.setVipChannelEnabled(false);
}
public String buildInstanceName() {
return Integer.toString(UtilAll.getPid())
+ "#" + System.nanoTime();
}
public DefaultMQProducer getDefaultMQProducer() {
return defaultMQProducer;
}
public void setProducerGroup(String producerGroup) {
defaultMQProducer.setProducerGroup(producerGroup);
}
public void start() {
try {
defaultMQProducer.start();
} catch (MQClientException e) {
throw new RuntimeException(e);
}
}
private RPCHook getAclRPCHook(String rocketAccessKey, String rocketSecretKey) {
return new AclClientRPCHook(new SessionCredentials(rocketAccessKey, rocketSecretKey));
}
}
然后将其余三个方法都参照上面修改
三、重新打包
与官网打包命令不一样,需要额外添加-Dcheckstyle.skip跳过校验
mvn -Prelease-all -DskipTests -Dcheckstyle.skip clean install -U
四、替换lib包和修改配置文件
1 如果还未部署,可以直接将distribution\target下的zip包部署,如果已部署,只需要直接替换rocketmq-mqtt-1.0.2-SNAPSHOT\lib目录下的mqtt-ds-1.0.2-SNAPSHOT.jar
2 在mqtt的conf目录下的service.conf加上刚刚定义的属性rocketAccessKey和rocketSecretKey(与rocketmq的accessKey和secretKey一致,在rocketmq的conf目录下的plain_acl.yml文件中)
3 启动mqtt,日志可查到打印的信息(默认路径/root/logs/start_out.log)
五、验证
rocketmq消费者
mqtt消费者