golang监听rabbitmq消息队列任务断线自动重连接
阅读原文时间:2022年03月03日阅读:1

需求背景:

goalng常驻内存任务脚本监听rbmq执行任务

任务脚本由supervisor来管理

当rabbitmq长时间断开连接会出现如下图 进程处于fatal状态

假如因为不可抗拒因素,rabbitmq服务器内存满了或者其它原因导致rabbitmq消息队列服务停止了

如果是短时间的停止重启,supervisor是可以即时唤醒该程序。如果服务器长时间没有恢复正常运行,程序就会出现fatal进程启动失败的状态,此时可以通过告警来提醒开发人员

如果以上告警能时时通知运维人员此问题可以略过了。今天讨论的是如果在长时间断开连接还能在服务器恢复正常情况下自动实现重连。

代码实现一:

消费者:

package main

import (
"fmt"
"github.com/ichunt2019/golang-rbmq-sl/utils/rabbitmq"
)

type RecvPro struct {

}

//// 实现消费者 消费消息失败 自动进入延时尝试 尝试3次之后入库db
/*
返回值 error 为nil 则表示该消息消费成功
否则消息会进入ttl延时队列 重复尝试消费3次
3次后消息如果还是失败 消息就执行失败 进入告警 FailAction
*/
func (t *RecvPro) Consumer(dataByte []byte) error {
//time.Sleep(500*time.Microsecond)
//return errors.New("顶顶顶顶")
fmt.Println(string(dataByte))
//time.Sleep(1*time.Second)
//return errors.New("顶顶顶顶")
return nil
}

//消息已经消费3次 失败了 请进行处理
/*
如果消息 消费3次后 仍然失败 此处可以根据情况 对消息进行告警提醒 或者 补偿 入库db 钉钉告警等等
*/
func (t *RecvPro) FailAction(err error,dataByte []byte) error {
fmt.Println(string(dataByte))
fmt.Println(err)
fmt.Println("任务处理失败了,我要进入db日志库了")
fmt.Println("任务处理失败了,发送钉钉消息通知主人")
return nil
}

func main() {
t := &RecvPro{}

//rabbitmq.Recv(rabbitmq.QueueExchange{  
//    "a\_test\_0001",  
//    "a\_test\_0001",  
//    "",  
//    "",  
//    "amqp://guest:guest@192.168.2.232:5672/",  
//},t,5)

/\*  
    runNums: 表示任务并发处理数量  一般建议 普通任务1-3    就可以了  
 \*/  
err := rabbitmq.Recv(rabbitmq.QueueExchange{  
    "a\_test\_0001",  
    "a\_test\_0001",  
    "hello\_go",  
    "direct",  
    "amqp://guest:guest@192.168.1.169:5672/",  
},t,4)

if(err != nil){  
    fmt.Println(err)  
}

}

rabbitmq代码

package rabbitmq

import (
"errors"
"strconv"
"time"

//"errors"  
"fmt"  
"github.com/streadway/amqp"  
"log"  

)

// 定义全局变量,指针类型
var mqConn *amqp.Connection
var mqChan *amqp.Channel

// 定义生产者接口
type Producer interface {
MsgContent() string
}

// 定义生产者接口
type RetryProducer interface {
MsgContent() string
}

// 定义接收者接口
type Receiver interface {
Consumer([]byte) error
FailAction(error , []byte) error
}

// 定义RabbitMQ对象
type RabbitMQ struct {
connection *amqp.Connection
Channel *amqp.Channel
dns string
QueueName string // 队列名称
RoutingKey string // key名称
ExchangeName string // 交换机名称
ExchangeType string // 交换机类型
producerList []Producer
retryProducerList []RetryProducer
receiverList []Receiver
}

// 定义队列交换机对象
type QueueExchange struct {
QuName string // 队列名称
RtKey string // key值
ExName string // 交换机名称
ExType string // 交换机类型
Dns string //链接地址
}

// 链接rabbitMQ
func (r *RabbitMQ)MqConnect() (err error){

mqConn, err = amqp.Dial(r.dns)  
r.connection = mqConn   // 赋值给RabbitMQ对象

if err != nil {  
    fmt.Printf("rbmq链接失败  :%s \\n", err)  
}

return  

}

// 关闭mq链接
func (r *RabbitMQ)CloseMqConnect() (err error){

err = r.connection.Close()  
if err != nil{  
    fmt.Printf("关闭mq链接失败  :%s \\n", err)  
}  
return  

}

// 链接rabbitMQ
func (r *RabbitMQ)MqOpenChannel() (err error){
mqConn := r.connection
r.Channel, err = mqConn.Channel()
//defer mqChan.Close()
if err != nil {
fmt.Printf("MQ打开管道失败:%s \n", err)
}
return err
}

// 链接rabbitMQ
func (r *RabbitMQ)CloseMqChannel() (err error){
r.Channel.Close()
if err != nil {
fmt.Printf("关闭mq链接失败 :%s \n", err)
}
return err
}

// 创建一个新的操作对象
func NewMq(q QueueExchange) RabbitMQ {
return RabbitMQ{
QueueName:q.QuName,
RoutingKey:q.RtKey,
ExchangeName: q.ExName,
ExchangeType: q.ExType,
dns:q.Dns,
}
}

func (mq *RabbitMQ) sendMsg (body string) (err error) {
err = mq.MqOpenChannel()
ch := mq.Channel
if err != nil{
log.Printf("Channel err :%s \n", err)
}

defer mq.Channel.Close()  
if mq.ExchangeName != "" {  
    if mq.ExchangeType == ""{  
        mq.ExchangeType = "direct"  
    }  
    err =  ch.ExchangeDeclare(mq.ExchangeName, mq.ExchangeType, true, false, false, false, nil)  
    if err != nil {  
        log.Printf("ExchangeDeclare err  :%s \\n", err)  
    }  
}

// 用于检查队列是否存在,已经存在不需要重复声明  
\_, err = ch.QueueDeclare(mq.QueueName, true, false, false, false, nil)  
if err != nil {  
    log.Printf("QueueDeclare err :%s \\n", err)  
}  
// 绑定任务  
if mq.RoutingKey != "" && mq.ExchangeName != "" {  
    err = ch.QueueBind(mq.QueueName, mq.RoutingKey, mq.ExchangeName, false, nil)  
    if err != nil {  
        log.Printf("QueueBind err :%s \\n", err)  
    }  
}

if mq.ExchangeName != "" && mq.RoutingKey != ""{  
    err = mq.Channel.Publish(  
        mq.ExchangeName,     // exchange  
        mq.RoutingKey, // routing key  
        false,  // mandatory  
        false,  // immediate  
        amqp.Publishing {  
            ContentType: "text/plain",  
            Body:        \[\]byte(body),  
        })  
}else{  
    err = mq.Channel.Publish(  
        "",     // exchange  
        mq.QueueName, // routing key  
        false,  // mandatory  
        false,  // immediate  
        amqp.Publishing {  
            ContentType: "text/plain",  
            Body:        \[\]byte(body),  
        })  
}  
return

}

/*
发送延时消息
*/
func (mq *RabbitMQ)sendDelayMsg(body string,ttl int64) (err error){
err =mq.MqOpenChannel()
ch := mq.Channel
if err != nil{
log.Printf("Channel err :%s \n", err)
}
defer mq.Channel.Close()

if mq.ExchangeName != "" {  
    if mq.ExchangeType == ""{  
        mq.ExchangeType = "direct"  
    }  
    err =  ch.ExchangeDeclare(mq.ExchangeName, mq.ExchangeType, true, false, false, false, nil)  
    if err != nil {  
        return  
    }  
}

if ttl <= 0{  
    return errors.New("发送延时消息,ttl参数是必须的")  
}

table := make(map\[string\]interface{},3)  
table\["x-dead-letter-routing-key"\] = mq.RoutingKey  
table\["x-dead-letter-exchange"\] = mq.ExchangeName  
table\["x-message-ttl"\] = ttl\*1000

//fmt.Printf("%+v",table)  
//fmt.Printf("%+v",mq)  
// 用于检查队列是否存在,已经存在不需要重复声明  
ttlstring := strconv.FormatInt(ttl,10)  
queueName := fmt.Sprintf("%s\_delay\_%s",mq.QueueName ,ttlstring)  
routingKey := fmt.Sprintf("%s\_delay\_%s",mq.QueueName ,ttlstring)  
\_, err = ch.QueueDeclare(queueName, true, false, false, false, table)  
if err != nil {  
    return  
}  
// 绑定任务  
if routingKey != "" && mq.ExchangeName != "" {  
    err = ch.QueueBind(queueName, routingKey, mq.ExchangeName, false, nil)  
    if err != nil {  
        return  
    }  
}

header := make(map\[string\]interface{},1)

header\["retry\_nums"\] = 0

var ttl\_exchange string  
var ttl\_routkey string

if(mq.ExchangeName != "" ){  
    ttl\_exchange = mq.ExchangeName  
}else{  
    ttl\_exchange = ""  
}

if mq.RoutingKey != "" && mq.ExchangeName != ""{  
    ttl\_routkey = routingKey  
}else{  
    ttl\_routkey = queueName  
}

err = mq.Channel.Publish(  
    ttl\_exchange,     // exchange  
    ttl\_routkey, // routing key  
    false,  // mandatory  
    false,  // immediate  
    amqp.Publishing {  
        ContentType: "text/plain",  
        Body:        \[\]byte(body),  
        Headers:header,  
    })  
if err != nil {  
    return

}  
return  

}

func (mq *RabbitMQ) sendRetryMsg (body string,retry_nums int32,args …string) {
err :=mq.MqOpenChannel()
ch := mq.Channel
if err != nil{
log.Printf("Channel err :%s \n", err)
}
defer mq.Channel.Close()

if mq.ExchangeName != "" {  
    if mq.ExchangeType == ""{  
        mq.ExchangeType = "direct"  
    }  
    err =  ch.ExchangeDeclare(mq.ExchangeName, mq.ExchangeType, true, false, false, false, nil)  
    if err != nil {  
        log.Printf("ExchangeDeclare err  :%s \\n", err)  
    }  
}

//原始路由key  
oldRoutingKey := args\[0\]  
//原始交换机名  
oldExchangeName := args\[1\]

table := make(map\[string\]interface{},3)  
table\["x-dead-letter-routing-key"\] = oldRoutingKey  
if oldExchangeName != "" {  
    table\["x-dead-letter-exchange"\] = oldExchangeName  
}else{  
    mq.ExchangeName = ""  
    table\["x-dead-letter-exchange"\] = ""  
}

table\["x-message-ttl"\] = int64(20000)

//fmt.Printf("%+v",table)  
//fmt.Printf("%+v",mq)  
// 用于检查队列是否存在,已经存在不需要重复声明  
\_, err = ch.QueueDeclare(mq.QueueName, true, false, false, false, table)  
if err != nil {  
    log.Printf("QueueDeclare err :%s \\n", err)  
}  
// 绑定任务  
if mq.RoutingKey != "" && mq.ExchangeName != "" {  
    err = ch.QueueBind(mq.QueueName, mq.RoutingKey, mq.ExchangeName, false, nil)  
    if err != nil {  
        log.Printf("QueueBind err :%s \\n", err)  
    }  
}

header := make(map\[string\]interface{},1)

header\["retry\_nums"\] = retry\_nums + int32(1)

var ttl\_exchange string  
var ttl\_routkey string

if(mq.ExchangeName != "" ){  
    ttl\_exchange = mq.ExchangeName  
}else{  
    ttl\_exchange = ""  
}

if mq.RoutingKey != "" && mq.ExchangeName != ""{  
    ttl\_routkey = mq.RoutingKey  
}else{  
    ttl\_routkey = mq.QueueName  
}

//fmt.Printf("ttl\_exchange:%s,ttl\_routkey:%s \\n",ttl\_exchange,ttl\_routkey)  
err = mq.Channel.Publish(  
    ttl\_exchange,     // exchange  
    ttl\_routkey, // routing key  
    false,  // mandatory  
    false,  // immediate  
    amqp.Publishing {  
        ContentType: "text/plain",  
        Body:        \[\]byte(body),  
        Headers:header,  
    })  
if err != nil {  
    fmt.Printf("MQ任务发送失败:%s \\n", err)

}

}

// 监听接收者接收任务 消费者
func (mq *RabbitMQ) ListenReceiver(receiver Receiver) {
err :=mq.MqOpenChannel()
ch := mq.Channel
if err != nil{
log.Printf("Channel err :%s \n", err)
}
defer mq.Channel.Close()
if mq.ExchangeName != "" {
if mq.ExchangeType == ""{
mq.ExchangeType = "direct"
}
err = ch.ExchangeDeclare(mq.ExchangeName, mq.ExchangeType, true, false, false, false, nil)
if err != nil {
log.Printf("ExchangeDeclare err :%s \n", err)
}
}

// 用于检查队列是否存在,已经存在不需要重复声明  
\_, err = ch.QueueDeclare(mq.QueueName, true, false, false, false, nil)  
if err != nil {  
    log.Printf("QueueDeclare err :%s \\n", err)  
}  
// 绑定任务  
if mq.RoutingKey != "" && mq.ExchangeName != "" {  
    err = ch.QueueBind(mq.QueueName, mq.RoutingKey, mq.ExchangeName, false, nil)  
    if err != nil {  
        log.Printf("QueueBind err :%s \\n", err)  
    }  
}  
// 获取消费通道,确保rabbitMQ一个一个发送消息  
err =  ch.Qos(1, 0, false)  
msgList, err :=  ch.Consume(mq.QueueName, "", false, false, false, false, nil)  
if err != nil {  
    log.Printf("Consume err :%s \\n", err)  
}  
for msg := range msgList {  
    retry\_nums,ok := msg.Headers\["retry\_nums"\].(int32)  
    if(!ok){  
        retry\_nums = int32(0)  
    }  
    // 处理数据  
    err := receiver.Consumer(msg.Body)  
    if err!=nil {  
        //消息处理失败 进入延时尝试机制  
        if retry\_nums < 3{  
            fmt.Println(string(msg.Body))  
            fmt.Printf("消息处理失败 消息开始进入尝试  ttl延时队列 \\n")  
            retry\_msg(msg.Body,retry\_nums,QueueExchange{  
                    mq.QueueName,  
                    mq.RoutingKey,  
                    mq.ExchangeName,  
                    mq.ExchangeType,  
                    mq.dns,  
                })  
        }else{  
            //消息失败 入库db  
            fmt.Printf("消息处理3次后还是失败了 入库db 钉钉告警 \\n")  
            receiver.FailAction(err,msg.Body)  
        }  
        err = msg.Ack(true)  
        if err != nil {  
            fmt.Printf("确认消息未完成异常:%s \\n", err)  
        }  
    }else {  
        // 确认消息,必须为false  
        err = msg.Ack(true)

        if err != nil {  
            fmt.Printf("消息消费ack失败 err :%s \\n", err)  
        }  
    }

}  

}

//消息处理失败之后 延时尝试
func retry_msg(msg []byte,retry_nums int32,queueExchange QueueExchange){
//原始队列名称 交换机名称
oldQName := queueExchange.QuName
oldExchangeName := queueExchange.ExName
oldRoutingKey := queueExchange.RtKey
if oldRoutingKey == "" || oldExchangeName == ""{
oldRoutingKey = oldQName
}

if queueExchange.QuName != "" {  
    queueExchange.QuName = queueExchange.QuName + "\_retry\_3";  
}

if queueExchange.RtKey != "" {  
    queueExchange.RtKey = queueExchange.RtKey + "\_retry\_3";  
}else{  
    queueExchange.RtKey = queueExchange.QuName + "\_retry\_3";  
}

//fmt.Printf("%+v",queueExchange)

mq := NewMq(queueExchange)  
\_ = mq.MqConnect()

defer func(){  
    \_ = mq.CloseMqConnect()  
}()  
//fmt.Printf("%+v",queueExchange)  
mq.sendRetryMsg(string(msg),retry\_nums,oldRoutingKey,oldExchangeName)

}

func Send(queueExchange QueueExchange,msg string) (err error){
mq := NewMq(queueExchange)
err = mq.MqConnect()
if err != nil{
return
}

defer func(){  
    mq.CloseMqConnect()  
}()

err = mq.sendMsg(msg)

return  

}

//发送延时消息
func SendDelay(queueExchange QueueExchange,msg string,ttl int64)(err error){
mq := NewMq(queueExchange)
err = mq.MqConnect()
if err != nil{
return
}
defer func(){
_ = mq.CloseMqConnect()
}()
err = mq.sendDelayMsg(msg,ttl)
return
}

/*
runNums 开启并发执行任务数量
*/
func Recv(queueExchange QueueExchange,receiver Receiver,runNums int) (err error){
mq := NewMq(queueExchange)
//链接rabbitMQ
err = mq.MqConnect()
if(err != nil){
return
}
//rbmq断开链接后 协程退出释放信号
taskQuit:= make(chan struct{}, 1)
//尝试链接rbmq
tryToLinkC := make(chan struct{}, 1)
//开始执行任务
for i:=1;i<=runNums;i++{
go Recv2(mq,receiver,taskQuit);
}

//如果rbmq断开连接后 尝试重新建立链接  
var tryToLink = func() {  
    for {  
        err = mq.MqConnect()  
        if(err == nil){  
            tryToLinkC <- struct{}{}  
            break  
        }  
        time.Sleep(time.Second \* 10)  
    }  
}  
for{  
    select {  
    case <- taskQuit ://rbmq断开连接后 开始尝试重新建立链接  
         go tryToLink()  
        <-tryToLinkC //建立链接成功后 重新开启协程执行任务  
        fmt.Println("重新开启新的协程执行任务")  
        go Recv2(mq,receiver,taskQuit);  
    }  
    time.Sleep(time.Millisecond\*100)  
}  

}

func Recv2(mq RabbitMQ,receiver Receiver,taskQuit chan<- struct{}){
defer func() {
fmt.Println("rbmq链接失败,协程任务退出~~~~")
taskQuit <- struct{}{}
return
}()
// 验证链接是否正常
err := mq.MqOpenChannel()
if(err != nil){
return
}
mq.ListenReceiver(receiver)
}

type retryPro struct {
msgContent string
}

实现重连方式很多,下面实现方式比较简单

  1. Recv方法创建ampq链接

  2. 启动协程开始执行任务

      3,协程中捕获异常发送消息到 taskQuit <- struct{}{}

      4,主进程监听taskQuit管道 开始尝试重新链接amqp  直到链接成功

      5,重新链接成功后启动新的协程处理任务

主要代码分析:

/*
runNums 开启并发执行任务数量
*/
func Recv(queueExchange QueueExchange,receiver Receiver,runNums int) (err error){
mq := NewMq(queueExchange)
//链接rabbitMQ
err = mq.MqConnect()
if(err != nil){
return
}
//rbmq断开链接后 协程退出释放信号
taskQuit:= make(chan struct{}, 1)
//尝试链接rbmq
tryToLinkC := make(chan struct{}, 1)
//开始执行任务
for i:=1;i<=runNums;i++{
go Recv2(mq,receiver,taskQuit);
}

//如果rbmq断开连接后 尝试重新建立链接  
var tryToLink = func() {  
    for {  
        err = mq.MqConnect()  
        if(err == nil){  
            tryToLinkC <- struct{}{}  
            break  
        }  
        time.Sleep(time.Second \* 10)  
    }  
}  
for{  
    select {  
    case <- taskQuit ://rbmq断开连接后 开始尝试重新建立链接  
         go tryToLink()  
        <-tryToLinkC //建立链接成功后 重新开启协程执行任务  
        fmt.Println("重新开启新的协程执行任务")  
        go Recv2(mq,receiver,taskQuit);  
    }  
    time.Sleep(time.Millisecond\*100)  
}  

}

func Recv2(mq RabbitMQ,receiver Receiver,taskQuit chan<- struct{}){
defer func() {
fmt.Println("rbmq链接失败,协程任务退出~~~~")
taskQuit <- struct{}{}
return
}()
// 验证链接是否正常
err := mq.MqOpenChannel()
if(err != nil){
return
}
mq.ListenReceiver(receiver)
}

手机扫一扫

移动阅读更方便

阿里云服务器
腾讯云服务器
七牛云服务器