跳转至

task

SchedulerTask 🔗

Source code in src/graia/scheduler/task.py
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
class SchedulerTask:
    target: Callable[..., Any]
    timer: Timer
    task: Optional[asyncio.Task]

    broadcast: Broadcast
    dispatchers: List[T_Dispatcher]
    decorators: List[Decorator]

    cancelable: bool
    stopped: bool

    sleep_record: EnteredRecord
    run_record: EnteredRecord

    loop: asyncio.AbstractEventLoop

    @property
    def is_sleeping(self) -> bool:
        return self.sleep_record.entered

    @property
    def is_executing(self) -> bool:
        return self.run_record.entered and not self.sleep_record.entered

    def __init__(
        self,
        target: Callable[..., Any],
        timer: Timer,
        broadcast: Broadcast,
        loop: Optional[asyncio.AbstractEventLoop] = None,
        cancelable: bool = False,
        dispatchers: Optional[List[T_Dispatcher]] = None,
        decorators: Optional[List[Decorator]] = None,
    ) -> None:
        self.target = target
        self.timer = timer
        self.broadcast = broadcast
        self.loop = loop or asyncio.get_running_loop()
        self.cancelable = cancelable
        self.task = None
        self.stopped = False
        self.dispatchers = dispatchers or []
        self.decorators = decorators or []
        self.sleep_record = EnteredRecord()
        self.run_record = EnteredRecord()

    def setup_task(self) -> asyncio.Task:
        """将本 SchedulerTask 作为 asyncio.Task 排入事件循环."""
        if self.task:
            raise AlreadyStarted("the scheduler task has been started!")
        self.task = self.loop.create_task(self.run())
        return self.task

    def sleep_interval_generator(self) -> Generator[float, None, None]:
        for next_execute_time in self.timer:
            if self.stopped:
                return
            now = datetime.now()
            if next_execute_time >= now:
                yield (next_execute_time - now).total_seconds()

    def coroutine_generator(self) -> Generator[Tuple[Awaitable[Any], bool], None, None]:
        for sleep_interval in self.sleep_interval_generator():
            yield (asyncio.sleep(sleep_interval), True)
            yield (
                self.broadcast.Executor(
                    target=ExecTarget(
                        callable=self.target,
                        inline_dispatchers=self.dispatchers,
                        decorators=self.decorators,
                    ),
                ),
                False,
            )

    @print_track_async
    async def run(self) -> None:
        if self.run_record.entered:
            raise AlreadyStarted("the scheduler task has been started!")
        with self.run_record:
            for coro, waiting in self.coroutine_generator():
                if waiting:  # 是否为 asyncio.sleep 的 coro
                    with self.sleep_record:
                        try:
                            await coro
                            continue
                        except asyncio.CancelledError:
                            return
                try:
                    await (coro if self.cancelable else asyncio.shield(coro))
                except asyncio.CancelledError:
                    if self.cancelable:
                        return
                    raise
                except (ExecutionStop, PropagationCancelled):
                    pass
                except Exception as e:
                    traceback.print_exc()
                    await self.broadcast.postEvent(ExceptionThrown(e, None))

    def stop_gen_interval(self) -> None:
        if not self.stopped:
            self.stopped = True

    async def join(self, stop: bool = False) -> None:
        """阻塞直至当前 SchedulerTask 执行完毕.

        Args:
            stop (bool, optional): 是否停止当前 SchedulerTask 下一次运行. 默认为 False.
        """
        if stop and not self.stopped:
            self.stop_gen_interval()

        if self.task:
            await self.task
        self.task = None

    def stop(self):
        """停止当前 SchedulerTask."""
        if self.task and not self.task.cancelled():
            self.task.cancel()

join(stop=False) async 🔗

阻塞直至当前 SchedulerTask 执行完毕.

Parameters:

Name Type Description Default
stop bool

是否停止当前 SchedulerTask 下一次运行. 默认为 False.

False
Source code in src/graia/scheduler/task.py
123
124
125
126
127
128
129
130
131
132
133
134
async def join(self, stop: bool = False) -> None:
    """阻塞直至当前 SchedulerTask 执行完毕.

    Args:
        stop (bool, optional): 是否停止当前 SchedulerTask 下一次运行. 默认为 False.
    """
    if stop and not self.stopped:
        self.stop_gen_interval()

    if self.task:
        await self.task
    self.task = None

setup_task() 🔗

将本 SchedulerTask 作为 asyncio.Task 排入事件循环.

Source code in src/graia/scheduler/task.py
65
66
67
68
69
70
def setup_task(self) -> asyncio.Task:
    """将本 SchedulerTask 作为 asyncio.Task 排入事件循环."""
    if self.task:
        raise AlreadyStarted("the scheduler task has been started!")
    self.task = self.loop.create_task(self.run())
    return self.task

stop() 🔗

停止当前 SchedulerTask.

Source code in src/graia/scheduler/task.py
136
137
138
139
def stop(self):
    """停止当前 SchedulerTask."""
    if self.task and not self.task.cancelled():
        self.task.cancel()