RocketMQ生产者流程篇

栏目: 后端 · 发布时间: 7年前

内容简介:生产者向消息队列里面写入消息,不同的业务场景会采用不同的写入策略,比如:同步发送,异步发送,延迟发送,事务消息等;本文首先从分析生产者发送消息的流程开始,然后再来介绍各种发送消息的策略。生产者首先需要设置namesrv,或者指定其他方式更新namesrv;然后从namesrv获取topic的路由信息,路由信息包括broker以及Message Queue等信息,同时将路由信息保存在本地内存中,方便下次使用;最后从Message Queue列表中选择合适的Queue发送消息,实现负载均衡;DefaultMQP

前言

生产者向消息队列里面写入消息,不同的业务场景会采用不同的写入策略,比如:同步发送,异步发送,延迟发送,事务消息等;本文首先从分析生产者发送消息的流程开始,然后再来介绍各种发送消息的策略。

生产者流程

1.流程概述

生产者首先需要设置namesrv,或者指定其他方式更新namesrv;然后从namesrv获取topic的路由信息,路由信息包括broker以及Message Queue等信息,同时将路由信息保存在本地内存中,方便下次使用;最后从Message Queue列表中选择合适的Queue发送消息,实现负载均衡;

2.启动过程

DefaultMQProducer实例化提供了两个参数分别是:生产者组名称以及RPCHook,RPCHook是一个接口,具体实现交由业务端实现,两个方法分别是:doBeforeRequest和doAfterResponse,表示在执行请求之前和接收返回之后分别执行相关逻辑;

接下来就是调用DefaultMQProducer的start方法,相关的初始化操作都在里面进行,内部其实调用的是DefaultMQProducerImpl的start方法,具体代码如下:

public void start(final boolean startFactory) throws MQClientException {
        switch (this.serviceState) {
            case CREATE_JUST:
                this.serviceState = ServiceState.START_FAILED;

                this.checkConfig();

                if (!this.defaultMQProducer.getProducerGroup().equals(MixAll.CLIENT_INNER_PRODUCER_GROUP)) {
                    this.defaultMQProducer.changeInstanceNameToPID();
                }

                this.mQClientFactory = MQClientManager.getInstance().getAndCreateMQClientInstance(this.defaultMQProducer, rpcHook);

                boolean registerOK = mQClientFactory.registerProducer(this.defaultMQProducer.getProducerGroup(), this);
                if (!registerOK) {
                    this.serviceState = ServiceState.CREATE_JUST;
                    throw new MQClientException("The producer group[" + this.defaultMQProducer.getProducerGroup()
                        + "] has been created before, specify another name please." + FAQUrl.suggestTodo(FAQUrl.GROUP_NAME_DUPLICATE_URL),
                        null);
                }

                this.topicPublishInfoTable.put(this.defaultMQProducer.getCreateTopicKey(), new TopicPublishInfo());

                if (startFactory) {
                    mQClientFactory.start();
                }

                log.info("the producer [{}] start OK. sendMessageWithVIPChannel={}", this.defaultMQProducer.getProducerGroup(),
                    this.defaultMQProducer.isSendMessageWithVIPChannel());
                this.serviceState = ServiceState.RUNNING;
                break;
            case RUNNING:
            case START_FAILED:
            case SHUTDOWN_ALREADY:
                throw new MQClientException("The producer service state not OK, maybe started once, "
                    + this.serviceState
                    + FAQUrl.suggestTodo(FAQUrl.CLIENT_SERVICE_NOT_OK),
                    null);
            default:
                break;
        }

        this.mQClientFactory.sendHeartbeatToAllBrokerWithLock();
    }

默认serviceState的状态为CREATE_JUST,刚进入设置为START_FAILED,初始化完成之后设置为RUNNING,再次初始化时会直接报错,下面看一下具体初始化了哪些信息;

2.1检查配置

这里的检查其实就是对producerGroup进行合法性校验;

2.2设置instanceName

如果producerGroup不等于默认的"CLIENT_INNER_PRODUCER",则设置DefaultMQProducer的instanceName为进程的pid;

2.3创建MQClientInstance对象

首先检查 ConcurrentMap<String/ clientId /, MQClientInstance> factoryTable中是否已经存在已clientId为key的MQClientInstance,如果存在则取出,不存在则实例化;clientId是已ip地址,instanceName以及unitName组成的,例如:10.13.83.7@12500

2.4注册producer

将DefaultMQProducerImpl注册到MQClientInstance中,已producerGroup作为key,注册到ConcurrentMap<String/ group /, MQProducerInner> producerTable中,如果已经存在此Group,则抛出异常;

2.5初始化TopicPublishInfo

已topic名称为"TBW102"为key,实例化TopicPublishInfo作为value,存放在ConcurrentMap<String/ topic /, TopicPublishInfo> topicPublishInfoTable中,TopicPublishInfo用来存放topic的路由信息;

2.6启动MQClientInstance

MQClientInstance启动会启动很多相关服务,具体可以看如下代码:

public void start() throws MQClientException {

        synchronized (this) {
            switch (this.serviceState) {
                case CREATE_JUST:
                    this.serviceState = ServiceState.START_FAILED;
                    // If not specified,looking address from name server
                    if (null == this.clientConfig.getNamesrvAddr()) {
                        this.mQClientAPIImpl.fetchNameServerAddr();
                    }
                    // Start request-response channel
                    this.mQClientAPIImpl.start();
                    // Start various schedule tasks
                    this.startScheduledTask();
                    // Start pull service
                    this.pullMessageService.start();
                    // Start rebalance service
                    this.rebalanceService.start();
                    // Start push service
                    this.defaultMQProducer.getDefaultMQProducerImpl().start(false);
                    log.info("the client factory [{}] start OK", this.clientId);
                    this.serviceState = ServiceState.RUNNING;
                    break;
                case RUNNING:
                    break;
                case SHUTDOWN_ALREADY:
                    break;
                case START_FAILED:
                    throw new MQClientException("The Factory object[" + this.getClientId() + "] has been created before, and failed.", null);
                default:
                    break;
            }
        }
    }

默认serviceState的状态为CREATE_JUST,刚进入设置为START_FAILED,初始化完成之后设置为RUNNING,防止重复初始化;

2.6.1初始化NameServerAddr

首先判断DefaultMQProducer是否配置了NameServerAddr,如果没有配置会到一个地址下获取,地址默认为: http://jmenv.tbsite.net :8080/rocketmq/nsaddr,相关的逻辑在MixAll类中,代码如下:

public static String getWSAddr() {
        String wsDomainName = System.getProperty("rocketmq.namesrv.domain", DEFAULT_NAMESRV_ADDR_LOOKUP);
        String wsDomainSubgroup = System.getProperty("rocketmq.namesrv.domain.subgroup", "nsaddr");
        String wsAddr = "http://" + wsDomainName + ":8080/rocketmq/" + wsDomainSubgroup;
        if (wsDomainName.indexOf(":") > 0) {
            wsAddr = "http://" + wsDomainName + "/rocketmq/" + wsDomainSubgroup;
        }
        return wsAddr;
    }

正常情况下我们需要设置自己的地址,可以通过如下方式设置:

System.setProperty("rocketmq.namesrv.domain", "localhost");

这种情况下就可以不用手动设置NameServerAddr;

2.6.2初始化RemotingClient

RemotingClient是一个接口类,底层使用的通讯框架是Netty,提供了实现类NettyRemotingClient,RemotingClient在初始化的时候实例化Bootstrap,方便后续用来创建Channel;

2.6.3启动定时器任务

总共启动了5个定时器任务,分别是:定时更新NameServerAddr信息,定时更新topic的路由信息,定时清理下线的broker,定时持久化Consumer的Offset信息,定时调整线程池;

2.6.3启动服务

pullMessageService和rebalanceService被用在消费端的两个服务类,分别是:从broker拉取消息的服务和均衡消息队列服务,负责分配消费者可消费的消息队列;

3.发送消息

相关发送消息的代码在DefaultMQProducerImpl的sendDefaultImpl方法中,部分代码如下所示:

private SendResult sendDefaultImpl(
        Message msg,
        final CommunicationMode communicationMode,
        final SendCallback sendCallback,
        final long timeout
    ) throws MQClientException, RemotingException, MQBrokerException, InterruptedException {
        this.makeSureStateOK();
        Validators.checkMessage(msg, this.defaultMQProducer);

        final long invokeID = random.nextLong();
        long beginTimestampFirst = System.currentTimeMillis();
        long beginTimestampPrev = beginTimestampFirst;
        long endTimestamp = beginTimestampFirst;
        TopicPublishInfo topicPublishInfo = this.tryToFindTopicPublishInfo(msg.getTopic());
        if (topicPublishInfo != null && topicPublishInfo.ok()) {
            boolean callTimeout = false;
            MessageQueue mq = null;
            Exception exception = null;
            SendResult sendResult = null;
            int timesTotal = communicationMode == CommunicationMode.SYNC ? 1 + this.defaultMQProducer.getRetryTimesWhenSendFailed() : 1;
            int times = 0;
            String[] brokersSent = new String[timesTotal];
            for (; times < timesTotal; times++) {
                String lastBrokerName = null == mq ? null : mq.getBrokerName();
                MessageQueue mqSelected = this.selectOneMessageQueue(topicPublishInfo, lastBrokerName);
                if (mqSelected != null) {
                    mq = mqSelected;
                    brokersSent[times] = mq.getBrokerName();
                    try {
                        beginTimestampPrev = System.currentTimeMillis();
                        long costTime = beginTimestampPrev - beginTimestampFirst;
                        if (timeout < costTime) {
                            callTimeout = true;
                            break;
                        }

                        sendResult = this.sendKernelImpl(msg, mq, communicationMode, sendCallback, topicPublishInfo, timeout - costTime);
                        endTimestamp = System.currentTimeMillis();
                        this.updateFaultItem(mq.getBrokerName(), endTimestamp - beginTimestampPrev, false);
                        switch (communicationMode) {
                            case ASYNC:
                                return null;
                            case ONEWAY:
                                return null;
                            case SYNC:
                                if (sendResult.getSendStatus() != SendStatus.SEND_OK) {
                                    if (this.defaultMQProducer.isRetryAnotherBrokerWhenNotStoreOK()) {
                                        continue;
                                    }
                                }

                                return sendResult;
                            default:
                                break;
                        }
                        ...以下代码省略...

此方法的四个参数分别是:msg为要发送的消息,communicationMode要使用的发送方式包括同步、异步、单向,sendCallback在异步情况下的回调函数,timeout发送消息的超时时间;

3.1获取topic的路由信息

通过msg中设置的topic获取路由信息,具体代码如下:

private TopicPublishInfo tryToFindTopicPublishInfo(final String topic) {
        TopicPublishInfo topicPublishInfo = this.topicPublishInfoTable.get(topic);
        if (null == topicPublishInfo || !topicPublishInfo.ok()) {
            this.topicPublishInfoTable.putIfAbsent(topic, new TopicPublishInfo());
            this.mQClientFactory.updateTopicRouteInfoFromNameServer(topic);
            topicPublishInfo = this.topicPublishInfoTable.get(topic);
        }

        if (topicPublishInfo.isHaveTopicRouterInfo() || topicPublishInfo.ok()) {
            return topicPublishInfo;
        } else {
            this.mQClientFactory.updateTopicRouteInfoFromNameServer(topic, true, this.defaultMQProducer);
            topicPublishInfo = this.topicPublishInfoTable.get(topic);
            return topicPublishInfo;
        }
    }

首先从变量ConcurrentMap<String/ topic /, TopicPublishInfo> topicPublishInfoTable中获取是否存在指定topic的路由信息,如果获取不到则使用topic去nameServer获取路由信息,如果还是获取不到则使用默认的topic名称为"TBW102"去获取路由信息,需要使用默认名称去获取的情况是没有创建topic,需要broker自动创建topic的情况;获取路由信息使用的是MQClientInstance中的updateTopicRouteInfoFromNameServer方法,此方法根据topic获取路由信息,具体连接哪台nameServer,会从列表中顺序的选择nameServer,实现负载均衡;

注:名称为"TBW102"的topic是系统自动创建的;

3.2选择MessageQueue

成功获取到路由信息之后,需要从MessageQueue列表中选择一个,在这之前获取了默认发送消息失败的重试次数,默认为3次(只有发送模式是同步的情况下),代码如下:

public MessageQueue selectOneMessageQueue(final TopicPublishInfo tpInfo, final String lastBrokerName) {
        if (this.sendLatencyFaultEnable) {
            try {
                int index = tpInfo.getSendWhichQueue().getAndIncrement();
                for (int i = 0; i < tpInfo.getMessageQueueList().size(); i++) {
                    int pos = Math.abs(index++) % tpInfo.getMessageQueueList().size();
                    if (pos < 0)
                        pos = 0;
                    MessageQueue mq = tpInfo.getMessageQueueList().get(pos);
                    if (latencyFaultTolerance.isAvailable(mq.getBrokerName())) {
                        if (null == lastBrokerName || mq.getBrokerName().equals(lastBrokerName))
                            return mq;
                    }
                }

                final String notBestBroker = latencyFaultTolerance.pickOneAtLeast();
                int writeQueueNums = tpInfo.getQueueIdByBroker(notBestBroker);
                if (writeQueueNums > 0) {
                    final MessageQueue mq = tpInfo.selectOneMessageQueue();
                    if (notBestBroker != null) {
                        mq.setBrokerName(notBestBroker);
                        mq.setQueueId(tpInfo.getSendWhichQueue().getAndIncrement() % writeQueueNums);
                    }
                    return mq;
                } else {
                    latencyFaultTolerance.remove(notBestBroker);
                }
            } catch (Exception e) {
                log.error("Error occurred when selecting message queue", e);
            }

            return tpInfo.selectOneMessageQueue();
        }

        return tpInfo.selectOneMessageQueue(lastBrokerName);
    }

以上代码在MQFaultStrategy,此类提供了选择MessageQueue的策略,相关策略可以看类变量:

private final LatencyFaultTolerance<String> latencyFaultTolerance = new LatencyFaultToleranceImpl();

    private boolean sendLatencyFaultEnable = false;

    private long[] latencyMax = {50L, 100L, 550L, 1000L, 2000L, 3000L, 15000L};
    private long[] notAvailableDuration = {0L, 0L, 30000L, 60000L, 120000L, 180000L, 600000L};

latencyFaultTolerance:延迟容错对象,维护brokers的延迟信息;

sendLatencyFaultEnable:延迟容错开关,默认不开启;

latencyMax:延迟级别数组;

notAvailableDuration :根据延迟级别,对应broker不可用的时长;

这样上面的这段代码就好理解了,首先判定是否开启开关,如果开启首先获取index,index初始为一个随机值,后面每次+1,这样在后面与MessageQueue长度取模的时候每个MessageQueue可以按顺序获取;获取

MessageQueue之后需要判定其对应的Broker是否可用,同时也需要和当前指定的brokerName进行匹配;如果没有获取到就选择一个延迟相对小的,pickOneAtLeast会做 排序 处理;如果都不行就直接获取一个MessageQueue,不管其他条件了;

3.3发送消息

在发送之前会做超时检测,如果前面两步已经超时了,则直接报超时,默认超时时间是3秒;部分代码如下:

private SendResult sendKernelImpl(final Message msg,
                                      final MessageQueue mq,
                                      final CommunicationMode communicationMode,
                                      final SendCallback sendCallback,
                                      final TopicPublishInfo topicPublishInfo,
                                      final long timeout) throws MQClientException, RemotingException, MQBrokerException, InterruptedException {
        long beginStartTime = System.currentTimeMillis();
        String brokerAddr = this.mQClientFactory.findBrokerAddressInPublish(mq.getBrokerName());
        if (null == brokerAddr) {
            tryToFindTopicPublishInfo(mq.getTopic());
            brokerAddr = this.mQClientFactory.findBrokerAddressInPublish(mq.getBrokerName());
        }

        SendMessageContext context = null;
        if (brokerAddr != null) {
            brokerAddr = MixAll.brokerVIPChannel(this.defaultMQProducer.isSendMessageWithVIPChannel(), brokerAddr);

            byte[] prevBody = msg.getBody();
            try {
                //for MessageBatch,ID has been set in the generating process
                if (!(msg instanceof MessageBatch)) {
                    MessageClientIDSetter.setUniqID(msg);
                }

                int sysFlag = 0;
                boolean msgBodyCompressed = false;
                if (this.tryToCompressMessage(msg)) {
                    sysFlag |= MessageSysFlag.COMPRESSED_FLAG;
                    msgBodyCompressed = true;
                }

                final String tranMsg = msg.getProperty(MessageConst.PROPERTY_TRANSACTION_PREPARED);
                if (tranMsg != null && Boolean.parseBoolean(tranMsg)) {
                    sysFlag |= MessageSysFlag.TRANSACTION_PREPARED_TYPE;
                }

                if (hasCheckForbiddenHook()) {
                    CheckForbiddenContext checkForbiddenContext = new CheckForbiddenContext();
                    checkForbiddenContext.setNameSrvAddr(this.defaultMQProducer.getNamesrvAddr());
                    checkForbiddenContext.setGroup(this.defaultMQProducer.getProducerGroup());
                    checkForbiddenContext.setCommunicationMode(communicationMode);
                    checkForbiddenContext.setBrokerAddr(brokerAddr);
                    checkForbiddenContext.setMessage(msg);
                    checkForbiddenContext.setMq(mq);
                    checkForbiddenContext.setUnitMode(this.isUnitMode());
                    this.executeCheckForbiddenHook(checkForbiddenContext);
                }

                if (this.hasSendMessageHook()) {
                    context = new SendMessageContext();
                    context.setProducer(this);
                    context.setProducerGroup(this.defaultMQProducer.getProducerGroup());
                    context.setCommunicationMode(communicationMode);
                    context.setBornHost(this.defaultMQProducer.getClientIP());
                    context.setBrokerAddr(brokerAddr);
                    context.setMessage(msg);
                    context.setMq(mq);
                    String isTrans = msg.getProperty(MessageConst.PROPERTY_TRANSACTION_PREPARED);
                    if (isTrans != null && isTrans.equals("true")) {
                        context.setMsgType(MessageType.Trans_Msg_Half);
                    }

                    if (msg.getProperty("__STARTDELIVERTIME") != null || msg.getProperty(MessageConst.PROPERTY_DELAY_TIME_LEVEL) != null) {
                        context.setMsgType(MessageType.Delay_Msg);
                    }
                    this.executeSendMessageHookBefore(context);
                }

                SendMessageRequestHeader requestHeader = new SendMessageRequestHeader();
                requestHeader.setProducerGroup(this.defaultMQProducer.getProducerGroup());
                requestHeader.setTopic(msg.getTopic());
                requestHeader.setDefaultTopic(this.defaultMQProducer.getCreateTopicKey());
                requestHeader.setDefaultTopicQueueNums(this.defaultMQProducer.getDefaultTopicQueueNums());
                requestHeader.setQueueId(mq.getQueueId());
                requestHeader.setSysFlag(sysFlag);
                requestHeader.setBornTimestamp(System.currentTimeMillis());
                requestHeader.setFlag(msg.getFlag());
                requestHeader.setProperties(MessageDecoder.messageProperties2String(msg.getProperties()));
                requestHeader.setReconsumeTimes(0);
                requestHeader.setUnitMode(this.isUnitMode());
                requestHeader.setBatch(msg instanceof MessageBatch);
                if (requestHeader.getTopic().startsWith(MixAll.RETRY_GROUP_TOPIC_PREFIX)) {
                    String reconsumeTimes = MessageAccessor.getReconsumeTime(msg);
                    if (reconsumeTimes != null) {
                        requestHeader.setReconsumeTimes(Integer.valueOf(reconsumeTimes));
                        MessageAccessor.clearProperty(msg, MessageConst.PROPERTY_RECONSUME_TIME);
                    }

                    String maxReconsumeTimes = MessageAccessor.getMaxReconsumeTimes(msg);
                    if (maxReconsumeTimes != null) {
                        requestHeader.setMaxReconsumeTimes(Integer.valueOf(maxReconsumeTimes));
                        MessageAccessor.clearProperty(msg, MessageConst.PROPERTY_MAX_RECONSUME_TIMES);
                    }
                }

                SendResult sendResult = null;
                switch (communicationMode) {
                    case ASYNC:
                        Message tmpMessage = msg;
                        if (msgBodyCompressed) {
                            //If msg body was compressed, msgbody should be reset using prevBody.
                            //Clone new message using commpressed message body and recover origin massage.
                            //Fix bug:https://github.com/apache/rocketmq-externals/issues/66
                            tmpMessage = MessageAccessor.cloneMessage(msg);
                            msg.setBody(prevBody);
                        }
                        long costTimeAsync = System.currentTimeMillis() - beginStartTime;
                        if (timeout < costTimeAsync) {
                            throw new RemotingTooMuchRequestException("sendKernelImpl call timeout");
                        }
                        sendResult = this.mQClientFactory.getMQClientAPIImpl().sendMessage(
                            brokerAddr,
                            mq.getBrokerName(),
                            tmpMessage,
                            requestHeader,
                            timeout - costTimeAsync,
                            communicationMode,
                            sendCallback,
                            topicPublishInfo,
                            this.mQClientFactory,
                            this.defaultMQProducer.getRetryTimesWhenSendAsyncFailed(),
                            context,
                            this);
                        break;
                    case ONEWAY:
                    case SYNC:
                        long costTimeSync = System.currentTimeMillis() - beginStartTime;
                        if (timeout < costTimeSync) {
                            throw new RemotingTooMuchRequestException("sendKernelImpl call timeout");
                        }
                        sendResult = this.mQClientFactory.getMQClientAPIImpl().sendMessage(
                            brokerAddr,
                            mq.getBrokerName(),
                            msg,
                            requestHeader,
                            timeout - costTimeSync,
                            communicationMode,
                            context,
                            this);
                        break;
                    default:
                        assert false;
                        break;
                }

                if (this.hasSendMessageHook()) {
                    context.setSendResult(sendResult);
                    this.executeSendMessageHookAfter(context);
                }

                return sendResult;

此处的6个参数分别是:msg消息本身,mq要发送到的队列,communicationMode发送策略,sendCallback异步回调函数,topicPublishInfo路由信息,timeout发送超时时间(3秒-前2步消耗的时间);

首先需要获取指定broker的地址,这要才能创建channel与broker连接;然后就是一些hock处理;接下来就是准备发送的消息头SendMessageRequestHeader,最后根据不同的发送策略执行发送消息,此处就不在进入更加深入的分析;

总结

本文重点介绍了发送者的启动,以及发送消息的大概流程;关于消息的发送策略,以及消息的类型包括:顺序消息,事务消息等,将在后面的文章介绍。


以上所述就是小编给大家介绍的《RocketMQ生产者流程篇》,希望对大家有所帮助,如果大家有任何疑问请给我留言,小编会及时回复大家的。在此也非常感谢大家对 码农网 的支持!

查看所有标签

本站部分资源来源于网络,本站转载出于传递更多信息之目的,版权归原作者或者来源机构所有,如转载稿涉及版权问题,请联系我们

Algorithms to Live By

Algorithms to Live By

Brian Christian、Tom Griffiths / Henry Holt and Co. / 2016-4-19 / USD 30.00

A fascinating exploration of how insights from computer algorithms can be applied to our everyday lives, helping to solve common decision-making problems and illuminate the workings of the human mind ......一起来看看 《Algorithms to Live By》 这本书的介绍吧!

JSON 在线解析
JSON 在线解析

在线 JSON 格式化工具

RGB转16进制工具
RGB转16进制工具

RGB HEX 互转工具

在线进制转换器
在线进制转换器

各进制数互转换器