rabbitmq客户端自动重连

栏目: Redis · 发布时间: 5年前

内容简介:开始找解决方案:上述的解决方案是在建立连接之后对连接添加或者连接出错以及之后的步骤出错都在10s之后重新调用方法本身实现重连
编程rookie, 如有错误请指出 ☞: 253065903@qq.com

RabbitMQ Node.js 客户端( AMQP 0-9-1 V0.5.2 )自动重连

重启策略

开始找解决方案:

  1. 通过查看AMQP的源码,发现没有reconnect的选项

  2. 然后上GitHub上看有没有人提出类似的问题 github repo ,通过输入 reconnect 搜索issue区找到题为 Support Auto-reconnectionissue ,发现这个问题是早在 2013 年就提出来的,可是现在还是 open

    的状态!

  3. 在这个Issue区发现有一个解决方案还是可以实践一下的:

    function connectRMQ() {
      amqp.connect(config.rabbitmq.URI).then(function(conn) {
        conn.on('close', function() {
          console.error('Lost connection to RMQ.  Reconnecting in 60 seconds...');
          return setTimeout(connectRMQ, 60 * 1000);
        });
        return conn.createChannel().then(function(ch) {
            var ok = ch.assertQueue(config.rabbitmq.queue, {durable: true});
            ok = ok.then(function() {
                ch.prefetch(1);
                ch.consume(config.rabbitmq.queue, doWork, {noAck: false});
            });
            return ok.then(function() {
                console.log(" [*] Waiting in %s.", config.rabbitmq.queue);
            });
    
            function doWork(msg) {
                var body = msg.content.toString();
                console.log(" [x] Received %s", body);
                setTimeout(function() {
                    ch.ack(msg);
                }, config.rabbitmq.timeout);
            }
        });
      }).then(null, function() {
         setTimeout(connectRMQ, 10 * 1000);
         return console.log('connection failed');
      });
    }
    
    connectRMQ();
    

上述的解决方案是在建立连接之后对连接添加 close 的监听事件,当 close 事件触发,

或者连接出错以及之后的步骤出错都在10s之后重新调用方法本身实现重连

  1. 还有一种简单粗暴的方法,监听 closeerror 事件, 有错误就抛出来,然后依靠外部的守护程序将此客户端重启

实践

按照Issue区发现的解决方案进行实践,修改之前的代码

#!/usr/bin/env node

const MQ_CONFIG = require('./conf/rabbitmq')
const REDIS_CONFIG = require('./conf/redis')
const Utils = require('./lib/Utils')
const pubRedisCli = Utils.connectRedis(REDIS_CONFIG.url)
var amqp = require('amqplib')
var ex = MQ_CONFIG.ex
var patten = MQ_CONFIG.routing_key
var exType = MQ_CONFIG.ex_type
var q = MQ_CONFIG.q || 'signals'
var cnt = 0

function connect() {
  amqp
    .connect(MQ_CONFIG.url)
    .then(conn => {
      conn.on('close', e => {
        reconnect(e)
      })
      return conn
    })
    .then(conn => {
      cnt = 0
      log('connect RMQ OK')
      console.log(' [*] Waiting for signals. To exit press CTRL+C')
      return conn.createChannel()
    })
    .then(ch => {
      return ch.assertQueue(q, {
        durable: true
      })
        .then(() => {
          return ch.assertExchange(ex, exType, {
            durable: true
          })
        })
        .then(() => {
          return ch.assertQueue(q, {
            durable: true
          })
        })
        .then(() => {
          return ch.bindQueue(q, ex, patten)
        })
        .then(() => {
          return ch.consume(q, (msg) => {
            pubRedisCli.publish(msg.fields.routingKey, msg.content.toString(), function (err, reply) {
              if (err) {
                log(err)
                ch.nack(msg)
              } else {
                if (reply !== 0) {
                  ch.ack(msg)
                } else {
                  ch.ack(msg)
                  saveUnSubscribeMsg(msg.content.toString())
                }
              }
            })
          })
        })
    })
    .catch(e => {
      reconnect(e)
    })
}

function reconnect(e) {
    log(e.message)
    log('lost RMQ connection')
    cnt++
    log(`正在第${cnt}次重新连接RMQ...`)
    setTimeout(() => {
        connect()
    }, 10 * 1000)
}
connect()
/**
 * if signals didn't be subscribed, they would be saved to ./data dir
 * @param {string} msg
 */
function saveUnSubscribeMsg(msg) {
  let date = new Date().toLocaleDateString()
  const fs = require('fs')
  const dir = './data'
  if (!fs.existsSync(dir)) {
    fs.mkdirSync(dir)
  }
  let path = `${dir}/${date}.txt`
  let isExist = fs.existsSync(path)
  if (isExist) {
    fs.appendFileSync(path, msg)
  } else {
    fs.writeFileSync(path, msg)
  }
}

function log(...args) {
  console.log(...args, new Date().toLocaleString())
}

然后进行测试:

通过对MQserver的重启,均正常,然后将MQserver的机器的网断掉测试,发现了close事件并没有监听到,而是报了heartbeat超时的错误,从而程序直接退出了,于是又在代码中加入对error事件的监听:

amqp
  .connect(MQ_CONFIG.url)
  .then(conn => {
    conn.on('error', e => {
      reconnect(e)
    })
    conn.on('close', e => {
      reconnect(e)
    })
    return conn
  })

这下应该不会导致程序退出了吧,然而又引入了新的问题,当重启MQserver时,报了 ECONNECTRET 的错误,两个监听事件都监听到了,所以程序重连了两次,导致一个项目在MQserver上建立了两个连接,当再一次重启MQserver时,建立了四个连接!

这是很严重的错误,然而并不是所有时候两个监听事件都能监听到,比如 heartbeat 超时就只报 error 的错误,所有需要想出一个策略,让程序始终与MQserver之间只有一个连接。

采用声明一个变量,记录是不是正在连接

var isConnecting = false

如果已经在连接了,其他的重连都不做处理

function reconnect(e) {
  if (!isConnecting) {
    isConnecting = true
    log(e.message)
    log('lost RMQ connection')
    cnt++
    log(`正在第${cnt}次重新连接RMQ...`)
    setTimeout(() => {
      connect()
    }, 10 * 1000)
  }
}

连接上时将重连的标志设为 false

.then(conn => {
  cnt = 0
  log('connect RMQ OK')
  isConnecting = false

于是乎,完整代码如下:

#!/usr/bin/env node

const MQ_CONFIG = require('./conf/rabbitmq')
const REDIS_CONFIG = require('./conf/redis')
const Utils = require('./lib/Utils')
const pubRedisCli = Utils.connectRedis(REDIS_CONFIG.url)
var amqp = require('amqplib')
var ex = MQ_CONFIG.ex
var patten = MQ_CONFIG.routing_key
var exType = MQ_CONFIG.ex_type
var q = MQ_CONFIG.q || 'signals'
var cnt = 0
var isConnecting = false

function connect() {
  amqp
    .connect(MQ_CONFIG.url)
    .then(conn => {
      conn.on('error', (e) => {
        reconnect(e)
      })
      conn.on('close', e => {
        reconnect(e)
      })
      return conn
    })
    .then(conn => {
      cnt = 0
      log('connect RMQ OK')
      isConnecting = false
      console.log(' [*] Waiting for signals. To exit press CTRL+C')
      return conn.createChannel()
    })
    .then(ch => {
      return ch.assertQueue(q, {
        durable: true
      })
        .then(() => {
          return ch.assertExchange(ex, exType, {
            durable: true
          })
        })
        .then(() => {
          return ch.assertQueue(q, {
            durable: true
          })
        })
        .then(() => {
          return ch.bindQueue(q, ex, patten)
        })
        .then(() => {
          return ch.consume(q, (msg) => {
            pubRedisCli.publish(msg.fields.routingKey, msg.content.toString(), function (err, reply) {
              if (err) {
                log(err)
                ch.nack(msg)
              } else {
                if (reply !== 0) {
                  ch.ack(msg)
                } else {
                  ch.ack(msg)
                  saveUnSubscribeMsg(msg.content.toString())
                }
              }
            })
          })
        })
    })
    .catch(e => {
      isConnecting = false
      reconnect(e)
    })
}

function reconnect(e) {
  if (!isConnecting) {
    isConnecting = true
    log(e.message)
    log('lost RMQ connection')
    cnt++
    log(`正在第${cnt}次重新连接RMQ...`)
    setTimeout(() => {
      connect()
    }, 10 * 1000)
  }
}
connect()
/**
 * if signals didn't be subscribed, they would be saved to ./data dir
 * @param {string} msg
 */
function saveUnSubscribeMsg(msg) {
  let date = new Date().toLocaleDateString()
  const fs = require('fs')
  const dir = './data'
  if (!fs.existsSync(dir)) {
    fs.mkdirSync(dir)
  }
  let path = `${dir}/${date}.txt`
  let isExist = fs.existsSync(path)
  if (isExist) {
    fs.appendFileSync(path, msg)
  } else {
    fs.writeFileSync(path, msg)
  }
}

function log(...args) {
  console.log(...args, new Date().toLocaleString())
}

以上就是本文的全部内容,希望对大家的学习有所帮助,也希望大家多多支持 码农网

查看所有标签

猜你喜欢:

本站部分资源来源于网络,本站转载出于传递更多信息之目的,版权归原作者或者来源机构所有,如转载稿涉及版权问题,请联系我们

React开发实战

React开发实战

[美] Cássio de Sousa Antonio / 杜伟、柴晓伟、涂曙光 / 清华大学出版社 / 2017-3-1 / 58.00 元

介绍如何成功构建日益复杂的前端应用程序与接口,深入分析 React库,并详述React生态系统中的其他工具与库,从而指导你创建完整的复杂应用程序。 你将全面学习React的用法以及React生态系统中的其他工具和库(如React Router和Flux 架构),并了解采用组合方式创建接口的佳实践。本书简明扼要地讲解每个主题,并呈现助你高效完成工作的细节。书中严谨深刻地讲述React中重要的功......一起来看看 《React开发实战》 这本书的介绍吧!

CSS 压缩/解压工具
CSS 压缩/解压工具

在线压缩/解压 CSS 代码

HTML 编码/解码
HTML 编码/解码

HTML 编码/解码