关于celery的一些操作

关于celery的一些操作

1. 关于运行的问题

1.1 内存泄漏的问题

1
CELERYD_MAX_TASKS_PER_CHILD = 20 # 这表示让每个worker执行20个任务,就销毁。防止内存泄漏。

1.2 死锁问题

因为开启worker的方式,其根本是开多个线程。我们又知道,线程的内存资源是共享的,所以,可以会发生死锁现象(其实这是次要的,主要的还是它们去更新同一个数据库,这是比较常见的)。

那么如何解决死锁问题呢?

只需要加一条配置:

1
CELERYD_FORCE_EXECV = True

就完美解决这个问题了。

1.3 运行时间问题

每个任务默认是不限时的,想运行多久,就运行多久。我们也可以稍加控制:

1
CELERYD_TASK_SOFT_TIME_LIMIT = 600   # 600秒超时

此超时会抛出一个SoftTimeLimitExceeded,我们可以捕获它,再做处理。

1
2
3
4
5
6
7
8
from celery.exceptions import SoftTimeLimitExceeded

@app.task
def mytask():
try:
return do_work()
except SoftTimeLimitExceeded:
cleanup_in_a_hurry()

2. 关于结果显示的问题

2.1 配置一个结果存储的后端

配置一个backend是第一步,如果不配置AsyncResult类也就用不了。

所以在celery的配置文件里面加一条

1
CELERY_RESULT_BACKEND = 'redis://x.x.x.x:6379/2'

2.2 关于结果显示颗粒度的原因

原始的celery是不支持任务状态显示started的状态的,也就是说,一个任务由消费者给出来。先到队列里面,状态是pending,然后到消费者(worker)手上,任务状态还是pending。接着worker就是卡卡一顿操作,要么成功了,要么失败了,要么失败之后去重试了。

也就是说,任务是这么走的:

pending–>success

或者

pending–>retry–>pending–>retry–>pending–>failure

如果你想任务没到worker的时候是pending,被消费者开始执行的时候是started,那么,你需要加一条配置:

1
CELERY_TRACK_STARTED = True

有时候,你加了这个参数好像没作用,那么你可以尝试一下,把参数这么写,别问为什么,照着干,还是不行,可以发邮件给我或者评论。

1
CELERY_TASK_TRACK_STARTED = True

2.3 关于结果存放多久的问题

既然我们的task结果存到了backend里面,那么,结果保存多久呢?其实,celery的默认保存时间是一天,单位是秒,一天是多少秒呢,自己去算。

通过此配置,更改结果保存时间:

1
CELERY_TASK_RESULT_EXPIRES = 500      # 保存了500秒

3 关于celery的配置

3.1 指定一个配置文件

如果你在写django项目,你就想把django的settings.py作为你的celery配置文件,那么很简单,只要你在你的celery.py里面用celery的app(这个app是这么来的app = Celery('app_name')),这么去做:

1
app.config_from_object('django.conf:settings', namespace='CELERY')

那么此时的settings.py也就是你的celery配置文件了,不过不推荐把django的配置文件作为celery的配置文件。具体原因,请往下看。

3.2 如果你想看看这个celery具体有啥配置,那么这个命令或许会帮到你:

1
celery inspect conf --app app_name

注意:这里的app_name是app的名字,是你在注册app的时候–app = Celery(‘app_name’)的这个app_name,不是django的app

如果我们指定了django的settings.py作为配置文件,那么你会发现,celery inspect conf的结果会非常长。

再多说一句,celery inspect conf --app app_name的结果是所有历史的celery的启动的配置文件。

4. 关于celery装饰器

4.1 装饰器参数

正常使用app.task()去装饰一个函数。

骚一点的在括号里写–bind=True,那么,传的参数第一个就必须是self,在任务里面就可以拿到worker本身这个对象了,就是self,那么task的id就是self.request.id

4.2 手动指定task_id

请注意,delay的方式,是无法指定task_id的,必须是apply_async(args, kwargs, task_id="task_id")

1
函数.apply_async(args, kwargs, task_id="task_id")

结合上一小节,在函数里面,我们就可以self.request.id,去拿到这task_id了。

文章作者: 海龟先生
文章链接: http://haiguixiansheng.org.cn/2019/10/20/关于celery的一些操作/
版权声明: 本博客所有文章除特别声明外,均采用 CC BY-NC-SA 4.0 许可协议。转载请注明来自 海龟先生
打赏
  • 微信
  • 支付宝

评论