-
-
Notifications
You must be signed in to change notification settings - Fork 4.9k
Add worker_eta_task_limit configuration to manage ETA task memory usage #9853
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
8f2e60f to
e81eb4a
Compare
e81eb4a to
ba314c9
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
please check the failing unit tests
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Interesting PR - how did you test it (aside from the unit tests)?
2025-08-08T12:23:06.0103380Z ==================================== ERRORS ====================================
2025-08-08T12:23:06.0103733Z ________________ ERROR collecting t/unit/worker/test_request.py ________________
2025-08-08T12:23:06.0104157Z .tox/3.13-unit/lib/python3.13/site-packages/_pytest/python.py:493: in importtestmodule
2025-08-08T12:23:06.0104553Z mod = import_path(
2025-08-08T12:23:06.0104827Z .tox/3.13-unit/lib/python3.13/site-packages/_pytest/pathlib.py:587: in import_path
2025-08-08T12:23:06.0105134Z importlib.import_module(module_name)
2025-08-08T12:23:06.0105509Z /opt/hostedtoolcache/Python/3.13.2/x64/lib/python3.13/importlib/__init__.py:88: in import_module
2025-08-08T12:23:06.0105912Z return _bootstrap._gcd_import(name[level:], package, level)
2025-08-08T12:23:06.0106191Z <frozen importlib._bootstrap>:1387: in _gcd_import
2025-08-08T12:23:06.0106467Z ???
2025-08-08T12:23:06.0106696Z <frozen importlib._bootstrap>:1360: in _find_and_load
2025-08-08T12:23:06.0106908Z ???
2025-08-08T12:23:06.0107101Z <frozen importlib._bootstrap>:1331: in _find_and_load_unlocked
2025-08-08T12:23:06.0107325Z ???
2025-08-08T12:23:06.0107496Z <frozen importlib._bootstrap>:935: in _load_unlocked
2025-08-08T12:23:06.0107699Z ???
2025-08-08T12:23:06.0107961Z .tox/3.13-unit/lib/python3.13/site-packages/_pytest/assertion/rewrite.py:185: in exec_module
2025-08-08T12:23:06.0108278Z exec(co, module.__dict__)
2025-08-08T12:23:06.0108481Z t/unit/worker/test_request.py:22: in <module>
2025-08-08T12:23:06.0108694Z from celery.worker import strategy
2025-08-08T12:23:06.0109014Z E File "/home/runner/_work/celery/celery/celery/worker/strategy.py", line 205
2025-08-08T12:23:06.0109294Z E nonlocal eta_task_count
2025-08-08T12:23:06.0109472Z E ^^^^^^^^^^^^^^^^^^^^^^^
2025-08-08T12:23:06.0109710Z E SyntaxError: name 'eta_task_count' is used prior to nonlocal declaration
2025-08-08T12:23:06.0110059Z _______________ ERROR collecting t/unit/worker/test_strategy.py ________________
2025-08-08T12:23:06.0110435Z .tox/3.13-unit/lib/python3.13/site-packages/_pytest/python.py:493: in importtestmodule
2025-08-08T12:23:06.0110858Z mod = import_path(
2025-08-08T12:23:06.0111121Z .tox/3.13-unit/lib/python3.13/site-packages/_pytest/pathlib.py:587: in import_path
2025-08-08T12:23:06.0111418Z importlib.import_module(module_name)
2025-08-08T12:23:06.0111819Z /opt/hostedtoolcache/Python/3.13.2/x64/lib/python3.13/importlib/__init__.py:88: in import_module
2025-08-08T12:23:06.0112195Z return _bootstrap._gcd_import(name[level:], package, level)
2025-08-08T12:23:06.0112463Z <frozen importlib._bootstrap>:1387: in _gcd_import
2025-08-08T12:23:06.0112672Z ???
2025-08-08T12:23:06.0112847Z <frozen importlib._bootstrap>:1360: in _find_and_load
2025-08-08T12:23:06.0113058Z ???
2025-08-08T12:23:06.0113248Z <frozen importlib._bootstrap>:1331: in _find_and_load_unlocked
2025-08-08T12:23:06.0113466Z ???
2025-08-08T12:23:06.0113640Z <frozen importlib._bootstrap>:935: in _load_unlocked
2025-08-08T12:23:06.0113902Z ???
2025-08-08T12:23:06.0114173Z .tox/3.13-unit/lib/python3.13/site-packages/_pytest/assertion/rewrite.py:185: in exec_module
2025-08-08T12:23:06.0114486Z exec(co, module.__dict__)
2025-08-08T12:23:06.0114750Z t/unit/worker/test_strategy.py:15: in <module>
2025-08-08T12:23:06.0115054Z from celery.worker.strategy import default as default_strategy
2025-08-08T12:23:06.0115416Z E File "/home/runner/_work/celery/celery/celery/worker/strategy.py", line 205
2025-08-08T12:23:06.0115784Z E nonlocal eta_task_count
2025-08-08T12:23:06.0115985Z E ^^^^^^^^^^^^^^^^^^^^^^^
2025-08-08T12:23:06.0116232Z E SyntaxError: name 'eta_task_count' is used prior to nonlocal declaration
2025-08-08T12:23:06.0116616Z =============================== warnings summary =============================== |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Use tox -e lint to check lint issues locally
I tested the changes by mocking a bulk set of ETA tasks with an ETA of around 30 minutes. I then manually set a low memory limit for the worker process. The worker kept fetching the ETA messages from the Redis broker and holding them in memory. Once the memory usage hit the set limit, the worker started crashing, and the remaining ETA tasks were picked up by other workers, which also eventually crashed as they exceeded the memory limit. With the new configuration in place, the worker rejects new ETA tasks when the limit is hit, preventing memory exhaustion and worker crashes. This behaviour was validated by monitoring that workers no longer crashed under heavy ETA task load, and the rejected tasks were safely re-queued as expected. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Changed to draft until all of the issues are addressed.
8f798e7 to
45b4c41
Compare
|
Are these tests running on stale code ? I have already fixed the pertaining issue |
45b4c41 to
8c4e5f2
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Some comments about the implementation approach
acbbe08 to
dac304a
Compare
Codecov Report✅ All modified and coverable lines are covered by tests. Additional details and impacted files@@ Coverage Diff @@
## main #9853 +/- ##
=======================================
Coverage 78.63% 78.64%
=======================================
Files 153 153
Lines 19222 19223 +1
Branches 2555 2555
=======================================
+ Hits 15115 15117 +2
Misses 3810 3810
+ Partials 297 296 -1
Flags with carried forward coverage won't be shown. Click here to find out more. ☔ View full report in Codecov by Sentry. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thank you for fixing everything.
The failing integration tests are an issue with main.
We're trying to solve it ASAP to get a clean CI on new contributions, FYI
0d57db9 to
6b61e4d
Compare
I'll release it in the next 24h; OOO for today :) |
|
6b61e4d to
f9362eb
Compare
|
lets us wait for couple of days for another round of review and the CI fix... |
|
Thank you for working on this ! I’ll give it a look as well asap, when I have some time |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM. One minor comment on the docs
Unfortunately this one is a bit more tricky than expected. I'm planning to give it another shot tomorrow. Fyi |
@Nusnus Do you mean the PR or the CI fix ? |
CI. There are many contributions that are blocked because we can't confirm the changes are valid. |
8acc6ff to
fba15f3
Compare
- Introduced `worker_eta_task_limit` to limit the number of ETA/countdown tasks a worker can hold in memory, preventing memory exhaustion. - Updated the task execution strategy to reject new ETA tasks when the limit is reached. - Added documentation for the new configuration option. - Implemented unit tests to validate the behavior of the ETA task limit.
Co-authored-by: Tomer Nosrati <tomer.nosrati@gmail.com>
- Introduced `worker_eta_task_limit` setting to limit the number of ETA tasks a worker can hold in memory. - Implemented `ETATaskTracker` class to track and enforce the ETA task limit. - Updated `default` strategy to reject new ETA tasks when the limit is reached. - Added unit tests to verify the behavior of the ETA task limit and tracker.
…g attribute gracefully
…g in task consumer
fba15f3 to
25f4a38
Compare
This PR addresses issue #9849 :
BUG:
Workers encounter Out Of Memory (OOM) issues when fetching a large number of ETA/countdown tasks, as there’s no configurable limit on how many ETA tasks a worker can hold in memory. This results in workers crashing and then, those tasks go back to the queue, and get fetched by other workers, which go in their turn out of memory, and so on.
FIX:
worker_eta_task_limitto limit the number of ETA/countdown tasks a worker can hold in memory, preventing memory exhaustion.Fixes #9849