celery分布式框架实践celery简介
Celery 是一个强大的 分布式任务队列 的 异步处理框架,它可以让任务的执行完全脱离主程序,甚至可以被分配到其他主机上运行。我们通常使用它来实现异步任务(async task)和定时任务(crontab)。Celery的架构由三部分组成,消息中间件(message broker),任务执行单元(worker)和任务执行结果存储(task result store)组成。Worker是Celery提供的任务执行的单元,worker并发的运行在分布式的系统节点中。Task result store用来存储Worker执行的任务的结果,Celery支持以不同方式存储任务的结果,包括AMQP, redis,memcached, mongodb,SQLAlchemy, Django ORM,Apache Cassandra, IronCache 等。本例中broker和backend都选择redis。celery的架构如下:
celery和redis安装celery安装
pip install -U Celeryredis安装
$ wget 目录下:
下面启动redis服务.
$ cd src$ ./redis-server注意这种方式启动redis 使用的是默认配置。也可以通过启动参数告诉redis使用指定配置文件使用下面命令启动。
$ cd src$ ./redis-server ../redis.confredis.conf 是一个默认的配置文件。我们可以根据需要使用自己的配置文件。
启动redis服务进程后,就可以使用测试客户端程序redis-cli和redis服务交互了。 比如:
$ cd src$ ./redis-cliredis> set foo barOKredis> get foo"bar"最后安装python中的redis库,否则celery提示会报错
pip install rediscelery实现简单计算启动redis server服务在redis/src目录下按以上步骤启动即可编写任务代码tasks.pybackend和broker都用redis
from celery import Celery# 我们这里案例使用redis作为broker和backendapp = Celery('demo', backend='redis://127.0.0.1:6379/2', broker='redis://127.0.0.1:6379/1')# 创建任务函数@app.taskdef my_task(a, b): print("任务函数正在执行....") return a + b注意:这里没有使用redis密码,因为redis server默认启动是不带密码的,否则会出现Client sent AUTH, but no password is set的错误将tasks模块加入worker
celery -A tasks worker --loglevel=info可以看到打印以下信息,任务成功加入
- ** ---------- .> transport: redis://127.0.0.1:6379/1- ** ---------- .> results: redis://127.0.0.1:6379/2- *** --- * --- .> concurrency: 8 (prefork)-- ******* ---- .> task events: OFF (enable -E to monitor tasks in this worker)--- ***** ----- -------------- [queues] .> celery exchange=celery(direct) key=celery [tasks] . tasks.my_task[2019-11-19 23:35:35,537: INFO/MainProcess] Connected to redis://127.0.0.1:6379/1[2019-11-19 23:35:35,544: INFO/MainProcess] mingle: searching for neighbors[2019-11-19 23:35:36,559: INFO/MainProcess] mingle: all alone[2019-11-19 23:35:36,574: INFO/MainProcess] celery@huzhenghui-desktop ready.执行任务,并将任务结果写入redis中:
>>> from tasks import my_task>>> ret = my_task.delay(1,2)>>> ret.result3>>>