読者です 読者をやめる 読者になる 読者になる

peketaminの日記

その辺のプログラマーのチラ裏です。

celery-once の retry タスクにおける注意点

Celery を使った開発をしていて、重複した task がキューイングされると困る、というケースがある。

重複したtaskがキューイング、というのは、つまり、キューがワーカー上で実行されたタイミングで、(複数)ワーカー上の複数のプロセス上で同じ処理が同時に走ったりするケース。

なんど処理してもべき等な結果が得られる処理ならデータの整合性的には問題はないが、処理負荷が問題になるケースが有る。

例えば、

celery beat を使っていて、周期的にtaskのキューイングを行っている場合、ワーカーが想定時間でキューを消費していくのであれば問題ない。 しかし、何らかの不具合で長時間ワーカーが停止していて、ブローカーは動いていた場合、同じタスクのキューが溜まる。

この状態で、ワーカーが再スタートしたとき、堰を切ったようにキューが各ワーカーにsubscribeされ、いっきに複数の同じ処理が走ったりする。

これを防ぐために、タスクが処理中の場合、同じシグネチャのキューを入れないライブラリとして、celery_onceが便利なのですが、

github.com

のプルリクを見ると、エラー時にリトライするようなタスクだと、after_returnされない→タスクのステータスが RETRY のまま…? (FAILURE にもなってくれない) ということで、

READMEに書いてある github.com の通り、

unlock_before_run
---------------------

By default, the lock is removed after the task has executed (using celery's `after_return <https://celery.readthedocs.org/en/latest/reference/celery.app.task.html#celery.app.task.Task.after_return>`_). This behaviour can be changed setting the task's option ``unlock_before_run``. When set to ``True``, the lock will be removed just before executing the task.

**Caveat**: any retry of the task won't re-enable the lock!

.. code:: python

    @celery.task(base=QueueOnce, once={'unlock_before_run': True})
    def slow_task():
        sleep(30)
        return "Done!"

unlock_before_run オプションを入れなければならない。

…と思ったけど、 github.com ↑のプルリクが unlock_before_run のコミットより後ってことは、retry がうまく動かない件は未解決・マージ待ち、ということのよう。 celery-onceの導入は早かったかも…


celery-once をつかわない方法

Task Cookbook — Celery 4.0.0 documentation


Celery を 3.1.23 へアップデートした

タスクの順次実行を行う、 chord を使っていたときに

[2016-09-17 16:27:46,495: ERROR/Worker-3] Chord 'd260afd3-ee61-4968-a3e4-b4f741ad1dbe' raised: "RuntimeError('OrderedDict mutated during iteration',)"
Traceback (most recent call last):
  File "/Users/peketamin/.pyenv/versions/xxx/lib/python3.5/site-packages/celery/backends/base.py", line 566, in on_chord_part_return
    ret = j(timeout=3.0, propagate=propagate)
  File "/Users/peketamin/.pyenv/versions/3.5.1/lib/python3.5/contextlib.py", line 77, in __exit__
    self.gen.throw(type, value, traceback)
  File "/Users/peketamin/.pyenv/versions/xxx/lib/python3.5/site-packages/celery/result.py", line 53, in allow_join_result
    yield
  File "/Users/peketamin/.pyenv/versions/xxx/lib/python3.5/site-packages/celery/backends/base.py", line 566, in on_chord_part_return
    ret = j(timeout=3.0, propagate=propagate)
  File "/Users/peketamin/.pyenv/versions/xxx/lib/python3.5/site-packages/celery/result.py", line 688, in join_native
    for task_id, meta in self.iter_native(timeout, interval, no_ack):
  File "/Users/peketamin/.pyenv/versions/xxx/lib/python3.5/site-packages/celery/backends/base.py", line 475, in get_many
    cache.update(r)
  File "/Users/peketamin/.pyenv/versions/xxx/lib/python3.5/site-packages/celery/utils/functional.py", line 68, in update
    for item in islice(iter(data), len(data) - limit):
RuntimeError: OrderedDict mutated during iteration

というエラーが出たので。

この対応かな?

Fix LRUCache.update for Python 3.5 by brakhane · Pull Request #2898 · celery/celery · GitHub


っていうかアレだ、Celery をちゃんと運用したいなら broker は Redis じゃなくて、 RabbitMQ 使ったほうが一番ちゃんと運用できそう…