博客
关于我
强烈建议你试试无所不能的chatGPT,快点击我
rocketmq-broker解读
阅读量:4144 次
发布时间:2019-05-25

本文共 4999 字,大约阅读时间需要 16 分钟。

1.broker启动流程

2.broker各项服务
3.broker接收消息分发消息流程
4.请求处理器介绍
5.部分问题思考

1.broker启动流程

1.启动的代码调用顺序

BrokerStartup->main0->createBrokerController(args)->BrokerController.start()
2.createBrokerController展开介绍

final BrokerConfig brokerConfig = new BrokerConfig();//broker客户端的所有配置信息 final NettyServerConfig nettyServerConfig = new NettyServerConfig();//netty服务端配置信息 final NettyClientConfig nettyClientConfig = new NettyClientConfig();//nett客户端配置信息 final MessageStoreConfig messageStoreConfig = new MessageStoreConfig();//消息存储配置信息 final BrokerController controller = new BrokerController(//broer控制类创建 (1)                brokerConfig,                nettyServerConfig,                nettyClientConfig,                messageStoreConfig); `boolean initResult = controller.initialize();``//broker控制类初始化 (2)

对上述(1)部分展开(只列出了主要功能):

this.brokerConfig = brokerConfig;//broker配置        this.nettyServerConfig = nettyServerConfig;//netty配置        this.nettyClientConfig = nettyClientConfig;//netty配置        this.messageStoreConfig = messageStoreConfig;//消息存储配置        this.consumerOffsetManager = new ConsumerOffsetManager(this);//消息消费指针管理        this.topicConfigManager = new TopicConfigManager(this);//topic管理        this.pullMessageProcessor = new PullMessageProcessor(this);//消费者拉取消息处理器        this.pullRequestHoldService = new PullRequestHoldService(this);//消费者long-poll请求hold服务        this.messageArrivingListener = new NotifyMessageArrivingListener(this.pullRequestHoldService);//long-poll即时传输监听        this.consumerIdsChangeListener = new DefaultConsumerIdsChangeListener(this);//消费者变更监听        this.consumerManager = new ConsumerManager(this.consumerIdsChangeListener);//消费者管理        this.consumerFilterManager = new ConsumerFilterManager(this);//消费者过滤管理        this.producerManager = new ProducerManager();//生产者管理        this.subscriptionGroupManager = new SubscriptionGroupManager(this);//订阅管理        this.filterServerManager = new FilterServerManager(this);//过滤服务器管理        this.slaveSynchronize = new SlaveSynchronize(this);//slave节点同步        this.brokerStatsManager = new BrokerStatsManager(this.brokerConfig.getBrokerClusterName());//统计信息管理

对上述(2)部分展开:

boolean result = this.topicConfigManager.load();//加载topic信息        result = result && this.consumerOffsetManager.load();//加载消费指针信息        result = result && this.subscriptionGroupManager.load();//加载订阅信息        result = result && this.consumerFilterManager.load();//加载过滤器信息        this.messageStore =              new DefaultMessageStore(this.messageStoreConfig, this.brokerStatsManager, this.messageArrivingListener,                  this.brokerConfig);//构造存储服务类                this.brokerStats = new BrokerStats((DefaultMessageStore) this.messageStore);//统计服务                       result = result && this.messageStore.load();//消息存储加载        if (result) {            this.remotingServer = new NettyRemotingServer(this.nettyServerConfig, this.clientHousekeepingService);//netty服务            this.sendMessageExecutor = new BrokerFixedThreadPoolExecutor//发送消息线程池            this.pullMessageExecutor = new BrokerFixedThreadPoolExecutor//拉取消息线程池            this.queryMessageExecutor = new BrokerFixedThreadPoolExecutor//查询消息线程池            this.adminBrokerExecutor =                Executors.newFixedThreadPool(this.brokerConfig.getAdminBrokerThreadPoolNums(), new ThreadFactoryImpl(                    "AdminBrokerThread_"));//管理端服务线程池            this.clientManageExecutor = new ThreadPoolExecutor//客户端管理线程池            this.heartbeatExecutor = new BrokerFixedThreadPoolExecutor//心跳维持线程            this.consumerManageExecutor //消费者管理线程池            this.registerProcessor();//这一步注册请求处理器和绑定线程池**(3)**           //下面都是定时任务,只摘取重要部分展示BrokerController.this.getBrokerStats().record();//统计信息BrokerController.this.consumerOffsetManager.persist();//消费指针信息持久化BrokerController.this.consumerFilterManager.persist();     //过滤器支持持久化BrokerController.this.slaveSynchronize.syncAll();//slave同步

3.start()展开说明

this.messageStore.start();//消息存储启动 this.remotingServer.start();//服务启动 this.filterServerManager.start();//过滤管理启动 BrokerController.this.registerBrokerAll(true, false, brokerConfig.isForceRegister());//往nameserver上注册broker

2.broker各项服务

主要特别点一下主要服务(参照第一部分):

broker启动后有定时任务往nameserver上注册broker信息
消息队列存储服务
消费端管理服务
生产端管理服务
admin管理服务
broker统计服务
定时拉取nameserver配置服务

3.broker接收消息分发消息流程

broker接收到消息后,写入commitlog日志文件中,同时按照不同的配置进行刷新硬盘和给slave同步

异步线程reput把commotlog日志中的消息,划分到各个逻辑队列中,每条消息在逻辑队列中保存为一个20字节的结构:在commitlog中的offset(long),消息的长度(int),tagid(long),如果有长轮询链接,直接传输该消息
客户端发起pull,查询消费队列表,获取消费指针,发送消息给消费端

4.请求处理器介绍

PullMessageProcessor//消费端拉取消息处理

QueryMessageProcessor//管理端查询消息处理
SendMessageProcessor//生产端发送消息处理以及消费端回退消息处理

5.部分问题思考

1.nameserver动态添加后broker无法注册???

建议采用http的nameserver地址配置
2.broker动态添加会影响以前的队列和已有的主题嘛?
不影响
3.broker宕机会丢失消息吗?
建议采用同步双写保证不丢消息
4.动态加入消费者到组中,会有什么问题?
因为以前的消费者有自己的拉取消息缓存,并且队列分配时定时任务,大多数情况下是存在重复消费问题的.
5.动态加入生产者会有问题吗?
没有问题
6.对消费端和服务端有什么建议?
尽量再单个应用中采用一个实例名称,其次尽量保证单个应用部署在单独的服务器上.主要是因为rocketmq中大量采用线程池和定时线程,对服务器资源消耗严重.不可避免出现频繁的线程切换.应尽量减少线程数量和使用公用线程,其次对可配置的服务线程应进行合理的配置.

转载地址:http://rbuti.baihongyu.com/

你可能感兴趣的文章
Template模式
查看>>
Observer模式
查看>>
高性能服务器设计
查看>>
性能扩展问题要趁早
查看>>
MySQL-数据库、数据表结构操作(SQL)
查看>>
OpenLDAP for Windows 安装手册(2.4.26版)
查看>>
图文介绍openLDAP在windows上的安装配置
查看>>
Pentaho BI开源报表系统
查看>>
Pentaho 开发: 在eclipse中构建Pentaho BI Server工程
查看>>
JSP的内置对象及方法
查看>>
android中SharedPreferences的简单例子
查看>>
android中使用TextView来显示某个网址的内容,使用<ScrollView>来生成下拉列表框
查看>>
andorid里关于wifi的分析
查看>>
Spring MVC和Struts2的比较
查看>>
Hibernate和IBatis对比
查看>>
Spring MVC 教程,快速入门,深入分析
查看>>
Android 的source (需安装 git repo)
查看>>
Commit our mod to our own repo server
查看>>
LOCAL_PRELINK_MODULE和prelink-linux-arm.map
查看>>
Simple Guide to use the gdb tool in Android environment
查看>>