发帖队列管理
限流平台的发帖排队工
Oban是Elixir生态中基于PostgreSQL的后台作业处理库,提供可靠的异步任务执行、队列管理、定时调度和错误恢复能力。该技能覆盖Worker设计、队列配置、重试策略、唯一性约束及Cron调度等核心功能。
Elixir应用需处理邮件发送、数据同步等异步任务时,常面临任务丢失、重复执行或队列拥塞风险。该方案基于PostgreSQL实现持久化队列,提供可靠的任务调度、失败重试及唯一性控制,保障核心业务后台流程的稳定运行。
落地案例:电商平台订单支付后需触发库存扣减、物流通知等多个下游操作。团队配置Oban Worker将任务分发至高优先级队列,设置指数退避重试策略应对第三方接口超时;通过唯一性约束防止用户重复点击导致的重复发货,确保交易链路最终一致性。
步骤1:添加依赖
# mix.exs
defp deps do
[{:oban, "~> 2.18"}]
end
步骤2:配置Oban
# config/config.exs
config :my_app, Oban,
repo: MyApp.Repo,
queues: [default: 10, mailers: 20, webhooks: 50],
plugins: [
Oban.Plugins.Pruner,
{Oban.Plugins.Cron, crontab: [
{"0 2 * * *", MyApp.Workers.DailyCleanup}
]}
]
# application.ex children列表中添加
{Oban, Application.fetch_env!(:my_app, Oban)}
步骤3:生成迁移文件
mix ecto.gen.migration add_oban_jobs_table
编辑生成的迁移:
defmodule MyApp.Repo.Migrations.AddObanJobsTable do
use Ecto.Migration
def up, do: Oban.Migration.up(version: 12)
def down, do: Oban.Migration.down(version: 1)
end
执行mix ecto.migrate完成数据库表创建。
步骤4:创建Worker
defmodule MyApp.Workers.SendEmail do
use Oban.Worker, queue: :mailers, max_attempts: 5
@impl Oban.Worker
def perform(%Oban.Job{args: %{"to" => to, "template" => template}}) do
case MyApp.Mailer.deliver(to, template) do
{:ok, _} -> :ok
{:error, :temporary} -> {:error, "retry later"}
{:error, :permanent} -> {:cancel, "invalid address"}
end
end
end
步骤5:插入任务
%{user_id: 123, template: "welcome"}
|> MyApp.Workers.SendEmail.new(priority: 0)
|> Oban.insert()
见下方输入与输出表格。
| 项目 | 内容 |
|---|---|
| 输入 | Elixir项目结构、Ecto Repo配置、PostgreSQL数据库、任务业务逻辑、队列并发需求、Cron规则 |
| 输出 | Worker模块代码、队列配置文件、数据库迁移、任务插入代码、重试策略配置、测试环境配置 |
| 适用人群 | Elixir后端开发者、需异步处理邮件/webhook的团队、有定时任务需求的系统架构师 |
| 不包含 | Redis/RabbitMQ集成、Oban Pro企业功能、K8s原生Job调度、非PostgreSQL适配 |
{:snooze, seconds}不计入尝试次数,需防范无限延迟循环Oban.Migration至目标版本,否则队列行为异常原始链接:https://github.com/openclaw/skills/tree/main/skills/gchapim/oban/SKILL.md
来源类型:GitHub仓库