今天来分享 RocketMQ 的定时任务 。通过这些定时任务,能让我们更加理解 RocketMQ 的消息处理机制和设计理念 。
【40个定时任务!这次带你彻底理解 RocketMQ 设计精髓!】从 RocketMQ 4.9.4 的源代码上看,RocketMQ 的定时任务有很多,今天主要讲解一些核心的定时任务 。
1 架构回顾
首先再来回顾一下 RocketMQ 的架构图:

文章插图
Name Server 集群部署,但是节点之间并不会同步数据,因为每个节点都会保存完整的数据 。因此单个节点挂掉,并不会对集群产生影响 。
Broker 可以采用主从集群部署,实现多副本存储和高可用 。每个 Broker 节点都要跟所有的 Name Server 节点建立长连接,定义注册 Topic 路由信息和发送心跳 。
Producer 和 Consumer 跟 Name Server 的任意一个节点建立长连接,定期从 Name Server 拉取 Topic 路由信息 。
2 Producer 和 Consumer 2.1 获取 NameServer 地址
Producer 和 Consumer 要跟 Name Server 建立连接,就必须首先获取 Name Server 地址 。Producer 和 Consumer 采用定时任务每两分钟获取 Name Server 地址并更新本地缓存 。代码如下:
//MQClientInstance类 this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() { @Override public void run() { try { MQClientInstance.this.mQClientAPIImpl.fetchNameServerAddr(); } catch (Exception e) { log.error("ScheduledTask fetchNameServerAddr exception", e); } } }, 1000 * 10, 1000 * 60 * 2, TimeUnit.MILLISECONDS); 2.2 更新路由信息Producer 和 Consumer 会定时从 Name Server 获取定时订阅信息,更新本地缓存,默认间隔是 30s(可以配置) 。代码如下:
//MQclientInstance类 this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() { @Override public void run() { try { MQClientInstance.this.updateTopicRouteInfoFromNameServer(); } catch (Exception e) { log.error("ScheduledTask updateTopicRouteInfoFromNameServer exception", e); } } }, 10, this.clientConfig.getPollNameServerInterval(), TimeUnit.MILLISECONDS); 2.3 向 Broker 发送心跳Producer 和 Consumer 会从本地缓存的 Broker 列表中定时清除离线的 Broker,并且向 Broker 发送心跳,默认间隔是 30s(可以配置) 。代码如下:
//MQClientInstance类 this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() { @Override public void run() { try { MQClientInstance.this.cleanOfflineBroker(); MQClientInstance.this.sendHeartbeatToAllBrokerWithlock(); } catch (Exception e) { log.error("ScheduledTask sendHeartbeatToAllBroker exception", e); } } }, 1000, this.clientConfig.getHeartbeatBrokerInterval(), TimeUnit.MILLISECONDS); 2.4 持久化 Offset消费者需要定时持久化 MessageQueue 的偏移量,默认每 5s 更新一次(可以配置) 。
注意:集群模式需要向 Broker 发送持久化消息,因为集群模式偏移量保存在 Broker 端,而广播模式只需要把偏移量保存在消费者本地文件 。代码如下:
//MQClientInstance类 this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() { @Override public void run() { try { MQClientInstance.this.persistAllConsumerOffset(); } catch (Exception e) { log.error("ScheduledTask persistAllConsumerOffset exception", e); } } }, 1000 * 10, this.clientConfig.getPersistConsumerOffsetInterval(), TimeUnit.MILLISECONDS); 2.5 调整核心线程数对于消费者采用推模式的情况,消费者会根据未消费的消息数量,定期更新核心线程数,默认每 1m 一次 。
注意:在 4.9.4 这个版本,更新核心线程数的代码并没有实现,只是预留了接口 。代码如下:
//MQClientInstance类 this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() { @Override public void run() { try { MQClientInstance.this.adjustThreadPool(); } catch (Exception e) { log.error("ScheduledTask adjustThreadPool exception", e); } } }, 1, 1, TimeUnit.MINUTES); 2.6 失效过期请求Producer 和 Consumer 会定时扫描缓存在本地的请求,如果请求开始时间加超时时间(再加 1s)小于当前时间,则这个请求过期 。通过定时任务(3s 一次)让过期请求失效,并且触发回调函数 。
/.NETtyRemotingClient.JAVA this.timer.scheduleAtFixedRate(new TimerTask() { @Override public void run() { try { NettyRemotingClient.this.scanResponseTable(); } catch (Throwable e) { log.error("scanResponseTable exception", e); } } }, 1000 * 3, 1000); 2.7 生产者 2.7.1 性能记录生产者发送消息后,会对成功失败的状态、花费时间进行记录,以此来计算吞吐量 TPS,响应时间 RT,代码如下:
//Producer.java executorService.scheduleAtFixedRate(new TimerTask() { @Override public void run() { snapshotList.addLast(statsBenchmark.createSnapshot()); if (snapshotList.size() > 10) { snapshotList.removeFirst(); } } }, 1000, 1000, TimeUnit.MILLISECONDS); executorService.scheduleAtFixedRate(new TimerTask() { private void printStats() { if (snapshotList.size() >= 10) { doPrintStats(snapshotList, statsBenchmark, false); } } @Override public void run() { try { this.printStats(); } catch (Exception e) { e.printStackTrace(); } } }, 10000, 10000, TimeUnit.MILLISECONDS);
推荐阅读
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
- 古达克副本任务在哪里接?古达克副本入口在哪?
- 想要给电脑定时关机怎么设置? 怎样设置定时关机
- 紫色曲玉任务多少级?DNF里的紫色曲玉怎么获得?
- 定时发送微信的软件——如何做到微信定时自动发送消息?
- windows 7旗舰版电脑计划任务服务的设置方法 计划任务服务
- 百川任务平台中赚的钱什么时候能提现 百川任务平台是真的吗
- 祖达克任务大全,魔兽世界 祖达克任务?
- w10定时关机在哪里,详细教程分解图一看就会
- 问下WOW怎么冲星界财团的声望啊?从0到崇敬。纳格兰是那个NPC有个循环任务是交念珠和象牙的?具体位置说下 星界财团声望速刷攻略
- 流浪地球2|《狂飙》剧情解析:老默执行任务,高启强左右为难
