celeryのPeriodic Taskについてのメモ

必要になったので自分用にメモ。今必要な部分だけざっくりと把握しただけなので間違ったこと書いてたらごめんなさい。

Periodic Taskとは

Periodic Tasks — Celery 3.0.9 documentation

celeryにはPeriodic Taskというタスクをある一定時間ごとに実行してくれる仕組みがある。そのスケジュールを管理してくれるのがcelery beat。

celerybeat: タスクのスケジュールを管理
celeryd: タスクの実行

periodic_taskデコレータ

定期実行タスクをスケジューラーに登録するためにdjangoのsettings.pyにCELERYBEAT_SCHEDULEを設定する必要がある。が、これとは別の方法としてperiodic_taskデコレータを使用する方法があるのでそちらを使うことにする。

from celery.task import periodic_task
import datetime.timedelta as timedelta
...

@periodic_task(run_every=timedelta(minutes=1))
def hoge():
    ...

ソースを見た感じPeriodicTaskクラスは抽象クラスという位置づけのようで、実際に使うには継承してやって具体的な実装をする。その際に必ずrun_everyという属性を設定してやらないと例外を吐くように作られている。

なので、periodic_taskデコレータを使う場合も、引数に run_every=hogehoge というように渡してやらないといけない。

crontab

引数に渡す時間は先ほどのコード例のようにtimedeltaで指定してやる方法もあるが、より柔軟な設定を簡単にするためにcrontabという関数が用意されている。詳しくはドキュメントを参照。

from celery.task import periodic_task
from celery.task.schedule import crontab

@periodic_task(run_every=crontab())  # 毎分実行
def hoge():
    ...

@periodic_task(run_every=crontab(minute="0", hour="0")) 毎日深夜0:00に実行
def fuga():
    ...

タスクの実行

celerybeatはあくまでもスケジュールを管理するだけで、実行まではしてくれない。あくまでも「xxxを実行する時間ですよ!」とcelerydの方に伝えるだけ。celerydが寝ていたら当然タスクは実行されないのでcelerydも一緒に起動しておく必要がある。

$ ./manage.py celerybeat -l DEBUG --settings=setting.xxx
$ ./manage.py celeryd -l DEBUG --settings=setting.xxx

※ タスクの内容にもよるけど、必ずしもDjangoをrunserverしておく必要は無い。

celerybeatを起動時に色々とログが出力される。その中にperiodic_taskデコレータをつけた関数が登録されているのが確認できる。で、時間が来ればそれらが実行される。

sftpserverモジュールをforkしていじった件

諸事情によりテスト用のsftpサーバーが必要になったのでsftpserverなるモジュールを見つけてきた。しかしこいつシングルスレッドなので、やりたいことのためにはmultiprocessingと併用する必要があったわけなんだけど、そうするとなんかエラーが出る。

raise AssertionError("PID check failed. RNG must be re-initialized after fork().
Hint: Try Random.atfork()")

で調べてみたところそのものズバリのエラーで困ってる人がいた。

python - multiprocess module with paramiko - Stack Overflow

どうやらparamikoのバグらしい。paramikoのforkで、pure python実装なsshモジュールだとこの問題は解決済みらしい。

ちなみにpyftpdlibはFTPSには対応しているけれど、SFTPには対応してないとのことだったので諦めた。issueに「SFTPにも対応しないの?」みたいなのもあったけど、「pyftpdlibがやるようなことじゃない」って蹴られてたので多分今後もそういう方向には行かないと思う。
あと、pyFileSystemというものがSFTPに対応していたので使ってみたけど、こいつもparamiko依存なので同じエラーが出る。

で、仕方がないのでforkしてちょこっと改良した。

kk6/sftpserver at spike · GitHub

変更点

  • 内部でのparamikoの使用をやめてsshに移行した
  • optparseをやめてargparseに移行した
  • なんとなくデーモン化したかったのでpython-daemonでデーモン起動できるようにした

今にして思えばデーモン化はちょっと余計だった気がしてpull-requestは飛ばしてない

テストで使う

unittestのsetupで、こんな感じでmultiprocessingと併用して立ち上げてやる

def setUp(self):
    from sftpserver import start_server
    from multiprocessing import Process

    args = (HOST, PORT, KEYFILE, 'DEBUG')
    self._process = Process(target=start_server, args=args)
    self._process.start()


def tearDown(self):
    self._process.terminate()

Djangoの発行する生SQLが見たい

ForeignKeyとfilterのメモ - AtAsAtAmAtArA
DjangoのORマッパーでSQL文を簡単に出力するサンプル - 十番目のムーサ

というやりとりを見て。シェルで確かめるのが楽だし便利。
django.db.connection.queriesで見られます。

おもむろにdjangoのシェルを起動

$ python manage.py shell

>>> from django.db import connection
>>> connection.queries
[]  # 最初は当然からっぽ
>>> from entry.models import Entry  # 適当にモデルをインポート
>>> Entry.objects.filter(is_active=True).order_by('-modified')
[<Entry: ぴよ>, <Entry: aa>, <Entry: ふが>, <Entry: ほげ>]
>>> connection.queries[-1]['sql']
u'SELECT "entry"."id", "entry"."del_flg", "entry"."author_id", "entry"."title", "entry"."body", "entry"."slug", "entry"."created", "entry"."modified", "entry"."publish", "entry"."is_active" FROM "entry" WHERE "entry"."is_active" = True  ORDER BY "entry"."modified" DESC LIMIT 21'
>>> connection.queries[0].keys()
['time', 'sql']  # 実行時間と生SQLが入ってる

詳しくは公式ドキュメントのFAQを参照してください。