jjzjj

简单易用的任务队列-beanstalkd

maxfang 2023-03-28 原文

概述

beanstalkd 是一个简单快速的分布式工作队列系统,协议基于 ASCII 编码运行在 TCP 上。其最初设计的目的是通过后台异步执行耗时任务的方式降低高容量 Web 应用的页面延时。其具有简单、轻量、易用等特点,也支持对任务优先级、延时/超时重发等控制,同时还有众多语言版本的客户端支持,这些优点使得它成为各种需要队列系统场景的一种常见选择。

beanstalkd 优点

  • 如他官网的介绍,simple&fast,使用非常简单,适合需要引入消息队列又不想引入 kafka 这类重型的 mq,维护成本低;同时,它的性能非常高,大部分场景下都可以 cover 住。
  • 支持持久化
  • 支持消息优先级,topic,延时消息,消息重试等
  • 主流语言客户端都支持,还可以根据 beanstalkd 协议自行实现。

beanstalkd 不足

  • 无最大内存控制,当业务消息极多时,服务可能会不稳定。
  • 官方没有提供集群故障切换方案(主从或哨兵等),需要自己解决。

beanstalkd 重点概念

  • job

任务,队列中的基本单元,每个 job 都会有 id 和优先级。有点类似其他消息队列中的 message 的概念。但 job 有各种状态,下文介绍生命周期部分会重点介绍。job 存放在 tube 中。

  • tube

管道,用来存储同一类型的 job。有点类似其他消息队列中的 topic 的概念。beanstalkd 通过 tube 来实现多任务队列,beanstalkd 中可以有多个管道,每个管道有自己的 producer 和 consumer,管道之间互相不影响。

  • producer

job 生产者。通过 put 命令将一个 job 放入到一个 tube 中。

  • consumer

job 消费者。通过 reserve 来获取 job,通过 delete、release、bury 来改变 job 的状态。

beanstalkd 生命周期

上文介绍到,beanstalkd 中 job 有状态区分,在整个生命周期中,job 可能有四种状态:READY, RESERVED, DELAYED, BURIED。只有处于READY状态的 job 才能被消费。下图介绍了各状态之间的流转情况。

producer 在创建 job 的时候有两种方式,put 和 put with delay(延时任务)。
如果 producer 使用 put 直接创建一个 job 时,该 job 就处于 READY 状态,等待 consumer 处理。
如果 producer 使用 put with delay 方式创建 job,该 job 的初始状态为 DELAYED 状态,等待延迟时间过后才变更为 READY 状态。
以上两种方式创建的 job 都会传入一个 TTR(超时机制),当 job 处于 RESERVED 状态时,TTR 开始倒计时,当 TTR 倒计时完,job 状态还没有改变,则会认为该 job 处理失败,会被重新放回到队列中。

consumer 获取到(reserve)一个 READY 状态的 job 之后,该 job 的状态就会变更为 RESERVED。此时,其他的 consumer 就不能再操作该 job 了。当 consumer 完成该 job 之后,可以选择 delete,release,或 bury 操作。

  • delete ,job 被删除,从 beanstalkd 中清除,以后也无法再获取到,生命周期结束。
  • release ,可以把该 job 重新变更为 READY 状态,使得其他的 consumer 可以继续获取和执行该 job,也可以使用 release with delay 延时操作,这样会先进入 DELAYED 状态,延迟时间到达后再变为 READY。
  • bury,可以将 job 休眠,等需要的时候,在将休眠的 job 通过 kick 命令变更回 READY 状态,也可以通过 delete 直接删除 BURIED 状态的 job 。

处于 BURIED 状态的 job,可以通过 kick 重回 READY 状态,也可以通过 delete 删除 job。

为什么设计这个 BURIED 状态呢?
一般我们可以用这个状态来做异常捕获,例如执行超时或者异常的 job,我们可以将其置为 BURIED 状态,这样做有几个好处:
1.可以便面这些异常的 job 直接被放回队列重试,影响正常的队列消费(这些失败一次的 job,很有可能再次失败)。如果没有这个 BURIED 状态,如果我们要单独隔离,一般我们会使用一个新的 tube 单独存放这些异常的 job,使用单独的 consumer 消费。这样就不会影响正常的新消息消费。特别是失败率比较高的时候,会占用很多的正常资源。
2.便于人工排查,上面已经讲到,可以将异常的 job 置为 BURIED 状态,这样人工排查时重点关注这个状态就可以了。

beanstalkd 特性

持久化

通过 binlog 将 job 及其状态记录到本地文件,当 beanstalkd 重启时,可以通过读取 binlog 来恢复之前的 job 状态。

分布式

在 beanstalkd 的文档中,其实是支持分布式的,其设计思想和 Memcached 类似,beanstalkd 各个 server 之间并不知道彼此的存在,是通过 client 实现分布式以及根据 tube 名称去特定的 server 上获取 job。贴一篇专门讨论 beanstalkd 分布式的文章,Beanstalkd的一种分布式方案

任务延时

天然支持延时任务,可以在创建 job 时指定延时时间,也可以当 job 被处理完后才能后,消费者使用 release with delay 将 job 再次放入队列延时执行。

任务优先级

producer 生成的 job 可以给他分配优先级,支持 0 到 2^32 的优先级,值越小,优先级越高,默认优先级为 1024。优先级高的 job 会被 consumer 优先执行。

超时机制

为了防止某个 consumer 长时间占用 job 但无法处理完成的情况,beanstalkd 的 reserve 操作支持设置 timeout 时间(TTR)。如果 consumer 不能在 TTR 内发送 delete、release 或 bury 命令改变 job 状态,那么 beanstalkd 会认为任务处理失败,会将 job 重新置为 READY 状态供其他 consumer 消费。
如果消费者已经预知可能无法在 TTR 内完成该 job,则可以发送 touch 命令,使得 beanstalkd 重新计算 TTR。

任务预留

有一个 BURIED 状态可以作为缓冲,具体特点见上文生命周期中关于 BURIED 状态的介绍。

安装及配置

以下以 ubuntu 为例,安转 beanstalkd:

sudo apt-get update
sudo apt-get install beanstalkd
vi /etc/sysconfig/beanstalkd
# 添加如下内容
BEANSTALKD_BINLOG_DIR=/data/beanstalkd/binlog

可以通过 beanstalkd 命令来运行服务,并且可以添加多种参数。命令的格式如下:

beanstalkd [OPTIONS]

 -b DIR   wal directory
 -f MS    fsync at most once every MS milliseconds (use -f0 for "always fsync")
 -F       never fsync (default)
 -l ADDR  listen on address (default is 0.0.0.0)
 -p PORT  listen on port (default is 11300)
 -u USER  become user and group
 -z BYTES set the maximum job size in bytes (default is 65535)
 -s BYTES set the size of each wal file (default is 10485760)
            (will be rounded up to a multiple of 512 bytes)
 -c       compact the binlog (default)
 -n       do not compact the binlog
 -v       show version information
 -V       increase verbosity
 -h       show this help

如下我们启动一个 beanstalkd 服务,并开启 binlog:

nohup beanstalkd -l 0.0.0.0 -p 11300 -b /data/beanstalkd/binlog/ &

beanstalkd管理工具

官方推荐的一些管理工具:Tools
笔者常用的管理工具:https://github.com/ptrofimov/beanstalk_console
如果只是简单的操作和查看 beanstalkd,可以使用 telnet 工具,然后执行 stats,use,put,watch 等:

$ telnet 127.0.0.1 11300
stats

实际应用

beansralkd 有很多语言版本的客户端实现,官方提供了一些客户端列表beanstalkd客户端列表
如果现有的这些库不满足需求,也可以自行实现,参考 beanstalkd协议

以下以 go 为例,简单演示下 beanstalkd 常用处理操作。

go get github.com/beanstalkd/go-beanstalk

生产者

向默认的 tube 中投入 job:

id, err := conn.Put([]byte("myjob"), 1, 0, time.Minute)
if err != nil {
	panic(err)
}
fmt.Println("job", id)

向指定的 tube 中投入 job:

tube := &beanstalk.Tube{Conn: conn, Name: "mytube"}
id, err := tube.Put([]byte("myjob"), 1, 0, time.Minute)
if err != nil {
	panic(err)
}
fmt.Println("job", id)

消费者

消费默认的 tube 中的 job:

id, body, err := conn.Reserve(5 * time.Second)
if err != nil {
	panic(err)
}
fmt.Println("job", id)
fmt.Println(string(body))

消费指定的 tube (此处指定多个) 中的 job:

tubeSet := beanstalk.NewTubeSet(conn, "mytube1", "mytube2")
id, body, err := tubeSet.Reserve(10 * time.Hour)
if err != nil {
	panic(err)
}
fmt.Println("job", id)
fmt.Println(string(body))

beanstalkd 使用小 tips

  • 可以通过指定 tube ,在 put 的时候将 job 放入指定的 tube 中,否则会放入 default 的 tube 中。
  • beanstalkd 支持持久化,在启动时使用 -b参数来开启binlog,通过binog可以将 job 及其状态记录到文件里。当重新使用-b参数重启 beanstalkd,将读取binlog来恢复之前的 job 及状态。

参考资料

有关简单易用的任务队列-beanstalkd的更多相关文章

  1. ruby - 其他文件中的 Rake 任务 - 2

    我试图在一个项目中使用rake,如果我把所有东西都放到Rakefile中,它会很大并且很难读取/找到东西,所以我试着将每个命名空间放在lib/rake中它自己的文件中,我添加了这个到我的rake文件的顶部:Dir['#{File.dirname(__FILE__)}/lib/rake/*.rake'].map{|f|requiref}它加载文件没问题,但没有任务。我现在只有一个.rake文件作为测试,名为“servers.rake”,它看起来像这样:namespace:serverdotask:testdoputs"test"endend所以当我运行rakeserver:testid时

  2. ruby - 如何使用 RSpec::Core::RakeTask 创建 RSpec Rake 任务? - 2

    如何使用RSpec::Core::RakeTask初始化RSpecRake任务?require'rspec/core/rake_task'RSpec::Core::RakeTask.newdo|t|#whatdoIputinhere?endInitialize函数记录在http://rubydoc.info/github/rspec/rspec-core/RSpec/Core/RakeTask#initialize-instance_method没有很好的记录;它只是说:-(RakeTask)initialize(*args,&task_block)AnewinstanceofRake

  3. ruby - 简单获取法拉第超时 - 2

    有没有办法在这个简单的get方法中添加超时选项?我正在使用法拉第3.3。Faraday.get(url)四处寻找,我只能先发起连接后应用超时选项,然后应用超时选项。或者有什么简单的方法?这就是我现在正在做的:conn=Faraday.newresponse=conn.getdo|req|req.urlurlreq.options.timeout=2#2secondsend 最佳答案 试试这个:conn=Faraday.newdo|conn|conn.options.timeout=20endresponse=conn.get(url

  4. ruby - 用 Ruby 编写一个简单的网络服务器 - 2

    我想在Ruby中创建一个用于开发目的的极其简单的Web服务器(不,不想使用现成的解决方案)。代码如下:#!/usr/bin/rubyrequire'socket'server=TCPServer.new('127.0.0.1',8080)whileconnection=server.acceptheaders=[]length=0whileline=connection.getsheaders想法是从命令行运行这个脚本,提供另一个脚本,它将在其标准输入上获取请求,并在其标准输出上返回完整的响应。到目前为止一切顺利,但事实证明这真的很脆弱,因为它在第二个请求上中断并出现错误:/usr/b

  5. ruby-on-rails - 简单的 Ruby on Rails 问题——如何将评论附加到用户和文章? - 2

    我意识到这可能是一个非常基本的问题,但我现在已经花了几天时间回过头来解决这个问题,但出于某种原因,Google就是没有帮助我。(我认为部分问题在于我是一个初学者,我不知道该问什么......)我也看过O'Reilly的RubyCookbook和RailsAPI,但我仍然停留在这个问题上.我找到了一些关于多态关系的信息,但它似乎不是我需要的(尽管如果我错了请告诉我)。我正在尝试调整MichaelHartl'stutorial创建一个包含用户、文章和评论的博客应用程序(不使用脚手架)。我希望评论既属于用户又属于文章。我的主要问题是:我不知道如何将当前文章的ID放入评论Controller。

  6. ruby - 分布式事务和队列,ruby,erlang,scala - 2

    我有一个涉及多台机器、消息队列和事务的问题。因此,例如用户点击网页,点击将消息发送到另一台机器,该机器将付款添加到用户的帐户。每秒可能有数千次点击。事务的所有方面都应该是容错的。我以前从未遇到过这样的事情,但一些阅读表明这是一个众所周知的问题。所以我的问题。我假设安全的方法是使用两阶段提交,但协议(protocol)是阻塞的,所以我不会获得所需的性能,我是否正确?我通常写Ruby,但似乎Redis之类的数据库和Rescue、RabbitMQ等消息队列系统对我的帮助不大——即使我实现某种两阶段提交,如果Redis崩溃,数据也会丢失,因为它本质上只是内存。所有这些让我开始关注erlang和

  7. ruby - 使用 Ruby 通过 Outlook 发送消息的最简单方法是什么? - 2

    我的工作要求我为某些测试自动生成电子邮件。我一直在四处寻找,但未能找到可以快速实现的合理解决方案。它需要在outlook而不是其他邮件服务器中,因为我们有一些奇怪的身份验证规则,我们需要保存草稿而不是仅仅发送邮件的选项。显然win32ole可以做到这一点,但我找不到任何相当简单的例子。 最佳答案 假设存储了Outlook凭据并且您设置为自动登录到Outlook,WIN32OLE可以很好地完成此操作:require'win32ole'outlook=WIN32OLE.new('Outlook.Application')message=

  8. postman——集合——执行集合——测试脚本——pm对象简单示例02 - 2

    //1.验证返回状态码是否是200pm.test("Statuscodeis200",function(){pm.response.to.have.status(200);});//2.验证返回body内是否含有某个值pm.test("Bodymatchesstring",function(){pm.expect(pm.response.text()).to.include("string_you_want_to_search");});//3.验证某个返回值是否是100pm.test("Yourtestname",function(){varjsonData=pm.response.json

  9. Qt Designer的简单使用 - 2

    在前面两节的例子中,主界面窗口的尺寸和标签控件显示的矩形区域等,都是用C++代码编写的。窗口和控件的尺寸都是预估的,控件如果多起来,那就不好估计每个控件合适的位置和大小了。用C++代码编写图形界面的问题就是不直观,因此Qt项目开发了专门的可视化图形界面编辑器——QtDesigner(Qt设计师)。通过QtDesigner就可以很方便地创建图形界面文件*.ui,然后将ui文件应用到源代码里面,做到“所见即所得”,大大方便了图形界面的设计。本节就演示一下QtDesigner的简单使用,学习拖拽控件和设置控件属性,并将ui文件应用到Qt程序代码里。使用QtDesigner设计界面在开始菜单中找到「Q

  10. ruby-on-rails - Rake 任务仅调用一次时执行两次 - 2

    我写了一个非常简单的rake任务来尝试找到这个问题的根源。namespace:foodotaskbar::environmentdoputs'RUNNING'endend当在控制台中执行rakefoo:bar时,输出为:RUNNINGRUNNING当我执行任何rake任务时会发生这种情况。有没有人遇到过这样的事情?编辑上面的rake任务就是写在那个.rake文件中的所有内容。这是当前正在使用的Rakefile。requireFile.expand_path('../config/application',__FILE__)OurApp::Application.load_tasks这里

随机推荐