这是一个被我修改过的是生产 /消费者模型,由于需要消费者返回数据,所以不得已用了两个 Queue,第二个用于把结果写回去,最后在 main 函数里取出返回,下面的代码在我的 python3.7 和 python3.8 环境都可以正常运行,先上代码,请大家过目。
import asyncio
import pandas as pd
import random
import re
async def crawler(u):
"""
模拟爬虫返回结果
:param u: fake url
:return:
"""
i = int(re.search(r"\d+", u).group(0))
await asyncio.sleep(random.random())
return {'url': u, 'result': i}
async def worker(qin, qout, w):
"""
消费者异步函数
:param qin: Queue1,用于生产者写入和消费者读出
:param qout: Queue2,用于让消费者回写结果
:param w: worker id
:return:
"""
while True:
if qin.empty():
break
u = await qin.get()
print(f"Worker-{w} crawling {u}")
resp = await crawler(u)
await qout.put(resp)
qout.task_done()
async def generate_url(url, qin):
"""
生产者异步函数
:param url:
:param qin: Queue1,写出生产者产出的(这里为传入的) url
:return:
"""
await qin.put(url)
print(f"Queue size = {qin.qsize()}")
# qin.task_done()
async def main(qmax=20):
q_in = asyncio.Queue(qmax)
q_out = asyncio.Queue()
urls = [f"url{i}" for i in range(100)]
producers = [asyncio.create_task(generate_url(u, q_in)) for u in urls]
# producers = [await q_in.put(u) for u in urls]
consumers = [asyncio.create_task(worker(q_in, q_out, i)) for i in range(1, qmax // 2 + 1)]
await asyncio.gather(*consumers)
await asyncio.gather(*producers)
# await q_in.join()
await q_out.join()
for c in consumers:
c.cancel()
return [await q_out.get() for _ in range(q_out.qsize())]
if __name__ == "__main__":
result = asyncio.run(main(30))
df = pd.DataFrame(result).set_index('url')
目前有三个问题,
- 被注释掉的 qin.join,qin.task_done 是需要?还是让第二个 queue 阻塞就好了?
- main 函数中的 producers 是我写的异步列表推导式,想用于替代 generate_url 函数,但是不能正常运行
- 是否还有更优雅的实现方式?