感谢大家在前文 请教下各位 Gunicorn...提供的帮助,前段时间有些忙,没有一一回复大家。在这里感谢各位了,中秋快乐。
听取了大家的意见,由于 celery 对于我来说过于复杂,所以使用 rq 来做。
叨扰一下新的问题:
- 长耗时任务分为两个步骤
- 步骤 1 执行完,才执行步骤 2
- 步骤 2 内部需要拆分为多个 worker 并行执行。
目前遇到的问题是
- step_two 里面的 worker 变成串行了。
- 要怎么汇总 step_two 的运行结果
- 如何 close 访问 /task 后创建的任务
以下代码使用 gunicorn 默认模式运行
from datetime import datetime
from redis import Redis
import time
import rq_dashboard
from flask import Flask, request
from rq import Queue, Worker
from rq.command import send_stop_job_command
redis = Redis(host='127.0.0.1', port=6379)
queue = Queue(connection=redis, name='yixianghuitong')
app = Flask(__name__)
app.config['RQ_DASHBOARD_REDIS_URL'] = 'redis://localhost:6379'
app.config.from_object(rq_dashboard.default_settings)
app.register_blueprint(rq_dashboard.blueprint, url_prefix="/rq")
@app.route('/task')
def add_task():
user = request.args.get('user')
if user:
print('执行步骤 1')
d = datetime.now().strftime('%Y%m%d%H%M%S')
name = f"{user}_{d}"
queue.enqueue(
step_one,
args=(name,)
)
worker = Worker([queue], connection=redis, name=name)
worker.work(burst=True)
print('执行步骤 2')
step_two(name)
# TODO: 汇总上面的结果返回给网页端
return f"Task {name} complete"
return 'No value for n'
def step_one(name):
delay = 5
print(f"Running {name}, Simulating {delay} second delay")
time.sleep(delay)
f = open(f"step_one_{name}.txt", 'a')
d = datetime.now().strftime('%Y%m%d-%H%M%S')
f.write(d)
print(f"Task {name} complete")
return name
def step_two(name):
for i in range(2):
worker_name = f"{name}_{i}"
queue.enqueue(step_two_fn, worker_name)
worker = Worker([queue], connection=redis, name=name)
worker.work(burst=True)
def step_two_fn(name):
delay = 5
print(f"Running {name}, Simulating {delay} second delay")
time.sleep(delay)
f = open(f"step_two_fn_{name}.txt", 'a')
d = datetime.now().strftime('%Y%m%d-%H%M%S')
f.write(d)
print(f"Task {name} complete")
return name
@app.route('/close')
def close():
# list_jobs = queue.get_jobs()
# print(list_jobs)
id = request.args.get('id')
# job = queue.fetch_job(id)
send_stop_job_command(redis, id)
# for j in list_jobs:
# print(11, j.get_id())
# j.cancel()
return 'len(list_jobs)'