Celery を使った開発をしていて、重複した task がキューイングされると困る、というケースがある。
重複したtaskがキューイング、というのは、つまり、キューがワーカー上で実行されたタイミングで、(複数)ワーカー上の複数のプロセス上で同じ処理が同時に走ったりするケース。
なんど処理してもべき等な結果が得られる処理ならデータの整合性的には問題はないが、処理負荷が問題になるケースが有る。
例えば、
celery beat を使っていて、周期的にtaskのキューイングを行っている場合、ワーカーが想定時間でキューを消費していくのであれば問題ない。
しかし、何らかの不具合で長時間ワーカーが停止していて、ブローカーは動いていた場合、同じタスクのキューが溜まる。
この状態で、ワーカーが再スタートしたとき、堰を切ったようにキューが各ワーカーにsubscribeされ、いっきに複数の同じ処理が走ったりする。
これを防ぐために、タスクが処理中の場合、同じシグネチャのキューを入れないライブラリとして、celery_onceが便利なのですが、
github.com
のプルリクを見ると、エラー時にリトライするようなタスクだと、after_returnされない→タスクのステータスが RETRY のまま…?
(FAILURE にもなってくれない)
ということで、
READMEに書いてある github.com の通り、
unlock_before_run
---------------------
By default, the lock is removed after the task has executed (using celery's `after_return <https://celery.readthedocs.org/en/latest/reference/celery.app.task.html#celery.app.task.Task.after_return>`_). This behaviour can be changed setting the task's option ``unlock_before_run``. When set to ``True``, the lock will be removed just before executing the task.
**Caveat**: any retry of the task won't re-enable the lock!
.. code:: python
@celery.task(base=QueueOnce, once={'unlock_before_run': True})
def slow_task():
sleep(30)
return "Done!"
unlock_before_run
オプションを入れなければならない。
…と思ったけど、
github.com
↑のプルリクが unlock_before_run のコミットより後ってことは、retry がうまく動かない件は未解決・マージ待ち、ということのよう。
celery-onceの導入は早かったかも…
celery-once をつかわない方法
Task Cookbook — Celery 4.0.0 documentation
Celery を 3.1.23 へアップデートした
タスクの順次実行を行う、 chord を使っていたときに
[2016-09-17 16:27:46,495: ERROR/Worker-3] Chord 'd260afd3-ee61-4968-a3e4-b4f741ad1dbe' raised: "RuntimeError('OrderedDict mutated during iteration',)"
Traceback (most recent call last):
File "/Users/peketamin/.pyenv/versions/xxx/lib/python3.5/site-packages/celery/backends/base.py", line 566, in on_chord_part_return
ret = j(timeout=3.0, propagate=propagate)
File "/Users/peketamin/.pyenv/versions/3.5.1/lib/python3.5/contextlib.py", line 77, in __exit__
self.gen.throw(type, value, traceback)
File "/Users/peketamin/.pyenv/versions/xxx/lib/python3.5/site-packages/celery/result.py", line 53, in allow_join_result
yield
File "/Users/peketamin/.pyenv/versions/xxx/lib/python3.5/site-packages/celery/backends/base.py", line 566, in on_chord_part_return
ret = j(timeout=3.0, propagate=propagate)
File "/Users/peketamin/.pyenv/versions/xxx/lib/python3.5/site-packages/celery/result.py", line 688, in join_native
for task_id, meta in self.iter_native(timeout, interval, no_ack):
File "/Users/peketamin/.pyenv/versions/xxx/lib/python3.5/site-packages/celery/backends/base.py", line 475, in get_many
cache.update(r)
File "/Users/peketamin/.pyenv/versions/xxx/lib/python3.5/site-packages/celery/utils/functional.py", line 68, in update
for item in islice(iter(data), len(data) - limit):
RuntimeError: OrderedDict mutated during iteration
というエラーが出たので。
この対応かな?
Fix LRUCache.update for Python 3.5 by brakhane · Pull Request #2898 · celery/celery · GitHub
っていうかアレだ、Celery をちゃんと運用したいなら broker は Redis じゃなくて、 RabbitMQ 使ったほうが一番ちゃんと運用できそう…