openresty 学习笔记五:访问RabbitMQ消息队列
阅读原文时间:2021年12月14日阅读:1

openresty 学习笔记五:访问RabbitMQ消息队列

之前通过比较选择,决定采用RabbitMQ这种消息队列来做中间件,目的舒缓是为了让整个架构的瓶颈环节。这里是做具体实施,用lua访问RabbitMQ消息队列。

RabbitMQ消息队列有几个比较重要的概念:生产者Producer,消费者Consumer,交换器Exchanges,队列Queues

我的简单理解
生产者,发布消息入队的用户。
消费者,订阅队列获取消息的用户。
交换器,消息可以不指定某个具体队列,而是发送给交换器,通过不同类型交换器的规则和router key分配到具体的队列。
队列,消息队列载体,每个消息都会被投入到一个或多个队列。

lua 要访问 RabbitMQ 目前只找到一个通过STOMP协议连接的库lua-resty-rabbitmqstomp,前面已经介绍了这个STOMP,RabbitMQ也需要安装适配器才可以。该库我查不到声明exchange和queues并进行绑定的方法,只可以实现发送消息到指定交换器和订阅指定队列。

二次封装

根据自己的业务需要,先稍微做了个二次封装,主要是封装连接步骤和send的方法,同时规范返回值

local rabbitmq = require "resty.rabbitmqstomp"

local _M = {}
_M._VERSION = '0.01'

local mt = { __index = _M }

function _M.new(self, opts)
opts = opts or {}
return setmetatable({
mq_host = opts.host or '127.0.0.1',
mq_port = opts.port or 61613,
mq_timeout = opts.timeout or 10000,
mq_user = opts.username or 'guest',
mq_password = opts.password or 'guest',
mq_vhost = opts.vhost or "/"}, mt)
end

function _M.get_connect(self)

local mq, err = rabbitmq:new({ username = self.mq\_user,  
                               password = self.mq\_password,  
                               vhost = self.mq\_vhost })

if not mq then  
    return false,err  
end

mq:set\_timeout(self.mq\_timeout)

local ok, err = mq:connect(self.mq\_host,self.mq\_port) 

if not ok then  
    return false,err  
end

return true,mq

end

function _M.send(self , destination, msg)

local ret, client = self:get\_connect()  
if not ret then  
    return false,client  
end

local send\_receipt\_id = ngx.now()\*1000

local headers = {}  
headers\["destination"\] = destination  
headers\["receipt"\] = send\_receipt\_id  
headers\["app-id"\] = "luaresty"  
headers\["persistent"\] = "true"  
headers\["content-type"\] = "application/json"

local ok, err = client:send(msg, headers)  
if not ok then  
    return false,err  
end

local \_,str\_start = string.find(ok, "receipt%-id", 1)  
local str\_end = string.find(ok, "\\n\\n", 1)  
if str\_start == nil or str\_end == nil then  
    return false,"send receipt not     receive"  
end

local receipt\_id = string.sub(ok, str\_start + 2 ,str\_end - 1)  
if receipt\_id ~= send\_receipt\_id then  
    return false,"receipt id not right"  
end

local ok, err = client:set\_keepalive(10000, 10000)

return true,send\_receipt\_id  

end

return _M

使用示例

local rabbitmq = require ("myRabbitmq")
local mq = rabbitmq:new(config)
local ok , err = mq:send(destination,data)
if not ok then
ngx.log(ngx.ERR, "failed to send mq :", err)
end