Celery を使った開発をしていて、重複した task がキューイングされると困る、というケースがある。
重複したtaskがキューイング、というのは、つまり、キューがワーカー上で実行されたタイミングで、(複数)ワーカー上の複数のプロセス上で同じ処理が同時に走ったりするケース。
なんど処理してもべき等な結果が得られる処理ならデータの整合性的には問題はないが、処理負荷が問題になるケースが有る。
例えば、
celery beat を使っていて、周期的にtaskのキューイングを行っている場合、ワーカーが想定時間でキューを消費していくのであれば問題ない。 しかし、何らかの不具合で長時間ワーカーが停止していて、ブローカーは動いていた場合、同じタスクのキューが溜まる。
この状態で、ワーカーが再スタートしたとき、堰を切ったようにキューが各ワーカーにsubscribeされ、いっきに複数の同じ処理が走ったりする。
これを防ぐために、タスクが処理中の場合、同じシグネチャのキューを入れないライブラリとして、celery_onceが便利なのですが、
のプルリクを見ると、エラー時にリトライするようなタスクだと、after_returnされない→タスクのステータスが RETRY のまま…? (FAILURE にもなってくれない) ということで、
READMEに書いてある github.com の通り、
unlock_before_run --------------------- By default, the lock is removed after the task has executed (using celery's `after_return <https://celery.readthedocs.org/en/latest/reference/celery.app.task.html#celery.app.task.Task.after_return>`_). This behaviour can be changed setting the task's option ``unlock_before_run``. When set to ``True``, the lock will be removed just before executing the task. **Caveat**: any retry of the task won't re-enable the lock! .. code:: python @celery.task(base=QueueOnce, once={'unlock_before_run': True}) def slow_task(): sleep(30) return "Done!"
unlock_before_run
オプションを入れなければならない。
…と思ったけど、 github.com ↑のプルリクが unlock_before_run のコミットより後ってことは、retry がうまく動かない件は未解決・マージ待ち、ということのよう。 celery-onceの導入は早かったかも…
celery-once をつかわない方法
Task Cookbook — Celery 4.0.0 documentation
Celery を 3.1.23 へアップデートした
タスクの順次実行を行う、 chord を使っていたときに
[2016-09-17 16:27:46,495: ERROR/Worker-3] Chord 'd260afd3-ee61-4968-a3e4-b4f741ad1dbe' raised: "RuntimeError('OrderedDict mutated during iteration',)" Traceback (most recent call last): File "/Users/peketamin/.pyenv/versions/xxx/lib/python3.5/site-packages/celery/backends/base.py", line 566, in on_chord_part_return ret = j(timeout=3.0, propagate=propagate) File "/Users/peketamin/.pyenv/versions/3.5.1/lib/python3.5/contextlib.py", line 77, in __exit__ self.gen.throw(type, value, traceback) File "/Users/peketamin/.pyenv/versions/xxx/lib/python3.5/site-packages/celery/result.py", line 53, in allow_join_result yield File "/Users/peketamin/.pyenv/versions/xxx/lib/python3.5/site-packages/celery/backends/base.py", line 566, in on_chord_part_return ret = j(timeout=3.0, propagate=propagate) File "/Users/peketamin/.pyenv/versions/xxx/lib/python3.5/site-packages/celery/result.py", line 688, in join_native for task_id, meta in self.iter_native(timeout, interval, no_ack): File "/Users/peketamin/.pyenv/versions/xxx/lib/python3.5/site-packages/celery/backends/base.py", line 475, in get_many cache.update(r) File "/Users/peketamin/.pyenv/versions/xxx/lib/python3.5/site-packages/celery/utils/functional.py", line 68, in update for item in islice(iter(data), len(data) - limit): RuntimeError: OrderedDict mutated during iteration
というエラーが出たので。
この対応かな?
Fix LRUCache.update for Python 3.5 by brakhane · Pull Request #2898 · celery/celery · GitHub
っていうかアレだ、Celery をちゃんと運用したいなら broker は Redis じゃなくて、 RabbitMQ 使ったほうが一番ちゃんと運用できそう…