序
Celery可以使用分布式,让不同的机器处理优先级不同的任务,在这里就做以下Celery分布式的小结
配置与实例化
- Celery配置类
from celery import Celery from kombu import Queue,Exchange class Config: CELERY_TASK_RESULT_EXPIRES=3600 CELERY_TASK_SERIALIZER='json' CELERY_ACCEPT_CONTENT=['json'] CELERY_RESULT_SERIALIZER='json' CELERY_DEFAULT_EXCHANGE = 'agent' CELERY_DEFAULT_EXCHANGE_TYPE = 'direct' CELERY_DEFAULT_QUEUE = 'm0' CELERT_QUEUES = ( Queue('m0',exchange='agent',routing_key='m0'), Queue('m1',exchange='agent',routing_key='m1'), Queue('m2',exchange='agent',routing_key='m2'), )
- 实例化
app = Celery('syj',broker='redis://localhost:6379/0',backend='redis://localhost:6379/0',)
# 第二台机子需要修改broker和backend
app.config_from_object(Config)
定义任务
- 定义任务:
@app.task def sendmail(mail): print 'aaaaaaaaaaaaaa' print('sending mail to %s...' % mail)
启动worker
-
分别启动两个worker
-
celery -A syj worker -l info -Q m1
-
celery -A syj worker -l info -Q m2
-
发布任务
-
发布任务到不同的redis queue
-
sendmail.apply_async(args=[‘syj’],queue=’m1’,routing_key=’m1’) #发送至m1
-
sendmail.apply_async(args=[‘syj’],queue=’m2’,routing_key=’m2’) #发送至m2
另一种方法
-
也可以通过定义celery routers来直接指派任务的queue
CELERY_ROUTES = {'syj.cansyj':{'queue':'m1'}, 'syj.canall':{'queue':'m2'},} @app.task() def cansyj(): print 'aaaaaaaaaaaaaa' @app.task() def canall(): print 'bbbbbbbbbbbbbb'