引入消息队列可以方便地实现系统解耦、削峰填谷等作用。但是消息队列使用不当,可能会引起消息丢失,在一些消息敏感的业务场景下,这是不允许的。今天我们来聊一聊 RocketMQ 怎么做能确保消息不丢失。
1 RocketMQ 简介
RocketMQ 是阿里巴巴开源的分布式消息中间件,整体架构如下图:
(资料图片)
RocketMQ 主要包括 Producer、Consumer 和 Broker,同时 Name Server 进行集群注册管理和保存元数据。
2 消息不丢失
要想保证消息不丢失,需要从以下几个方面考虑:
Producer 发送消息 Broker 保存消息 Consumer 消费消息 Broker 主从切换维度 1 :同步发送,代码如下:
public void send() throws Exception { String message = \"test producer\"; Message sendMessage = new Message(\"topic1\", \"tag1\", message.getBytes()); sendMessage.putUserProperty(\"name1\",\"value1\"); SendResult sendResult = null; DefaultMQProducer producer = new DefaultMQProducer(\"testGroup\"); producer.setNamesrvAddr(\"localhost:9876\"); producer.setRetryTimesWhenSendFailed(3); try { sendResult = producer.send(sendMessage); } catch (Exception e) { e.printStackTrace(); } if (sendResult != null) { System.out.println(sendResult.getSendStatus()); }}同步发送会返回 4 个状态码:
SEND_OK:消息发送成功。 需要注意的是,消息发送到 broker 后,还有两个操作:消息刷盘和消息同步到 slave 节点,默认这两个操作都是异步的,只有把这两个操作都改为同步,SEND_OK 这个状态才能真正表示发送成功。 FLUSH_DISK_TIMEOUT:消息发送成功但是消息刷盘超时。 FLUSH_SLAVE_TIMEOUT:消息发送成功但是消息同步到 slave 节点时超时。 SLAVE_NOT_AVAILABLE:消息发送成功但是 broker 的 slave 节点不可用。根据返回的状态码,可以做消息重试,这里设置的重试次数是 3。
消息重试时,消费端一定要做好幂等处理。
维度 2 :异步发送,代码如下:
public void sendAsync() throws Exception { String message = \"test producer\"; Message sendMessage = new Message(\"topic1\", \"tag1\", message.getBytes()); sendMessage.putUserProperty(\"name1\",\"value1\"); DefaultMQProducer producer = new DefaultMQProducer(\"testGroup\"); producer.setNamesrvAddr(\"localhost:9876\"); producer.setRetryTimesWhenSendFailed(3); producer.send(sendMessage, new SendCallback() { @Override public void onSuccess(SendResult sendResult) { } @Override public void onException(Throwable e) { // TODO 可以在这里加入重试逻辑 } });}异步发送,可以重写回调函数,回调函数捕获到 Exception 时表示发送失败,这时可以进行重试,这里设置的重试次数是 3。
维度 3 :刷盘策略
异步刷盘:默认。消息写入 CommitLog 时,并不会直接写入磁盘,而是先写入 PageCache 缓存后返回成功,然后用后台线程异步把消息刷入磁盘。异步刷盘提高了消息吞吐量,但是可能会有消息丢失的情况,比如断点导致机器停机,PageCache 中没来得及刷盘的消息就会丢失。 同步刷盘:消息写入内存后,立刻请求刷盘线程进行刷盘,如果消息未在约定的时间内(默认 5 s)刷盘成功,就返回 FLUSH_DISK_TIMEOUT,Producer 收到这个响应后,可以进行重试。同步刷盘策略保证了消息的可靠性,同时降低了吞吐量,增加了延迟。要开启同步刷盘,需要增加下面配置:flushDiskType=SYNC_FLUSH
维度 4 :Broker 多副本和高可用
Broker 为了保证高可用,采用一主多从的方式部署。如下图:
消息发送到 master 节点后,slave 节点会从 master 拉取消息保持跟 master 的一致。这个过程默认是异步的,即 master 收到消息后,不等 slave 节点复制消息就直接给 Producer 返回成功。
这样会有一个问题,如果 slave 节点还没有完成消息复制,这时 master 宕机了,进行主备切换后就会有消息丢失。为了避免这个问题,可以采用 slave 节点同步复制消息,即等 slave 节点复制消息成功后再给 Producer 返回发送成功。只需要增加下面的配置:
brokerRole=SYNC_MASTER
改为同步复制后,消息复制流程如下:
slave 初始化后,跟 master 建立连接并向 master 发送自己的 offset; master 收到 slave 发送的 offset 后,将 offset 后面的消息批量发送给 slave; slave 把收到的消息写入 commitLog 文件,并给 master 发送新的 offset; master 收到新的 offset 后,如果 offset >= producer 发送消息后的 offset,给 Producer 返回 SEND_OK。维度 5 :消息确认
Consumer 消费消息的代码如下:
public void consume() throws Exception { DefaultMQPushConsumer consumer = new DefaultMQPushConsumer(\"testGroup\"); consumer.setNamesrvAddr(\"localhost:9876\"); consumer.setMessageModel(MessageModel.CLUSTERING); consumer.subscribe(\"topic1\", \"tag1\"); consumer.registerMessageListener((MessageListenerConcurrently) (msgs, context) ->{ try{ System.out.printf(\"Receive New Messages: %s\", msgs); return ConsumeConcurrentlyStatus.CONSUME_SUCCESS; }catch (Exception e){ e.printStackTrace(); return ConsumeConcurrentlyStatus.RECONSUME_LATER; } }); consumer.start();}如果 Consumer 消费成功,返回 CONSUME_SUCCESS,提交 offset 并从 Broker 拉取下一批消息。
维度 6 :Consumer 重试
Consumer 消费失败,这里有 3 种情况:
返回 RECONSUME_LATER 返回 null 抛出异常Broker 收到这个响应后,会把这条消息放入重试队列,重新发送给 Consumer。
注意:
Broker 默认最多重试 16 次,如果重试 16 次都失败,就把这条消息放入死信队列,Consumer 可以订阅死信队列进行消费。 重试只有在集群模式(MessageModel.CLUSTERING)下生效,在广播模式(MessageModel.BROADCASTING)下是不生效的。 Consumer 端一定要做好幂等处理。其实重试 3 次都失败就可以说明代码有问题,这时 Consumer 可以把消息存入本地,给 Broker 返回CONSUME_SUCCESS 来结束重试。代码如下:
int count = ((MessageExt) msgs).getReconsumeTimes();if (count >2) { //TODO 把消息写入本地存储 System.out.println(\"重试次数超过3次\"); return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;}维度7 :事务消息
RocketMQ支持事务消息,整体流程如下图:
Producer 发送 half 消息; Broker 先把消息写入 topic 是 RMQ_SYS_TRANS_HALF_TOPIC 的队列,之后给 Producer 返回成功; Producer 执行本地事务,成功后给 Broker 发送 commit 命令(本地事务执行失败则发送 rollback); Broker 收到 commit 请求后把消息状态更改为成功并把消息推到真正的 topic; Consumer 拉取消息进行消费。代码如下:
public class ProducerTransactionListenerImpl implements TransactionListener { @Override public LocalTransactionState executeLocalTransaction(Message msg, Object arg) { /** * 这里执行本地事务,执行成功返回LocalTransactionState.COMMIT_MESSAGE,执行失败返回 * LocalTransactionState.ROLLBACK_MESSAGE,如果返回LocalTransactionState.UNKNOW, * Broker会回来查询,所以需要记录事务执行状态 */ return LocalTransactionState.COMMIT_MESSAGE; } @Override public LocalTransactionState checkLocalTransaction(MessageExt msg) { /** * 这里查询事务执行状态,根据事务状态返回LocalTransactionState.COMMIT_MESSAGE或 * LocalTransactionState.ROLLBACK_MESSAGE,如果没有查询到返回LocalTransactionState.UNKNOW, * Broker会再次查询,可以记录查询次数,超过次数后返回ROLLBACK_MESSAGE */ return LocalTransactionState.UNKNOW; }}点击查看原文,获取更多福利!
https://developer.aliyun.com/article/1111186?utm_content=g_1000366920
版权声明:本文内容由阿里云实名注册用户自发贡献,版权归原作者所有,阿里云开发者社区不拥有其著作权,亦不承担相应法律责任。具体规则请查看《阿里云开发者社区用户服务协议》和《阿里云开发者社区知识产权保护指引》。如果您发现本社区中有涉嫌抄袭的内容,填写侵权投诉表单进行举报,一经查实,本社区将立刻删除涉嫌侵权内容。
