RocketMQ入门-RocketMQ批量消息

RocketMQ批量消息

批量发送消息能显著提高传递小消息的性能。限制是这些批量消息应该有相同的topic,相同的waitStoreMsgOK,而且不能是延时消息。此外,这一批消息的总大小不应超过4MB。

Producer

生产者发送消息调用的方法是:send(Collection<Message> msgs),与发送同步消息的区别是批量消息发送传入了一个消息的集合。

待发送的消息有如下的限制条件:

  1. 相同的topic
  2. 相同的waitStoreMsgOK
  3. 不能是延时消息
  4. 消息列表总字节数不可大与4MB
public class BatchProducer {

    public static void main(String[] args) throws Exception {
        // 1.关联生产者组
        DefaultMQProducer defaultMQProducer = new DefaultMQProducer(MyRocketMqConstant.BatchLearn.BATCH_LEARN_PRODUCER_GROUP);
        // 2.关联注册中心
        defaultMQProducer.setNamesrvAddr(MyRocketMqConstant.NAME_SRV);
        // 3.启动生产者
        defaultMQProducer.start();

        // 构造消息
        List<Message> messages = buildMessages();

        SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");

        // 4.发送消息
        SendResult sendResult = defaultMQProducer.send(messages);
        System.out.println(String.format("SendResult status:%s,queueId:%d,body:%s"
                ,sendResult.getSendStatus()
                ,sendResult.getMessageQueue().getQueueId()
                ,"批量消息体....."+sdf.format(new Date())));
        // 5.关闭生产者
        defaultMQProducer.shutdown();
    }


    public static List<Message> buildMessages() {
        List<Message> messages = new ArrayList<>();
        for (int i = 0; i < 100; i++) {
            String str = "Hello World " + i;
            Message message = new Message(MyRocketMqConstant.BatchLearn.BATCH_LEARN_TOPIC
                    , MyRocketMqConstant.BatchLearn.BATCH_LEARN_TAG_A
                    , "KEY " + i
                    , str.getBytes(StandardCharsets.UTF_8));
            messages.add(message);
        }
        return messages;
    }
}

Consumer

public class BatchConsumer {
    public static void main(String[] args) throws Exception {
        // 1.关联消费者组
        DefaultMQPushConsumer consumer = new DefaultMQPushConsumer(MyRocketMqConstant.BatchLearn.BATCH_LEARN_CONSUMER_GROUP);
        // 2.关联注册中心
        consumer.setNamesrvAddr(MyRocketMqConstant.NAME_SRV);
        // 3.订阅消息
        consumer.subscribe(MyRocketMqConstant.BatchLearn.BATCH_LEARN_TOPIC, MyRocketMqConstant.BatchLearn.BATCH_LEARN_TAG_A);
        // 4.注册监听器
        consumer.registerMessageListener(new MessageListenerOrderly() {
            Random random = new Random();

            SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");

            @Override
            public ConsumeOrderlyStatus consumeMessage(List<MessageExt> msgs, ConsumeOrderlyContext context) {
                for (MessageExt msg : msgs) {
                    Date msgDate = new Date(msg.getStoreTimestamp());
                    Date now = new Date();
                    System.out.println("[consumerThread=" + Thread.currentThread().getName()
                            + "] [broker=" + msg.getBrokerName() + msg.getStoreHost()
                            + "] [queueId=" + msg.getQueueId()
                            + "] [now" + sdf.format(now) + " storeDate" + sdf.format(msgDate)
                            + "] [content=" + new String(msg.getBody()) + "]");
                }

                try {
                    TimeUnit.SECONDS.sleep(random.nextInt(10));
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
                return ConsumeOrderlyStatus.SUCCESS;
            }
        });

        // 5.启动消费者
        System.out.println("消费者启动");
        consumer.start();
    }
}

测试结果

消费者输出

我将生产者启动了两次,一次是09:51:47,一次是09:55:09。能够发现两次批量消息放入的消息队列id不相同,且在各自的队列中是有序的。

[consumerThread=ConsumeMessageThread_3] [broker=broker-a/192.168.15.15:10911] [queueId=1] [now2021-04-19 09:55:12 storeDate2021-04-19 09:51:47] [content=Hello World 46]
[consumerThread=ConsumeMessageThread_3] [broker=broker-a/192.168.15.15:10911] [queueId=1] [now2021-04-19 09:55:14 storeDate2021-04-19 09:51:47] [content=Hello World 47]
[consumerThread=ConsumeMessageThread_4] [broker=broker-a/192.168.15.15:10911] [queueId=3] [now2021-04-19 09:55:14 storeDate2021-04-19 09:55:09] [content=Hello World 0]
[consumerThread=ConsumeMessageThread_3] [broker=broker-a/192.168.15.15:10911] [queueId=1] [now2021-04-19 09:55:19 storeDate2021-04-19 09:51:47] [content=Hello World 48]
[consumerThread=ConsumeMessageThread_4] [broker=broker-a/192.168.15.15:10911] [queueId=3] [now2021-04-19 09:55:22 storeDate2021-04-19 09:55:09] [content=Hello World 1]
[consumerThread=ConsumeMessageThread_5] [broker=broker-a/192.168.15.15:10911] [queueId=1] [now2021-04-19 09:55:28 storeDate2021-04-19 09:51:47] [content=Hello World 49]

特殊情况

摘录RocketMQ示例中的描述:

复杂度只有当你发送大批量时才会增长,你可能不确定它是否超过了大小限制(4MB)。这时候你最好把你的消息列表分割一下

我们需要知道其中的大小限制是什么?

  1. 当使用发送批量消息时,要求消息列表的消息大小不能超过4MB。否则会报错org.apache.rocketmq.client.exception.MQClientException: CODE: 13 DESC: the message body size over max value, MAX: 4194304
  2. 这个大小限制对非批量发送消息时不生效,即对send(Message message)方法无效
public class ListSplitter implements Iterator<List<Message>> {
    private final int SIZE_LIMIT = 1024 * 1024 *4;
    private final List<Message> messages;
    private int currIndex;

    public ListSplitter(List<Message> messages) {
        this.messages = messages;
    }

    @Override
    public boolean hasNext() {
        return currIndex < messages.size();
    }

    @Override
    public List<Message> next() {
        // 获取开始切割的消息索引(消息字节小于4MB的消息索引)
        int startIndex = getStartIndex();
        // 切割结束位置
        int nextIndex = startIndex;
        // 临时存放切割出的消息列表的总字节数
        int totalSize = 0;
        // 切割消息列表,从起始位置切割,直到切割出的消息列表的总字节数超过4MB为止
        for (; nextIndex < messages.size() ; nextIndex++) {
            // 计算遍历的消息的字节并累加,当超过4MB时退出循环,记录nextIndex;用于切分list
            Message message = messages.get(nextIndex);
            int tmpSize = calcMessageSize(message);
            if (tmpSize + totalSize > SIZE_LIMIT){
                break;
            }else {
                totalSize += tmpSize;
            }
        }
        List<Message> subList = this.messages.subList(startIndex, nextIndex);
        // 存放切割结束位置
        currIndex = nextIndex;
        return subList;
    }

    /**
     * 计算开始切割的位置,切割的对象是消息列表,最小粒度就是一个消息对象(一条消息的字节大于4MB不会对该消息对象进行切割)
     * 获取起始切割位置就是找到单条消息字节小于4MB的消息索引。对大于4MB的消息跳过。
     */
    private int getStartIndex(){
        // 计算开始索引的消息的字节数
        Message currMessage = messages.get(currIndex);
        int tmpSize = calcMessageSize(currMessage);
        // 下一个开始位置的消息的字节数不能大于4MB
        while (tmpSize > SIZE_LIMIT) {
            currIndex ++;
            Message message = messages.get(currIndex);
            tmpSize = calcMessageSize(message);
        }
        return currIndex;
    }

    private int calcMessageSize(Message message){
        int tmpSize = message.getTopic().length() + message.getBody().length;
        Map<String, String> properties = message.getProperties();
        for (Map.Entry<String, String> entry : properties.entrySet()) {
            tmpSize += entry.getKey().length() + entry.getValue().length();
        }
        // 增加日志的开销20字节
        tmpSize = tmpSize+20;
        return tmpSize;
    }
}