V2EX = way to explore
V2EX 是一个关于分享和探索的地方
Sign Up Now
For Existing Member  Sign In
推荐学习书目
Learn Python the Hard Way
Python Sites
PyPI - Python Package Index
http://diveintopython.org/toc/index.html
Pocoo
值得关注的项目
PyPy
Celery
Jinja2
Read the Docs
gevent
pyenv
virtualenv
Stackless Python
Beautiful Soup
结巴中文分词
Green Unicorn
Sentry
Shovel
Pyflakes
pytest
Python 编程
pep8 Checker
Styles
PEP 8
Google Python Style Guide
Code Style from The Hitchhiker's Guide
craftx
V2EX  ›  Python

有 N 个 asyncio task,要求并发处理,并发量不超过 k,有什么优雅的写法?

  •  
  •   craftx · May 30, 2023 · 2100 views
    This topic created in 1064 days ago, the information mentioned may be changed or developed.
    15 replies    2023-06-05 13:36:20 +08:00
    wliansheng
        1
    wliansheng  
       May 30, 2023
    不知道,楼主可以“抛砖引玉”?
    NessajCN
        2
    NessajCN  
       May 30, 2023
    啥叫并发量不超过 k ?
    await asyncio.gather([小于 k 个 coro])
    你是说这样?
    seers
        3
    seers  
       May 30, 2023 via Android
    for 一下?
    alexsunxl
        4
    alexsunxl  
       May 30, 2023
    k 数量大小的线程池?
    完成一个 task 就回收到池子里。
    思路应该是比较简单的吧
    BBCCBB
        5
    BBCCBB  
       May 30, 2023
    asyncio 就是并发的?

    再加个 Semaphore. 来控制同时最大请求数.
    centralpark
        6
    centralpark  
       May 30, 2023
    asyncio.Semaphore 。不过这种问题适合问 ChatGPT 吧……
    Trim21
        7
    Trim21  
       May 30, 2023
    Semaphore
    jonathanchoo
        8
    jonathanchoo  
       May 30, 2023
    您可以使用 asyncio 的 asyncio.gather() 方法来实现这个功能。您可以将所有的 asyncio task 放在一个 list 中,然后在 asyncio.gather() 方法中指定 concurrency 参数为 k ,即可实现并发量不超过 k 的并发处理。

    以下是示例代码:

    ```python
    import asyncio

    async def task1():
    # Your code here

    async def task2():
    # Your code here

    # Put all tasks in a list
    tasks = [task1(), task2(), ...]

    async def main():
    # Use asyncio.gather to run tasks concurrently with a maximum concurrency of k
    await asyncio.gather(*tasks, return_exceptions=True, concurrency=k)

    # Run the main function
    asyncio.run(main())
    ```
    craftx
        9
    craftx  
    OP
       May 30, 2023
    @jonathanchoo
    你好。在官方最新文档中,asyncio.gather 没有 concurrency 这个参数。
    执行抛出异常:TypeError: gather() got an unexpected keyword argument 'concurrency'
    https://docs.python.org/3/library/asyncio-task.html
    iorilu
        10
    iorilu  
       May 30, 2023
    可以建立若干 worker ,N 个 worker 就控制并发 N 了把

    我记得有这个模式
    zzl22100048
        11
    zzl22100048  
       May 31, 2023
    创建一个
    sem = asyncio.Semaphore(k)

    async def task():
    async with sem:
    ....

    async def main():
    await asyncio.gather(*[ task() for _ in range(n) ])

    asyncio.run(main())



    如果更喜欢 pipeline 语法,可以用 aiostream
    from aiostream import pipe, stream
    async def main():
    await (
    stream.iterate(range(n))
    | pipe.map(task,task_limit=k)
    )
    ruanimal
        12
    ruanimal  
       May 31, 2023
    @craftx 他用 gpt 生成的答案吧,可能是 gpt 编的
    photon006
        13
    photon006  
       May 31, 2023
    node.js 有个 promise 库 bluebird ,map()方法可以传一个参数 concurrency 控制并发量。

    http://bluebirdjs.com/docs/api/promise.map.html
    zyxbcde
        14
    zyxbcde  
       May 31, 2023
    @craftx 我是习惯把任务列表切成若干个不大于 k 的子列表,然后顺序去 gather 这些子列表,每跑完一个子列表打印个进度,好歹知道自己跑到哪了吧。
    gather 里面从来就没有 concurrency 这个参数,这人用 gpt 生成个错误答案故意恶心人吧。
    craftx
        15
    craftx  
    OP
       Jun 5, 2023
    @zzl22100048 采用了 aiostream 的办法。谢谢
    About   ·   Help   ·   Advertise   ·   Blog   ·   API   ·   FAQ   ·   Solana   ·   3622 Online   Highest 6679   ·     Select Language
    创意工作者们的社区
    World is powered by solitude
    VERSION: 3.9.8.5 · 75ms · UTC 11:13 · PVG 19:13 · LAX 04:13 · JFK 07:13
    ♥ Do have faith in what you're doing.