Lua Web快速开发指南(10) - 利用MQ实现异步任务、订阅/发布、消息队列

栏目: Lua · 发布时间: 1年前

来源: segmentfault.com

内容简介:本章节我们将学习如何使用

本文转载自:https://segmentfault.com/a/1190000019581455,本站转载出于传递更多信息之目的,版权归原作者或者来源机构所有。

本章节我们将学习如何使用 MQ 库.

MQ库简介

MQ 库实现了各类消息代理中间件(Message Broker)的连接协议, 目前支持: redismqttstomp 协议.

MQ 库基于上述协议实现了: 生产者 -> 消费者订阅 -> 发布 模型, 可以在不依赖其它服务的情况下独立完成任务.

API介绍

cf框架提供了多种 MQ 的封装, 当我们需要使用的时候需要根据实际的协议进行选择:

-- local MQ = require "MQ.mqtt"
-- local MQ = require "MQ.redis"
-- local MQ = require "MQ.stomp"

MQ:new(opt)

此方法将会创建一个的MQ对象实例.

opt 是一个 table 类型的参数, 可以传递如下值:

  • host - 字符串类型, 消息队列的域名或者IP地址.
  • port - int类型, 消息队列监听的端口.
  • auth/db - 字符串类型, 仅在 redis 协议下用作登录认证或者db选择(没有可以不填写).
  • username/password - 字符串类型, 仅在stomp/mqtt协议下用作登录认证(没有可以不填写).
  • vhost - 字符串类型, 仅在使用某些特定消息队列server的时候填写(例如:rabbit).
  • keepalive - int类型, 仅在使用mqtt的时候用来出发客户端主动发出心跳包的时间.

以redis broker为示例:

local MQ = require "MQ.redis"
local mq = MQ:new {
  host = "localhost",
  port = 6379,
  -- db = 0,
  -- auth = "123456789",
}

MQ:on(pattern, function)

此方法用来订阅一个指定 pattern . 当 broker 将消息传递到cf后, function 将会被调用.

MQ 库会为 function 注入一个 table 类型的参数 msg , 此参数将在断开连接的时候为 nil .

msg 根据采用的协议的不同 msg 的内容也将有所不同. 具体内容以 logging 库的打印为准.

标准使用示例:

local Log = require("logging"):new()
mq:on("/notice", function(msg)
  if not msg then
    return Log:ERROR("['/notice'] SUBSCRIBE ERROR: 连接已断开.")
  end
  Log:DEBUG(msg)
end)

开发者可以同时订阅多个 parttern .

MQ:emit(pattern, msg)

此方法用来向指定 pattern 发送消息. msg为字符串类型的消息.

使用示例:

mq:emit('/notice', '{"code":200,"data":[1,2,3,4,5,6,7,8,9,10]}')

单个 MQ 可以一直复用emit, 内部会创建一个写入队列去完成消息的顺序发送. (在多个实例中无法保证消息先后)

MQ:start()

此方法在作为独立运行服务端时候调用.

使用示例:

mq:start()

MQ:clsoe()

此方法可以关闭不再使用的MQ; 在任何情况下MQ使用完毕后都需要调用此方法来释放资源.

使用示例:

mq:close()

开始实践

为了演示更加直观, 这里仅使用redis作为broker中专消息.

1. 模拟生产者与消费者

我们模拟100个生产者向redis的 /queue 投递消息, 同时定义了一个消费者订阅 /queue 持续进行消费

代码如下:

local cf = require "cf"
local json = require "json"
local Log = require("logging"):new()
local MQ = require "MQ.redis"

cf.fork(function ()
  local consumer = MQ:new {
    host = "localhost",
    port = 6379
  }

  local count = 0
  consumer:on("/queue", function (msg)
    if not msg then
      Log:ERROR("[/queue]连接失败", "已经消费了"..count.."个消息")
      return
    end
    count = count + 1
    Log:DEBUG("开始消费:", msg, "已经消费了"..count.."个消息")
  end)

  consumer:start() -- Websoket内部无需使用这个方法
end)

for i = 1, 100 do
  cf.fork(function()

    local producer = MQ:new {
      host = "localhost",
      port = 6379
    }

    producer:emit("/queue", json.encode({
      code = 200,
      data = {
        id = math.random(1, 1 << 32)
      },
    }))

    producer:close()
  end)
end

输出如下:

[candy@MacBookPro:~/Documents/core_framework] $ ./cfadmin
[2019-06-25 16:05:36,240] [@script/main.lua:19] [DEBUG] : 开始消费:, {["pattern"]="/queue", ["payload"]="{"code":200,"data":{"id":3912595079}}", ["source"]="/queue", ["type"]="pmessage"}, 已经消费了1个消息
[2019-06-25 16:05:36,240] [@script/main.lua:19] [DEBUG] : 开始消费:, {["pattern"]="/queue", ["payload"]="{"code":200,"data":{"id":2938696189}}", ["source"]="/queue", ["type"]="pmessage"}, 已经消费了2个消息
[2019-06-25 16:05:36,240] [@script/main.lua:19] [DEBUG] : 开始消费:, {["pattern"]="/queue", ["payload"]="{"code":200,"data":{"id":3499397173}}", ["source"]="/queue", ["type"]="pmessage"}, 已经消费了3个消息
[2019-06-25 16:05:36,240] [@script/main.lua:19] [DEBUG] : 开始消费:, {["pattern"]="/queue", ["payload"]="{"code":200,"data":{"id":1711272453}}", ["source"]="/queue", ["type"]="pmessage"}, 已经消费了4个消息
[2019-06-25 16:05:36,240] [@script/main.lua:19] [DEBUG] : 开始消费:, {["pattern"]="/queue", ["payload"]="{"code":200,"data":{"id":3968420025}}", ["source"]="/queue", ["type"]="pmessage"}, 已经消费了5个消息
[2019-06-25 16:05:36,240] [@script/main.lua:19] [DEBUG] : 开始消费:, {["pattern"]="/queue", ["payload"]="{"code":200,"data":{"id":1887895479}}", ["source"]="/queue", ["type"]="pmessage"}, 已经消费了6个消息
[2019-06-25 16:05:36,240] [@script/main.lua:19] [DEBUG] : 开始消费:, {["pattern"]="/queue", ["payload"]="{"code":200,"data":{"id":3687986737}}", ["source"]="/queue", ["type"]="pmessage"}, 已经消费了7个消息
[2019-06-25 16:05:36,240] [@script/main.lua:19] [DEBUG] : 开始消费:, {["pattern"]="/queue", ["payload"]="{"code":200,"data":{"id":2823099353}}", ["source"]="/queue", ["type"]="pmessage"}, 已经消费了8个消息
[2019-06-25 16:05:36,240] [@script/main.lua:19] [DEBUG] : 开始消费:, {["pattern"]="/queue", ["payload"]="{"code":200,"data":{"id":2528190121}}", ["source"]="/queue", ["type"]="pmessage"}, 已经消费了9个消息
[2019-06-25 16:05:36,240] [@script/main.lua:19] [DEBUG] : 开始消费:, {["pattern"]="/queue", ["payload"]="{"code":200,"data":{"id":4107999865}}", ["source"]="/queue", ["type"]="pmessage"}, 已经消费了10个消息
.
..
...
....
.....
[2019-06-25 16:05:36,247] [@script/main.lua:19] [DEBUG] : 开始消费:, {["pattern"]="/queue", ["payload"]="{"code":200,"data":{"id":3608578767}}", ["source"]="/queue", ["type"]="pmessage"}, 已经消费了100个消息

为了方便阅读. 我们这里取出前10条与最后第100条并且将msg的数据结构打印出来方便阅读.

消费者的处理方式采用同步非阻塞处理的(当前业务未处理完成是不会继续处理下个消息的), 如果不想阻塞当前消息队列事件循环可以考虑自行 fork 一个协程来处理.

2. 推送消息给某个用户

用户通过认证后接入到Server后订阅自己专属的频道, 当有用户专属消息的时候任何服务都可以利用此方法进行业务消息推送.

我们

代码实现如下:

local cf = require "cf"
local json = require "json"
local Log = require("logging"):new()
local MQ = require "MQ.redis"

for uid = 1, 10 do
  cf.fork(function ()
    local client = MQ:new {
      host = "localhost",
      port = 6379
    }

    client:on("/user/"..uid.."/*", function (msg)
      if not msg then
        Log:ERROR("[/user/9257]连接失败")
        return
      end
      Log:DEBUG("UID:["..uid.."]接收到推送消息", msg)
    end)

    client:start() -- Websoket内部无需使用这个方法
  end)
end

local server = MQ:new {
  host = "localhost",
  port = 6379
}

cf.at(1, function (...)
  server:emit("/user/"..math.random(1, 10).."/ad", json.encode({
    code = 200,
    data = {}
  }))
end)

server:start()

运行后终端输出如下所示:

^C[candy@MacBookPro:~/Documents/core_framework] $ ./cfadmin
[2019-06-25 16:20:23,506] [@script/main.lua:18] [DEBUG] : UID:[9]接收到推送消息, {["source"]="/user/9/ad", ["pattern"]="/user/9/*", ["type"]="pmessage", ["payload"]="{"data":{},"code":200}"}
[2019-06-25 16:20:24,504] [@script/main.lua:18] [DEBUG] : UID:[4]接收到推送消息, {["source"]="/user/4/ad", ["pattern"]="/user/4/*", ["type"]="pmessage", ["payload"]="{"data":{},"code":200}"}
[2019-06-25 16:20:25,506] [@script/main.lua:18] [DEBUG] : UID:[8]接收到推送消息, {["source"]="/user/8/ad", ["pattern"]="/user/8/*", ["type"]="pmessage", ["payload"]="{"data":{},"code":200}"}
[2019-06-25 16:20:26,506] [@script/main.lua:18] [DEBUG] : UID:[8]接收到推送消息, {["source"]="/user/8/ad", ["pattern"]="/user/8/*", ["type"]="pmessage", ["payload"]="{"data":{},"code":200}"}
[2019-06-25 16:20:27,505] [@script/main.lua:18] [DEBUG] : UID:[10]接收到推送消息, {["source"]="/user/10/ad", ["pattern"]="/user/10/*", ["type"]="pmessage", ["payload"]="{"data":{},"code":200}"}
[2019-06-25 16:20:28,506] [@script/main.lua:18] [DEBUG] : UID:[2]接收到推送消息, {["source"]="/user/2/ad", ["pattern"]="/user/2/*", ["type"]="pmessage", ["payload"]="{"data":{},"code":200}"}
[2019-06-25 16:20:29,506] [@script/main.lua:18] [DEBUG] : UID:[4]接收到推送消息, {["source"]="/user/4/ad", ["pattern"]="/user/4/*", ["type"]="pmessage", ["payload"]="{"data":{},"code":200}"}
[2019-06-25 16:20:30,506] [@script/main.lua:18] [DEBUG] : UID:[8]接收到推送消息, {["source"]="/user/8/ad", ["pattern"]="/user/8/*", ["type"]="pmessage", ["payload"]="{"data":{},"code":200}"}
[2019-06-25 16:20:31,505] [@script/main.lua:18] [DEBUG] : UID:[3]接收到推送消息, {["source"]="/user/3/ad", ["pattern"]="/user/3/*", ["type"]="pmessage", ["payload"]="{"data":{},"code":200}"}
[2019-06-25 16:20:32,506] [@script/main.lua:18] [DEBUG] : UID:[6]接收到推送消息, {["source"]="/user/6/ad", ["pattern"]="/user/6/*", ["type"]="pmessage", ["payload"]="{"data":{},"code":200}"}
[2019-06-25 16:20:33,506] [@script/main.lua:18] [DEBUG] : UID:[5]接收到推送消息, {["source"]="/user/5/ad", ["pattern"]="/user/5/*", ["type"]="pmessage", ["payload"]="{"data":{},"code":200}"}
[2019-06-25 16:20:34,503] [@script/main.lua:18] [DEBUG] : UID:[7]接收到推送消息, {["source"]="/user/7/ad", ["pattern"]="/user/7/*", ["type"]="pmessage", ["payload"]="{"data":{},"code":200}"}
[2019-06-25 16:20:35,506] [@script/main.lua:18] [DEBUG] : UID:[4]接收到推送消息, {["source"]="/user/4/ad", ["pattern"]="/user/4/*", ["type"]="pmessage", ["payload"]="{"data":{},"code":200}"}
[2019-06-25 16:20:36,506] [@script/main.lua:18] [DEBUG] : UID:[6]接收到推送消息, {["source"]="/user/6/ad", ["pattern"]="/user/6/*", ["type"]="pmessage", ["payload"]="{"data":{},"code":200}"}
[2019-06-25 16:20:37,505] [@script/main.lua:18] [DEBUG] : UID:[10]接收到推送消息, {["source"]="/user/10/ad", ["pattern"]="/user/10/*", ["type"]="pmessage", ["payload"]="{"data":{},"code":200}"}
^C[candy@MacBookPro:~/Documents/core_framework] $

这里我们可以看到, 由消息发布到 /user/9527/* 下的 topic 的时候, 我们可以通过一次 通配符 订阅就可以接收到所有下属路由消息.

3. 消息广播

在各种领域内, 消息推送已经成为了一种最常见的业务. 我们现在来尝试利用MQ实现消息推送业务.

首先, 我们将 script/main.lua 的文件写入如下代码:

-- main.lua
local cf = require "cf"
local json = require "json"
local Log = require("logging"):new()
local MQ = require "MQ.redis"

for i = 1, 3 do
  cf.fork(function ()
    local uid = math.random(1, 1 << 32)
    local client_mq = MQ:new {
      host = "localhost", -- 主机名
      port = 6379,        -- 端口号
      -- db = nil,        -- 默认数据库
      -- auth = nil,      -- 密码
    }
    client_mq:on("/system/notice", function (msg)
      if not msg then
        Log:ERROR("['/system/notice'] SUBSCRIBE ERROR: 连接已断开.")
        return
      end
      Log:DEBUG("UID:["..uid.."]接收到消息: ", msg)
    end)

    client_mq:start()
  end)
end

local server_mq = MQ:new {
  host = "localhost", -- 主机名
  port = 6379,        -- 端口号
  -- db = nil,        -- 默认数据库
  -- auth = nil,      -- 密码
}

cf.at(3, function (args)
  server_mq:emit("/system/notice", json.encode({
    code = 200,
    msg = "系统即将关闭"
  }))
end)

server_mq:start()

这里我们用启动了3个协程来模拟用户订阅消息, 并且每个协程都使用不同的UID来打印. 然后再启动一个定时器模拟每三秒的消息推送业务.

打开终端运行 ./cfadmin 后, 输出如下:

[candy@MacBookPro:~/Documents/core_framework] $ ./cfadmin
[2019-06-25 15:43:24,842] [@script/main.lua:19] [DEBUG] : UID:[3363385555]接收到消息: , {["pattern"]="/system/notice", ["payload"]="{"msg":"系统即将关闭","code":200}", ["type"]="pmessage", ["source"]="/system/notice"}
[2019-06-25 15:43:24,842] [@script/main.lua:19] [DEBUG] : UID:[1693861773]接收到消息: , {["pattern"]="/system/notice", ["payload"]="{"msg":"系统即将关闭","code":200}", ["type"]="pmessage", ["source"]="/system/notice"}
[2019-06-25 15:43:24,842] [@script/main.lua:19] [DEBUG] : UID:[3608578767]接收到消息: , {["pattern"]="/system/notice", ["payload"]="{"msg":"系统即将关闭","code":200}", ["type"]="pmessage", ["source"]="/system/notice"}

[2019-06-25 15:43:27,841] [@script/main.lua:19] [DEBUG] : UID:[3363385555]接收到消息: , {["pattern"]="/system/notice", ["payload"]="{"msg":"系统即将关闭","code":200}", ["type"]="pmessage", ["source"]="/system/notice"}
[2019-06-25 15:43:27,841] [@script/main.lua:19] [DEBUG] : UID:[1693861773]接收到消息: , {["pattern"]="/system/notice", ["payload"]="{"msg":"系统即将关闭","code":200}", ["type"]="pmessage", ["source"]="/system/notice"}
[2019-06-25 15:43:27,841] [@script/main.lua:19] [DEBUG] : UID:[3608578767]接收到消息: , {["pattern"]="/system/notice", ["payload"]="{"msg":"系统即将关闭","code":200}", ["type"]="pmessage", ["source"]="/system/notice"}

[2019-06-25 15:43:30,841] [@script/main.lua:19] [DEBUG] : UID:[3363385555]接收到消息: , {["pattern"]="/system/notice", ["payload"]="{"msg":"系统即将关闭","code":200}", ["type"]="pmessage", ["source"]="/system/notice"}
[2019-06-25 15:43:30,841] [@script/main.lua:19] [DEBUG] : UID:[1693861773]接收到消息: , {["pattern"]="/system/notice", ["payload"]="{"msg":"系统即将关闭","code":200}", ["type"]="pmessage", ["source"]="/system/notice"}
[2019-06-25 15:43:30,841] [@script/main.lua:19] [DEBUG] : UID:[3608578767]接收到消息: , {["pattern"]="/system/notice", ["payload"]="{"msg":"系统即将关闭","code":200}", ["type"]="pmessage", ["source"]="/system/notice"}
[candy@MacBookPro:~/Documents/core_framework] $

从终端的输出内容中可以看到, 我们确实每隔3秒就收到了一次消息推送.

4. 对基于Websocket协议的客户端实现业务推送

首先, 我们需要建立一套基于 httpd 库的 Websocket 路由. 让我们打开 script/main.lua 文件并将下面的代码写入进去.

local httpd require "httpd"

local app = httpd:new("Web")

app:ws('/ws', require "ws")

app:listen("0.0.0.0", 8080)

app:run()

Websocket 必须在建立与客户端的连接完成的同时利用 MQ 库订阅 /chat . 每当客户端发送消息过来触发 on_message 的时候, 都将会消息直接发布到 /chat 内部通过中转后实现推送聊天.

然后我们利用前面章节所学的 Websocket指南 , 编写一段简单的Websocket路由处理代码. 由于示例代码没有UID生成机制. 为了方便调试, 我们随机生成32位整数作为唯一ID标识符.

script/ws.lua 具体代码如下所示:

local MQ = require "MQ.redis"
local class = require "class"

local websocket = class("websocket")

function websocket:ctor (opt)
  self.ws = opt.ws
  self.id = math.random(1, 1 << 32)
end

function websocket:on_open ()
  self.mq = MQ:new { host = 'localhost', port = 6379 }
  self.mq:on("/chat", function (msg)
    if not msg then
      return
    end
    self.ws:send(msg.payload)
  end)
end

function websocket:on_message (data, typ)
  if self.mq then
    self.mq:emit("/chat", data)
  end
  print("客户端["..self.id.."]发送了消息:["..data.."]")
end

function websocket:on_error (error)

end

function websocket:on_close ()
  if self.mq then
    self.mq:close()
    self.mq = nil
  end
end

return websocket

注意: 我们需要记住当客户端连接断开的时候记得关闭订阅回收资源. 启动 ./cfadmin , 查看是否正常运行.

让我们下载 客户端工具 , 并且安装到我们的 Chrome 浏览器上. 提取码: cgwr

现在, 我们运行客户端 工具 在地址栏输入 localhost:8080/ws 连接我们刚刚启动的Websocket Server, 然后开始向 服务器 发送消息.

如果从终端中和客户端看到类似的输出内容, 说明我们的示例编写完成.

[candy@MacBookPro:~/Documents/core_framework] $ ./cfadmin
[2019/06/25 20:11:59] [INFO] httpd正在监听: 0.0.0.0:8080
[2019/06/25 20:11:59] [INFO] httpd正在运行Web Server服务...
[2019/06/25 20:12:01] - ::1 - ::1 - /ws - GET - 101 - req_time: 0.000095/Sec
[2019/06/25 20:12:17] - ::1 - ::1 - /ws - GET - 101 - req_time: 0.000080/Sec
客户端[1693861773]发送了消息:[hello! 我是2]
客户端[1693861773]发送了消息:[hello! 我是2]
客户端[1693861773]发送了消息:[hello! 我是2]
客户端[1693861773]发送了消息:[hello! 我是2]
客户端[1693861773]发送了消息:[hello! 我是2]
客户端[1693861773]发送了消息:[hello! 我是2]
客户端[1693861773]发送了消息:[hello! 我是2]
[2019/06/25 20:12:23] - ::1 - ::1 - /ws - GET - 101 - req_time: 0.000052/Sec
客户端[3363385555]发送了消息:[hello! 我是1]
客户端[3363385555]发送了消息:[hello! 我是1]
客户端[3363385555]发送了消息:[hello! 我是1]
客户端[3363385555]发送了消息:[hello! 我是1]
客户端[3363385555]发送了消息:[hello! 我是1]
客户端[3363385555]发送了消息:[hello! 我是1]
客户端[3363385555]发送了消息:[hello! 我是1]
客户端[1693861773]发送了消息:[hello! 我是2]

最后

上述代码仅用 redis 协议进行模拟, 其它协议请参考 Wiki .

学习完成

至此 Lua Web开发指南 已经编写完毕. 软件开发领域内不仅仅需要师傅领进门, 个人修行也是一种能力的体现.

cf框架都内置库非常的多, 维护框架都同时还要编写使用教程. 作者不可能一个一个介绍完全. cf框架已经有了专属的QQ讨论社区: 727531854 , 点击加群 .

目前内部就作者一个人在里面. 如果您也对它比较感兴趣, 欢迎您到群里来一起交流技术.


以上就是本文的全部内容,希望本文的内容对大家的学习或者工作能带来一定的帮助,也希望大家多多支持 码农网

关注码农网公众号

关注我们,获取更多IT资讯^_^


查看所有标签

本站部分资源来源于网络,本站转载出于传递更多信息之目的,版权归原作者或者来源机构所有,如转载稿涉及版权问题,请联系我们

Design systems

Design systems

Not all design systems are equally effective. Some can generate coherent user experiences, others produce confusing patchwork designs. Some inspire teams to contribute to them, others are neglected. S......一起来看看 《Design systems》 这本书的介绍吧!

CSS 压缩/解压工具
CSS 压缩/解压工具

在线压缩/解压 CSS 代码

Markdown 在线编辑器
Markdown 在线编辑器

Markdown 在线编辑器

RGB CMYK 转换工具
RGB CMYK 转换工具

RGB CMYK 互转工具