我之前写过类似的,和你思路差不多,外部用一个类记录状态并判断
```python
from apscheduler.executors.pool import ThreadPoolExecutor
from apscheduler.jobstores.memory import MemoryJobStore
from apscheduler.schedulers.background import BlockingScheduler
from loguru import logger
class LimitJob:
n = 0
@
classmethod def print_job(cls):
cls.n += 1
logger.info(f"run job {cls.n} times")
if cls.n >= 5:
jobs.shutdown(wait=False)
def init_scheduler() -> BlockingScheduler:
jobstores = {
'default': MemoryJobStore()
}
executors = {
'default': ThreadPoolExecutor(1)
}
job_defaults = {
'coalesce': False,
'max_instances': 1}
scheduler = BlockingScheduler(jobstores=jobstores, executors=executors, job_defaults=job_defaults)
scheduler.add_job(LimitJob.print_job, 'interval', seconds=3, id="print_job")
logger.info("调度任务创建成功")
scheduler.print_jobs()
return scheduler
if __name__ == '__main__':
jobs = init_scheduler()
jobs.start()
```
https://pastebin.ubuntu.com/p/XQMtf4RXgp/