我开发了一个基于 Beancount 的账本托管服务 HostedBeans,欢迎大家来了解纯文本复式记账并试用我的服务。
查看源代码

设计基于 Redis 的定时任务系统(ZSET + Scripting)

这篇文章会带领大家实现一个基于 Redis 的分布式高可用定时任务系统,其中的 worker 以 Node.js 为例,但其实可以使用任何语言来实现。这篇文章不会给出完整的代码,更侧重于探索的过程。

高可用设计

首先我们希望让 worker 是无状态的,这样会大幅减少对于 worker 的高可用需求,也不涉及到 worker 之间的数据同步或者选举。我们将所有的状态集中到 Redis 上,Redis 可以用 Master-Slave + Sentinel 的方式达到一个相对较高的可用性。

为什么不使用选举 Master 的方式?

有一种很常见的做法是在单个实例上完成所有的工作,这样甚至连 Redis 都不需要了。但为了保证高可用,往往会同时启动多个实例,然后引入一个「选举」的过程来决定谁是那个完成所有工作的人(称为 master),在 master 失效后,则需要重新进行选举,选出另外一个 master。

简单来说我觉得这种做法不够「分布式」,只有一个 master 在工作,其他实例只是热备而已,同时选举和对 master 失效的监测也是一个非常麻烦的事情。而在我们的方案中所有的 worker 都是平等的,都在执行任务,可以在任意时候创建或移除 worker。

核心循环

对于一个定时任务系统来说,核心的工作是给定时任务们按时间排序,然后找到并等待下一个需要触发的任务。Redis 刚好为我们提供了 ZSET 这种支持排序的集合类型,ZSET 中存储着若干个互不相同的 member(字符串或二进制数据),每个 member 有一个相关联的 score(数字),整个 ZSET 会按照 score 排序,基于这样的数据结构提供了若干操作。

我们将定时任务的 ID 作为 member 放在一个叫 cronjobs 的 ZSET 里,使用下次触发时间作为 score 来排序,这样便得到了一个以下次触发时间排序的列表。

我们可以这样向系统中添加任务:

redisClient.zadd('cronjobs', nextTriggerAt(cronjobId), cronjobId)

ZRANGE cronjobs 0 -1 WITHSCORES 可以看到已添加的任务:

127.0.0.1:6379> ZRANGE cronjobs 0 -1 WITHSCORES
1) "2"
2) "1565453967832"
3) "1"
4) "1565453998742"

然后我们可以在 worker 中写一个无限循环,不断地检查这个 ZSET 中 score 最小(触发时间最早)的任务是否已经超过了当前时间,如果是的话就执行这个任务,并修改 ZSET 中的 score 为下次触发时间,Node.js 的代码如下:

while (true) {
  const [cronjobId, triggerAt] = await redisClient.zrange('cronjobs', 0, 0, 'WITHSCORES')

  if (parseInt(triggerAt) < Date.now()) {
    // ZADD CH 会返回被修改 member 数量,只有当成功修改了 score 我们才会继续执行,否则说明这个任务已经被其他的 worker 执行了
    if (await redisClient.zadd('cronjobs', 'CH', nextTriggerAt(cronjobId), cronjobId)) {
      // 异步地运行任务,避免「阻塞」核心循环
      runJob(cronjobId)
    }
  } else {
    // 等待下一个任务触发,如果距离下一个任务的触发少于 10 秒,则等待下一个任务执行,否则等待 10 秒后重试。
    await bluebird.delay(triggerAt ? Math.min(parseInt(triggerAt) - Date.now(), 10000) : 10000)
  }
}

上面的循环构成了这个定时任务系统最核心的部分,后面我们会逐渐地完善他。

为什么不用 Keyspace Notifications?

在社区中有很多文章推荐简单地使用 Keyspace Notifications 来实现「定时」,即为一个 key 设置一个过期时间,然后订阅这个 key 过期的事件。但这种方式主要的问题是 Redis 的 Pub/Sub 并不保证送达,如果刚好在这个 key 过期时 worker 不在线,那么这一次触发就不会生效;如果刚好有多个 worker 在线,那么这一次触发的任务也可能被执行多次。

而我们选择的基于 ZSET 的方式,需要 worker 主动修改 ZSET 中的下次触发时间,即使 worker 暂时不可用,在恢复时也会继续执行之前剩余的任务。

这样 Redis 就变成了系统的单点?

是这样的,这个系统中几乎全部的状态都存储于 Redis 上,可以说是系统中的单点。但相比于 worker,Redis(或其他的数据库)是一个更稳定、更标准化的组件。你可以用官方的 Master-Slave + Sentinel 方案来达到一个相对较高的可用性,你也可以使用由云服务厂商提供的托管 Redis 产品,避免自己来维护它。

继续完善

CRON 表达式

前面的代码中我们并没有实现 nextTriggerAt,你可以用 cron-parser 这样的库去解析 CRON 表达式,计算下次触发时间:

const cronParser = require('cron-parser')

async function nextTriggerAt(cronjobId) {
  // 从 Redis 或其他数据库中根据 cronjobId 拉取定时任务的详情
  const cronjobInfo = await getCronjobInfo(cronjobId)
  return cronParser.parseExpression(cronjobInfo.cron).next().getTime()
}

中断任务处理

如果一个 Worker 意外退出,那么当时正在被它处理的所有任务都会永久性地丢失。为了避免这种情况,我们将正在执行的任务也存储到 Redis 中(一个叫 running 的 ZSET):

if (await redisClient.zadd('cronjobs', 'CH', nextTriggerAt(cronjobId), cronjobId)) {
    // 为每个任务生成一个随机的 uuid 以便能单独地追踪每个任务,例如打印到日志中
  await redisClient.zadd('running', Date.now() + 60000, `${cronjobId}:${uuid.v4()}`)
  runJob(cronjobId).finally( () => {
    // 在一个任务被完成时,我们还需要将它从 running 集合中取出
    redisClient.zrem('running', uniqueId)
  })
 }

如果你有一些为多实例应用编写代码的经验,那么可能会注意到这里存在一个竞态条件:对 cronjobs 和 running 的操作并不是原子的,可能会出现对 cronjobs 的操作成功了,随即 worker 意外退出,没有来得及写入 running 的情况。

因为这里我们需要对 ZADD 的返回值做判断,所以不能简单地使用 Redis 的 Pipeline 功能,而是要用到 Lua Script:

redisClient.defineCommand('startJob', {
  lua: `
    local cronjobId = ARGV[1]
    local jobName = ARGV[2]
    local nextTriggerAt = tonumber(ARGV[3])
    local timeoutAt = tonumber(ARGV[4])

    local changed = redis.call('ZADD', 'cronjobs', 'CH', nextTriggerAt, cronjobId)

    if changed ~= 0 then
      redis.call('ZADD', 'running', timeoutAt, jobName)
    end

    return changed
  `
})

经过修改后的核心循环:

const jobName = `${cronjobId}:${uuid.v4()}`

if (await redisClient.startJob(cronjobId, jobName, nextTriggerAt(cronjobId), Date.now() + 60000)) {
  runJob(cronjobId).finally( () => {
    redisClient.zrem('running', uniqueId)
  })
}

然后我们便可以添加另外一个循环,从 running 中拉取已经超时的任务进行重试或其他处理,这里不再给出具体的代码。

Lua Script

Lua Script 是 Redis 提供的一种类似事务能力,Redis 保证每个 Lua Script 都是串行执行的,中途不会有其他指令被执行,这提供了一种非常强的一致性保证。在实际的开发中,我们可以将需要一致性保证的逻辑写成 Lua Script。

平滑关闭

我们不可避免地会对 worker 进程进行新版本的部署或其他维护,因此我们需要一种平滑的方式来关闭 worker 进程,让它继续执行已经收到的任务,但不去接受新的任务,在执行完当前的任务之后,主动退出。

在 Unix 中最正统的方式是实现自定义的 SIGINT 处理器来实现这个功能,即由终端模拟器、进程管理器或容器平台向程序发送 SIGINT 信号,程序即开始进行退出前的清理工作,然后待清理工作结束后,程序主动退出。当然进程管理器也有可能等不及,再发送一个强制结束的 SIGKILL。

所以我们需要将所有正在执行的任务注册到一个全局的 Promise 数组中,然后在受到 SIGINT 时停止接受新任务,并等待所有正在执行的任务完成后主动退出:

let runningJobs = []
let shuttingDown = false

process.on('SIGTERM', () => {
  shuttingDown = true

  // 等待 runningJobs 中所有的任务完成,无论成功还是失败
  Promise.all(runningJobs.map( p => p.catch(() => {}) )).then( () => {
    process.exit(0)
  })
})

修改核心循环,在开始任务时将 runJob 返回的 Promise 存入 runningJobs,然后在任务执行完时取出:

while (true) {
  if (shuttingDown) {
    break
  }

  // ...

  if (await redisClient.zadd('cronjobs', 'CH', nextTriggerAt(cronjobId), cronjobId)) {
    // 异步地运行任务,避免「阻塞」核心循环
    const jobPromise = runJob(cronjobId)

    jobPromise.finally( () => {
      _.pull(runningJobs, jobPromise)
    ))

    runningJobs.push(jobPromise)
  }

  // ...
}

容量的横向拓展

目前这个定时任务系统中的 worker 是可以无限拓展的,但 Redis 却是整个系统中的瓶颈,每个 worker 都需要从 Redis 获取任务来执行。按照我们对于 Redis 通常 70k QPS 的估计,按每个任务需要执行 5 个命令计算,整个系统可以支持每秒 14k 次任务触发,对于绝大部分的场景其实完全够用了。

如果要继续拓展的话,我的建议是根据业务上的一些区分(例如用户、任务类型)将队列分散到不同的 Key 上面(例如 userA:cronjobsuserB:cornjobs),这样便可以利用 Redis Cluster 的分片功能来进行扩展了。

小结

我们用 ZSET 将定时任务按照触发时间排序,然后使用一个无限循环来拉取需要触发的任务,实现了一个分布式定时任务系统的核心部分,读者可以在此基础上根据自己的需要做进一步扩展。

本文甚至没有给出完整的代码,因此并不能直接地复制到你的项目中使用,更多地在于提出和讨论一种解决方案。社区中也有一些类似的开源组件可供选用,例如 Bull 是一个功能完整的任务队列,其中包括了定时任务功能,Bull 使用了和本文类似的 ZSET + Scripting 技术,使用 Redis 作为后端。

撰写评论

如希望撰写评论,请发邮件至 jysperm@gmail.com 并注明文章标题,我会挑选对读者有价值的评论附加到文章末尾。

精子生于 1995 年,英文 ID jysperm.

订阅推送

通过 Telegram Channel 订阅我的博客日志、产品和项目的动态:

王子亭的博客 @ Telegram


通过邮件订阅订阅我的博客日志、产品和项目的动态(历史邮件):

该博客使用基于  Hexo  的  simpleblock  主题。博客内容使用  CC BY-NC-ND  授权发布。最后生成于 2024-04-08.