- MQ概述
- AMQP和JMS
- 消息队列实现
- 消息队列常用的场景
- RocketMQ & RabbitMQ特性
- 各种消息队列优缺点
- 消息队列常使用的注意事项
- RocketMQ官网
- RocketMQ简介
- RocketMQ基本概念
- RocketMQ消息收发模型
- RocketMQ版本选择
- RocketMQ系统部署架构理解
- RocketMQ部署
- 参数配置
- 日志删除机制
- RocketMQ监控
- jvm/os性能调优
- 常见问题
MQ概述
MQ(Message Queue)是典型的生产者消费者模型,没有业务逻辑侵入,实现生产者和消费者的解耦。它具有低耦合、可靠投递、广播、流量控制、最终一致性等一系列功能,成为异步RPC的主要手段之一。
AMQP和JMS
MQ是消息通信的模型,并发具体实现。现在实现MQ的有两种主流方式:AMQP、JMS。
两者的区别和联系:
- JMS是定义了统一的接口,来对消息操作进行统一;AMQP是通过规定协议来统一数据交互的格式
- JMS限定了必须使用Java语言;AMQP只是协议,不规定实现方式,因此是跨语言的。
- JMS规定了两种消息模型;而AMQP的消息模型更加丰富
消息队列实现
- RabbitMQ 是基于AMQP 协议的 具有跨语言的特性,支持多种开发语言,基于erlang语言编写,天生具有高并发.
- rocketMQ 是基于JMS的 是阿里巴巴旗下开发的mq,现在可以多语言接入,声称可用性极高,消息从来不会丢失. 具体比较(偷来的图):
消息队列常用的场景
削峰 例如: 我们做的考试系统中,用户通过人脸识别登录系统,考虑到考试系统的特殊性,三万名考生参加考试,需要记录人脸识别登录照片。 从考试完结果上看,用户最大并发数在4000,于是我们采用rocketMq来进行异步消费用户人脸识别图片,当时统计rocketMq每秒1000消费消息。及时反馈了考生人脸识别登录成功,对数据库写操作也起到很大的缓冲功能。
诸如秒杀、抢红包、企业开门红等大型活动时皆会带来较高的流量脉冲,或因没做相应的保护而导致系统超负荷甚至崩溃,或因限制太过导致请求大量失败而影响用户体验,消息队列RocketMQ可提供削峰填谷的服务来解决该问题。
解耦 如常用ABCD系统中,BCD系统都需要从A系统中调用接口返回数据,这时候突然来了E系统,也需要A系统,又或者C系统不想要用这个接口数据了,而且A系统还得考虑,如果BCD接收不到数据,接收失败咋整之类的问题。
如果基于消息队列,这些问题就迎刃而解了。
A系统直接把数据扔到Mq中,BCDE系统直接从MQ中消费,如果消费失败,则重试消费。
异步 比如下订单系统中,会调用库存系统,会调用仓库系统,积分系统等,用户订单操作会直接返回给用户信息,提示订单完成,至于库存减少,或者仓库发货又或者积分的增加等,都是异步完成。极大的提高用户响应速度。
分布式事务一致性
交易系统、支付红包等场景需要确保数据的最终一致性,大量引入消息队列RocketMQ的分布式事务,既可以实现系统之间的解耦,又可以保证最终的数据一致性。
大数据分析
数据在“流动”中产生价值,传统数据分析大多是基于批量计算模型,而无法做到实时的数据分析,利用阿里云消息队列RocketMQ与流式计算引擎相结合,可以很方便的实现业务数据的实时分析。
分布式缓存同步
天猫双11大促,各个分会场琳琅满目的商品需要实时感知价格变化,大量并发访问数据库导致会场页面响应时间长,集中式缓存因带宽瓶颈,限制了商品变更的访问流量,通过消息队列RocketMQ构建分布式缓存,实时通知商品数据的变化。
RocketMQ & RabbitMQ特性
RocketMQ:
- NameServer:整个MQ集群提供服务协调与治理,具体就是记录维护Topic、Broker的信息,及监控Broker的运行状态,Name Server是一个几乎无状态节点,可集群部署,节点之间无任何信息同步,相当于注册中心.
- Broker:消息服务器,作为server提供消息核心服务,每个Broker与Name Server集群中的所有节点建立长连接,定时注册Topic信息到所有Name Server;
- Producer:消息生产者,业务的发起方,负责生产消息传输给broker.
- Consumer:消息消费者,业务的处理方,负责从broker获取消息并进行业务逻辑处理
RabbitMQ:
- Exchange:交换机的作用就是根据路由规则,将消息转发到对应的队列上。.
- Broker:消息服务器,作为server提供消息核心服务
- Channel:信道是建立在真实的TCP连接内的虚拟连接.
- Routing key:生产者将消息发送到交换机时,会在消息头上携带一个 key,这个 key就是routing key,来指定这个消息的路由规则。
- Binding key:在绑定Exchange与Queue时,一般会指定一个binding key,生产者将消息发送给Exchange时,消息头上会携带一个routing key,当binding key与routing key相匹配时,消息将会被路由到对应的Queue中。
各种消息队列优缺点
RabbitMQ
优点:rabbitMq 几万级数据量,基于erlang语言开发,因此响应速度快些,并且社区活跃度比较活跃,可视化界面
缺点:数据吞吐量相对与小一些,并且是基于erlang语言开发,比较重的问题难以维护
RocketMq
rocketMq几十万级别数据量,基于Java开发,应对了淘宝双十一考验,并且文档十分的完善,拥有一些其他消息队列不具备的高级特性。
-
如定时推送,其他消息队列是延迟推送,如 rabbitMq 通过设置 expire 字段设置延迟推送时间。
-
如rocketmq实现分布式事务,rocketmq4.3开始支持分布式事务消息,比较可靠的。
kafka
kafka真正的大规模分布式消息队列,提供的核心功能比较少。基于zookeeper实现的分布式消息订阅。据说最新版不依赖zookeeper了
消息队列常使用的注意事项
1、如何保证系统的高可用
RocketMQ:分为多主集群结构,多主多备异步复制结构,多主多备同步复制结构
- 多Master:配置简单,性能最高,但可能会有少量消息丢失(配置相关),单台机器重启或宕机期间,该机器下未被消费的消息在机器恢复前不可订阅,影响消息实时性。对应的配置文件
$ROCKETMQ_HOME/conf/2m-noslave/
- 多Master多Slave异步模式:每个Master配一个Slave,有多对Master-Slave,消息写入全部是发送到Master Broker的,获取消息也可以Master获取,少了Slave Broker,会导致所有读写压力都集中在Master Broker,集群采用异步复制方式,主备有短暂消息延迟,毫秒级;性能同多Master几乎一样,实时性高,主备间切换对应用透明,不需人工干预,但Master宕机或磁盘损坏时会有少量消息丢失;对应的配置文件
$ROCKETMQ_HOME/conf/2m-2s-async
- 多Master多Slave同步模式:每个Master配一个Slave,有多对Master-Slave,消息写入全部是发送到Master Broker的,获取消息也可以Master获取,少了Slave Broker,会导致所有读写压力都集中在Master Broker,集群采用同步双写方式,主备都写成功,向应用返回成功;优点是服务可用性与数据可用性非常高;缺点是性能比异步集群略低,当前版本主宕备不能自动切换为主。 对应的配置文件
$ROCKETMQ_HOME/conf/2m-2s-sync
RabbitMQ:分为普通集群模式、普通集群模式概念,就是用户在发送数据时候,发送到mq机器上,并且持久化磁盘,然后通过设置镜像的queue,把数的持久化地址对应表同步到另外mq机器上。这种就有效防止一台mq挂了以后,另外的mq可以直接对外提供消费功能。
-
普通集群模式:多台机器上启动多个rabbitmq实例,每个机器启动一个。但是你创建的queue,只会放在一个rabbtimq实例上,但是每个实例都同步queue的元数据。完了你消费的时候,实际上如果连接到了另外一个实例,那么那个实例会从queue所在实例上拉取数据过来。如果那个放queue的实例宕机了,会导致接下来其他实例就无法从那个实例拉取,如果你开启了消息持久化,让rabbitmq落地存储消息的话,消息不一定会丢,得等这个实例恢复了,然后才可以继续从这个queue拉取数据。
-
镜像集群模式:创建的queue,无论元数据还是queue里的消息都会存在于多个实例上,然后每次写消息到queue的时候,都会自动把消息到多个实例的queue里进行消息同步。缺点:(1)性能开销大,因为需要进行整个集群内部所有实例的数据同步;(2)无法线性扩容: 因为每一个服务器中都包含整个集群服务节点中的所有数据, 这样如果一旦单个服务器节点的容量无法容纳了。
2、如何保证消息不会丢失
RocketMQ:
- 生产者丢数据:(1)采取send()同步发消息,发送结果是同步感知的。发送失败后可以重试,设置重试次数。默认3次。(2)发送失败的消息会存储在Commitlog中。
- 消息队列丢数据:(1)消息支持持久化到Commitlog里面,即使宕机后重启,未消费的消息也是可以加载出来的;(2)Broker自身支持同步刷盘、异步刷盘的策略,可以保证接收到的消息一定存储在本地的内存中;(3)Broker集群支持 1主N从的策略,支持同步复制和异步复制的方式,同步复制可以保证即使Master 磁盘崩溃,消息仍然不会丢失
- 消费者丢失数据:(1)完全成功后发送ACK;(2)维护一个持久化的offset
RabbitMQ:
- 生产者丢数据:RabbitMQ提供transaction(事务,支持回滚)和confirm模式(ACK给生产者)来确保生产者不丢消息;
- 消息队列丢数据:开启rabbitmq的持久化,就是消息写入之后会持久化到磁盘,持久化可以跟生产者那边的confirm机制配合起来,只有消息被持久化到磁盘之后,才会通知生产者ack。
- 消费者丢失数据:消费者丢数据一般是因为采用了自动确认消息模式,改为手动确认消息,处理消息成功后,手动回复确认消息。
rabbitMq
就rabbitmq而言,从生产者,消费者,消息队列角度分析。生产者而言,发送消息如果失败,则定义重试次数,一般设置成五次。
两种解决方式:
1、通过设置事务,进行事务回滚重试
2、通过发送者确认模式开启
-
方式一:
channel.waitForConfirms()
普通发送方确认模式; -
方式二:
channel.waitForConfirmsOrDie()
批量确认模式; -
方式三:
channel.addConfirmListener()
异步监听发送方确认模式;
就mq本身而言,需要做队列的持久化到磁盘的操作。
1、queque队列的持久化,通过channel.queue_declare(queue='hello', durable=True);设置
2、设置消息的持久化,通过delivery_mode=2来进行设置。
mq消费者而言,开启手动ACK模式,也就是需要真正的消费者入库成功,才会进行消费成功的确认。
总结就是一句话:发送者确认模式开启,消息持久化默认开启,消费者消费开启手动ack
rocketMq
rocketMq而言,生产者发送消息,生产者默认模式
rocketMq持久化方式中,消息持久化通过如下配置
3、消费者幂等消费问题
感觉rabbitmq和rocketmq出现重复消费场景差不多
当出现消费者对某条消息重复消费的情况时,重复消费的结果与消费一次的结果是相同的,并且多次消费并未对业务系统产生任何负面影响,那么这个消费者的处理过程就是幂等的。
例如,在支付场景下,消费者消费扣款消息,对一笔订单执行扣款操作,扣款金额为100元。如果因网络不稳定等原因导致扣款消息重复投递,消费者重复消费了该扣款消息,但最终的业务结果是只扣款一次,扣费100元,且用户的扣款记录中对应的订单只有一条扣款流水,不会多次扣除费用。那么这次扣款操作是符合要求的,整个消费过程实现了消费幂等。
发送时消息重复
当一条消息已被成功发送到服务端并完成持久化,此时出现了网络闪断或者客户端宕机,导致服务端对客户端应答失败。 如果此时生产者意识到消息发送失败并尝试再次发送消息,消费者后续会收到两条内容相同并且 Message ID 也相同的消息。
投递时消息重复
消息消费的场景下,消息已投递到消费者并完成业务处理,当客户端给服务端反馈应答的时候网络闪断。 为了保证消息至少被消费一次,消息队列 RocketMQ 的服务端将在网络恢复后再次尝试投递之前已被处理过的消息,消费者后续会收到两条内容相同并且 Message ID 也相同的消息。
负载均衡时消息重复(包括但不限于网络抖动、Broker 重启以及订阅方应用重启)
- 当消息队列 RocketMQ 的 Broker 或客户端重启、扩容或缩容时,会触发 Rebalance,此时消费者可能会收到重复消息。
- 解决方式的话,通过messageId,作为数据库业务主键,重复插入会报错主键冲突问题。
- 或者通过redis唯一性,messageId作为key存入,去重重复的数据,在从redis中刷到数据库里面。
RocketMQ官网
RocketMQ简介
RocketMQ是由阿里捐赠给Apache的一款低延迟、高并发、高可用、高可靠的分布式消息中间件。经历了淘宝双十一的洗礼。RocketMQ既可为分布式应用系统提供异步解耦和削峰填谷的能力,同时也具备互联网应用所需的海量消息堆积、高吞吐、可靠重试等特性。
RocketMQ基本概念
- Topic:消息主题,一级消息类型,生产者向其发送消息。
- Message:生产者向Topic发送并最终传送给消费者的数据消息的载体。
- 消息属性:生产者可以为消息定义的属性,包含Message Key和Message Tag。
- Message Key:消息的业务标识,由消息生产者(Producer)设置,唯一标识某个业务逻辑。
- Message ID:消息的全局唯一标识,由消息队列RocketMQ系统自动生成,唯一标识某条消息。
- Tag:消息标签,二级消息类型,用来进一步区分某个Topic下的消息分类
- Producer:也称为消息发布者,负责生产并发送消息至Topic。
- Consumer:也称为消息订阅者,负责从Topic接收并消费消息。
- 分区:即Topic Partition,物理上的概念。每个Topic包含一个或多个分区。
- 消费位点:每个Topic会有多个分区,每个分区会统计当前消息的总条数,这个称为最大位点MaxOffset;分区的起始位置对应的位置叫做起始位点MinOffset。
- Group:一类生产者或消费者,这类生产者或消费者通常生产或消费同一类消息,且消息发布或订阅的逻辑一致。
- Group ID:Group的标识。
- 队列:一个Topic下会由一到多个队列来存储消息。
- Exactly-Once投递语义:Exactly-Once投递语义是指发送到消息系统的消息只能被Consumer处理且仅处理一次,即使Producer重试消息发送导致某消息重复投递,该消息在Consumer也只被消费一次。
- 集群消费:一个Group ID所标识的所有Consumer平均分摊消费消息。例如某个Topic有9条消息,一个Group ID有3个Consumer实例,那么在集群消费模式下每个实例平均分摊,只消费其中的3条消息。
- 广播消费:一个Group ID所标识的所有Consumer都会各自消费某条消息一次。例如某个Topic有9条消息,一个Group ID有3个Consumer实例,那么在广播消费模式下每个实例都会各自消费9条消息。
- 定时消息:Producer将消息发送到消息队列RocketMQ服务端,但并不期望这条消息立马投递,而是推迟到在当前时间点之后的某一个时间投递到Consumer进行消费,该消息即定时消息。
- 延时消息:Producer将消息发送到消息队列RocketMQ服务端,但并不期望这条消息立马投递,而是延迟一定时间后才投递到Consumer进行消费,该消息即延时消息。
- 事务消息:RocketMQ提供类似X/Open XA的分布事务功能,通过消息队列RocketMQ的事务消息能达到分布式事务的最终一致。
- 顺序消息:RocketMQ提供的一种按照顺序进行发布和消费的消息类型,分为全局顺序消息和分区顺序消息。
- 全局顺序消息:对于指定的一个Topic,所有消息按照严格的先入先出(FIFO)的顺序进行发布和消费。
- 分区顺序消息:对于指定的一个Topic,所有消息根据Sharding Key进行区块分区。同一个分区内的消息按照严格的FIFO顺序进行发布和消费。Sharding Key是顺序消息中用来区分不同分区的关键字段,和普通消息的Message Key是完全不同的概念。
- 消息堆积:Producer已经将消息发送到消息队列RocketMQ的服务端,但由于Consumer消费能力有限,未能在短时间内将所有消息正确消费掉,此时在消息队列RocketMQ的服务端保存着未被消费的消息,该状态即消息堆积。
- 消息过滤:Consumer可以根据消息标签(Tag)对消息进行过滤,确保Consumer最终只接收被过滤后的消息类型。消息过滤在消息队列RocketMQ的服务端完成。
- 消息轨迹:在一条消息从Producer发出到Consumer消费处理过程中,由各个相关节点的时间、地点等数据汇聚而成的完整链路信息。通过消息轨迹,您能清晰定位消息从Producer发出,经由消息队列RocketMQ服务端,投递给Consumer的完整链路,方便定位排查问题。
- 重置消费位点:以时间轴为坐标,在消息持久化存储的时间范围内(默认3天),重新设置Consumer对已订阅的Topic的消费进度,设置完成后Consumer将接收设定时间点之后由Producer发送到消息队列RocketMQ服务端的消息。
- 死信队列:死信队列用于处理无法被正常消费的消息。当一条消息初次消费失败,消息队列RocketMQ会自动进行消息重试;达到最大重试次数后,若消费依然失败,则表明Consumer在正常情况下无法正确地消费该消息。此时,消息队列RocketMQ不会立刻将消息丢弃,而是将这条消息发送到该Consumer对应的特殊队列中。 消息队列RocketMQ将这种正常情况下无法被消费的消息称为死信消息(Dead-Letter Message),将存储死信消息的特殊队列称为死信队列(Dead-Letter Queue)。
RocketMQ消息收发模型
消息队列RocketMQ支持发布和订阅模型,消息生产者应用创建Topic并将消息发送到Topic。消费者应用创建对Topic的订阅以便从其接收消息。通信可以是一对多(扇出)、多对一(扇入)和多对多。具体通信如下图所示。
- 生产者集群:用来表示发送消息应用,一个生产者集群下包含多个生产者实例,可以是多台机器,也可以是一台机器的多个进程,或者一个进程的多个生产者对象。 一个生产者集群可以发送多个Topic消息。发送分布式事务消息时,如果生产者中途意外宕机,消息队列RocketMQ服务端会主动回调生产者集群的任意一台机器来确认事务状态。
- 消费者集群:用来表示消费消息应用,一个消费者集群下包含多个消费者实例,可以是多台机器,也可以是多个进程,或者是一个进程的多个消费者对象。 一个消费者集群下的多个消费者以均摊方式消费消息。如果设置的是广播方式,那么这个消费者集群下的每个实例都消费全量数据。 一个消费者集群对应一个Group ID,一个Group ID可以订阅多个Topic,如上图中的Group 2所示。Group和Topic的订阅关系可以通过直接在程序中设置即可。
RocketMQ版本选择
https://rocketmq.apache.org/zh/version
在 RocketMQ 4.5 版本之前,RocketMQ 只有 Master/Slave 一种部署方式,一组 broker 中有一个 Master ,有零到多个Slave,Slave 通过同步复制或异步复制的方式去同步 Master 数据。Master/Slave 部署模式,提供了一定的高可用性。但这样的部署模式,有一定缺陷。比如故障转移方面,如果主节点挂了,还需要人为手动进行重启或者切换,无法自动将一个从节点转换为主节点。因此,我们希望能有一个新的多副本架构,去解决这个问题。
新的多副本架构首先需要解决自动故障转移的问题,本质上来说是自动选主的问题。这个问题的解决方案基本可以分为两种:
- 利用第三方协调服务集群完成选主,比如 zookeeper 或者 etcd。这种方案会引入了重量级外部组件,加重部署,运维和故障诊断成本,比如在维护 RocketMQ 集群还需要维护 zookeeper 集群,并且 zookeeper 集群故障会影响到 RocketMQ 集群。
- 利用 raft 协议来完成一个自动选主,raft 协议相比前者的优点是不需要引入外部组件,自动选主逻辑集成到各个节点的进程中,节点之间通过通信就可以完成选主。
因此最后选择用 raft 协议来解决这个问题,而 DLedger 就是一个基于 raft 协议的 commitlog 存储库,也是 RocketMQ 实现新的高可用多副本架构的关键。
DLedger —基于 raft 协议的 commitlog 存储库
RocketMQ 4.5版本以后,依赖Dledger进行主从切换
RocketMQ 5.0 的架构中,在客户端和broker之间增加了一个 Proxy(代理层)。简单来说,就是对外屏蔽了NameServer、Broker 的概念和调用,统一收敛由 Proxy 来负责提供消息发送、接收、调度控制等功能。
而无状态的 Proxy ,其实也是将客户端(Client)和 Broker 侧的无状态的功能都抽取出来。比如,客户端原有的负载均衡机制、故障隔离、push/pop 消费模型; Broker 的访问控制、多协议适配、客户端治理以及NameServer 的消息路由能力等这些。
当然,必须确保一个宗旨的是:Proxy 所提供的必须都是无状态。这样子才能结合云原生技术,方便扩缩容部署。
RocketMQ系统部署架构理解
图中所涉及到的概念如下所述:
- Name Server:在消息队列RocketMQ版中提供命名服务。是一个很简单的 Topic 路由注册中心,支持 Broker 的动态注册和发现[根据日志可以看出每5秒扫描一次],保存 Topic 和Borker 之间的关系。通常也是集群部署,但是各 NameServer 之间不会互相通信, 各 NameServer 都有完整的路由信息,即是一个几乎无状态节点。
-
Broker:消息中转角色,负责存储消息,转发消息。分为Master Broker和Slave Broker,一个Master Broker可以对应多个Slave Broker,但是一个Slave Broker只能对应一个Master Broker。Broker启动后需要完成一次将自己的路由信息注册至每一台Name Server的操作;随后每隔30s定期向Name Server上报Topic路由信息。
-
生产者:与Name Server集群中的其中一个节点(随机)建立长链接(Keep-alive),定期从Name Server读取Topic路由信息,并向提供Topic服务的Master Broker建立长链接,且定时向Master Broker发送心跳。可以集群部署
-
消费者:与Name Server集群中的其中一个节点(随机)建立长连接,定期从Name Server拉取Topic路由信息,并向提供Topic服务的Master Broker、Slave Broker建立长连接,且定时向Master Broker、Slave Broker发送心跳。Consumer既可以从Master Broker订阅消息,也可以从Slave Broker订阅消息,订阅规则由Broker配置决定。支持集群消费和广播消费消息。
白话文解释:
先启动 NameServer 集群,各 NameServer 之间无任何数据交互,Broker 启动之后会向所有 NameServer 定期(每 30s)发送心跳包,包括:IP、Port、TopicInfo,NameServer 也会定期扫描 Broker 存活列表,如果超过 120s 没有心跳则移除此 Broker 相关信息,代表下线。
这样每个 NameServer 就知道集群所有 Broker 的相关信息,此时 Producer 上线从 NameServer 就可以得知它要发送的某 Topic 消息在哪个 Broker 上,和对应的 Broker (Master 角色的)建立长连接,发送消息。
Consumer 上线也可以从 NameServer 得知它所要接收的 Topic 是哪个 Broker ,和对应的 Master、Slave 建立连接,接收消息。
NameServer
- 每个NameServer节点互相之间是独立的, 没有任何信息交互, 也就不存在任何的选主或主从切换之类的问题, 因此NameServer是很轻量级的.
- 单个NameServer节点中存储了活跃的Broker列表(包括master和slave), 这里活跃的定义是与NameServer保持有心跳.
它的特点就是轻量级,无状态。角色类似于 Zookeeper 的情况,从上面描述知道其主要的两个功能就是:Broker 管理、路由信息管理。
总体而言比较简单,我再贴一些字段,让大家有更直观的印象知道它存储了些什么。
请使用 HTTP 静态服务器寻址(默认),这样 NameServer 就能动态发现。
Broker
Broker 就比较复杂一些了,但是非常重要。大致分为以下五大模块,我们来看一下官网的图。
-
Broker上存放Topic信息, Topic由多个队列组成, 队列会平均分散在多个Broker上.
-
消息发送方发消息会自动轮询所有可以发送的Broker, 尽量平均分布到所有队列中, 最终的效果是所有消息都平均落到每个Broker上.
-
Broker是具体提供业务的服务器, 单个Broker节点与所有的NameServer节点保持长连接及心跳, 定时每隔(30秒)注册Topic信息到所有的NameServer中.
-
NameServer定时每隔(5s)扫描所有存活Broker的连接, 如果NameServer超过2分钟没有收到心跳, 则NameServer会断开与Broker的连接. 底层的通信和连接都是基于Netty实现的的.
Remoting 远程模块,处理客户请求。
Client Manager 管理客户端,维护订阅的主题。
Store Service 提供消息存储查询服务。
HA Serivce,主从同步高可用。
Index Serivce,通过指定key 建立索引,便于查询。
消息刷盘机制
RocketMQ 提供消息同步刷盘和异步刷盘两个选择,关于刷盘我们都知道效率比较低,单纯存入内存中的话效率是最高的,但是可靠性不高,影响消息可靠性的情况大致有以下几种:
- Broker 被暴力关闭,比如 kill -9
- Broker 挂了
- 操作系统挂了
- 机器断电
- 机器坏了,开不了机
- 磁盘坏了
如果都是 1-4 的情况,同步刷盘肯定没问题,异步的话就有可能丢失部分消息,5 和 6就得依靠副本机制了,如果同步双写肯定是稳的,但是性能太差,如果异步则有可能丢失部分消息。
所以需要看场景来使用同步、异步刷盘和副本双写机制。
Producer
- 与NameServer的关系
单个Producer和一台NameServer节点(随机选择) 保持长连接, 定时查询
- 与Broker的关系
单个Produce和与其关联的所有broker保持长连接, 并维持心跳. 默认情况下消息发送采用轮询方式, 会均匀发到对应Topic的所有queue中.
Producer 无非就是消息生产者,那首先它得知道消息要发往哪个 Broker ,于是每 30s 会从某台 NameServer 获取 Topic 和 Broker 的映射关系存在本地内存中,如果发现新的 Broker 就会和其建立长连接,每 30s 会发送心跳至 Broker 维护连接。
并且会轮询当前可以发送的 Broker 来发送消息,达到负载均衡的目的,在同步发送情况下如果发送失败会默认重投两次(retryTimesWhenSendFailed = 2),并且不会选择上次失败的 broker,会向其他 broker 投递。
在异步发送失败的情况下也会重试,默认也是两次 (retryTimesWhenSendAsyncFailed = 2),但是仅在同一个 Broker 上重试。
Consumer
- 与nameserver的关系
单个Consumer和一台NameServer保持长连接,定时查询topic配置信息,如果该NameServer挂掉,消费者会自动连接下一个NameServer,直到有可用连接为止,并能自动重连。与NameServer之间没有心跳。
- 与broker的关系
单个Consumer和与其关联的所有broker保持长连接,并维持心跳,失去心跳后,则关闭连接,并向该消费者分组的所有消费者发出通知,分组内消费者重新分配队列继续消费。
消费者类型 RocketMQ有2种常见的消费模式, 分别是DefaultMQPushConsumer和DefaultMQPullConsumer模式, 这2种模式字面理解一个是推送消息, 一个是拉取消息. 但无论是Push还是Pull, 其本质都是拉取消息, 只是实现机制不一样.
- push consume 推模式
其实并不是Broker主动向Consumer推送消息, 而是consumer向broker发出请求, 保持了一种长链接, broker会每4秒检测一次是否有消息, 如果有消息, 则将消息推送给consumer. 使用push模式实现消息消费, broker会主动记录消息消费的偏移量.
- pull consume 拉模式
拉模式是消费方主动去broker拉取数据, 一般会在本地使用定时任务实现, 使用它获得消息状态方便, 负载均衡性能可控, 但消息的及时性差, 而且需要手动记录消息消费的偏移量信息, 所以在工作中多数情况推荐使用Push模式.
消费模式,一般我们用的都是集群模式。
- 集群模式:在默认情况下,就是集群消费,此时消息发出去后将只有一个消费者能获取消息。[一个分组下的消费者均分消费Topic 消息。]
- 广播模式:一条消息被多个Consumer消费。消息会发给Consume Group中的每一个消费者进行消费。[一个分组下的每个消费者都会消费完整的Topic 消息。]
提高 Consumer 的消费能力
1、提高消费并行度:增加队列数和消费者数量,提高单个消费者的并行消费线程,参数consumeThreadMax
。
2、批处理消费,设置consumeMessageBatchMaxSize
参数,这样一次能拿到多条消息,然后比如一个 update语句之前要执行十次,现在一次就执行完。
3、跳过非核心的消息,当负载很重的时候,为了保住那些核心的消息,设置那些非核心的消息,例如此时消息堆积 1W 条了之后,就直接返回消费成功,跳过非核心消息。
RocketMQ部署
单机版
系统要求
- 64位操作系统,推荐 Linux/Unix/macOS
- 64位 JDK 1.8+
1.下载安装Apache RocketMQ
RocketMQ 的安装包分为两种,二进制包和源码包。
点击这里 下载 Apache RocketMQ 5.0.0的源码包。你也可以从这里 下载到二进制包。二进制包是已经编译完成后可以直接运行的,源码包是需要编译后运行的,
这里以在Linux环境为例,介绍RocketMQ安装过程。
解压下载的源码包并编译构建二进制可执行文件
$ mvn -Prelease-all -DskipTests clean install -U
$ cd distribution/target/rocketmq-5.0.0/rocketmq-5.0.0
直接下载二进制文件,目前最新版是5.0.0
# wget https://dlcdn.apache.org/rocketmq/5.0.0/rocketmq-all-5.0.0-bin-release.zip
# unzip rocketmq-all-5.0.0-bin-release.zip
# tree rocketmq-all-5.0.0-bin-release -L 1
rocketmq-all-5.0.0-bin-release
├── benchmark
├── bin # 启动脚本,包括shell脚本和CMD脚本
├── conf # 实例配置文件,包括broker配置文件、logback配置文件等
├── lib # 依赖jar包,包括Netty、commons-lang、FastJSON等
├── LICENSE
├── NOTICE
└── README.md
4 directories, 3 files
配置环境变量
修改三个文件,一个是bin/runserver.sh
,一个是bin/runbroker.sh
,另一个是bin/tools.sh
在里面找到如下三行,然后将第二行和第三行都删除掉,同时将第一行的值修改为自己的JDK的主目录。
[ ! -e "$JAVA_HOME/bin/java" ] && JAVA_HOME=$HOME/jdk/java
[ ! -e "$JAVA_HOME/bin/java" ] && JAVA_HOME=/usr/java
[ ! -e "$JAVA_HOME/bin/java" ] && error_exit "Please set the JAVA_HOME variable in your environment, We need java(x64)!"
注:查看JDK的目录命令:/usr/libexec/java_home -V,
如果是yum安装,默认jre jdk 安装路径是/usr/lib/jvm 下面
vim /etc/profile
export JAVA_HOME=/usr/lib/jvm/java-xxx # xxx是你安装的版本号
export JRE_HOME=$JAVA_HOME/jre
export CLASSPATH=$JAVA_HOME/lib:$JRE_HOME/lib:$CLASSPATH
export PATH=$JAVA_HOME/bin:$JRE_HOME/bin:$PATH
# 重新加载环境变量
source /etc/profile
查看变量:
echo $JAVA_HOME
echo $CLASSPATH
echo $PATH
2. 启动NameServer
安装完RocketMQ包后,我们启动NameServer
### 启动namesrv
$ nohup sh bin/mqnamesrv & #
# netstat -nutlp | grep 9876 # NameServer监听的接口默认就是9876
tcp6 0 0 :::9876 :::* LISTEN 739728/java
# ps aux| grep rocketmq
### 验证namesrv是否启动成功 [日志在哪儿? 查看 logback_namesrv.xml , {user.home} 就是你用户目录下]
$ tail -f ~/logs/rocketmqlogs/namesrv.log
The Name Server boot success...
信息
我们可以在namesrv.log
中看到 'The Name Server boot success..', 表示NameServer 已成功启动。
3. 启动Broker+Proxy
NameServer成功启动后,我们启动Broker和Proxy,5.x 版本下我们建议使用 Local 模式部署,即 Broker 和 Proxy 同进程部署。5.x 版本也支持 Broker 和 Proxy 分离部署以实现更灵活的集群能力。详情参考部署教程。
### 先启动broker
$ nohup sh bin/mqbroker -n localhost:9876 --enable-proxy &
# netstat -nutlp | grep java
tcp6 0 0 :::8081 :::* LISTEN 15858/java
tcp6 0 0 :::9876 :::* LISTEN 15721/java
tcp6 0 0 :::10909 :::* LISTEN 15858/java
tcp6 0 0 :::10911 :::* LISTEN 15858/java
tcp6 0 0 :::10912 :::* LISTEN 15858/java
### 验证broker是否启动成功, 比如, broker的ip是192.168.1.2 然后名字是broker-a
$ tail -f ~/logs/rocketmqlogs/broker_default.log
The broker[broker-a,192.169.1.2:10911] boot success
...
rocketmq-proxy startup successfully
单机部署Broker启动时,实际上会监听4个端口:8081、10909、10911、10912
8081是5.0版本中的proxy组件的端口;10909、10912两个端口是根据listenPort的值,动态计算出来的。这三个端口由Broker内部不同的组件使用,作用分别如下:
listenPort
listenPort参数是broker的监听端口号,是remotingServer服务组件使用,作为对Producer和Consumer提供服务的端口号,默认为10911,可以通过配置文件修改。
打开broker-x.conf,修改或增加listenPort参数:
# Broker 对外服务的监听端口
listenPort=10911
fastListenPort
fastListenPort参数是fastRemotingServer服务组件使用,默认为listenPort - 2,可以通过配置文件修改。
打开broker-x.conf,修改或增加fastListenPort参数
# 主要用于slave同步master
fastListenPort=10909
haListenPort
haListenPort参数是HAService服务组件使用,用于Broker的主从同步,默认为listenPort + 1,可以通过配置文件修改。
打开broker-x.conf,修改或增加haListenPort参数:
# haService中使用
haListenPort=10912
remotingServer和fastRemotingServer的区别:
Broker端:
remotingServer可以处理客户端所有请求,如:生产者发送消息的请求,消费者拉取消息的请求。
fastRemotingServer功能基本与remotingServer相同,唯一不同的是不可以处理消费者拉取消息的请求。
Broker在向NameServer注册时,只会上报remotingServer监听的listenPort端口。
客户端:
默认情况下,生产者发送消息是请求fastRemotingServer,我们也可以通过配置让其请求remotingServer;消费者拉取消息只能请求remotingServer。
原文链接:https://blog.csdn.net/weixin_33399354/article/details/113025852
信息
我们可以在 broker_default.log 中看到“The broker[brokerName,ip:port] boot success..”,这表明 broker 已成功启动。
备注
至此,一个单节点副本的 RocketMQ 集群已经部署起来了,我们可以利用脚本进行简单的消息收发。
RocketMQ 5.0 提供无状态 proxy,通过 proxy 可以很方便的扩展更多标准消息协议以及流量治理功能。无状态 proxy 也具备良好的的网络穿透能力,可以灵活应对企业在上云过程中面临复杂跨网络访问场景。
4. 工具测试消息收发
在进行工具测试消息收发之前,我们需要告诉客户端NameServer的地址,RocketMQ有多种方式在客户端中设置NameServer地址,这里我们利用环境变量NAMESRV_ADDR
$ export NAMESRV_ADDR=localhost:9876
$ sh bin/tools.sh org.apache.rocketmq.example.quickstart.Producer # 生产者生产1000条消息,运行完自动结束
[topic=TopicTest, brokerName=broker-a, queueId=3], queueOffset=249]
SendResult [sendStatus=SEND_OK, msgId=7F0000016A2A085EDE7B0CC31C9903E7, offsetMsgId=0A00020400002A9F000000000003AC09, messageQueue=MessageQueue
…………
11:28:33.877 [NettyClientSelector_1] INFO RocketmqRemoting - closeChannel: close the connection to remote address[10.0.2.4:10911] result: true
11:28:34.146 [NettyClientSelector_1] INFO RocketmqRemoting - closeChannel: close the connection to remote address[127.0.0.1:9876] result: true
$ sh bin/tools.sh org.apache.rocketmq.example.quickstart.Consumer # 作为消费者消费1000条消息
ConsumeMessageThread_please_rename_unique_group_name_4_11 Receive New Messages: [MessageExt [brokerName=broker-a, queueId=2, storeSize=241, queueOffset=98, sysFlag=0, bornTimestamp=1667446109731, bornHost=/10.0.2.4:44434, storeTimestamp=1667446109736, storeHost=/10.0.2.4:10911, msgId=0A00020400002A9F000000000001718B, commitLogOffset=94603, bodyCRC=1263260627, reconsumeTimes=0, preparedTransactionOffset=0, toString()=Message{topic='TopicTest', flag=0, properties={MIN_OFFSET=0, TRACE_ON=true, MAX_OFFSET=250, MSG_REGION=DefaultRegion, CONSUME_START_TIME=1667446355085, UNIQ_KEY=7F0000016A2A085EDE7B0CC30E220189, CLUSTER=DefaultCluster, WAIT=true, TAGS=TagA}, body=[72, 101, 108, 108, 111, 32, 82, 111, 99, 107, 101, 116, 77, 81, 32, 51, 57, 51], transactionId='null'}]]
…………
如果出现这个错误,就是没有配置java的环境变量
5. SDK测试消息收发
6. 关闭服务器
完成实验后,我们可以通过以下方式关闭服务
$ sh bin/mqshutdown broker
The mqbroker with proxy enable is running(16978)...
Send shutdown request to mqbroker with proxy enable OK(16978)
No mqbroker running.
$ sh bin/mqshutdown namesrv
The mqnamesrv(16104) is running...
Send shutdown request to mqnamesrv(16104) OK
docker部署单机版
namesrv
docker run -d -v /srv/rocketmq/mqnamesrv494/logs:/home/rocketmq/logs \
--name rmqnamesrv \
-e "JAVA_OPT_EXT=-Xms512M -Xmx512M -Xmn128m" \
-p 9999:9876 \
apache/rocketmq:4.9.4 \
sh mqnamesrv
#安装实际情况修改-p 9999:9876
broker
# broker配置文件
cat > /srv/rocketmq/broker.conf << EOF
Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
brokerClusterName = DefaultCluster
brokerName = broker-a
brokerId = 0
deleteWhen = 04
fileReservedTime = 48
brokerRole = ASYNC_MASTER
flushDiskType = ASYNC_FLUSH
#broker IP,根据实际情况修改
brokerIP1=10.28.3.114
#broker监听端口
listenPort=10911
#namesrvAddr,根据实际情况修改
namesrvAddr=10.28.3.114:9999
storePathRootDir=/home/rocketmq/store
storePathCommitLog=/home/rocketmq/store/commitlog
EOF
#启动broker
docker run -itd \
--privileged=true \
-u root \
-v /srv/rocketmq/mqbroker494/store:/home/rocketmq/store \
-v /srv/rocketmq/mqbroker494/logs:/home/rocketmq/logs \
-v /srv/rocketmq/broker.conf:/home/rocketmq/rocketmq-4.9.4/conf/broker.conf \
-e "JAVA_OPT_EXT=-Xms512M -Xmx512M -Xmn128m" \
-e "NAMESRV_ADDR=10.28.3.114:9999" \
-p 10911:10911 -p 10912:10912 -p 10909:10909 \
--name rmqbroker \
apache/rocketmq:4.9.4 \
sh mqbroker -c /home/rocketmq/rocketmq-4.9.4/conf/broker.conf
dashboard
docker run -d --name rocketmq-dashboard -e "JAVA_OPTS=-Drocketmq.namesrv.addr=10.28.3.114:9999" -p 7700:8080 -t apacherocketmq/rocketmq-dashboard:latest
#namesrv的IP和port,10.28.3.114:9999
访问:http://10.28.3.114:7700/
部署方式
https://rocketmq.apache.org/zh/docs/deploymentOperations/15deploy
Apache RocketMQ 5.0 版本完成基本消息收发,包括 NameServer、Broker、Proxy 组件。 在 5.0 版本中 Proxy 和 Broker 根据实际诉求可以分为 Local 模式和 Cluster 模式,一般情况下如果没有特殊需求,或者遵循从早期版本平滑升级的思路,可以选用Local模式。
- 在 Local 模式下,Broker 和 Proxy 是同进程部署,只是在原有 Broker 的配置基础上新增 Proxy 的简易配置就可以运行。
- 在 Cluster 模式下,Broker 和 Proxy 分别部署,即在原有的集群基础上,额外再部署 Proxy 即可。
Local模式部署
单组节点单副本模式
就参考单机版部署
多组节点(集群)单副本模式
一个集群内全部部署 Master 角色,不部署Slave 副本,例如2个Master或者3个Master,这种模式的优缺点如下:
- 优点:配置简单,单个Master宕机或重启维护对应用无影响,在磁盘配置为RAID10时,即使机器宕机不可恢复情况下,由于RAID10磁盘非常可靠,消息也不会丢(异步刷盘丢失少量消息,同步刷盘一条不丢),性能最高;
- 缺点:单台机器宕机期间,这台机器上未被消费的消息在机器恢复之前不可订阅,消息实时性会受到影响。
启动NameServer
NameServer需要先于Broker启动,且如果在生产环境使用,为了保证高可用,建议一般规模的集群启动3个NameServer,各节点的启动命令相同,如下:
### 首先启动NameServer
$ nohup sh bin/mqnamesrv &
### 验证Name Server 是否启动成功
$ tail -f ~/logs/rocketmqlogs/namesrv.log
The Name Server boot success...
启动Broker+Proxy集群
$ export ROCKETMQ_HOME=/root/rocketmq-all-5.0.0-bin-release # 软件存放在/root下面
### 在机器A,启动第一个Master,例如NameServer的IP为:192.168.1.1
$ nohup sh bin/mqbroker -n 192.168.1.1:9876 -c $ROCKETMQ_HOME/conf/2m-noslave/broker-a.properties --enable-proxy &
### 在机器B,启动第二个Master,例如NameServer的IP为:192.168.1.1
$ nohup sh bin/mqbroker -n 192.168.1.1:9876 -c $ROCKETMQ_HOME/conf/2m-noslave/broker-b.properties --enable-proxy &
...
使用以下命令检查一下RocketMQ的集群状态
sh bin/mqadmin clusterList -n 127.0.0.1:9876
# cat conf/2m-noslave/broker-a.properties
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
brokerClusterName=DefaultCluster
brokerName=broker-a
brokerId=0
deleteWhen=04
fileReservedTime=48
brokerRole=ASYNC_MASTER
flushDiskType=ASYNC_FLUSH
listenPort=30911
namesrvAddr=127.0.0.1:9876
#存储路径
storePathRootDir=/tmp/rocketmq-a
#commitLog 存储路径
storePathCommitLog=/tmp/rocketmq-a/commitlog
#消费队列存储路径
storePathConsumeQueue=/tmp/rocketmq-a/consumequeue
#消息索引存储路径
storePathIndex=/tmp/rocketmq-a/index
#checkpoint 文件存储路径
storeCheckpoint=/tmp/rocketmq-a/checkpoint
#abort 文件存储路径
abortFile=/tmp/rocketmq-a/abort
# cat conf/2m-noslave/broker-b.properties
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
brokerClusterName=DefaultCluster
brokerName=broker-b
brokerId=0
deleteWhen=04
fileReservedTime=48
brokerRole=ASYNC_MASTER
flushDiskType=ASYNC_FLUSH
namesrvAddr=127.0.0.1:9876
listenPort=40911
#存储路径
storePathRootDir=/tmp/rocketmq-b
#commitLog 存储路径
storePathCommitLog=/tmp/rocketmq-b/commitlog
#消费队列存储路径
storePathConsumeQueue=/tmp/rocketmq-b/consumequeue
#消息索引存储路径
storePathIndex=/tmp/rocketmq-b/index
#checkpoint 文件存储路径
storeCheckpoint=/tmp/rocketmq-b/checkpoint
#abort 文件存储路径
abortFile=/tmp/rocketmq-b/abort
备注
如上启动命令是在单个NameServer情况下使用的。对于多个NameServer的集群,Broker启动命令中-n
后面的地址列表用分号隔开即可,例如 192.168.1.1:9876;192.161.2:9876
。
多节点(集群)多副本模式-异步复制
每个Master配置一个Slave,有多组 Master-Slave,HA采用异步复制方式,主备有短暂消息延迟(毫秒级),这种模式的优缺点如下:
- 优点:即使磁盘损坏,消息丢失的非常少,且消息实时性不会受影响,同时Master宕机后,消费者仍然可以从Slave消费,而且此过程对应用透明,不需要人工干预,性能同多Master模式几乎一样;
- 缺点:Master宕机,磁盘损坏情况下会丢失少量消息。
启动NameServer
### 首先启动Name Server
$ nohup sh mqnamesrv &
### 验证Name Server 是否启动成功
$ tail -f ~/logs/rocketmqlogs/namesrv.log
The Name Server boot success...
启动Broker+Proxy集群
$ export ROCKETMQ_HOME=/root/rocketmq-all-5.0.0-bin-release # 软件存放在/root下面
### 在机器A,启动第一个Master,例如NameServer的IP为:192.168.1.1
$ nohup sh bin/mqbroker -n 192.168.1.1:9876 -c $ROCKETMQ_HOME/conf/2m-2s-async/broker-a.properties --enable-proxy &
### 在机器B,启动第二个Master,例如NameServer的IP为:192.168.1.1
$ nohup sh bin/mqbroker -n 192.168.1.1:9876 -c $ROCKETMQ_HOME/conf/2m-2s-async/broker-b.properties --enable-proxy &
### 在机器C,启动第一个Slave,例如NameServer的IP为:192.168.1.1
$ nohup sh bin/mqbroker -n 192.168.1.1:9876 -c $ROCKETMQ_HOME/conf/2m-2s-async/broker-a-s.properties --enable-proxy &
### 在机器D,启动第二个Slave,例如NameServer的IP为:192.168.1.1
$ nohup sh bin/mqbroker -n 192.168.1.1:9876 -c $ROCKETMQ_HOME/conf/2m-2s-async/broker-b-s.properties --enable-proxy &
# master与salve部署在同一台的情况下,需要新增主、从节点存储路径的配置,下面是主点新增配置示例:
# 为从节点创建存储文件
mkdir -p /tmp/store-m/{commitlog,consumequeue,index,checkpoint,abort}
# 对Slave配置文件进行修改:
listenPort=30911
namesrvAddr=127.0.0.1:9876
#===================storePath主从节点必须分开====================
#存储路径
storePathRootDir=/tmp/store-m
#commitLog 存储路径
storePathCommitLog=/tmp/store-m/commitlog
#消费队列存储路径
storePathConsumeQueue=/tmp/store-m/consumequeue
#消息索引存储路径
storePathIndex=/tmp/store-m/index
#checkpoint 文件存储路径
storeCheckpoint=/tmp/store-m/checkpoint
#abort 文件存储路径
abortFile=/tmp/store-m/abort
#===================storePath主从节点必须分开====================
可以正常测试消息收发,步骤参考单机版
接下来停止master这个节点,可以看到salve并不会自动切换到master
# 通过如下命令获取PID
# lsof -i :10911
# 然后kill调master进程
# kill -9 1874121
多节点(集群)多副本模式-同步双写
每个Master配置一个Slave,有多对 Master-Slave,HA采用同步双写方式,即只有主备都写成功,才向应用返回成功,这种模式的优缺点如下:
- 优点:数据与服务都无单点故障,Master宕机情况下,消息无延迟,服务可用性与数据可用性都非常高;
- 缺点:性能比异步复制模式略低(大约低10%左右),发送单个消息的RT会略高,且目前版本在主节点宕机后,备机不能自动切换为主机。
提示
以上 Broker 与 Slave 配对是通过指定相同的 BrokerName 参数来配对,Master 的 BrokerId 必须是 0,Slave 的 BrokerId 必须是大于 0 的数。另外一个 Master 下面可以挂载多个 Slave,同一 Master 下的多个 Slave 通过指定不同的 BrokerId 来区分。$ROCKETMQ_HOME指的RocketMQ安装目录,需要用户自己设置此环境变量。
Cluster模式部署
RocketMQ5.0 DLedger Controller模式
https://blog.csdn.net/Huangjiazhen711/article/details/127221881
RocketMQ5.0已经发布,在RocketMQ5.0新增了一个新的高可用模式 DLedger Controller 模式。下面就来聊一下RocketMQ5.0新增的这个模式。
1. 背景
首先我们需要知道DLedger Controller 是为了解决什么问题,先来看一下之前版本的DLedger模式架构图:
在 DLedger 模式下,利用 Raft Commitlog 代替了原来的 Commitlog 了,使得 Commmitlog 具备了选举的能力,当 Master Broker 故障后,通过内部协商,从其他的 Slave Broker 中选出新的 Master,完成主备切换,同时 Raft 的算法也保证了 Commitlog 的一致性。但是存在一些缺点:
- 想要具备选举切换的能力,单组 Broker 内的副本数必须 3 副本及以上(Raft协议决定)
- 副本 ACK 需要严格遵循 Raft 协议多数派的限制,3 副本需要 2 副本 ACK 后才能返回,5 副本需要 3 副本 ACK 后才能返回,副本越多可能耗时也可能越长。(这个也是最重要的一点)
- DLedger 模式下,由于存储库使用了 OpenMessaging DLedger 存储,因此无法复用 RocketMQ 原生的存储和复制的能力(比如 transientStorePool 和零拷贝能力),且对维护造成了困难。
在RocketMQ5.0版本新增了DLedger Controller模式来解决上面对的痛点。
DLedger Controller模式架构
该文档主要介绍如何部署支持自动主从切换的 RocketMQ 集群,其架构如上图所示,主要增加支持自动主从切换的 Controller 组件,其可以独立部署也可以内嵌在 NameServer 中。
DLedger Controller模式的核心思想:将其作为一个选主组件,并且是一个 可选择、松耦合 的组件。当部署 DLedger Controller 组件后,原本 Master-Slave 部署模式下 Broker 组就拥有 Failover 能力。
DLedger Controller的部署有两种模式:
- 内嵌NameSrv
- 单独部署DLedger Controller
独立部署的配置和内嵌配置的区别就是无需配置 enableControllerInNamesrv=true
Controller 组件提供选主能力,若需要保证 Controller 具备容错能力,Controller 部署需要三副本及以上(遵循 Raft 的多数派协议)。
注意
Controller 若只部署单副本也能完成 Broker Failover,但若该单点 Controller 故障,会影响切换能力,但不会影响存量集群的正常收发。
Controller 部署有两种方式。一种是嵌入于 NameServer 进行部署,可以通过配置 enableControllerInNamesrv 打开(可以选择性打开,并不强制要求每一台 NameServer 都打开),在该模式下,NameServer 本身能力仍然是无状态的,也就是内嵌模式下若 NameServer 挂掉多数派,只影响切换能力,不影响原来路由获取等功能。另一种是独立部署,需要单独部署 Controller 组件。
DLedger Controller部署
快速启动DLedger Controller
在RocketMQ中提供了快速启动的脚本
# 内嵌NameServer快速启动
$ sh bin/controller/fast-try-namesrv-plugin.sh start
# 独立部署快速启动
$ sh bin/controller/fast-try-independent-deployment.sh start
Controller 嵌入 NameServer 部署
https://rocketmq.apache.org/zh/docs/deploymentOperations/16autoswitchdeploy#controller-%E5%B5%8C%E5%85%A5-nameserver-%E9%83%A8%E7%BD%B2
嵌入 NameServer 部署时只需要在 NameServer 的配置文件中设置 enableControllerInNamesrv=true
,并填上 Controller 的配置即可。
enableControllerInNamesrv = true
controllerDLegerGroup = group1
controllerDLegerPeers = n0-127.0.0.1:9877;n1-127.0.0.1:9878;n2-127.0.0.1:9879
controllerDLegerSelfId = n0
controllerStorePath = /home/admin/DledgerController
enableElectUncleanMaster = false
notifyBrokerRoleChanged = true
参数解释:
- enableControllerInNamesrv:Nameserver 中是否开启 controller,默认 false。
- controllerDLegerGroup:DLedger Raft Group 的名字,同一个 DLedger Raft Group 保持一致即可。
- controllerDLegerPeers:DLedger Group 内各节点的端口信息,同一个 Group 内的各个节点配置必须要保证一致。
- controllerDLegerSelfId:节点 id,必须属于 controllerDLegerPeers 中的一个;同 Group 内各个节点要唯一。
- controllerStorePath:controller 日志存储位置。controller 是有状态的,controller 重启或宕机需要依靠日志来恢复数据,该目录非常重要,不可以轻易删除。单机部署集群这个参数需要修成成不一样
- enableElectUncleanMaster:是否可以从 SyncStateSet 以外选举 Master,若为 true,可能会选取数据落后的副本作为 Master 而丢失消息,默认为 false。
- notifyBrokerRoleChanged:当 Broker 副本组上角色发生变化时是否主动通知,默认为 true。
参数设置完成后,指定配置文件启动 Nameserver 即可。
$ nohup sh bin/mqnamesrv -c namesrv.conf &
Controller 独立部署
https://rocketmq.apache.org/zh/docs/deploymentOperations/16autoswitchdeploy#controller-%E7%8B%AC%E7%AB%8B%E9%83%A8%E7%BD%B2
$ nohup sh bin/mqcontroller -c controller.conf &
mqcontroller 脚本在源码包 distribution/bin/mqcontroller,配置参数与内嵌模式相同。
注意
独立部署Controller后,仍然需要单独部署NameServer提供路由发现能力
Broker Controller DLedger模式
Broker搭配Controller DLedger模式可以实现高可用以及主备自动切换。有区别的在于需要配置开启Controller模式以及配置DLedger Controller的一些信息,新增参数如下:
- enableControllerMode:Broker controller 模式的总开关,只有该值为 true,自动主从切换模式才会打开。默认为 false。必填
- controllerAddr:controller 的地址,多个 controller 中间用分号隔开。例如
controllerAddr = 127.0.0.1:9877;127.0.0.1:9878;127.0.0.1:9879
,必填 - syncBrokerMetadataPeriod:向 controller 同步 Broker 副本信息的时间间隔。默认 5000(5s)。
- checkSyncStateSetPeriod:检查 SyncStateSet 的时间间隔,检查 SyncStateSet 可能会 shrink SyncState。默认5000(5s)。
- syncControllerMetadataPeriod:同步 controller 元数据的时间间隔,主要是获取 active controller 的地址。默认10000(10s)。
- haMaxTimeSlaveNotCatchup:表示 Slave 没有跟上 Master 的最大时间间隔,若在 SyncStateSet 中的 slave 超过该时间间隔会将其从 SyncStateSet 移除。默认为 15000(15s)。
- storePathEpochFile:存储 epoch 文件的位置。epoch 文件非常重要,不可以随意删除。默认在 store 目录下。必填
- allAckInSyncStateSet:若该值为 true,则一条消息需要复制到 SyncStateSet 中的每一个副本才会向客户端返回成功,可以保证消息不丢失。默认为 false。必填
- syncFromLastFile:若 slave 是空盘启动,是否从最后一个文件进行复制。默认为 false。
- asyncLearner:若该值为 true,则该副本不会进入 SyncStateSet,也就是不会被选举成 Master,而是一直作为一个 learner 副本进行异步复制。默认为false。
- inSyncReplicas:需保持同步的副本组数量,默认为1,allAckInSyncStateSet=true 时该参数无效。
- minInSyncReplicas:最小需保持同步的副本组数量,若 SyncStateSet 中副本个数小于 minInSyncReplicas 则 putMessage 直接返回 PutMessageStatus.IN_SYNC_REPLICAS_NOT_ENOUGH,默认为1。
[root@nova-ad9e8811-3238-40b9-a56e-a473ded894de rocketmq-all-5.0.0-bin-release]# cat conf/2m-2s-sync/broker-a.properties
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
brokerClusterName=DefaultCluster
brokerName=broker-a
#brokerId=0
deleteWhen=04
fileReservedTime=48
#brokerRole=SYNC_MASTER
flushDiskType=ASYNC_FLUSH
## 开启Broker Controller DLedger模式
enableControllerMode=true
allAckInSyncStateSet=true
controllerAddr = 127.0.0.1:9878;127.0.0.1:9868;127.0.0.1:9858
storePathEpochFile=/tmp/store-m/epocfa
namesrvAddr = 127.0.0.1:9876;127.0.0.1:9886;127.0.0.1:9896
listenPort=30911
#===================storePath主从节点必须分开====================
#存储路径
storePathRootDir=/tmp/store-m
#commitLog 存储路径
storePathCommitLog=/tmp/store-m/commitlog
#消费队列存储路径存储路径
storePathConsumeQueue=/tmp/store-m/consumequeue
#消息索引存储路径
storePathIndex=/tmp/store-m/index
#checkpoint 文件存储路径
storeCheckpoint=/tmp/store-m/checkpoint
#abort 文件存储路径
abortFile=/tmp/store-m/abort
##===================storePath主从节点必须分开====================
[root@nova-ad9e8811-3238-40b9-a56e-a473ded894de rocketmq-all-5.0.0-bin-release]# cat conf/2m-2s-sync/broker-a-s.properties
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
brokerClusterName=DefaultCluster
brokerName=broker-a
#brokerId=-1
deleteWhen=04
fileReservedTime=48
#brokerRole=SLAVE
flushDiskType=ASYNC_FLUSH
# 开启Broker Controller DLedger模式
enableControllerMode=true
allAckInSyncStateSet=true
controllerAddr = 127.0.0.1:9878;127.0.0.1:9868;127.0.0.1:9858
storePathEpochFile=/tmp/store-m/epocfas
namesrvAddr = 127.0.0.1:9876;127.0.0.1:9886;127.0.0.1:9896
listenPort=40911
#===================storePath主从节点必须分开====================
#存储路径
storePathRootDir=/tmp/store-s
#commitLog 存储路径
storePathCommitLog=/tmp/store-s/commitlog
#消费队列存储路径存储路径
storePathConsumeQueue=/tmp/store-s/consumequeue
#消息索引存储路径
storePathIndex=/tmp/store-s/index
#checkpoint 文件存储路径
storeCheckpoint=/tmp/store-s/checkpoint
#abort 文件存储路径
abortFile=/tmp/store-s/abort
#===================storePath主从节点必须分开====================
./bin/mqbroker -c conf/2m-2s-sync/broker-a.properties
./bin/mqbroker -c conf/2m-2s-sync/broker-a-s.properties
在Controller模式下,Broker配置必须设置 enableControllerMode=true
,并填写 controllerAddr
,并以下面命令启动:
$ nohup sh bin/mqbroker -c broker.conf &
注意
自动主备切换模式下Broker无需指定brokerId和brokerRole,其由Controller组件进行分配
Tips:
- Controller DLedger模式下
enableControllerMode必须为true
,默认为false - 实现消息不丢失
allAckInSyncStateSet设置为true
总结
- 若需要保证Controller具备容错能力,Controller部署需要三副本及以上(遵循Raft的多数派协议)
- Controller部署配置文件中配置参数controllerDLegerPeers 中的IP地址配置成其他节点能够访问的IP,在多机器部署的时候尤为重要。例子仅供参考需要根据实际情况进行修改调整。
- 要想实现消息不丢失需要Broker进行相对应的配置进行配合使用。
mqadmin指令
注意
- 执⾏命令⽅法:
./mqadmin {command} {args}
- ⼏乎所有命令都需要配置
-n 表⽰ NameServer 地址
,格式为ip:port
- ⼏乎所有命令都可以通过 -h 获取帮助
- 如果既有 Broker 地址(
-b
)配置项又有 clusterName(-c
)配置项,则优先以 Broker 地址执⾏命令,如果不配置 Broker 地址,则对集群中所有主机执⾏命令,只 ⽀持⼀个 Broker 地址。-b
格式为 ip:port,port 默认是 10911 - 在 tools 下可以看到很多命令,但并不是所有命令都能使⽤,只有在 MQAdminStartup 中初始化的命令才能使⽤,你也可以修改这个类,增加或⾃定义 命令
- 由于版本更新问题,少部分命令可能未及时更新,遇到错误请直接阅读相关命令源码
首先进入 RocketMQ 工程,进入/RocketMQ/bin
在该目录下有个 mqadmin 脚本
.
查看帮助: 在 mqadmin
下可以查看有哪些命令
a: 查看具体命令的使用 : sh mqadmin
b: sh mqadmin help 命令名称
例如,查看 updateTopic 的使用
sh mqadmin help updateTopic
-
关闭nameserver和所有的broker: 进入到bin下:
shell sh mqshutdown namesrv sh mqshutdown broker
-
查看所有消费组group:
shell sh mqadmin consumerProgress -n 127.0.0.1:9876
-
查看指定消费组下的所有topic数据堆积情况:
shell sh mqadmin consumerProgress -n 127.0.0.1:9876 -g please_rename_unique_group_name_4
-
查看所有topic :
shell sh mqadmin topicList -n 127.0.0.1:9876
-
查看topic信息列表详情统计
shell sh mqadmin topicstatus -n 127.0.0.1:9876 -t TopicTest
-
新增topic
shell sh mqadmin updateTopic -n 127.0.0.1:9876 -c DefaultCluster -t topicWarning
-
删除topic
shell sh mqadmin deleteTopic -n 127.0.0.1:9876 -c DefaultCluster -t topicWarning
-
查询集群消息
shell sh mqadmin clusterList -n 127.0.0.1:9876
基于DLedger高可用自动切换部署
前言
DLedger是一套基于Raft协议的分布式日志存储组件,部署 RocketMQ 时可以根据需要选择使用DLeger来替换原生的副本存储机制。本文档主要介绍如何快速构建和部署基于 DLedger 的可以自动容灾切换的 RocketMQ 集群。
RocketMQ 4.5版本以后,依赖Dledger进行主从切换
基于Dledger自动容灾切换的最小集群
执行下面的命令进行快速RocketMQ集群的启动
sh bin/dledger/fast-try.sh start
这个命令会在当前这台机器上启动一个NameServer和三个Broker,三个Broker其中一个是Master,另外两个是Slave,瞬间就可以组成一个最小的Rocker集群
使用以下命令检查一下RocketMQ的集群状态
sh bin/mqadmin clusterList -n 127.0.0.1:9876
看到如上图就说明启动成功了,(BID 为 0 的表示 Master,其余都是 Follower)。
启动成功,现在可以向集群收发消息,并进行容灾切换测试了。
容灾切换-> 尝试slave自动切换为master
部署成功,杀掉 Leader 之后(在上面的例子中,杀掉端口 30931 所在的进程),等待约 10s 左右,用 clusterList 命令查看集群,就会发现 Leader 切换到另一个节点了。
## 找到master的进程号
lsof -i :30931
kill -9 932301 ##干掉它
再次查看集群状态详情
sh bin/mqadmin clusterList -n 127.0.0.1:9876
关闭快速集群,可以执行:
$ sh bin/dledger/fast-try.sh stop
快速部署,默认配置在 conf/dledger 里面,默认的存储路径在/tmp/rmqstore
注:第一个Broker的配置文件是broker-n0.conf,第二个broker的配置文件可以是broker-n1.conf,第三broker 的配置文件可以是broker-n2.conf。 对broker-n0.conf配置文件的说明:
brokerClusterName = RaftCluster
这个是集群的名称,你整个broker集群都可以用这个名称
brokerName=RaftNode00
这是Broker的名称,比如`你有一个Master和两个Slave,那么他们的Broker名称必须是一样的`,因为他们三个是一个分组;如果你有另外一组Master和两个Slave,你可以给他们起个别的名字,比如说RaftNode01
listenPort=30911
这个就是你的Broker监听的端口号,如果每台机器上就部署一个Broker,可以考虑就用这个端口号,不用修改
namesrvAddr=127.0.0.1:9876
这里是配置NameServer的地址,如果你有很多个NameServer的话,可以在这里写入多个NameServer的地址,用`分号隔开`
storePathRootDir=/tmp/rmqstore/node00
storePathCommitLog=/tmp/rmqstore/node00/commitlog
上面两个目录是存放Broker数据的地方,你可以换成别的目录,类似于是/usr/local/rocketmq/node00之类的
enableDLegerCommitLog=true
这个是`非常关键的一个配置`,就是是否启用DLeger技术,这个`必须是true`。即是否启用 RocketMQ 主从切换,默认值为 false。如果需要开启主从切换,则该值需要设置为 true 。
dLegerGroup=RaftNode00
节点所属的 raft 组,这个一般建议和Broker名字保持一致,一个Master加两个Slave会组成一个Group
dLegerPeers=n0-127.0.0.1:40911;n1-127.0.0.1:40912;n2-127.0.0.1:40913
这个`很关键`,这里的`端口用作 dledger 内部通信`。DLedger Group 内各节点的端口信息,同一个 Group 内的各个节点配置必须要保证一致
dLegerSelfId=n0
当前节点id,这个是代表了一个Broker在组里的id,一般就是n0、n1、n2之类的,这个你得跟上面的dLegerPeers中的n0、n1、n2相匹配。并且特别需要强调,`只能第一个字符为英文,其他字符需要配置成数字`。
sendMessageThreadPoolNums=16
这个是发送消息的线程数量,一般建议你配置成`跟你的CPU核数一样`,比如我们的机器假设是24核的,那么这里就修改成24核
#限制的消息大小,客户端限制的消息大小,超过报错,同时服务端也会限制,所以需要跟服务端配合使用。默认为4M。1024 * 1024 * 4
maxMessageSize=65536
#Broker 的角色
#- ASYNC_MASTER 异步复制Master
#- SYNC_MASTER 同步双写Master
#- SLAVE salve
brokerRole=ASYNC_MASTER
#刷盘方式
#- ASYNC_FLUSH 异步刷盘
#- SYNC_FLUSH 同步刷盘
flushDiskType=ASYNC_FLUSH
#删除文件时间点,默认凌晨 4点.表明在几点做消息删除动作,与fileReservedTime写作
deleteWhen=04
#文件保留时间,默认 48 小时,自动删除超时的消息
fileReservedTime=120
#RocketMQ有一个默认磁盘的使用率,超过这个使用率,这就报这个错。
diskMaxUsedSpaceRatio=80
#删除的文件被引用时,不会马上被删除,最大的存活时间
destroyMapedFileIntervalForcibly=120000
brokerIP1:当前broker监听的IP。
brokerIP2:broker为master-slave模式时,broker的从节点通过brokerIP2和主节点进行连接。
其实最关键的是,你的Broker是分为多组的,每一组是三个Broker,一个Master和两个Slave。对每一组Broker,他们的Broker名称、Group名称都是一样的,然后你得给他们配置好一样的dLegerPeers(里面是组内三台Broker的地址)然后他们得配置好对应的NameServer的地址,最后还有就是每个Broker有自己的ID,在组内是唯一的就可以了,比如说不同的组里都有一个ID为n0的broker,这个是可以的。
Dledger集群搭建
https://rocketmq.apache.org/zh/docs/bestPractice/16dledger#dledger%E9%9B%86%E7%BE%A4%E6%90%AD%E5%BB%BA
RocketMQ-on-DLedger Group 是指一组相同名称的 Broker,至少需要 3 个节点,通过 Raft 自动选举出一个 Leader,其余节点 作为 Follower,并在 Leader 和 Follower 之间复制数据以保证高可用。 RocketMQ-on-DLedger Group 能自动容灾切换,并保证数据一致。 RocketMQ-on-DLedger Group 是可以水平扩展的,也即可以部署任意多个 RocketMQ-on-DLedger Group 同时对外提供服务。
1. 新集群部署
1.1 编写配置
每个 RocketMQ-on-DLedger Group 至少准备三台机器(本文假设为 3)。 编写 3 个配置文件,建议参考 conf/dledger 目录下的配置文件样例。 关键配置介绍:
name | 含义 | 举例 |
---|---|---|
enableDLegerCommitLog | 是否启动 DLedger | true |
dLegerGroup | DLedger Raft Group的名字,建议和 brokerName 保持一致 | RaftNode00 |
dLegerPeers | DLedger Group 内各节点的端口信息,同一个 Group 内的各个节点配置必须要保证一致 | n0-127.0.0.1:40911;n1-127.0.0.1:40912;n2-127.0.0.1:40913 |
dLegerSelfId | 节点 id, 必须属于 dLegerPeers 中的一个;同 Group 内各个节点要唯一 | n0 |
sendMessageThreadPoolNums | 发送线程个数,建议配置成 Cpu 核数 | 16 |
这里贴出 conf/dledger/broker-n0.conf 的配置举例。
brokerClusterName = RaftCluster
brokerName=RaftNode00
listenPort=30911
namesrvAddr=127.0.0.1:9876
storePathRootDir=/tmp/rmqstore/node00
storePathCommitLog=/tmp/rmqstore/node00/commitlog
enableDLegerCommitLog=true
dLegerGroup=RaftNode00
dLegerPeers=n0-127.0.0.1:40911;n1-127.0.0.1:40912;n2-127.0.0.1:40913
## must be unique
dLegerSelfId=n0
sendMessageThreadPoolNums=16
1.2 启动 Broker
与老版本的启动方式一致。
$ nohup sh bin/mqbroker -c conf/dledger/xxx-n0.conf &
$ nohup sh bin/mqbroker -c conf/dledger/xxx-n1.conf &
$ nohup sh bin/mqbroker -c conf/dledger/xxx-n2.conf &
RocketMQ可视化界面安装
docker 镜像安装-x86
① 安装docker,拉取 rocketmq-dashboard
镜像
$ docker pull apacherocketmq/rocketmq-dashboard:latest
② docker 容器中运行 rocketmq-dashboard
$ docker run -d --name rocketmq-dashboard -e "JAVA_OPTS=-Drocketmq.namesrv.addr=192.168.137.7:9876" -p 8080:8080 -t apacherocketmq/rocketmq-dashboard:latest
访问地址: http://192.168.137.7:8080
运维这块儿就两个功能:
- 设置Nameserver
- 打开/关闭vipchannnel
设置nameserver:可以添加多个nameserver地址到输入框内,默认读取的是DashBoard这个springboot启动配置里面的nameserver配置。如果rockermq集群里有加入新的nameserver节点,可以在这里动态配置后更新生效。
打开/关闭vipchannel: 这里默认为false就好,vipchannnel针对的是topic的优先级,相当于在消息处理的时候,有些topic可以走vipchannel,可以优先被处理,这个除了电商场景用的一般不多。
提示
namesrv.addr:port
替换为 rocketmq
中配置的 nameserver 地址:端口号
开放端口号:8080,9876,10911,11011 端口
- 云服务器:设置安全组访问规则
- 本地虚拟机:关闭防火墙,或
-add-port
源码部署
https://rocketmq.apache.org/zh/docs/deploymentOperations/17Dashboard
源码地址:apache/rocketmq-dashboard
下载并解压,切换至源码目录 rocketmq-dashboard-master/
① 编译 rocketmq-dashboard
$ mvn clean package -Dmaven.test.skip=true
② 运行 rocketmq-dashboard
$ java -jar target/rocketmq-dashboard-1.0.1-SNAPSHOT.jar
$ java -Drocketmq.namesrv.addr=192.168.137.7:9876 -Dserver.port=8080 -jar target/rocketmq-dashboard-1.0.1-SNAPSHOT.jar # 指定namesrv地址
提示:Started App in x.xxx seconds (JVM running for x.xxx)
启动成功
浏览器页面访问:namesrv.addr:8080
关闭rocketmq-dashboard : ctrl + c
再次启动:执行 ②
tips:下载后的源码需要上传到 Linux 系统上编译,本地编译可能会报错。
解决内存不足,修改runbroker.sh,runserver.sh 脚本
修改启动参数,分别对 bin 目录下的runserver.sh
和 runbroker.sh
进行修改。
1、runserver.sh 修改 默认配置:
JAVA_OPT="${JAVA_OPT} -server -Xms4g -Xmx4g -Xmn2g -XX:MetaspaceSize=128m -XX:MaxMetaspaceSize=320m"
修改为(可根据实际情况修改):
JAVA_OPT="${JAVA_OPT} -server -Xms256m -Xmx256m -Xmn125m -XX:MetaspaceSize=128m -XX:MaxMetaspaceSize=320m"
2、runbroker.sh 修改 默认配置:
JAVA_OPT="${JAVA_OPT} -server -Xms8g -Xmx8g"
修改为(可根据实际情况修改):
JAVA_OPT="${JAVA_OPT} -server -Xms256m -Xmx256m"
3、修改tools.sh文件
默认配置:
JAVA_OPT="${JAVA_OPT} -server -Xms1g -Xmx1g -Xmn256m -XX:MetaspaceSize=128m -XX:MaxMetaspaceSize=128m"
修改为(可根据实际情况修改):
JAVA_OPT="${JAVA_OPT} -server -Xms256m -Xmx256m -Xmn256m -XX:MetaspaceSize=128m -XX:MaxMetaspaceSize=128m"
参数配置
https://rocketmq.apache.org/zh/docs/4.x/parameterConfiguration/24server
客户端配置
服务端配置
NameServer配置
名称 | 描述 | 参数类型 | 默认值 | 有效值 | 重要性 |
---|---|---|---|---|---|
rocketmqHome | RocketMQ主目录,默认用户主目录 | String | |||
namesrvAddr | NameServer地址 | String | |||
kvConfigpath | kv配置文件路径,包含顺序消息主题的配置信息 | String | |||
configStorePath | NameServer配置文件路径,建议使用-c指定NameServer配置文件路径 | String | |||
clusterTest | 是否支持集群测试,默认为false | boolean | |||
orderMessageEnable | 是否支持顺序消息,默认为false | boolean | |||
网络配置属性
名称 | 描述 | 参数类型 | 默认值 | 有效值 | 重要性 |
---|---|---|---|---|---|
accessMessageInMemorymaxRatio | 访问消息在内存中的比率 | int | 40(%) | ||
adminBrokerThreadPoolNums | 服务端处理控制台管理命令线程池线程数量 | int | 16 | ||
autoCreateSubscriptionGroup | 是否自动创建消费组 | boolean | true | true/false | |
autoCreateTopicEnable | 是否自动创建主题 | boolean | |||
bitMapLengthConsumeQueueExt | ConsumeQueue扩展过滤bitmap大小 | int | 112 | ||
brokerClusterName | Broker集群名称 | String | TestCluster | ||
brokerFastFailureEnable | 是否支持broker快速失败 如果为true表示会立即清除发送消息线程池,消息拉取线程池中排队任务 ,直接返回系统错误 | boolean | true | ||
brokerId | brokerID 0表示主节点 大于0表示从节点 | int | 0 | ||
brokerIP1 | Broker服务地址 | String | |||
brokerIP2 | BrokerHAIP地址,供slave同步消息的地址 | String | |||
brokerName | Broker服务器名称morning服务器hostname | String | broker-a | ||
brokerPermission | Broker权限 默认为6表示可读可写 | int | 6 | ||
brokerRole | broker角色,分为 ASYNC_MASTER SYNC_MASTER, SLAVE | enum | ASYNC_MASTER | ||
brokerTopicEnable | broker名称是否可以用做主体使用 | boolean | |||
channelNotActiveInterval | long | ||||
checkCRCOnRecover | 文件恢复时是否校验CRC | boolean | |||
cleanFileForciblyEnable | 是否支持强行删除过期文件 | boolean | |||
cleanResourceInterval | 清除过期文件线程调度频率 | int | |||
clientAsyncSemaphoreValue | 客户端对invokeAsyncImpl方法的调用频率 | int | |||
clientCallbackExecutorThreads | 客户端执行回调线程数 | int | |||
clientChannelMaxIdleTimeSeconds | 客户端每个channel最大等待时间 | int | |||
clientCloseSocketIfTimeout | 客户端关闭socket是否需要等待 | boolean | false | ||
clientManagerThreadPoolQueueCapacity | 客户端管理线程池任务队列初始大小 | int | 1000000 | ||
clientManageThreadPoolNums | 服务端处理客户端管理(心跳 注册 取消注册线程数量) | int | 32 | ||
clientOnewaySemaphoreValue | 客户端对invokeOnewayImpl方法的调用控制 | int | |||
clientPooledByteBufAllocatorEnable | 客户端池化内存是否开启 | boolean | |||
clientSocketRcvBufSize | 客户端socket接收缓冲区大小 | long | |||
clientSocketSndBufSize | 客户端socket发送缓冲区大小 | long | |||
clientWorkerThreads | worker线程数 | int | |||
clusterTopicEnable | 集群名称是否可用在主题使用 | boolean | |||
commercialBaseCount | |||||
commercialBigCount | |||||
commercialEnable | |||||
commercialTimerCount | |||||
commitCommitLogLeastPages | 一次提交至少需要脏页的数量,默认4页,针对 commitlog文件 | int | |||
commitCommitLogThoroughInterval | Commitlog两次提交的最大间隔,如果超过该间隔,将忽略commitCommitLogLeastPages直接提交 | int | 200 | ||
commitIntervalCommitLog | commitlog提交频率 | int | 200 | ||
compressedRegister | 是否开启消息压缩 | boolean | |||
connectTimeoutMillis | 链接超时时间 | long | 3000 | ||
consumerFallbehindThreshold | 消息消费堆积阈值默认16GB在disableConsumeifConsumeIfConsumerReadSlowly为true时生效 | long | 17179869184 | ||
consumerManagerThreadPoolQueueCapacity | 消费管理线程池任务队列大小 | int | 1000000 | ||
consumerManageThreadPoolNums | 服务端处理消费管理 获取消费者列表 更新消费者进度查询消费进度等 | int | 32 | ||
debugLockEnable | 是否支持 PutMessage Lock锁打印信息 | boolean | false | ||
defaultQueryMaxNum | 查询消息默认返回条数,默认为32 | int | 32 | ||
defaultTopicQueueNums | 主体在一个broker上创建队列数量 | int | 8 | ||
deleteCommitLogFilesInterval | 删除commitlog文件的时间间隔,删除一个文件后等一下再删除一个文件 | int | 100 | ||
deleteConsumeQueueFilesInterval | 删除consumequeue文件时间间隔 | int | 100 | ||
deleteWhen | 磁盘文件空间充足情况下,默认每天什么时候执行删除过期文件,默认04表示凌晨4点 | string | 04 | ||
destroyMapedFileIntervalForcibly | 销毁MappedFile被拒绝的最大存活时间,默认120s。清除过期文件线程在初次销毁mappedfile时,如果该文件被其他线程引用,引用次数大于0.则设置MappedFile的可用状态为false,并设置第一次删除时间,下一次清理任务到达时,如果系统时间大于初次删除时间加上本参数,则将ref次数一次减1000,知道引用次数小于0,则释放物理资源 | int | 120000 | ||
disableConsumeIfConsumerReadSlowly | 如果消费组消息消费堆积是否禁用该消费组继续消费消息 | boolean | false | ||
diskFallRecorded | 是否统计磁盘的使用情况,默认为true | boolean | true | ||
diskMaxUsedSpaceRatio | commitlog目录所在分区的最大使用比例,如果commitlog目录所在的分区使用比例大于该值,则触发过期文件删除 | int | 75 | ||
duplicationEnable | 是否允许重复复制,默认为 false | boolean | false | ||
enableCalcFilterBitMap | 是否开启比特位映射 | boolean | false | ||
enableConsumeQueueExt | 是否启用ConsumeQueue扩展属性 | boolean | false | ||
enablePropertyFilter | 是否支持根据属性过滤 如果使用基于标准的sql92模式过滤消息则改参数必须设置为true | boolean | false | ||
endTransactionPoolQueueCapacity | 处理提交和回滚消息线程池线程队列大小 | int | |||
endTransactionThreadPoolNums | 处理提交和回滚消息线程池 | int | 24 | ||
expectConsumerNumUseFilter | 布隆过滤器参数 | int | 32 | ||
fastFailIfNoBufferInStorePool | 从 transientStorepool中获取 ByteBuffer是否支持快速失败 | boolean | false | ||
fetchNamesrvAddrByAddressServer | 是否支持从服务器获取nameServer | boolean | false | ||
fileReservedTime | 文件保留时间,默认72小时,表示非当前写文件最后一次更新时间加上filereservedtime小与当前时间,该文件将被清理 | String | 120 | ||
filterDataCleanTimeSpan | 清除过滤数据的时间间隔 | long | 86400000 | ||
filterServerNums | broker服务器过滤服务器数量 | int | 0 | ||
filterSupportRetry | 消息过滤是否支持重试 | boolean | false | ||
flushCommitLogLeastPages | 一次刷盘至少需要脏页的数量,针对commitlog文件 | int | 4 | ||
flushCommitLogTimed | 表示await方法等待FlushIntervalCommitlog,如果为true表示使用Thread.sleep方法等待 | boolean | false | ||
flushConsumeQueueLeastPages | 一次刷盘至少需要脏页的数量,默认2页,针对 Consume文件 | int | 2 | ||
flushConsumeQueueThoroughInterval | Consume两次刷盘的最大间隔,如果超过该间隔,将忽略 | int | 60000 | ||
flushConsumerOffsetHistoryInterval | fushConsumeQueueLeastPages直接刷盘 | int | 60000 | ||
flushConsumerOffsetInterval | 持久化消息消费进度 consumerOffse.json文件的频率ms | int | 5000 | ||
flushDelayOffsetInterval | 延迟队列拉取进度刷盘间隔。默认10s | long | 10000 | ||
flushDiskType | 刷盘方式,默认为 ASYNC_FLUSH(异步刷盘),可选值SYNC_FLUSH(同步刷盘) | enum | ASYNC_FLUSH | ||
flushIntervalCommitLog | commitlog刷盘频率 | int | 500 | ||
flushIntervalConsumeQueue | consumuQueue文件刷盘频率 | int | 1000 | ||
flushLeastPagesWhenWarmMapedFile | 用字节0填充整个文件的,每多少页刷盘一次。默认4096页,异步刷盘模式生效 |
日志删除机制
对于过期文件 1)通过设置删除过期文件的时间,会在这个小时内去删除文件,每次删除10个。
相关配置参数:
#删除文件时间点,默认是凌晨4点,24小时制,可以通过;分隔配置多个
deleteWhen=04
fileReservedTime=72 #文件保留时间,默认72小时
2)通过设置磁盘存储空间,达到了阈值就会删除过期的文件。
相关配置参数:
diskMaxUsedSpaceRatio=75 默认75%
fileReservedTime=72 文件保留时间,默认48小时
3.2对于没有过期的文件 1)磁盘存储空间达到强制清理阈值,(通过启动命令设置)
-Drocketmq.broker.diskSpaceCleanForciblyRatio=0.85
强制清理,默认85%
destroyMapedFileIntervalForcibly= 1000 * 120 ms
删除的文件被引用时,不会马上被删除,最大的存活时间
2)磁盘存储空间达到预警线,(通过启动命令设置)
-Drocketmq.broker.diskSpaceWarningLevelRatio=0.90
禁止写入,并清理,默认90%
destroyMapedFileIntervalForcibly= 1000 * 120 ms
删除的文件被引用时,不会马上被删除,最大的存活时间
RocketMQ监控
RocketMQ-Exporter 有如下的运行选项
选项 | 默认值 | 含义 |
---|---|---|
rocketmq.config.namesrvAddr | 127.0.0.1:9876 | MQ集群的nameSrv地址 |
rocketmq.config.webTelemetryPath | /metrics | 指标搜集路径 |
server.port | 5557 | HTTP服务暴露端口 |
以上的运行选项既可以在下载代码后在配置文件中更改,也可以通过命令行来设置。
docker pull slpcat/rocketmq-exporter:latest
java -jar rocketmq-exporter-0.0.2-SNAPSHOT.jar --rocketmq.config.namesrvAddr='192.168.102.171:9876;192.168.102.16:9876;192.168.102.100:9876' --server.port='5557'
https://grafana.com/grafana/dashboards/10477-rocketmq-dashboard/?tab=reviews
- job_name: 'rocketmqexport'
static_configs:
- targets: ['192.168.102.12:5557']
- 监控指标
监控指标 | 含义 |
---|---|
rocketmq_broker_tps | broker每秒生产消息数量 |
rocketmq_broker_qps | broker每秒消费消息数量 |
rocketmq_producer_tps | 某个topic每秒生产的消息数量 |
rocketmq_producer_put_size | 某个topic每秒生产的消息大小(字节) |
rocketmq_producer_offset | 某个topic的生产消息的进度 |
rocketmq_consumer_tps | 某个消费组每秒消费的消息数量 |
rocketmq_consumer_get_size | 某个消费组每秒消费的消息大小(字节) |
rocketmq_consumer_offset | 某个消费组的消费消息的进度 |
rocketmq_group_get_latency_by_storetime | 某个消费组的消费延时时间 |
rocketmq_message_accumulation (rocketmq_producer_offset-rocketmq_consumer_offset) | 消息堆积量(生产进度-消费进度),需要计算不能直接用 |
rocketmq_message_accumulation 是一个聚合指标,需要根据其它上报指标聚合生成。
- 告警指标
告警指标 | 含义 |
---|---|
sum(rocketmq_producer_tps) by (cluster) >= 10 | 集群发送tps太高(根据cluster分组求和) |
sum(rocketmq_producer_tps) by (cluster) < 1 | 集群发送tps太低 |
sum(rocketmq_consumer_tps) by (cluster) >= 10 | 集群消费tps太高 |
sum(rocketmq_consumer_tps) by (cluster) < 1 | 集群消费tps太低 |
rocketmq_group_get_latency_by_storetime rocketmq_group_get_latency_by_storetime / 1000 / 60 >= 1 | 集群消费延时告警(大于1分钟) |
sort_desc(sum(rocketmq_group_get_latency_by_storetime) by (broker,group,topic)) and (sum(rocketmq_group_get_latency_by_storetime) by (broker,group,topic) > 0 ) | 集群消费延时告警(大于0毫秒) |
rocketmq_message_accumulation > value | 消费堆积告警 |
sum(rocketmq_producer_message_size) by (topic) | 生产者每秒消息大小 |
sum(rocketmq_consumer_message_size) by (topic) | 消费者每秒消息大小 |
sum(rocketmq_brokeruntime_commitlog_disk_ratio) by (brokerIP) * 100 > 80 | 磁盘利用率大于80%(master) |
消费者堆积告警指标也是一个聚合指标,它根据消费堆积的聚合指标生成,value 这个阈值对每个消费者是不固定的,当前是根据过去 5 分钟生产者生产的消息数量来定,用户也可以根据实际情况自行设定该阈值。 告警指标设置的值只是个阈值只是象征性的值,用户可根据在实际使用 RocketMQ 的情况下自行设定。这里重点介绍一下消费者堆积告警指标,在以往的监控系统中,由于没有像 Prometheus 那样有强大的 PromQL 语言,在处理消费者告警问题时势必需要为每个消费者设置告警,那这样就需要 RocketMQ 系统的维护人员为每个消费者添加,要么在系统后台检测到有新的消费者创建时自动添加。在 Prometheus 中,这可以通过一条如下的语句来实现:
(sum(rocketmq_producer_offset) by (topic) - on(topic) group_right sum(rocketmq_consumer_offset) by (group,topic)) - ignoring(group) group_left sum (avg_over_time(rocketmq_producer_tps[5m])) by (topic)*5*60 > 0
[root@file-04 rules]# cat rocketmq.yml #rocketmq的告警规则
groups:
- name: RocketmqAlert
rules:
- alert: rocketmq_producer_tps_h_Alert
expr: sum(rocketmq_producer_tps) by (cluster) >= 10
for: 1m
labels:
severity: emergency
annotations:
summary: 'ROCKETMQ_CLUSTER {{ $labels.cluster }} too high; value: "{{$value}}"'
description: '{{ $labels.cluster }} 集群发送tps太高'
- alert: rocketmq_producer_tps_l_Alert
expr: sum(rocketmq_producer_tps) by (cluster) < 1
for: 1m
labels:
severity: emergency
annotations:
summary: 'ROCKETMQ_CLUSTER {{ $labels.cluster }} too low; value: "{{$value}}"'
description: '{{ $labels.cluster }} 集群发送tps太低'
- alert: rocketmq_consumer_tps_h_Alert
expr: sum(rocketmq_consumer_tps) by (cluster) >= 10
for: 1m
labels:
severity: emergency
annotations:
summary: 'ROCKETMQ_CLUSTER {{ $labels.cluster }} too high; value: "{{$value}}"'
description: '{{ $labels.cluster }} 集群消费tps太高'
- alert: rocketmq_consumer_tps_l_Alert
expr: sum(rocketmq_consumer_tps) by (cluster) < 1
for: 1m
labels:
severity: emergency
annotations:
summary: 'ROCKETMQ_CLUSTER {{ $labels.cluster }} too low; value: "{{$value}}"'
description: '{{ $labels.cluster }} 集群消费tps太低'
- alert: rocketmq_group_get_latency_by_storetime_Alert
expr: rocketmq_group_get_latency_by_storetime / 1000 / 60 >= 1
for: 1m
labels:
severity: emergency
annotations:
summary: 'ROCKETMQ_CLUSTER {{ $labels.cluster }} too high; value: "{{$value}}" min'
description: '{{ $labels.cluster }} 集群消费延时太高'
- alert: rocketmq_message_accumulation_Alert
expr: (sum(rocketmq_producer_offset) by (topic) - on(topic) group_right sum(rocketmq_consumer_offset) by (group,topic)) - ignoring(group) group_left sum (avg_over_time(rocketmq_producer_tps[5m])) by (topic)*5*60 > 0
for: 1m
labels:
severity: emergency
annotations:
summary: 'ROCKETMQ_CLUSTER {{ $labels.cluster }} too high; value: "{{$value}}"'
description: '{{ $labels.cluster }} 消费堆积太多了'
- alert: rocketmq_brokeruntime_commitlog_disk_ratio_Alert
expr: sum(rocketmq_brokeruntime_commitlog_disk_ratio) by (brokerIP) * 100 > 70
for: 1m
labels:
severity: emergency
annotations:
summary: 'ROCKETMQ_CLUSTER {{ $labels.brokerIP }} too high; value: "{{$value}}"'
description: '{{ $labels.brokerIP }} 磁盘利用率百分比'
Monitor RocketMQ by Prometheus RocketMQ Exporter
dashboard for rocketmq_exporter.
{
"__inputs": [
{
"name": "DS_PROMETHEUS",
"label": "Prometheus",
"description": "",
"type": "datasource",
"pluginId": "prometheus",
"pluginName": "Prometheus"
}
],
"__requires": [
{
"type": "grafana",
"id": "grafana",
"name": "Grafana",
"version": "5.4.3"
},
{
"type": "panel",
"id": "graph",
"name": "Graph",
"version": "5.0.0"
},
{
"type": "datasource",
"id": "prometheus",
"name": "Prometheus",
"version": "5.0.0"
}
],
"annotations": {
"list": [
{
"builtIn": 1,
"datasource": "-- Grafana --",
"enable": true,
"hide": true,
"iconColor": "rgba(0, 211, 255, 1)",
"name": "Annotations & Alerts",
"type": "dashboard"
}
]
},
"editable": true,
"gnetId": 10477,
"graphTooltip": 0,
"id": 1,
"links": [],
"panels": [
{
"aliasColors": {},
"bars": false,
"dashLength": 10,
"dashes": false,
"fill": 1,
"gridPos": {
"h": 6,
"w": 7,
"x": 0,
"y": 0
},
"id": 12,
"legend": {
"avg": false,
"current": false,
"max": false,
"min": false,
"rightSide": true,
"show": false,
"total": false,
"values": false
},
"lines": true,
"linewidth": 1,
"links": [],
"nullPointMode": "null",
"percentage": false,
"pointradius": 5,
"points": false,
"renderer": "flot",
"seriesOverrides": [],
"spaceLength": 10,
"stack": false,
"steppedLine": false,
"targets": [
{
"expr": "rocketmq_broker_tps",
"format": "time_series",
"intervalFactor": 1,
"refId": "A"
},
{
"expr": "rocketmq_broker_qps",
"format": "time_series",
"intervalFactor": 1,
"refId": "B"
}
],
"thresholds": [],
"timeFrom": null,
"timeRegions": [],
"timeShift": null,
"title": "broker tps & broker qps",
"tooltip": {
"shared": true,
"sort": 0,
"value_type": "individual"
},
"type": "graph",
"xaxis": {
"buckets": null,
"mode": "time",
"name": null,
"show": true,
"values": []
},
"yaxes": [
{
"format": "short",
"label": null,
"logBase": 1,
"max": null,
"min": null,
"show": true
},
{
"format": "short",
"label": null,
"logBase": 1,
"max": null,
"min": null,
"show": true
}
],
"yaxis": {
"align": false,
"alignLevel": null
}
},
{
"aliasColors": {},
"bars": false,
"dashLength": 10,
"dashes": false,
"fill": 1,
"gridPos": {
"h": 6,
"w": 9,
"x": 7,
"y": 0
},
"id": 8,
"legend": {
"avg": false,
"current": false,
"max": false,
"min": false,
"rightSide": true,
"show": false,
"total": false,
"values": false
},
"lines": true,
"linewidth": 1,
"links": [],
"nullPointMode": "null",
"percentage": false,
"pointradius": 5,
"points": false,
"renderer": "flot",
"seriesOverrides": [],
"spaceLength": 10,
"stack": false,
"steppedLine": false,
"targets": [
{
"expr": "rocketmq_consumer_offset",
"format": "time_series",
"intervalFactor": 1,
"refId": "A"
}
],
"thresholds": [],
"timeFrom": null,
"timeRegions": [],
"timeShift": null,
"title": "rocketmq_consumer_offset",
"tooltip": {
"shared": true,
"sort": 0,
"value_type": "individual"
},
"type": "graph",
"xaxis": {
"buckets": null,
"mode": "time",
"name": null,
"show": true,
"values": []
},
"yaxes": [
{
"format": "short",
"label": null,
"logBase": 1,
"max": null,
"min": null,
"show": true
},
{
"format": "short",
"label": null,
"logBase": 1,
"max": null,
"min": null,
"show": true
}
],
"yaxis": {
"align": false,
"alignLevel": null
}
},
{
"aliasColors": {},
"bars": false,
"dashLength": 10,
"dashes": false,
"fill": 1,
"gridPos": {
"h": 6,
"w": 8,
"x": 16,
"y": 0
},
"id": 16,
"legend": {
"avg": false,
"current": false,
"max": false,
"min": false,
"rightSide": true,
"show": false,
"total": false,
"values": false
},
"lines": true,
"linewidth": 1,
"links": [],
"nullPointMode": "null",
"percentage": false,
"pointradius": 5,
"points": false,
"renderer": "flot",
"seriesOverrides": [],
"spaceLength": 10,
"stack": false,
"steppedLine": false,
"targets": [
{
"expr": "rocketmq_consumer_message_size",
"format": "time_series",
"intervalFactor": 1,
"refId": "A"
}
],
"thresholds": [],
"timeFrom": null,
"timeRegions": [],
"timeShift": null,
"title": "rocketmq_consumer_message_size",
"tooltip": {
"shared": true,
"sort": 0,
"value_type": "individual"
},
"type": "graph",
"xaxis": {
"buckets": null,
"mode": "time",
"name": null,
"show": true,
"values": []
},
"yaxes": [
{
"format": "short",
"label": null,
"logBase": 1,
"max": null,
"min": null,
"show": true
},
{
"format": "short",
"label": null,
"logBase": 1,
"max": null,
"min": null,
"show": true
}
],
"yaxis": {
"align": false,
"alignLevel": null
}
},
{
"aliasColors": {},
"bars": false,
"dashLength": 10,
"dashes": false,
"decimals": null,
"fill": 1,
"gridPos": {
"h": 7,
"w": 7,
"x": 0,
"y": 6
},
"id": 6,
"legend": {
"avg": false,
"current": false,
"max": false,
"min": false,
"rightSide": true,
"show": false,
"total": false,
"values": false
},
"lines": true,
"linewidth": 1,
"links": [],
"nullPointMode": "null",
"percentage": false,
"pointradius": 5,
"points": false,
"renderer": "flot",
"seriesOverrides": [],
"spaceLength": 10,
"stack": false,
"steppedLine": false,
"targets": [
{
"expr": "rocketmq_producer_offset",
"format": "time_series",
"intervalFactor": 1,
"refId": "A"
}
],
"thresholds": [],
"timeFrom": null,
"timeRegions": [],
"timeShift": null,
"title": "rocketmq_producer_offset",
"tooltip": {
"shared": true,
"sort": 0,
"value_type": "individual"
},
"type": "graph",
"xaxis": {
"buckets": null,
"mode": "time",
"name": null,
"show": true,
"values": []
},
"yaxes": [
{
"format": "short",
"label": null,
"logBase": 1,
"max": null,
"min": null,
"show": true
},
{
"format": "short",
"label": null,
"logBase": 1,
"max": null,
"min": null,
"show": true
}
],
"yaxis": {
"align": false,
"alignLevel": null
}
},
{
"aliasColors": {},
"bars": false,
"dashLength": 10,
"dashes": false,
"datasource": "${DS_PROMETHEUS}",
"description": "消费tps",
"fill": 1,
"gridPos": {
"h": 7,
"w": 9,
"x": 7,
"y": 6
},
"id": 4,
"legend": {
"avg": false,
"current": false,
"max": false,
"min": false,
"rightSide": true,
"show": false,
"total": false,
"values": false
},
"lines": true,
"linewidth": 1,
"links": [],
"nullPointMode": "null",
"percentage": false,
"pointradius": 5,
"points": false,
"renderer": "flot",
"seriesOverrides": [],
"spaceLength": 10,
"stack": false,
"steppedLine": false,
"targets": [
{
"expr": "rocketmq_consumer_tps",
"format": "time_series",
"intervalFactor": 1,
"refId": "A"
}
],
"thresholds": [],
"timeFrom": null,
"timeRegions": [],
"timeShift": null,
"title": "rocketmq_consumer_tps",
"tooltip": {
"shared": true,
"sort": 0,
"value_type": "individual"
},
"type": "graph",
"xaxis": {
"buckets": null,
"mode": "time",
"name": null,
"show": true,
"values": []
},
"yaxes": [
{
"format": "short",
"label": null,
"logBase": 1,
"max": null,
"min": null,
"show": true
},
{
"format": "short",
"label": null,
"logBase": 1,
"max": null,
"min": null,
"show": true
}
],
"yaxis": {
"align": false,
"alignLevel": null
}
},
{
"aliasColors": {},
"bars": false,
"dashLength": 10,
"dashes": false,
"fill": 1,
"gridPos": {
"h": 7,
"w": 8,
"x": 16,
"y": 6
},
"id": 14,
"legend": {
"avg": false,
"current": false,
"max": false,
"min": false,
"rightSide": true,
"show": false,
"total": false,
"values": false
},
"lines": true,
"linewidth": 1,
"links": [],
"nullPointMode": "null",
"percentage": false,
"pointradius": 5,
"points": false,
"renderer": "flot",
"seriesOverrides": [],
"spaceLength": 10,
"stack": false,
"steppedLine": false,
"targets": [
{
"expr": "rocketmq_producer_message_size",
"format": "time_series",
"intervalFactor": 1,
"refId": "A"
}
],
"thresholds": [],
"timeFrom": null,
"timeRegions": [],
"timeShift": null,
"title": "rocketmq_producer_message_size",
"tooltip": {
"shared": true,
"sort": 0,
"value_type": "individual"
},
"type": "graph",
"xaxis": {
"buckets": null,
"mode": "time",
"name": null,
"show": true,
"values": []
},
"yaxes": [
{
"format": "short",
"label": null,
"logBase": 1,
"max": null,
"min": null,
"show": true
},
{
"format": "short",
"label": null,
"logBase": 1,
"max": null,
"min": null,
"show": true
}
],
"yaxis": {
"align": false,
"alignLevel": null
}
},
{
"aliasColors": {},
"bars": false,
"dashLength": 10,
"dashes": false,
"datasource": "${DS_PROMETHEUS}",
"fill": 1,
"gridPos": {
"h": 6,
"w": 7,
"x": 0,
"y": 13
},
"id": 2,
"legend": {
"alignAsTable": false,
"avg": false,
"current": false,
"max": false,
"min": false,
"rightSide": true,
"show": false,
"total": false,
"values": false
},
"lines": true,
"linewidth": 1,
"links": [],
"nullPointMode": "null",
"percentage": false,
"pointradius": 5,
"points": false,
"renderer": "flot",
"seriesOverrides": [],
"spaceLength": 10,
"stack": false,
"steppedLine": false,
"targets": [
{
"expr": "rocketmq_producer_tps",
"format": "time_series",
"hide": false,
"instant": false,
"intervalFactor": 1,
"refId": "A"
}
],
"thresholds": [],
"timeFrom": null,
"timeRegions": [],
"timeShift": null,
"title": "rocketmq_producer_tps",
"tooltip": {
"shared": true,
"sort": 0,
"value_type": "individual"
},
"type": "graph",
"xaxis": {
"buckets": null,
"mode": "time",
"name": null,
"show": true,
"values": []
},
"yaxes": [
{
"format": "short",
"label": null,
"logBase": 1,
"max": null,
"min": null,
"show": true
},
{
"format": "short",
"label": null,
"logBase": 1,
"max": null,
"min": null,
"show": true
}
],
"yaxis": {
"align": false,
"alignLevel": null
}
},
{
"aliasColors": {},
"bars": false,
"dashLength": 10,
"dashes": false,
"fill": 1,
"gridPos": {
"h": 6,
"w": 9,
"x": 7,
"y": 13
},
"id": 10,
"legend": {
"avg": false,
"current": false,
"max": false,
"min": false,
"rightSide": true,
"show": false,
"total": false,
"values": false
},
"lines": true,
"linewidth": 1,
"links": [],
"nullPointMode": "null",
"percentage": false,
"pointradius": 5,
"points": false,
"renderer": "flot",
"seriesOverrides": [],
"spaceLength": 10,
"stack": false,
"steppedLine": false,
"targets": [
{
"expr": "rocketmq_group_get_latency_by_storetime",
"format": "time_series",
"intervalFactor": 1,
"refId": "A"
}
],
"thresholds": [],
"timeFrom": null,
"timeRegions": [],
"timeShift": null,
"title": "rocketmq_group_get_latency_by_storetime",
"tooltip": {
"shared": true,
"sort": 0,
"value_type": "individual"
},
"type": "graph",
"xaxis": {
"buckets": null,
"mode": "time",
"name": null,
"show": true,
"values": []
},
"yaxes": [
{
"format": "short",
"label": null,
"logBase": 1,
"max": null,
"min": null,
"show": true
},
{
"format": "short",
"label": null,
"logBase": 1,
"max": null,
"min": null,
"show": true
}
],
"yaxis": {
"align": false,
"alignLevel": null
}
},
{
"aliasColors": {},
"bars": false,
"dashLength": 10,
"dashes": false,
"fill": 1,
"gridPos": {
"h": 6,
"w": 8,
"x": 16,
"y": 13
},
"id": 18,
"legend": {
"avg": false,
"current": false,
"max": false,
"min": false,
"rightSide": true,
"show": false,
"total": false,
"values": false
},
"lines": true,
"linewidth": 1,
"links": [],
"nullPointMode": "null",
"percentage": false,
"pointradius": 5,
"points": false,
"renderer": "flot",
"seriesOverrides": [],
"spaceLength": 10,
"stack": false,
"steppedLine": false,
"targets": [
{
"expr": "sum(rocketmq_producer_offset) by (topic) - on(topic) group_right sum(rocketmq_consumer_offset) by (group,topic)",
"format": "time_series",
"intervalFactor": 1,
"refId": "A"
}
],
"thresholds": [],
"timeFrom": null,
"timeRegions": [],
"timeShift": null,
"title": "rocketmq_message_accumulation",
"tooltip": {
"shared": true,
"sort": 0,
"value_type": "individual"
},
"type": "graph",
"xaxis": {
"buckets": null,
"mode": "time",
"name": null,
"show": true,
"values": []
},
"yaxes": [
{
"format": "short",
"label": null,
"logBase": 1,
"max": null,
"min": null,
"show": true
},
{
"format": "short",
"label": null,
"logBase": 1,
"max": null,
"min": null,
"show": true
}
],
"yaxis": {
"align": false,
"alignLevel": null
}
},
{
"aliasColors": {},
"bars": false,
"dashLength": 10,
"dashes": false,
"fill": 1,
"gridPos": {
"h": 7,
"w": 8,
"x": 0,
"y": 19
},
"id": 20,
"legend": {
"avg": false,
"current": false,
"max": false,
"min": false,
"show": false,
"total": false,
"values": false
},
"lines": true,
"linewidth": 1,
"links": [],
"nullPointMode": "null",
"percentage": false,
"pointradius": 5,
"points": false,
"renderer": "flot",
"seriesOverrides": [],
"spaceLength": 10,
"stack": false,
"steppedLine": false,
"targets": [
{
"expr": "rocketmq_brokeruntime_pmdt_0ms",
"format": "time_series",
"intervalFactor": 1,
"refId": "A"
},
{
"expr": "rocketmq_brokeruntime_pmdt_0to10ms",
"format": "time_series",
"intervalFactor": 1,
"refId": "B"
},
{
"expr": "rocketmq_brokeruntime_pmdt_10to50ms",
"format": "time_series",
"intervalFactor": 1,
"refId": "C"
},
{
"expr": "rocketmq_brokeruntime_pmdt_50to100ms",
"format": "time_series",
"intervalFactor": 1,
"refId": "D"
},
{
"expr": "rocketmq_brokeruntime_pmdt_100to200ms",
"format": "time_series",
"intervalFactor": 1,
"refId": "E"
},
{
"expr": "rocketmq_brokeruntime_pmdt_200to500ms",
"format": "time_series",
"intervalFactor": 1,
"refId": "F"
},
{
"expr": "rocketmq_brokeruntime_pmdt_500to1s",
"format": "time_series",
"intervalFactor": 1,
"refId": "G"
},
{
"expr": "rocketmq_brokeruntime_pmdt_1to2s",
"format": "time_series",
"intervalFactor": 1,
"refId": "H"
},
{
"expr": "rocketmq_brokeruntime_pmdt_2to3s",
"format": "time_series",
"intervalFactor": 1,
"refId": "I"
},
{
"expr": "rocketmq_brokeruntime_pmdt_3to4s",
"format": "time_series",
"intervalFactor": 1,
"refId": "J"
},
{
"expr": "rocketmq_brokeruntime_pmdt_4to5s",
"format": "time_series",
"intervalFactor": 1,
"refId": "K"
},
{
"expr": "rocketmq_brokeruntime_pmdt_5to10s",
"format": "time_series",
"intervalFactor": 1,
"refId": "L"
},
{
"expr": "rocketmq_brokeruntime_pmdt_10stomore",
"format": "time_series",
"intervalFactor": 1,
"refId": "M"
}
],
"thresholds": [],
"timeFrom": null,
"timeRegions": [],
"timeShift": null,
"title": "PutMessageDistributeTime",
"tooltip": {
"shared": true,
"sort": 0,
"value_type": "individual"
},
"type": "graph",
"xaxis": {
"buckets": null,
"mode": "time",
"name": null,
"show": true,
"values": []
},
"yaxes": [
{
"decimals": null,
"format": "short",
"label": "",
"logBase": 1,
"max": null,
"min": null,
"show": true
},
{
"format": "short",
"label": null,
"logBase": 1,
"max": null,
"min": null,
"show": true
}
],
"yaxis": {
"align": false,
"alignLevel": null
}
},
{
"aliasColors": {},
"bars": false,
"dashLength": 10,
"dashes": false,
"fill": 1,
"gridPos": {
"h": 7,
"w": 8,
"x": 8,
"y": 19
},
"id": 28,
"legend": {
"alignAsTable": false,
"avg": false,
"current": false,
"max": false,
"min": false,
"rightSide": false,
"show": false,
"total": false,
"values": false
},
"lines": true,
"linewidth": 1,
"links": [],
"nullPointMode": "null",
"percentage": false,
"pointradius": 5,
"points": false,
"renderer": "flot",
"seriesOverrides": [],
"spaceLength": 10,
"stack": false,
"steppedLine": false,
"targets": [
{
"expr": "rocketmq_brokeruntime_pull_threadpoolqueue_headwait_timemills",
"format": "time_series",
"intervalFactor": 1,
"refId": "A"
},
{
"expr": "rocketmq_brokeruntime_query_threadpoolqueue_headwait_timemills",
"format": "time_series",
"intervalFactor": 1,
"refId": "B"
},
{
"expr": "rocketmq_brokeruntime_send_threadpoolqueue_headwait_timemills",
"format": "time_series",
"intervalFactor": 1,
"refId": "C"
}
],
"thresholds": [],
"timeFrom": null,
"timeRegions": [],
"timeShift": null,
"title": "threadpoolqueue_headwait_timemills",
"tooltip": {
"shared": true,
"sort": 0,
"value_type": "individual"
},
"type": "graph",
"xaxis": {
"buckets": null,
"mode": "time",
"name": null,
"show": true,
"values": []
},
"yaxes": [
{
"format": "short",
"label": null,
"logBase": 1,
"max": null,
"min": null,
"show": true
},
{
"format": "short",
"label": null,
"logBase": 1,
"max": null,
"min": null,
"show": true
}
],
"yaxis": {
"align": false,
"alignLevel": null
}
},
{
"aliasColors": {},
"bars": false,
"dashLength": 10,
"dashes": false,
"fill": 1,
"gridPos": {
"h": 7,
"w": 8,
"x": 16,
"y": 19
},
"id": 30,
"legend": {
"avg": false,
"current": false,
"max": false,
"min": false,
"show": false,
"total": false,
"values": false
},
"lines": true,
"linewidth": 1,
"links": [],
"nullPointMode": "null",
"percentage": false,
"pointradius": 5,
"points": false,
"renderer": "flot",
"seriesOverrides": [],
"spaceLength": 10,
"stack": false,
"steppedLine": false,
"targets": [
{
"expr": "rocketmq_client_consume_fail_msg_count",
"format": "time_series",
"intervalFactor": 1,
"refId": "A"
},
{
"expr": "rocketmq_client_consume_fail_msg_tps",
"format": "time_series",
"intervalFactor": 1,
"refId": "B"
},
{
"expr": "rocketmq_client_consume_ok_msg_tps",
"format": "time_series",
"intervalFactor": 1,
"refId": "C"
},
{
"expr": "rocketmq_client_consume_rt",
"format": "time_series",
"intervalFactor": 1,
"refId": "D"
},
{
"expr": "rocketmq_client_consumer_pull_rt",
"format": "time_series",
"intervalFactor": 1,
"refId": "E"
},
{
"expr": "rocketmq_client_consumer_pull_tps",
"format": "time_series",
"intervalFactor": 1,
"refId": "F"
}
],
"thresholds": [],
"timeFrom": null,
"timeRegions": [],
"timeShift": null,
"title": "consume client info",
"tooltip": {
"shared": true,
"sort": 0,
"value_type": "individual"
},
"type": "graph",
"xaxis": {
"buckets": null,
"mode": "time",
"name": null,
"show": true,
"values": []
},
"yaxes": [
{
"format": "short",
"label": null,
"logBase": 1,
"max": null,
"min": null,
"show": true
},
{
"format": "short",
"label": null,
"logBase": 1,
"max": null,
"min": null,
"show": true
}
],
"yaxis": {
"align": false,
"alignLevel": null
}
},
{
"aliasColors": {},
"bars": false,
"dashLength": 10,
"dashes": false,
"fill": 1,
"gridPos": {
"h": 8,
"w": 8,
"x": 0,
"y": 26
},
"id": 26,
"legend": {
"avg": false,
"current": false,
"max": false,
"min": false,
"show": false,
"total": false,
"values": false
},
"lines": true,
"linewidth": 1,
"links": [],
"nullPointMode": "null",
"percentage": false,
"pointradius": 5,
"points": false,
"renderer": "flot",
"seriesOverrides": [],
"spaceLength": 10,
"stack": false,
"steppedLine": false,
"targets": [
{
"expr": "rocketmq_brokeruntime_getfound_tps10",
"format": "time_series",
"intervalFactor": 1,
"refId": "A"
},
{
"expr": "rocketmq_brokeruntime_gettotal_tps10",
"format": "time_series",
"intervalFactor": 1,
"refId": "B"
},
{
"expr": "rocketmq_brokeruntime_gettransfered_tps10",
"format": "time_series",
"intervalFactor": 1,
"refId": "C"
},
{
"expr": "rocketmq_brokeruntime_getmiss_tps10",
"format": "time_series",
"intervalFactor": 1,
"refId": "D"
},
{
"expr": "rocketmq_brokeruntime_put_tps10",
"format": "time_series",
"intervalFactor": 1,
"refId": "E"
}
],
"thresholds": [],
"timeFrom": null,
"timeRegions": [],
"timeShift": null,
"title": "runtime tps",
"tooltip": {
"shared": true,
"sort": 0,
"value_type": "individual"
},
"type": "graph",
"xaxis": {
"buckets": null,
"mode": "time",
"name": null,
"show": true,
"values": []
},
"yaxes": [
{
"format": "short",
"label": null,
"logBase": 1,
"max": null,
"min": null,
"show": true
},
{
"format": "short",
"label": null,
"logBase": 1,
"max": null,
"min": null,
"show": true
}
],
"yaxis": {
"align": false,
"alignLevel": null
}
},
{
"aliasColors": {},
"bars": false,
"dashLength": 10,
"dashes": false,
"fill": 1,
"gridPos": {
"h": 8,
"w": 8,
"x": 8,
"y": 26
},
"id": 24,
"legend": {
"avg": false,
"current": false,
"max": false,
"min": false,
"show": false,
"total": false,
"values": false
},
"lines": true,
"linewidth": 1,
"links": [],
"nullPointMode": "null",
"percentage": false,
"pointradius": 5,
"points": false,
"renderer": "flot",
"seriesOverrides": [],
"spaceLength": 10,
"stack": false,
"steppedLine": false,
"targets": [
{
"expr": "rocketmq_brokeruntime_commitlog_disk_ratio",
"format": "time_series",
"intervalFactor": 1,
"refId": "A"
},
{
"expr": "rocketmq_brokeruntime_consumequeue_disk_ratio",
"format": "time_series",
"intervalFactor": 1,
"refId": "B"
},
{
"expr": "rocketmq_brokeruntime_commitlogdir_capacity_free",
"format": "time_series",
"intervalFactor": 1,
"refId": "C"
},
{
"expr": "rocketmq_brokeruntime_commitlogdir_capacity_total",
"format": "time_series",
"intervalFactor": 1,
"refId": "D"
},
{
"expr": "rocketmq_brokeruntime_commitlog_maxoffset",
"format": "time_series",
"intervalFactor": 1,
"refId": "E"
},
{
"expr": "rocketmq_brokeruntime_commitlog_minoffset",
"format": "time_series",
"intervalFactor": 1,
"refId": "F"
}
],
"thresholds": [],
"timeFrom": null,
"timeRegions": [],
"timeShift": null,
"title": "disk space",
"tooltip": {
"shared": true,
"sort": 0,
"value_type": "individual"
},
"type": "graph",
"xaxis": {
"buckets": null,
"mode": "time",
"name": null,
"show": true,
"values": []
},
"yaxes": [
{
"format": "short",
"label": null,
"logBase": 1,
"max": null,
"min": null,
"show": true
},
{
"format": "short",
"label": null,
"logBase": 1,
"max": null,
"min": null,
"show": true
}
],
"yaxis": {
"align": false,
"alignLevel": null
}
},
{
"aliasColors": {},
"bars": false,
"dashLength": 10,
"dashes": false,
"fill": 1,
"gridPos": {
"h": 8,
"w": 8,
"x": 16,
"y": 26
},
"id": 22,
"legend": {
"avg": false,
"current": false,
"max": false,
"min": false,
"show": false,
"total": false,
"values": false
},
"lines": true,
"linewidth": 1,
"links": [],
"nullPointMode": "null",
"percentage": false,
"pointradius": 5,
"points": false,
"renderer": "flot",
"seriesOverrides": [],
"spaceLength": 10,
"stack": false,
"steppedLine": false,
"targets": [
{
"expr": "rocketmq_brokeruntime_msg_put_total_today_now",
"format": "time_series",
"intervalFactor": 1,
"refId": "A"
},
{
"expr": "rocketmq_brokeruntime_msg_gettotal_today_now",
"format": "time_series",
"intervalFactor": 1,
"refId": "B"
},
{
"expr": "rocketmq_brokeruntime_dispatch_behind_bytes",
"format": "time_series",
"intervalFactor": 1,
"refId": "C"
},
{
"expr": "rocketmq_brokeruntime_put_message_size_total",
"format": "time_series",
"intervalFactor": 1,
"refId": "D"
},
{
"expr": "rocketmq_brokeruntime_put_message_average_size",
"format": "time_series",
"intervalFactor": 1,
"refId": "E"
},
{
"expr": "rocketmq_brokeruntime_msg_gettotal_yesterdaymorning",
"format": "time_series",
"intervalFactor": 1,
"refId": "F"
},
{
"expr": "rocketmq_brokeruntime_msg_puttotal_yesterdaymorning",
"format": "time_series",
"intervalFactor": 1,
"refId": "G"
},
{
"expr": "rocketmq_brokeruntime_msg_gettotal_todaymorning",
"format": "time_series",
"intervalFactor": 1,
"refId": "H"
},
{
"expr": "rocketmq_brokeruntime_msg_puttotal_todaymorning",
"format": "time_series",
"intervalFactor": 1,
"refId": "I"
}
],
"thresholds": [],
"timeFrom": null,
"timeRegions": [],
"timeShift": null,
"title": "broker runtime info",
"tooltip": {
"shared": true,
"sort": 0,
"value_type": "individual"
},
"type": "graph",
"xaxis": {
"buckets": null,
"mode": "time",
"name": null,
"show": true,
"values": []
},
"yaxes": [
{
"format": "short",
"label": null,
"logBase": 1,
"max": null,
"min": null,
"show": true
},
{
"format": "short",
"label": null,
"logBase": 1,
"max": null,
"min": null,
"show": true
}
],
"yaxis": {
"align": false,
"alignLevel": null
}
}
],
"refresh": false,
"schemaVersion": 16,
"style": "dark",
"tags": [],
"templating": {
"list": []
},
"time": {
"from": "now-1h",
"to": "now"
},
"timepicker": {
"refresh_intervals": [
"5s",
"10s",
"30s",
"1m",
"5m",
"15m",
"30m",
"1h",
"2h",
"1d"
],
"time_options": [
"5m",
"15m",
"1h",
"6h",
"12h",
"24h",
"2d",
"7d",
"30d"
]
},
"timezone": "",
"title": "Rocketmq_dashboard",
"uid": "zkVx1w_iz",
"version": 24
}
jvm/os性能调优
JVM选项,以下抄自官网:
https://rocketmq.apache.org/zh/docs/4.x/bestPractice/19JVMOS
推荐使用最新发布的 JDK 1.8 版本。通过设置相同的 Xms 和 Xmx 值来防止 JVM 调整堆大小以获得更好的性能。生产环境 JVM 配置如下所示:
-server -Xms8g -Xmx8g -Xmn4g
当 JVM 是默认 8 字节对齐,建议配置最大堆内存不要超过 32 G,否则会影响 JVM 的指针压缩技术,浪费内存。
如果不关心 RocketMQ Broker的启动时间,通过"预触摸" Java 堆以确保在 JVM 初始化期间每个页面都将被分配。
那些不关心启动时间的人可以启用它:
-XX:+AlwaysPreTouch
禁用偏置锁定可能会减少 JVM 暂停:
-XX:-UseBiasedLocking
垃圾回收,建议使用 JDK 1.8 自带的 G1 收集器:
-XX:+UseG1GC
-XX:G1HeapRegionSize=16m
-XX:G1ReservePercent=25
-XX:InitiatingHeapOccupancyPercent=30
这些 GC 选项看起来有点激进,但事实证明它在我们的生产环境中具有良好的性能。
另外不要把-XX:MaxGCPauseMillis
的值设置太小,否则JVM将使用一个小的年轻代来实现这个目标,这将导致非常频繁的minor GC,所以建议使用rolling GC日志文件:
-XX:+UseGCLogFileRotation
-XX:NumberOfGCLogFiles=5
-XX:GCLogFileSize=30m
如果写入 GC 文件会增加代理的延迟,可以考虑将 GC 日志文件重定向到内存文件系统:
-Xloggc:/dev/shm/mq_gc_%p.log123
Linux内核参数,以下抄自官网:
os.sh
脚本在 bin 文件夹中列出了许多内核参数,可以进行微小的更改然后用于生产用途。下面的参数需要注意,更多细节请参考/proc/sys/vm/*
的 文档
vm.extra_free_kbytes
告诉VM在后台回收(kswapd)启动的阈值与直接回收(通过分配进程)的阈值之间保留额外的可用内存。RocketMQ使用此参数来避免内存分配中的长延迟。(与具体内核版本相关)vm.min_free_kbytes
如果将其设置为低于1024KB,将会巧妙的将系统破坏,并且系统在高负载下容易出现死锁。vm.max_map_count
限制一个进程可能具有的最大内存映射区域数。RocketMQ将使用mmap加载CommitLog和ConsumeQueue,因此建议将为此参数设置较大的值。(agressiveness --> aggressiveness)vm.swappiness
定义内核交换内存页面的积极程度。较高的值会增加攻击性,较低的值会减少交换量。建议将值设置为10来避免交换延迟。File descriptor limits
RocketMQ需要为文件(CommitLog和ConsumeQueue)和网络连接打开文件描述符。我们建议设置文件描述符的值为655350。Disk scheduler
RocketMQ建议使用I/O截止时间调度器,它试图为请求提供有保证的延迟。
常见问题
https://help.aliyun.com/document_detail/165006.html
免责声明: 本文部分内容转自网络文章,转载此文章仅为个人收藏,分享知识,如有侵权,请联系博主进行删除。