任务背景:不停地去第三方服务器请求HTTP资源,根据请求到的信息做下一步动作
当前方案:启动一个单独的线程不停的监听任务列表,有任务就丢给协程去异步处理
大致代码如下:
import sys import time from functools import partial from threading import Thread from typing import Callable import anyio import httpx class Monitor: running = True tasks: list[Callable] = [] results: list = [] @classmethod async def gather(cls, async_func) -> None: cls.results.append(await async_func()) @classmethod def wait(cls, seconds: int, interval=1) -> None: for _ in range(int(seconds / interval)): if cls.tasks: time.sleep(interval) def forever_loop() -> None: async def waiting() -> None: async with anyio.create_task_group() as tg: while Monitor.running: for t in Monitor.tasks: tg.start_soon(Monitor.gather, t) Monitor.tasks.clear() await anyio.sleep(0.01) tg.cancel_scope.cancel() anyio.run(waiting) class EventLoopThread(Thread): def __init__(self) -> None: super().__init__(target=forever_loop) def close(self) -> None: Monitor.running = False @staticmethod def add_task(async_func: Callable) -> None: Monitor.tasks.append(async_func) @staticmethod def wait_until(seconds: int) -> None: Monitor.wait(seconds=seconds) async def fetch(url: str, verbose=True) -> httpx.Response: async with httpx.AsyncClient(verify=False) as client: r = await client.get(url) if verbose: print(f"{url = }") print(r.text) return r def main() -> None: total = 10 url = "https://qq.com" if sys.argv[1:]: arg = sys.argv[1] if arg.isdigit(): total = int(arg) elif arg.startswith("http"): url = arg t = EventLoopThread() t.daemon = True t.start() for _ in range(total): t.add_task(partial(fetch, url=url)) t.wait_until(seconds=10) t.close() t.join() print(f"{len(Monitor.results)=}") print("Done.") if __name__ == "__main__": main()