跳转至

service

SchedulerService 🔗

Bases: Service

GraiaScheduler 的 Launart 服务

Parameters:

Name Type Description Default
scheduler GraiaScheduler

任务计划器

required
Source code in src/graia/scheduler/service.py
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
class SchedulerService(Service):
    """GraiaScheduler 的 Launart 服务

    Args:
        scheduler (GraiaScheduler): 任务计划器
    """

    id = "scheduler.service"

    def __init__(self, scheduler: GraiaScheduler) -> None:
        super().__init__()
        self.scheduler = scheduler

    @property
    def required(self):
        return set()

    @property
    def stages(self) -> Set[Literal["preparing", "blocking", "cleanup"]]:
        return {"preparing", "blocking", "cleanup"}

    async def launch(self, manager: Launart):
        async with self.stage("preparing"):
            pass  # Wait for preparation to complete, then we run tasks
        async with self.stage("blocking"):
            tsk = asyncio.create_task(self.scheduler.run())
            await manager.status.wait_for_sigexit()
        async with self.stage("cleanup"):
            # Stop all schedule tasks
            self.scheduler.stop()
            await self.scheduler.join()
            await tsk