celeryのPeriodic Taskについてのメモ

必要になったので自分用にメモ。今必要な部分だけざっくりと把握しただけなので間違ったこと書いてたらごめんなさい。

Periodic Taskとは

Periodic Tasks — Celery 3.0.9 documentation

celeryにはPeriodic Taskというタスクをある一定時間ごとに実行してくれる仕組みがある。そのスケジュールを管理してくれるのがcelery beat。

celerybeat: タスクのスケジュールを管理
celeryd: タスクの実行

periodic_taskデコレータ

定期実行タスクをスケジューラーに登録するためにdjangoのsettings.pyにCELERYBEAT_SCHEDULEを設定する必要がある。が、これとは別の方法としてperiodic_taskデコレータを使用する方法があるのでそちらを使うことにする。

from celery.task import periodic_task
import datetime.timedelta as timedelta
...

@periodic_task(run_every=timedelta(minutes=1))
def hoge():
    ...

ソースを見た感じPeriodicTaskクラスは抽象クラスという位置づけのようで、実際に使うには継承してやって具体的な実装をする。その際に必ずrun_everyという属性を設定してやらないと例外を吐くように作られている。

なので、periodic_taskデコレータを使う場合も、引数に run_every=hogehoge というように渡してやらないといけない。

crontab

引数に渡す時間は先ほどのコード例のようにtimedeltaで指定してやる方法もあるが、より柔軟な設定を簡単にするためにcrontabという関数が用意されている。詳しくはドキュメントを参照。

from celery.task import periodic_task
from celery.task.schedule import crontab

@periodic_task(run_every=crontab())  # 毎分実行
def hoge():
    ...

@periodic_task(run_every=crontab(minute="0", hour="0")) 毎日深夜0:00に実行
def fuga():
    ...

タスクの実行

celerybeatはあくまでもスケジュールを管理するだけで、実行まではしてくれない。あくまでも「xxxを実行する時間ですよ!」とcelerydの方に伝えるだけ。celerydが寝ていたら当然タスクは実行されないのでcelerydも一緒に起動しておく必要がある。

$ ./manage.py celerybeat -l DEBUG --settings=setting.xxx
$ ./manage.py celeryd -l DEBUG --settings=setting.xxx

※ タスクの内容にもよるけど、必ずしもDjangoをrunserverしておく必要は無い。

celerybeatを起動時に色々とログが出力される。その中にperiodic_taskデコレータをつけた関数が登録されているのが確認できる。で、時間が来ればそれらが実行される。