使用RabbitMQ延时队列实现定时任务

"奇淫技巧"

Posted by Mzx on June 3, 2017

本文使用简单案例介绍了如何使用RabbitMQ实现一个轻量级的可自定义的定时任务模块。适合于定时任务不多,对定时任务功能要求比较单一的场景。

物料清单

RabbitMQ, NodeJS(8.x), 代码猴

step 1 准备工作

在MQ上分别新建名为”demo“的vhost,一个exchange和两个queue,配置如下:

  1. deadLetterQueue 要绑定到 deadLetterExchange, routing-key 为:“deadLetterExchange”
  2. 具体配置:
  exchange: {
    deadLetterExchange: {
      name: 'deadLetterExchange',
      conf: {type: 'direct'}
    }
  },
  queue: {
    deadLetterQueue: {
      name: 'deadLetterQueue',
      conf: {durable: true}
    },
    delayQueue: {
      name: 'delayQueue',
      conf: {
      	durable: true, deadLetterExchange: 'deadLetterExchange',
       deadLetterRoutingKey: 'deadLetterQueue'
       }
    }
  }

step 2 RabbitMQ死信(DLX)队列介绍

DLX中一旦队列头部消息达到死信条件,该消息便被push到指定队列。有三种条件会将消息置位死信。

  • 消息被拒绝
  • 消息TTL过期
  • 队列达到最大长度

其中消息TTL过期分两种情况。一种是针对队列设置的TTL过期。另一种是针对单条消息的设置的TTL过期。如果同时设置了针对队列的TTL和针对单条消息的TTL。那么过期时间以小的TTL为准。有一点需要注意的是,队列只有在消息达到头部时,才会判断该消息是否符合死信条件。如果头部消息未达到条件,这时,即使其后面的消息达到条件也不会被处理。

这里,我们需要自定义的定时任务。所以需要的是只针对单条消息设置TTL

Step 3 流程

流程解释:

  1. 当需要新添加任务时,首先取出DLX中所有未过期的消息
  2. 将新的任务加入到取出的消息列表中
  3. 对消息列表按照超时时间的大小排序,小的在前,大的在后
  4. 安装顺序重新插入到DLX

此时任务按照过期时间的远近已经顺序排列带DLX中。待DLX将头部的消息pushdeadLetterExchange时,说明达到了该任务的执行时间。一个可以自定义执行时间的定时任务就此完成。

注意:

  1. 所有加入到mq的消息都应该包含一个expiredIn的值,用于标识该消息的执行时间,针对消息的排序也是基于这个值。
  2. 在从DLX取出所有msg,排序,再重新插入时,要从新计算每一条msgexpiration

Step 4 Demo


'use strict'
/**
 * 
 */
const amqplib = require('amqplib')
const config = {
  user: 'user',
  password: 'password',
  host: '192.168.1.1',
  vhost: 'demo',
  exchange: {
    deadLetterExchange: {
      name: 'deadLetterExchange',
      conf: {type: 'direct'}
    }
  },
  queue: {
    deadLetterQueue: {
      name: 'deadLetterQueue',
      conf: {durable: true}
    },
    delayQueue: {
      name: 'delayQueue',
      conf: {durable: true, deadLetterExchange: 'deadLetterExchange', deadLetterRoutingKey: 'deadLetterQueue'}
    }
  }
}

let connection

/**
 * mq 的消息为buffer。这里将push的消息转为buffer
 */
function toBuffer(msg) {
  if (typeof msg === 'object')
    return new Buffer(JSON.stringify(msg))

  return new Buffer(msg + "")
}

function sort(list) {
  list.sort(function (a, b) {
    return a.expiredIn > b.expiredIn
  })
}


async function init() {
  connection = await amqplib.connect(`amqp://${config.user}:${config.password}@${config.host}/${config.vhost}`)
}

/**
 * 发送消息
 * @param msg e.g. {title: title, content: content, expiration: expiration, expiredIn: expiredIn}
 * @param expiration
 * @returns {Promise.<void>}
 */
async function send(msg) {
  let name = config.queue.delayQueue.name
  let conf = config.queue.delayQueue.conf
  let channel
  try {
    channel = await connection.createChannel()
    await channel.assertQueue(name, conf)
    let now = Date.now()
    let expiration = now < msg.expiredIn ? (msg.expiredIn - now) : 1 //重新计算消息过期时间,最小值为 1ms
    await channel.sendToQueue(name, toBuffer(msg), {expiration: `${expiration}`})
  } catch (e) {
    console.log(e)
  } finally {
    channel.close()
  }
}

/**
 * 批量获取消息
 * @param config
 * @returns {Promise.<*>}
 */
async function getMsgs(config) {
  let name = config.name
  let conf = config.conf
  let list = [], channel
  try {
    channel = await connection.createChannel()
    await channel.assertQueue(name, conf)
    let msg = true
    list = []
    while (msg) {
      msg = await channel.get(name, {noAck: true})
      msg && (list.push(msg))
    }
  } catch (e) {
    console.log(e)
  } finally {
    channel.close()
  }
  return list
}

async function ackAll(config, list) {
  let name = config.name
  let conf = config.conf
  let channel
  try {
    channel = await connection.createChannel()
    await channel.assertQueue(name, conf)
    channel.ackAll(list)
  } catch (e) {
    console.log(e)
  } finally {
    channel.close()
  }
}

/**
 * 接收消息
 * @param config
 * @returns {Promise.<void>}
 */
async function receive(config) {
  let name = config.name
  let conf = config.conf
  try {
    let channel = await connection.createChannel()
    await channel.assertQueue(name, conf)
    console.log(`success to connect ${name}`)
    await channel.consume(name, function (msg) {
      let body = msg.content.toString()
      console.log(body)
      channel.ack(msg)
    })
  } catch (e) {
    console.log(e)
  }
}

/**
 * 发送消息
 * @returns {Promise.<void>}
 */
async function producter() {
  await init()
  let newTask = {title: "task_5", expiration: 70000, expiredIn: Date.now() + 70000} //new task
  let list = await getMsgs(config.queue.delayQueue)
  let tasks = []
  for (let current of list) {
    tasks.push(JSON.parse(current.content.toString()))
  }
  tasks.push(newTask)
  sort(tasks)
  for (let task of tasks) {
    await send(task)
  }
  await ackAll(config.queue.delayQueue, list)
}

/**
 * 消费消息
 * @returns {Promise.<void>}
 */
async function consumer() {
  await init()
  await receive(config.queue.deadLetterQueue)
}


async function test() {
  await init()
  let tasks = [
    {title: "task_1", expiration: 60000, expiredIn: Date.now() + 60000},
    {title: "task_2", expiration: 10000, expiredIn: Date.now() + 10000},
    {title: "task_3", expiration: 30000, expiredIn: Date.now() + 30000},
    {title: "task_4", expiration: 20000, expiredIn: Date.now() + 20000}
  ]
  for (let task of tasks) {
    await send(task)
  }
}

// consumer()  //step 1
// test()  //step 2

producter()  //step 3


Result

按照顺序执行三个方法。得到结果如下:


$ node send.js
success to connect deadLetterQueue
{"title":"task_2","expiration":10000,"expiredIn":1512978191470}
{"title":"task_4","expiration":20000,"expiredIn":1512978201470}
{"title":"task_3","expiration":30000,"expiredIn":1512978211470}
{"title":"task_1","expiration":60000,"expiredIn":1512978241470}
{"title":"task_5","expiration":70000,"expiredIn":1512978255290}