关于celery的一些操作
1. 关于运行的问题
1.1 内存泄漏的问题
1 | CELERYD_MAX_TASKS_PER_CHILD = 20 # 这表示让每个worker执行20个任务,就销毁。防止内存泄漏。 |
1.2 死锁问题
因为开启worker的方式,其根本是开多个线程。我们又知道,线程的内存资源是共享的,所以,可以会发生死锁现象(其实这是次要的,主要的还是它们去更新同一个数据库,这是比较常见的)。
那么如何解决死锁问题呢?
只需要加一条配置:
1 | CELERYD_FORCE_EXECV = True |
就完美解决这个问题了。
1.3 运行时间问题
每个任务默认是不限时的,想运行多久,就运行多久。我们也可以稍加控制:
1 | CELERYD_TASK_SOFT_TIME_LIMIT = 600 # 600秒超时 |
此超时会抛出一个SoftTimeLimitExceeded
,我们可以捕获它,再做处理。
1 | from celery.exceptions import SoftTimeLimitExceeded |
2. 关于结果显示的问题
2.1 配置一个结果存储的后端
配置一个backend是第一步,如果不配置AsyncResult
类也就用不了。
所以在celery的配置文件里面加一条
1 | CELERY_RESULT_BACKEND = 'redis://x.x.x.x:6379/2' |
2.2 关于结果显示颗粒度的原因
原始的celery是不支持任务状态显示started的状态的,也就是说,一个任务由消费者给出来。先到队列里面,状态是pending
,然后到消费者(worker)手上,任务状态还是pending。接着worker就是卡卡一顿操作,要么成功了,要么失败了,要么失败之后去重试了。
也就是说,任务是这么走的:
pending
–>success
或者
pending
–>retry
–>pending
–>retry
–>pending
–>failure
如果你想任务没到worker的时候是pending,被消费者开始执行的时候是started
,那么,你需要加一条配置:
1 | CELERY_TRACK_STARTED = True |
有时候,你加了这个参数好像没作用,那么你可以尝试一下,把参数这么写,别问为什么,照着干,还是不行,可以发邮件给我或者评论。
1 | CELERY_TASK_TRACK_STARTED = True |
2.3 关于结果存放多久的问题
既然我们的task结果存到了backend里面,那么,结果保存多久呢?其实,celery的默认保存时间是一天,单位是秒,一天是多少秒呢,自己去算。
通过此配置,更改结果保存时间:
1 | CELERY_TASK_RESULT_EXPIRES = 500 # 保存了500秒 |
3 关于celery的配置
3.1 指定一个配置文件
如果你在写django项目,你就想把django的settings.py
作为你的celery配置文件,那么很简单,只要你在你的celery.py
里面用celery的app(这个app是这么来的app = Celery('app_name')
),这么去做:
1 | app.config_from_object('django.conf:settings', namespace='CELERY') |
那么此时的settings.py
也就是你的celery配置文件了,不过不推荐把django的配置文件作为celery的配置文件。具体原因,请往下看。
3.2 如果你想看看这个celery具体有啥配置,那么这个命令或许会帮到你:
1 | celery inspect conf --app app_name |
注意:这里的app_name是app的名字,是你在注册app的时候–app = Celery(‘app_name’)的这个app_name,不是django的app
如果我们指定了django的settings.py
作为配置文件,那么你会发现,celery inspect conf
的结果会非常长。
再多说一句,celery inspect conf --app app_name
的结果是所有历史的celery的启动的配置文件。
4. 关于celery装饰器
4.1 装饰器参数
正常使用app.task()
去装饰一个函数。
骚一点的在括号里写–bind=True
,那么,传的参数第一个就必须是self,在任务里面就可以拿到worker本身这个对象了,就是self,那么task的id就是self.request.id
。
4.2 手动指定task_id
请注意,delay的方式,是无法指定task_id的,必须是apply_async(args, kwargs, task_id="task_id")
1 | 函数.apply_async(args, kwargs, task_id="task_id") |
结合上一小节,在函数里面,我们就可以self.request.id,去拿到这task_id了。