本文共 4999 字,大约阅读时间需要 16 分钟。
1.broker启动流程
2.broker各项服务 3.broker接收消息分发消息流程 4.请求处理器介绍 5.部分问题思考1.启动的代码调用顺序
BrokerStartup->main0->createBrokerController(args)->BrokerController.start() 2.createBrokerController展开介绍final BrokerConfig brokerConfig = new BrokerConfig();//broker客户端的所有配置信息 final NettyServerConfig nettyServerConfig = new NettyServerConfig();//netty服务端配置信息 final NettyClientConfig nettyClientConfig = new NettyClientConfig();//nett客户端配置信息 final MessageStoreConfig messageStoreConfig = new MessageStoreConfig();//消息存储配置信息 final BrokerController controller = new BrokerController(//broer控制类创建 (1) brokerConfig, nettyServerConfig, nettyClientConfig, messageStoreConfig); `boolean initResult = controller.initialize();``//broker控制类初始化 (2)
对上述(1)部分展开(只列出了主要功能):
this.brokerConfig = brokerConfig;//broker配置 this.nettyServerConfig = nettyServerConfig;//netty配置 this.nettyClientConfig = nettyClientConfig;//netty配置 this.messageStoreConfig = messageStoreConfig;//消息存储配置 this.consumerOffsetManager = new ConsumerOffsetManager(this);//消息消费指针管理 this.topicConfigManager = new TopicConfigManager(this);//topic管理 this.pullMessageProcessor = new PullMessageProcessor(this);//消费者拉取消息处理器 this.pullRequestHoldService = new PullRequestHoldService(this);//消费者long-poll请求hold服务 this.messageArrivingListener = new NotifyMessageArrivingListener(this.pullRequestHoldService);//long-poll即时传输监听 this.consumerIdsChangeListener = new DefaultConsumerIdsChangeListener(this);//消费者变更监听 this.consumerManager = new ConsumerManager(this.consumerIdsChangeListener);//消费者管理 this.consumerFilterManager = new ConsumerFilterManager(this);//消费者过滤管理 this.producerManager = new ProducerManager();//生产者管理 this.subscriptionGroupManager = new SubscriptionGroupManager(this);//订阅管理 this.filterServerManager = new FilterServerManager(this);//过滤服务器管理 this.slaveSynchronize = new SlaveSynchronize(this);//slave节点同步 this.brokerStatsManager = new BrokerStatsManager(this.brokerConfig.getBrokerClusterName());//统计信息管理
对上述(2)部分展开:
boolean result = this.topicConfigManager.load();//加载topic信息 result = result && this.consumerOffsetManager.load();//加载消费指针信息 result = result && this.subscriptionGroupManager.load();//加载订阅信息 result = result && this.consumerFilterManager.load();//加载过滤器信息 this.messageStore = new DefaultMessageStore(this.messageStoreConfig, this.brokerStatsManager, this.messageArrivingListener, this.brokerConfig);//构造存储服务类 this.brokerStats = new BrokerStats((DefaultMessageStore) this.messageStore);//统计服务 result = result && this.messageStore.load();//消息存储加载 if (result) { this.remotingServer = new NettyRemotingServer(this.nettyServerConfig, this.clientHousekeepingService);//netty服务 this.sendMessageExecutor = new BrokerFixedThreadPoolExecutor//发送消息线程池 this.pullMessageExecutor = new BrokerFixedThreadPoolExecutor//拉取消息线程池 this.queryMessageExecutor = new BrokerFixedThreadPoolExecutor//查询消息线程池 this.adminBrokerExecutor = Executors.newFixedThreadPool(this.brokerConfig.getAdminBrokerThreadPoolNums(), new ThreadFactoryImpl( "AdminBrokerThread_"));//管理端服务线程池 this.clientManageExecutor = new ThreadPoolExecutor//客户端管理线程池 this.heartbeatExecutor = new BrokerFixedThreadPoolExecutor//心跳维持线程 this.consumerManageExecutor //消费者管理线程池 this.registerProcessor();//这一步注册请求处理器和绑定线程池**(3)** //下面都是定时任务,只摘取重要部分展示BrokerController.this.getBrokerStats().record();//统计信息BrokerController.this.consumerOffsetManager.persist();//消费指针信息持久化BrokerController.this.consumerFilterManager.persist(); //过滤器支持持久化BrokerController.this.slaveSynchronize.syncAll();//slave同步
3.start()展开说明
this.messageStore.start();//消息存储启动 this.remotingServer.start();//服务启动 this.filterServerManager.start();//过滤管理启动 BrokerController.this.registerBrokerAll(true, false, brokerConfig.isForceRegister());//往nameserver上注册broker
主要特别点一下主要服务(参照第一部分):
broker启动后有定时任务往nameserver上注册broker信息 消息队列存储服务 消费端管理服务 生产端管理服务 admin管理服务 broker统计服务 定时拉取nameserver配置服务broker接收到消息后,写入commitlog日志文件中,同时按照不同的配置进行刷新硬盘和给slave同步
异步线程reput把commotlog日志中的消息,划分到各个逻辑队列中,每条消息在逻辑队列中保存为一个20字节的结构:在commitlog中的offset(long),消息的长度(int),tagid(long),如果有长轮询链接,直接传输该消息 客户端发起pull,查询消费队列表,获取消费指针,发送消息给消费端PullMessageProcessor//消费端拉取消息处理
QueryMessageProcessor//管理端查询消息处理 SendMessageProcessor//生产端发送消息处理以及消费端回退消息处理1.nameserver动态添加后broker无法注册???
建议采用http的nameserver地址配置 2.broker动态添加会影响以前的队列和已有的主题嘛? 不影响 3.broker宕机会丢失消息吗? 建议采用同步双写保证不丢消息 4.动态加入消费者到组中,会有什么问题? 因为以前的消费者有自己的拉取消息缓存,并且队列分配时定时任务,大多数情况下是存在重复消费问题的. 5.动态加入生产者会有问题吗? 没有问题 6.对消费端和服务端有什么建议? 尽量再单个应用中采用一个实例名称,其次尽量保证单个应用部署在单独的服务器上.主要是因为rocketmq中大量采用线程池和定时线程,对服务器资源消耗严重.不可避免出现频繁的线程切换.应尽量减少线程数量和使用公用线程,其次对可配置的服务线程应进行合理的配置.转载地址:http://rbuti.baihongyu.com/