Воркер Celery не закрывает соединения с Redis
Я создал тестовое приложение на Flask, которое использует Celery для асинхронной обработки его запросов. Когда я запускаю воркер celery, то по умолчанию создается 10 соединений celery с Redis, после отправки запросов, старые соединения не переиспользуются и не закрываются,а создаются новые. Все эти соединения Redis висят без запросов до тех пор, пока я не остановлю процесс Celery.
Я пробовал ставить timeout в redis.conf, но после его истечения соединения также висят в статусе CLOSE_WAIT, также я пробовал добавлять настройки celery: backend_health_check_interval, broker_connection_timeout, max_tasks_per_child. Ничего не помогает, только завершения работы самого процесса. Как закрывать неиспользуемые соединения с Redis?
Код приложения:
import redis
import loguru
import time
import celery
from celery import Celery
from flask import Flask
app = Flask(__name__)
CELERY_BROKER_URL = 'redis://localhost:6379/0'
CELERY_RESULT_BACKEND = 'redis://localhost:6379/0'
CELERY_TASK_ROUTES = {'hello_func': {'queue': 'redis'}}
def make_celery(app):
celery = Celery(
app.name,
backend=CELERY_RESULT_BACKEND,
broker=CELERY_BROKER_URL,
task_routes=CELERY_TASK_ROUTES
)
TaskBase = celery.Task
class ContextTask(TaskBase):
abstract = True
def __call__(self, *args, **kwargs):
with app.app_context():
return TaskBase.__call__(self, *args, **kwargs)
celery.Task = ContextTask
return celery
celery = make_celery(app)
app.celery = celery
@app.celery.task(name='hello_func')
def hello_func():
s = time.time()
loguru.logger.info("task_add")
time.sleep(20)
loguru.logger.info(f"task took {time.time()-s}")
@app.route('/')
def hello():
hello_func.apply_async()
return "hello"
if __name__ == '__main__':
app.run()