コミット時にpdb消し忘れてないかチェックするhg hook

チェック用のスクリプト

こんな感じのスクリプト作成

#!/usr/bin/env python

import os
import sys


COMMAND = r"find . -name '*.py' | xargs grep -lr '{pattern}'"
INFO = '{filename}: pdb found.'


def check(pattern):
    return os.popen(COMMAND.format(pattern=pattern)).readlines()


def display_warning(lines):
    for line in lines:
        print >> sys.stderr, INFO.format(filename=line.strip('\n'))


def check_pdb(*args, **kwargs):
    lines = check('pdb.set_trace')
    if lines:
        display_warning(lines)
        sys.exit(1)

~/.hgext/hookpdb/check.py みたいな感じで配置。

.hgrcに追記

[hook]
pretxncommit.pdb = python:~/.hgext/hookpdb/check.py:check_pdb

試してみる

pdb仕込みっぱなしのファイルがある状態でコミットしようとすると…

$ hg ci
./hoge.py: pdb found.
transaction abort!
rollback completed
note: commit message saved in .hg/last-message.txt

うん、できてる。けど、ファイル多いともっさりがキニナルような…

追記

ありがとうございますありがとうございます!
描き直してみました。

#!/usr/bin/env python

import os
import sys
from fnmatch import fnmatch


COMMAND = r"grep -l {pattern} {file}"
INFO = '{filename}: debugger found.'
PATTERNS = ['pdb.set_trace', 'import debug']


def check(repo, file, patterns):
    patterns = ['-e "{ptn}"'.format(ptn=pattern) for pattern in patterns]
    pattern = ' '.join(patterns)
    file = os.path.join(repo.root, file)
    return os.popen(COMMAND.format(pattern=pattern, file=file)).read()


def display_warning(lines):
    for line in lines:
        print >> sys.stderr, INFO.format(filename=line.strip('\n'))


def get_pyfiles(repo, node):
    return [f for f in repo[node].files() if fnmatch(f, '*.py')]


def check_pdb(ui, repo, hooktype, node=None, source=None, **kwargs):
    lines = [f for f in get_pyfiles(repo, node) if check(repo, f, PATTERNS)]
    if lines:
        display_warning(lines)
        sys.exit(1)

さらに修正

リポジトリルートで実行しないとgrep時にnot foundになってしまっていたので絶対パスgrepするように修正しました。

さらにさらに修正

ゆとりな僕はpdbよりもdebugモジュールを使いがちなのでパターンを複数受け取れるように修正しました。もうGistでやれよと思います。

インスタンスに動的にメソッドを追加する

追記:
記事を公開してからおんなじような記事あったりするかなと思ってググったら2年前にIanさんがほとんど同じ内容書いてた。
Pythonでメソッドをクラスまたはインスタンスに動的に追加する - Ian Lewis
しかも僕その記事はてブしてたよ…

Interactive Shell で色々試してる時なんかにインスタンスにメソッドを追加したくなる時がある。

>>> class Person(object):
...     def __init__(self, name):
...         self._name = name
...
>>> alice = Person("Alice")
>>> bob = Person("Bob")

こんなクラスがあって、幾つかインスタンスを作っているとする。で、self._nameを取得するメソッドが欲しくなったとしよう。

>>> def get_name(self):
...     return self._name

これを単純にインスタンスのメンバーとして代入してもうまくいかない

>>> alice.get_name = get_name
>>> alice.get_name()
Traceback (most recent call last):
  File "<stdin>", line 1, in <module>
TypeError: get_name() takes exactly 1 argument (0 given)

これはalice.get_nameがただの関数だから。インスタンスメソッドではない。

>>> type(alice.get_name)
<type 'function'>

コレジャナイので一旦消します。

>>> del alice.get_name

さて、じゃあどうすればいいかというと

>>> Person.get_name = get_name

というようにクラスの方に追加してやる。ただ、これだと当然ながら全てのインスタンスに影響が出る

>>> alice.get_name()
'Alice'
>>> bob.get_name()
'Bob'

影響が出るというか、実はaliceもbobもget_nameなんて持ってない。持ってないんだけど、クラスの方を見に行ったらそっちにあったからそれを呼び出してるだけ。

>>> del alice.get_name
Traceback (most recent call last):
  File "<stdin>", line 1, in <module>
AttributeError: get_name
>>> alice.__dict__
{'_name': 'Alice'}

というわけでまあこれでいいんだけど、もしある特定のインスタンスにだけ設定したいなんて場合にはどうすればいいか…というのが今回の本題。結論としては、types.MethodTypeを使う。

>>> del Person.get_name
>>> import types
>>> alice.get_name = types.MethodType(get_name, alice)
>>> alice.get_name()
'Alice'
>>> bob.get_name()
Traceback (most recent call last):
  File "<stdin>", line 1, in <module>
AttributeError: 'Person' object has no attribute 'get_name'

第一引数にメソッドとして登録したい関数、第二引数にインスタンスを渡してやると、それらを紐付けてくれる。これでaliceはget_nameメソッドを持ってるけど、bobは持っていないという状態にすることができる。用途が思いつかないけど気になったから調べてみただけ。

celeryのPeriodic Taskについてのメモ

必要になったので自分用にメモ。今必要な部分だけざっくりと把握しただけなので間違ったこと書いてたらごめんなさい。

Periodic Taskとは

Periodic Tasks — Celery 3.0.9 documentation

celeryにはPeriodic Taskというタスクをある一定時間ごとに実行してくれる仕組みがある。そのスケジュールを管理してくれるのがcelery beat。

celerybeat: タスクのスケジュールを管理
celeryd: タスクの実行

periodic_taskデコレータ

定期実行タスクをスケジューラーに登録するためにdjangoのsettings.pyにCELERYBEAT_SCHEDULEを設定する必要がある。が、これとは別の方法としてperiodic_taskデコレータを使用する方法があるのでそちらを使うことにする。

from celery.task import periodic_task
import datetime.timedelta as timedelta
...

@periodic_task(run_every=timedelta(minutes=1))
def hoge():
    ...

ソースを見た感じPeriodicTaskクラスは抽象クラスという位置づけのようで、実際に使うには継承してやって具体的な実装をする。その際に必ずrun_everyという属性を設定してやらないと例外を吐くように作られている。

なので、periodic_taskデコレータを使う場合も、引数に run_every=hogehoge というように渡してやらないといけない。

crontab

引数に渡す時間は先ほどのコード例のようにtimedeltaで指定してやる方法もあるが、より柔軟な設定を簡単にするためにcrontabという関数が用意されている。詳しくはドキュメントを参照。

from celery.task import periodic_task
from celery.task.schedule import crontab

@periodic_task(run_every=crontab())  # 毎分実行
def hoge():
    ...

@periodic_task(run_every=crontab(minute="0", hour="0")) 毎日深夜0:00に実行
def fuga():
    ...

タスクの実行

celerybeatはあくまでもスケジュールを管理するだけで、実行まではしてくれない。あくまでも「xxxを実行する時間ですよ!」とcelerydの方に伝えるだけ。celerydが寝ていたら当然タスクは実行されないのでcelerydも一緒に起動しておく必要がある。

$ ./manage.py celerybeat -l DEBUG --settings=setting.xxx
$ ./manage.py celeryd -l DEBUG --settings=setting.xxx

※ タスクの内容にもよるけど、必ずしもDjangoをrunserverしておく必要は無い。

celerybeatを起動時に色々とログが出力される。その中にperiodic_taskデコレータをつけた関数が登録されているのが確認できる。で、時間が来ればそれらが実行される。