从 zmq 和 protobuf 谈 STF 中的消息传递

栏目: 后端 · 发布时间: 6年前

内容简介:从 zmq 和 protobuf 谈 STF 中的消息传递

一、前言

用过STF的都知道,只要用户点击使用按钮,这台手机就会被标记为占用状态,其他用户在设备列表立即就可以看到某人使用了手机,同时其他用户也不再能使用这台手机,这种即时的消息肯定不能靠接口等传统方式进行传递。事实上,在STF中,很多信息都是通过消息来传递的,这其中用到了很多工具,比如说zeromq和protobuf。下面根据我的理解讲一下STF中的消息传递过程。

二、zeromq和protobuf基础

2.1 zeromq介绍和使用

zmq号称史上最快的消息队列,当然,快是以牺牲其他方面的性能为代价的。首先看下消息队列的定义:

消息队列中间件是分布式系统中重要的组件,主要解决应用耦合,异步消息,流量削锋等问题。实现高性能,高可用,可伸缩和最终一致性架构。是大型分布式系统不可缺少的中间件。

打个比方,人与人交流时可以通过讲话来完成,一个大型系统各个模块之间的通信就需要消息队列来完成了。再以STF以例,当某个设备上线后,provider可以通过消息告诉在线的用户这台设备处于可用状态,当某个人使用了设备以后,需要通过消息告诉其他人这台手机牌繁忙状态,同时通知手机开始传输屏幕图像,再通知数据库改变设备的使用状态。

当然,有很多开源的消息队列 工具 可以使用,zmq也不是最流行的一个。其他常用消息队列有ActiveMQ,RabbitMQ,ZeroMQ,Kafka,MetaMQ,RocketMQ等等。

zmq也是基于socket接口进行通信的。zmq与Socket 的区别是普通的socket是端到端的(1:1的关系),而ZMQ却是可以N:M的关系。就是多个服务端可以同时向多个客户端发消息,同时会自动处理错误等细节。

2.1.1 zmq的模式

zmq具有多种工作模式:

  • request-reply
  • push-pull
  • publish-subscribe
  • dealer-router

这些模式适用于不同的场景,在STF中用到了:push-pull,publish-subscribe,dealer-router这三种模式。

push-pull:

push/pull是单向模式,消息只能由push端发出,由pull端进行拉取。一般来说pull端对消息进行处理,如果一个pull端不能及时处理,可以同时有多个pull端,这种情况下,一条消息只能被 一个pull端拉取,拉过之后其他pull端就不能再次拉取。如果没有pull端拉取,消息过多的时候可能会溢出。

从 zmq 和 protobuf 谈 STF 中的消息传递

下面以node的示例代码:

// producer.js,push端
var zmq = require('zeromq')
  , sock = zmq.socket('push');

sock.bindSync('tcp://127.0.0.1:3000');
console.log('Producer bound to port 3000');

var count = 0
setInterval(function(){
  var message = "some work"+(count++);
  console.log(message);
  sock.send(message);
}, 2000);
// worker.js,pull端
var zmq = require('zeromq')
  , sock = zmq.socket('pull');

sock.connect('tcp://127.0.0.1:3000');
console.log('Worker connected to port 3000');

sock.on('message', function(msg){
  console.log('work: %s', msg.toString());
});

worker.js代码可以同时运行多个。一个worker pull过以后其他worker不会再pull到相同的消息。

在stf中,push-pull模式可用于用户和provider之前的消息处理,因为一条消息只需要处理一次,并且可以有多个处理端,即processor。

publish-subscribe:

这属于发布订阅模式。与push-pull所不同的,pub会向所有已经连接的sub发消息,如果没有sub连接,消息会被丢弃。简单来说pub-sub就是一个像大喇叭一样的广播系统,如果此时没有听到广播,后面就不会听到了。

// pubber.js
var zmq = require('zeromq')
  , sock = zmq.socket('pub');

sock.bindSync('tcp://127.0.0.1:3000');
console.log('Publisher bound to port 3000');

setInterval(function(){
  console.log('sending a multipart message envelope');
  sock.send(['kitty', 'kitty meow!']);
  sock.send(['cats', 'cats meow!']);
}, 2000);
// subber.js
var zmq = require('zeromq')
  , sock = zmq.socket('sub');

sock.connect('tcp://127.0.0.1:3000');
sock.subscribe('kitty');
sock.subscribe('cats');
console.log('Subscriber connected to port 3000');

sock.on('message', function(topic, message) {
  //console.log('received a message related to:', topic, 'containing message:', message);
  console.log('received a message related to:', topic.toString(), 'containing message:', message.toString());
});

subber.js可以同时运行多个,每个subber都会收到相同的消息。pub和sub还可以订阅特定的关键字。比如说如果sub只订阅了cats关键字,只会收到cats meow!消息,也可以同时订阅多个关键字。

在STF中,pub-sub模式用于广播设备的变更信息,比较如某个手机上线和下线,需要通知到所有的用户还有数据库。

dealer-router:

dealer/router是路由模式,适用于有多个发送端和多个接收端的情况,这样可以实现负载均衡。

在stf中,同时有多个用户和多台手机在线,dealer-router很适用于这种情况下的消息传递。

这种模式还没理解太透,等理解透了把代码补上。

2.1.2 zmq支持的语言

zmq最开始由c/c++编写的,但是现在已经支持 java 、node、 python 等语言

node版的zmq前文已经做了demo,下面举一个java subber的例子:

import com.alibaba.fastjson.JSONObject;
import org.zeromq.ZContext;
import org.zeromq.ZMQ;

public class Zmq_Thread {
    public void start(String url,String subscription) {
        ZContext context = new ZContext();
        ZMQ.Socket subscriber = context.createSocket(ZMQ.SUB);
        if (url != null) {
            subscriber.connect(url);
        }
        else {
            subscriber.connect("tcp://127.0.0.1:7350");
        }

        if (subscription == null){
            subscription = "test";
        }

        subscriber.subscribe(subscription.getBytes(ZMQ.CHARSET));

        while (true) {
            String topic = subscriber.recvStr();
            if (topic == null)
                break;
            String data = subscriber.recvStr();
            assert(topic.equals(subscription));
            JSONObject jsonObject = JSONObject.parseObject(data) ;
            System.out.println(jsonObject.toJSONString());
        }
        context.destroy();
    }
}

关于java的其他模式可以参考官方demo或者网上相关教程。这里举java subber的原因是很多公司的项目是采用java的,stf与其他项目结合是需要用到jeromq。

2.2 protobuf的使用

Protocol Buffer是Google的数据交换的格式,与protobuf类似的东西其实是json和xml,protobuff的优势在于更小的体积,这样在大量数据传输的时候节省了带宽资源。与json和xml所不同的是,protobuff自带了一个编译器,protoc,只需要用它进行编译,可以编译成JAVA、python、C++代码,简单来说,它可以生成对应语言的数据类型,比如说生成java的一个类等等。

由于stf是node语言,这里重点介绍node中protobuf的使用,protobuf使用前需要先编写一个proto文件,定义消息类型,举个protobuf.js的例子:

// user.proto
package user;
syntax = "proto2";

message username {
    string username_field = 1; 
}
//user.js
var protobuf = require("protobufjs");

protobuf.load("user.proto", function(err, root) {
    if (err) throw err;

    // Obtain a message type
    var user = root.lookup("user.username");

    // Create a new message
    var message = user.create({ usernamefield: "Tom" });

    // Encode a message
    var buffer = user.encode(message).finish();
    // ... do something with buffer

    // Or, encode a plain object
    var buffer = user.encode({ usernamefield: "jerry" }).finish();
    // ... do something with buffer

    // Decode a buffer
    var message = user.decode(buffer);
    // ... do something with message
    console.log(message)

    // If your application uses length-delimited buffers, there is also encodeDelimited and decodeDelimited.
});

上面的代码是异步的,有时候异步不是很好用,我们可以改成同步的。在STF中用的就是同步模式,有兴趣的同学可以详细看一下,在lib\wire文件夹下,说实话,关于protobuf我也没搞太清楚,不过对于STF的改造已经够用了。

三、利用zmq和protobuf增强STF的性能

前面关于zmq和protobuffer讲了那么多,可能很多人已经看晕了,讲这些并不是故弄玄虚,也不是为了显得stf多么高深,而是因为不把这些知识深入的搞清楚,根本无法理解stf的消息机制,更无法利用消息进行扩展或者与外部交互。

下面说一下stf消息的两个应用:扩展节点以及对外发布设备状态。

首先看下面的一张图,这是STF官方部署文档中一张结构图,刚接触时,我就知道这张图比较重要,但是看了很长时间也没看出所以然,直到把zmq搞懂,才基本有所理解了。

从 zmq 和 protobuf 谈 STF 中的消息传递

3.1. provider节点的扩展。

这是最常见的形式了,很多手机并不是完全插在一台provider(电脑)上,多台电脑就是provider的扩展。从上图中可以看出,provider有两个接口,push和sub,从前文中可以知道,push可以保证消息被可靠的推送成功,而且sub用来监听自己感兴趣的消息,对应的实际中,用户点击使用按钮时,发出一个GroupMessage(占用手机)命令,provider通过sub端收到这个消息后,执行一系列操作,然后通过push方式把占用成功的消息推送出去,很明显,占用广播的消息即使没有provider回应也没有关系,因为这时候表示手机占用失败,我们用stf时偶尔就会出现这个问题,但是占用成功一定要保证push成功,否则下一个再占用会造成冲突。

图中provider上面的dev我认为就是手机,手机上的STFservice可以push和pull数据,但是STFsevice是如何联网的我还不太清楚。很显然,一个provider上可以同时插多台手机。

从图中可以看出,在进行provider扩展的时候,每个provider只要连上dev-triproxy上就行了,从dev-triproxy push和sub数据。下面举一个provider的启动命令的例子:

docker run --rm \
  --name provider \
  --net host \
  openstf/stf:latest \
  stf provider \
    --name provider1 \
    --connect-sub tcp://devside.stf.example.org:7250 \
    --connect-push tcp://devside.stf.example.org:7270 \
    --storage-url http://stf.example.org/ \
    --public-ip local_ip \
    --min-port=15000 \
    --max-port=25000 \
    --heartbeat-interval 10000 \
    --screen-ws-url-pattern "ws://stf.example.org/d/provider1/<%= serial %>/<%= publicPort %>/"

从命令中可以看出,在provider中需要提供几个主要参数:

  • name 这是在stf手机列表中显示的provider的名字。
  • connect-sub 就是triproxy的pub的端口。
  • connect-push 是triproxy的pull的端口。
  • storage-url 一般就是stf主页的链接,也可以是自己配置的storage-url,在storage模块中有设置。
  • public-ip 是指provider所在的电脑的ip,这个ip会作为minicap向外传送图像的地址。
  • min-port、max-port是指每部手机向外传输图像的端口,因为一个provider可以连多台手机,每台手机传输图像的端口不一样。
  • heartbeat-interval 是指心跳时间,为了保证provider可用,每隔一段时间provider会发出一个heartbeat,reaper接收这个heartbeat,如果reaper在自己的超时时间内没有收到provider的heartbeat,会认为这个provider下线。
  • screen-ws-url-pattern是指屏幕传输图像的url类型,其中serial是指手机的串号,publicPort是min-port和max-port之前的一个,这样就能唯一确定一个图像传输的url。

在扩展provider时,只要更改一下provider的ip和名称,就可以同时上线多个provider。

3.2.websocket节点的扩展

websocket节点同样有push和sub两个端口,分别用来推送和接收消息,推送的主要消息是用户占用和取消占用的消息,接收的消息主要是设备被占用和设备改变的消息。

下面看一下websocket节点的启动命令:

docker run --rm \
  --name %p-%i \
  --link rethinkdb-proxy-28015:rethinkdb \
  -e "SECRET=YOUR_SESSION_SECRET_HERE" \
  -p %i:3000 \
  openstf/stf:latest \
  stf websocket --port 3000 \
    --storage-url https://stf.example.org/ \
    --connect-sub tcp://appside.stf.example.org:7150 \
    --connect-push tcp://appside.stf.example.org:7170

首先解释一下命令中的各个参数,%p表示模块的名称,%i表示端口号,在实际应用的需要用对应的参数替换。参数里面有一个link rethinkdb的参数,是因为websocket模块有些功能需要直接读写数据库。扩展websocket节点的时候,如果在同一台电脑,只要修改一下%i这个端口号就行了,因为同一个系统的两个进程不能监听同一端口,当然,如果没有用docker,需要修改-p 3000。

websocket有多个监听端口,怎样用这些端口呢,需要在nginx里配置一下:

...
  upstream stf_api {
    server 192.168.255.100:3700 max_fails=0;
    server 192.168.255.100:3701 max_fails=0;
    server 192.168.255.101:3700 max_fails=0;
  }
...

3.3.processor节点的扩展

processor节点的扩展就比较简单了。仿照前面websocket的扩展,直接启动多个processerr模块就可以了。processor模块扩展一般用于processor成为系统的瓶颈的情况下,不过目前为止,我还没有发现processor需要扩展的情况,一般都能处理过来。

3.4.triproxy节点的扩展

官方的框图中表示triproxy节点可以进行扩展,但是我实在找不到扩展的办法,还请懂行的人指导!

3.5. 设备状态的广播。

STF可以提供手机屏幕的实时图像以及实时操作功能,很多时候我们的手机不仅可以提供给别人使用,也会用来做自动化等事情,当手机做自动化时,肯定不希望别人来操作,当有人在使用这台手机时,也不允许做自动化,这时候自动化工具肯定希望知道设备的状态。对于这种需求,一种方式是提供查询的接口,其他应用通过接口查询手机的状态,但是如果有很多应用同时调接口,会给STF造成很大压力,同时也有些浪费。

根据前文介绍的STF消息机制,我们可以利用zmq给STF做一个广播模块,实时广播设备的状态改变,其他应用监听到设备的改变后再做对应的操作。事实上,provider一直在广播设备的改变状态,如果我们用subber.js直接收听7150端口的数据,我们可以看到设备的改变信息。

广播的格式是protobuffer,这里说一下protobuffer的一些问题,虽然protobuffer具更高的数据传输效率,但是同时牺牲了很多灵活性,如果别人想要收听这个广播,还必须拿到一个完整的proto文件,然后生成相关的类,如果广播的信息有所增删,则需要重新拉最新的proto文件,这给收听都带来很多不便。因此,我们可以利用STF的模块再做一次转换,变成普通的json,这样就灵活多了。


以上就是本文的全部内容,希望本文的内容对大家的学习或者工作能带来一定的帮助,也希望大家多多支持 码农网

查看所有标签

猜你喜欢:

本站部分资源来源于网络,本站转载出于传递更多信息之目的,版权归原作者或者来源机构所有,如转载稿涉及版权问题,请联系我们

Ordering Disorder

Ordering Disorder

Khoi Vinh / New Riders Press / 2010-12-03 / USD 29.99

The grid has long been an invaluable tool for creating order out of chaos for designers of all kinds—from city planners to architects to typesetters and graphic artists. In recent years, web designers......一起来看看 《Ordering Disorder》 这本书的介绍吧!

在线进制转换器
在线进制转换器

各进制数互转换器

URL 编码/解码
URL 编码/解码

URL 编码/解码