A-A+

python celery异步任务队列(redis + supervisor)

2015年11月15日 Python 评论 1 条 阅读 17,203 views 次

 

 

PDF下载:python-celery.pdf

 

celery简介:

celery是一个异步任务队列/基于分布式消息传递的作业队列。它侧重于实时操作,但对调度支持也很好。 celery是用Python编写的,但该协议可以在任何语言实现。更多简介的请自己在网上搜索

 

本文目的是用python使用celery做异步任务,在centos 6.4上安装celery,并用supervisor来管理celery进程,celery采用redis做中间件的消息传输。现实中可以celery做异步请求,如发送邮箱,发送消息,请求URL等场景

 

环境部署

 

环境

系统:centos 6.4 64位

python版本:2.6.6

 

创建相关的目录

  1. mkdir -p /data/{redis,logs,www}

(我会把python脚本都放在/data/www中)

 

安装celery

  1. yum install python-devel python-setuptools
  2. easy_install pip
  3. pip install celery
  4. pip install redis   #(python中的redis模块)

 

安装 supervisor

  1. pip install supervisor
  2. echo_supervisord_conf > /etc/supervisord.conf

如果在执行echo_supervisord_conf > /etc/supervisord.conf时报pkg_resources.DistributionNotFound: meld3>=0.6.5错误的话,找到supervisor-3.1.3-py2.6.egg-info/requires.txt,把文件里面meld3 >= 0.6.5注释掉,然后再执行echo_supervisord_conf > /etc/supervisord.conf就好了

 

查找方法:

  1. find / | grep requires.txt

 

配置

  1. vim /etc/supervisord.conf

 

在配置后面添加以下参数

  1. [program:celery]
  2. command=/usr/bin/celery worker -A tasks
  3. directory=/data/www
  4. stdout_logfile=/data/logs/celery.log
  5. autostart=true
  6. autorestart=true
  7. redirect_stderr=true
  8. stopsignal=QUIT

注释

;program:celery 要管理的进程名,你自己随便定义,我这定义了叫celery

;command  是启动celery的命令

;directory  是程序目录 ,因为我要启动celery,需要进入/data/www目录中才能生效的,所以这里在启动命令时,会切换到这个目录里

;autorstart  自动重启celery

;stdout_logfile  存放celery的日志路径

 

以上命令的大概意思就是:

进行到/data/www目录,然后执行/usr/bin/celery worker -A tasks,并把输出的日志保存到/data/logs/celery.log中,这是指定了worker模式,如果不指定,默认为prefork模式,一般你机器有几核,系统就开启几个worker进程,如果有异常,记得查看日志/data/logs/celery.log

 

安装redis

  1. cd /usr/local/src
  2. wget http://download.redis.io/releases/redis-3.0.5.tar.gz
  3. tar xf redis-3.0.5.tar.gz
  4. cd redis-3.0.5
  5. make
  6. make install
  7. #(可用make PREFIX=/usr/local/redis install 安装到指定的路径下面 )  
  8. cp utils/redis_init_script /etc/init.d/redis
  9. chmod a+x /etc/init.d/redis
  10. mkdir /etc/redis
  11. cp redis.conf /etc/redis/

 

简单修改redis.conf(前面数字是行号)

  1. 42 daemonize yes                                                  #进程转入后台运行
  2. 50 port 22222                                                        #修改端口,不用默认的6379
  3. 69 bind 127.0.0.1                                                   #绑定IP,只能本地连接
  4. 192 dir /data/redis                                                #修改redis文件存放路径
  5. 396 requirepass DILSi3_dflka_f3i_LFKDkdf!idf      #设置密码
  6. 453 maxmemory 256M                                       #redis最大使用256M内存,我的是虚拟机,所以内存设置小

注释:

requirepass DILSi3_dflka_f3i_LFKDkdf!idf  是配置redis密码,客户端连接和关闭redis时需要此密码

 

修改/etc/init.d/redis(前面数字是行号)

  1. 6 REDISPORT=22222
  2. 7 EXEC=/usr/local/bin/redis-server
  3. 8 CLIEXEC=/usr/local/bin/redis-cli
  4. PASSWD="DILSi3_dflka_f3i_LFKDkdf!idf"        #增加行
  5. 10 PIDFILE=/var/run/redis.pid
  6. 11 CONF="/etc/redis/redis.conf"
  7. 30                 $CLIEXEC -p $REDISPORT -a $PASSWD  shutdown

注释:

关闭redis时,需要带上redis密码,否则失败,如果没有设置密码,则不需要带密码关闭

 

启动redis

  1. service redis start

 

查看redis进程

  1. ps -ef |grep redis
  2. root     28631     1  0 04:30 ?        00:00:00 /usr/local/bin/redis-server 127.0.0.1:22222
  3. root     28635  1542  0 04:31 pts/0    00:00:00 grep redis

 

编写celery脚本

  1. cd /data/www
  2. vim tasks.py
  1. #!/usr/bin/env python
  2. #coding:utf-8
  3. import time
  4. #导入celery相关模块、方法
  5. from celery import Celery
  6. from celery import platforms
  7. #因为supervisord默认是用root运行的,必须设置以下参数为True才能允许celery用root运行
  8. platforms.C_FORCE_ROOT = True
  9. #配置celery,连接redis,DILSi3_dflka_f3i_LFKDkdf!idf是redis的密码,22222是端口 10是库(redis有16个库,这取第11个,即10)
  10. config={}
  11. config['CELERY_BROKER_URL'] = 'redis://:DILSi3_dflka_f3i_LFKDkdf!idf@127.0.0.1:22222/10'
  12. config['CELERY_RESULT_BACKEND'] = 'redis://:DILSi3_dflka_f3i_LFKDkdf!idf@127.0.0.1:22222/10'
  13. #不需要返回任务状态,即设置以下参数为True
  14. config['CELERY_IGNORE_RESULT'] = True
  15. app = Celery("tasks", broker=config['CELERY_BROKER_URL'])
  16. app.conf.update(config)
  17. @app.task
  18. def output(num):
  19. """输出传进来的数字,在输出前等待2秒"""
  20.     time.sleep(2)
  21.     print num

 

启动supervisord

  1. /usr/bin/supervisord

 

查看是否启动成功

  1. [root@drfdai www]# ps -ef |grep supervisor
  2. root     28656     1  0 04:52 ?        00:00:00 /usr/bin/python /usr/bin/supervisord
  3. root     28692  1542  0 04:53 pts/0    00:00:00 grep supervisor

 

启动celery:

  1. supervisorctl start celery

 

查看celery是否连接成功redis

  1. tailf /data/logs/celery.log

能看到[2015-11-16 05:05:27,938: WARNING/MainProcess] celery@drfdai ready.说明连接成功

 

测试

在终端【终端1】中打开实时日志:

  1. [root@drfdai www]# tailf /data/logs/celery.log

 

打开另一终端【终端2】测试:

  1. [root@drfdai ~]# cd /data/www
  2. [root@drfdai www]# python
  3. Python 2.6.6 (r266:84292, Jul 23 2015, 15:22:56)
  4. [GCC 4.4.7 20120313 (Red Hat 4.4.7-11)] on linux2
  5. Type "help", "copyright", "credits" or "license" for more information.
  6. >>> from tasks import output
  7. >>> for i in xrange(20):
  8. ...     output.apply_async(args=[i])
  9. ...
  10. <AsyncResult: 38a14673-3ce6-4244-b262-d488aa637ff9>
  11. <AsyncResult: 3ff1b2c0-4b37-4c70-a543-d7c58111cb99>
  12. <AsyncResult: 355c232f-c419-43d5-bf8f-afa4d7084136>
  13. <AsyncResult: c3089d05-d430-4fd4-bfba-112d6dd5b635>
  14. <AsyncResult: 842db1ad-5490-456d-82d8-e81e60db244b>
  15. <AsyncResult: fe0dd54a-8bad-43e2-a8b8-b87be158cf16>
  16. <AsyncResult: edd95275-a3b3-46a5-8e21-b88c7faedbe9>
  17. <AsyncResult: c21b9c62-644d-4171-bb68-9a371af60283>
  18. <AsyncResult: 29d38896-9d4d-4b97-8e27-6a7209f06dc9>
  19. <AsyncResult: 7c3ba209-01fd-44d3-9645-f9c85d799eaf>
  20. <AsyncResult: b08bbb7a-341b-436c-8ef4-be954b1fcbc5>
  21. <AsyncResult: b03193a2-c6d6-49ff-ac27-b453593050c4>
  22. <AsyncResult: 1be6394c-09c3-4f48-9ed4-1114fd1af637>
  23. <AsyncResult: b496cdd3-3032-41ad-939e-414198c9056a>
  24. <AsyncResult: 0dd365ea-383b-4c93-9ed0-406bcff6f41b>
  25. <AsyncResult: ce60892e-d640-4589-8a8b-3a7f18e8c6d3>
  26. <AsyncResult: df4fd3c4-d9e8-474b-b60f-c6e11283564a>
  27. <AsyncResult: 83e7d36c-fc42-4f3b-8941-df40fa13a517>
  28. <AsyncResult: 9b25f7f7-c3bc-4bb0-8a1f-27ceec59cf0c>
  29. <AsyncResult: f0d7e201-032b-41a0-8e7a-c02681c31947>

 

切换到【终端1】可以看到,每隔2秒就会打印一行日志

  1. warnings.warn(CDeprecationWarning(W_PICKLE_DEPRECATED))
  2. [2015-11-16 05:58:09,687: WARNING/MainProcess] celery@drfdai ready.
  3. [2015-11-16 05:59:18,816: WARNING/Worker-1] 0
  4. [2015-11-16 05:59:19,826: WARNING/Worker-1] 1
  5. [2015-11-16 05:59:20,830: WARNING/Worker-1] 2
  6. [2015-11-16 05:59:21,836: WARNING/Worker-1] 3
  7. [2015-11-16 05:59:22,841: WARNING/Worker-1] 4
  8. [2015-11-16 05:59:23,848: WARNING/Worker-1] 5
  9. [2015-11-16 05:59:24,853: WARNING/Worker-1] 6
  10. [2015-11-16 05:59:25,858: WARNING/Worker-1] 7
  11. [2015-11-16 05:59:26,864: WARNING/Worker-1] 8
  12. [2015-11-16 05:59:27,873: WARNING/Worker-1] 9
  13. [2015-11-16 05:59:28,884: WARNING/Worker-1] 10
  14. [2015-11-16 05:59:29,892: WARNING/Worker-1] 11
  15. [2015-11-16 05:59:30,899: WARNING/Worker-1] 12
  16. [2015-11-16 05:59:31,908: WARNING/Worker-1] 13
  17. [2015-11-16 05:59:32,912: WARNING/Worker-1] 14
  18. [2015-11-16 05:59:33,920: WARNING/Worker-1] 15
  19. [2015-11-16 05:59:34,926: WARNING/Worker-1] 16
  20. [2015-11-16 05:59:35,933: WARNING/Worker-1] 17
  21. [2015-11-16 05:59:36,939: WARNING/Worker-1] 18
  22. [2015-11-16 05:59:37,944: WARNING/Worker-1] 19

 

当我们在【终端2】中调用此函数后,执行并没有受到time.sleep(2)的影响,执行后马上就结束了,而【终端1】每隔2秒就输出1行日志,这日志是函数output输出来的。

 

实用代码片断:

vim common/tasks.py

  1. #导入发送邮件模块
  2. from email.mime.text import MIMEText
  3. import smtplib
  4. import json
  5. import datetime
  6. from celery import Celery
  7. from celery import platforms
  8. import requests
  9. #导入配置文件
  10. from config import setting
  11. #发送邮箱配置
  12. smtp = setting.smtp
  13. user = setting.user
  14. passwd = setting.passwd
  15. 此处省略……
  16. @celery.task
  17. def sendMail(mail):
  18.     '''
  19.     to_list 接收者邮件,每个邮件地址用","分隔,str格式
  20.     subuect 邮件主题,str格式
  21.     context 邮件内容,str格式
  22.     '''
  23.     to_list = mail.get('to_list')
  24.     subject = mail.get('subject')
  25.     context = mail.get('context')
  26.     msg = MIMEText(context,'html','utf-8')
  27.     msg['Subject'] = subject
  28.     msg['From'] = "linux系统运维<%s>" % user
  29.     to_list = list(to_list.split(','))
  30.     msg['To'] = ",".join((to_list))
  31.     try:
  32.         s = smtplib.SMTP()
  33.         s.connect(smtp)
  34.         s.login(user,passwd)
  35.         s.sendmail(user,to_list,msg.as_string())
  36.         s.close()
  37.         return json.dumps({"redid":0,"redmsg":"发送成功"})
  38.     except Exception,e:
  39.         print 'Error: %s' % e
  40.         return json.dumps({"redid":-1,"redmsg":"发送失败"})

 

在实际项目中调用此方法,做异步邮箱发送,发送后不需要等待返回的结果

  1. #/usr/bin/env python
  2. #coding:utf-8
  3. from common.tasks import sendMail
  4. sendMail.delay(dict=(to_list ='xxxxxxxx@qq.com',subject ='邮件发送测试',context ='<h1>邮件收到</h1><br />测试没问题'))
关键词:

1 条留言  访客:1 条  博主:0 条

  1. avatar สามเณร

    我使用supervisor来管理celery进程,celery采用redis做中间件的消息传输,异步发送报警邮件,运行一段时间后,经常发送一些延时邮件(报警已经解决了,但是邮件过一定时间又在发送之前的报警邮件),请问这是什么原因?

给我留言

Copyright © linux系统运维 保留所有权利.   Theme  Ality 粤ICP备13023035号-1

用户登录

分享到: