Python新起一个线程来不停地跑协程任务

任务背景:不停地去第三方服务器请求HTTP资源,根据请求到的信息做下一步动作

当前方案:启动一个单独的线程不停的监听任务列表,有任务就丢给协程去异步处理

大致代码如下:

import sys
import time
from functools import partial
from threading import Thread
from typing import Callable

import anyio
import httpx


class Monitor:
    running = True
    tasks: list[Callable] = []
    results: list = []

    @classmethod
    async def gather(cls, async_func) -> None:
        cls.results.append(await async_func())

    @classmethod
    def wait(cls, seconds: int, interval=1) -> None:
        for _ in range(int(seconds / interval)):
            if cls.tasks:
                time.sleep(interval)


def forever_loop() -> None:
    async def waiting() -> None:
        async with anyio.create_task_group() as tg:
            while Monitor.running:
                for t in Monitor.tasks:
                    tg.start_soon(Monitor.gather, t)
                Monitor.tasks.clear()
                await anyio.sleep(0.01)
            tg.cancel_scope.cancel()

    anyio.run(waiting)


class EventLoopThread(Thread):
    def __init__(self) -> None:
        super().__init__(target=forever_loop)

    def close(self) -> None:
        Monitor.running = False

    @staticmethod
    def add_task(async_func: Callable) -> None:
        Monitor.tasks.append(async_func)

    @staticmethod
    def wait_until(seconds: int) -> None:
        Monitor.wait(seconds=seconds)


async def fetch(url: str, verbose=True) -> httpx.Response:
    async with httpx.AsyncClient(verify=False) as client:
        r = await client.get(url)
    if verbose:
        print(f"{url = }")
        print(r.text)
    return r


def main() -> None:
    total = 10
    url = "https://qq.com"
    if sys.argv[1:]:
        arg = sys.argv[1]
        if arg.isdigit():
            total = int(arg)
        elif arg.startswith("http"):
            url = arg
    t = EventLoopThread()
    t.daemon = True
    t.start()
    for _ in range(total):
        t.add_task(partial(fetch, url=url))
    t.wait_until(seconds=10)
    t.close()
    t.join()
    print(f"{len(Monitor.results)=}")
    print("Done.")


if __name__ == "__main__":
    main()