推荐学习书目
Learn Python the Hard Way
Python Sites
PyPI - Python Package Index
http://diveintopython.org/toc/index.html
Pocoo
值得关注的项目
PyPy
Celery
Jinja2
Read the Docs
gevent
pyenv
virtualenv
Stackless Python
Beautiful Soup
结巴中文分词
Green Unicorn
Sentry
Shovel
Pyflakes
pytest
Python 编程
pep8 Checker
Styles
PEP 8
Google Python Style Guide
Code Style from The Hitchhiker's Guide
vegetableChick
V2EX  ›  Python

多线程消费, 队列没有清空就退出了, 请教原因, 谢谢!

  •  1
     
  •   vegetableChick · Jan 18, 2022 · 3514 views
    This topic created in 1600 days ago, the information mentioned may be changed or developed.

    我想通过 ThreadPoolExecutor 使用多个线程来消耗 redis 队列。但进程在队列没有消耗完的情况下退出了

    下面是实现代码

    from concurrent.futures import ThreadPoolExecutor
    
    import redis
    from redis import Redis
    
    
    
    pool = redis.ConnectionPool(
        max_connections=settings.REDIS_POOL_MAX_CLIENT,
        host=settings.REDIS_HOST,
        port=settings.REDIS_PORT,
        db=8,
        decode_responses=True
    )
    
    redis_con = Redis(connection_pool=pool)
    
    
    class BasicTask(object):
         def __init__(self,
                     consume_queue_name=None,
                     thread_num=50):
            self.consume_queue_name = consume_queue_name
            self.thread_num = thread_num
    
        def _consume(self):
            try:
                with ThreadPoolExecutor(max_workers=self.thread_num) as e:
                    e.map(self._do_request, range(0, self.thread_num))
            except Exception as e:
                self.logger.error(f"[consume error]: {e}")
    
        def _do_request(self, _):
            try:
                with redis_con as redis_conn:
                    while 1:
                        account_id_info = redis_conn.rpop(self.consume_queue_name)
                        if account_id_info:
                            try:
                                # django orm save db
                                ...
    
                            except Exception as e:
                                import traceback as tb
                                tb.print_exc()
                                self.logger.error(f"[consume error]: {e}. ")
    
                        else:
                            break
    
            except Exception as e:
                self.logger.error(f"[Unexpected Error: {e}]")
                import traceback as tb
                tb.print_exc()
    
        def run(self):
            self._consume()
    
    
    
    # run 
    BasicTask(consume_queue_name="base_list_queue").run()
    
    

    请问 bug 写在哪里了? 感谢大佬

    python3.7.3

    2 replies    2022-01-18 18:52:44 +08:00
    MoYi123
        1
    MoYi123  
       Jan 18, 2022
    唯一的可能性就是 if account_id_info: 后面的 break 了吧, 不然都是有日志的.
    wuwukai007
        2
    wuwukai007  
       Jan 18, 2022
    不是应该 brpop 之后在开启多线程 执行任务吗,怎么多线程 brpop 了
    About   ·   Help   ·   Advertise   ·   Blog   ·   API   ·   FAQ   ·   Solana   ·   2738 Online   Highest 6679   ·     Select Language
    创意工作者们的社区
    World is powered by solitude
    VERSION: 3.9.8.5 · 31ms · UTC 02:10 · PVG 10:10 · LAX 19:10 · JFK 22:10
    ♥ Do have faith in what you're doing.