JAVA和Nginx 教程大全

网站首页 > 精选教程 正文

轻量级消息队列-beanstalkd 消息队列 topic

wys521 2024-10-05 05:07:11 精选教程 31 ℃ 0 评论

基本介绍

Beanstalk,一个高性能、轻量级的分布式内存队列系统

核心概念:

  1. job: 一个需要异步处理的任务,需要放在一个tube中

  2. tube:一个队列,用来存放job,相当于一个主题(topic),是producer和consumer操作的对象

  3. producer:生产者,发布任务的人,通过put命令来将一个job放到一个tube中

  4. consumer:消费者,处理任务的人,通过reserve/release/bury/delete命令来获取job或改变job的状态

流程:

producer生产出一个job,放在tube中,consumer监听着tube,一旦有job任务,就将其取出进行业务处理,然后改变job的状态

Job的四种状态:

  1. ready:生产者put一个job后,job就是ready,表示该job随时可以被消费者消费

  2. reserved:job正在被处理

  3. delayed:job被延迟,延迟时间到了之后,job将变成ready状态供消费者消费

  4. buried:job任务由于某些条件限制而被暂放一边,等条件满足再继续处理

特性

  1. 优先级:支持设置job的优先级,值越小,优先级越高,默认优先级为1024

  2. 持久化:可以通过binlog将job及其状态记录到文件里面,在Beanstalkd下次启动时可以通过读取binlog来恢复之前的job及状态。

  3. 超时控制:为了防止某个consumer长时间占用任务但不能处理的情况,Beanstalkd为reserve操作设置了timeout时间,如果该consumer不能在指定时间内完成job,job将被迁移回READY状态,供其他consumer执行

不足

  1. 在使用中发现一个Beanstalkd尚无提供删除一个tube的操作,只能将tube的job依次删除,并让Beanstalkd来自动删除空tube

  2. Beanstalkd不支持客户端认证机制,开发者将应用场景定位在局域网

安装与使用

ubuntu安装beanstalkd

apt-get install beanstalkd1

启动

beanstalkd -l 127.0.0.1 -p 11300 -b /var/log/beanstalkd/binlog1

-b 参数置顶binlog文件路径,断电后重启会自动恢复任务。

也可以使用/etc/init.d/beanstalkd 来启动和查看beanstalkd进程状态等

Usage: /etc/init.d/beanstalkd {start|stop|force-stop|restart|force-reload|status}1

配置文件所在:

/etc/default/beanstalkd1

php操作beanstalkd

php操作beanstalkd可以使用pheanstalk

composer安装pheanstalk

composer require pda/pheanstalk1

github源码地址:pheanstalk

简单例子:

<?phprequire'vendor/autoload.php';

use Pheanstalk\Pheanstalk;

$pheanstalk=new Pheanstalk('127.0.0.1');

// ----------------------------------------

// 生产一个job,放入testtube这个tube中

$pheanstalk->useTube('testtube')

->put("job payload goes here\n");

// ----------------------------------------

// 消费者监听testtube这个tube,

$job=$pheanstalk->watch('testtube')

->ignore('default')

->reserve();

//此处进行业务处理

echo $job->getData();

//业务处理成功后删除job

$pheanstalk->delete($job);

在实际开发中,生产者和消费者肯定是分开的,这里只是为了简单演示所以写在一个文件中

reserve()方法是阻塞的,在实际开发中,可以在while循环中使用reserve()方法持续监听,一有任务就执行,如:

while(true){ 
 $job = $pheanstalk
 ->watch('testtube') 
 ->reserve();
 //此处进行业务处理
 }

beanstalk管理工具

beanstalk_console (github自行搜索)

部署并访问这个管理工具,如果报了nginx 502 gateway 的错误,可尝试以下解决方案:

打开lib/include.php,删除或注释 Pheanstalk_ClassLoader::register(dirname(__FILE__));

Tags:

本文暂时没有评论,来添加一个吧(●'◡'●)

欢迎 发表评论:

最近发表
标签列表