Module textual.timer

Timer objects are created by [set_interval][textual.message_pump.MessagePump.set_interval] or [set_timer][textual.message_pump.MessagePump.set_timer].

Expand source code
"""

Timer objects are created by [set_interval][textual.message_pump.MessagePump.set_interval] or
    [set_timer][textual.message_pump.MessagePump.set_timer].

"""

from __future__ import annotations

import asyncio
import weakref
from asyncio import (
    CancelledError,
    Event,
    Task,
)
from typing import Awaitable, Callable, Union

from rich.repr import Result, rich_repr

from . import events
from ._callback import invoke
from ._context import active_app
from . import _clock
from ._types import MessageTarget

TimerCallback = Union[Callable[[], Awaitable[None]], Callable[[], None]]


class EventTargetGone(Exception):
    pass


@rich_repr
class Timer:
    """A class to send timer-based events.

    Args:
        event_target (MessageTarget): The object which will receive the timer events.
        interval (float): The time between timer events.
        sender (MessageTarget): The sender of the event.
        name (str | None, optional): A name to assign the event (for debugging). Defaults to None.
        callback (TimerCallback | None, optional): A optional callback to invoke when the event is handled. Defaults to None.
        repeat (int | None, optional): The number of times to repeat the timer, or None to repeat forever. Defaults to None.
        skip (bool, optional): Enable skipping of scheduled events that couldn't be sent in time. Defaults to True.
        pause (bool, optional): Start the timer paused. Defaults to False.
    """

    _timer_count: int = 1

    def __init__(
        self,
        event_target: MessageTarget,
        interval: float,
        sender: MessageTarget,
        *,
        name: str | None = None,
        callback: TimerCallback | None = None,
        repeat: int | None = None,
        skip: bool = True,
        pause: bool = False,
    ) -> None:
        self._target_repr = repr(event_target)
        self._target = weakref.ref(event_target)
        self._interval = interval
        self.sender = sender
        self.name = f"Timer#{self._timer_count}" if name is None else name
        self._timer_count += 1
        self._callback = callback
        self._repeat = repeat
        self._skip = skip
        self._active = Event()
        self._task: Task | None = None
        self._reset: bool = False
        if not pause:
            self._active.set()

    def __rich_repr__(self) -> Result:
        yield self._interval
        yield "name", self.name
        yield "repeat", self._repeat, None

    @property
    def target(self) -> MessageTarget:
        target = self._target()
        if target is None:
            raise EventTargetGone()
        return target

    def start(self) -> Task:
        """Start the timer return the task.

        Returns:
            Task: A Task instance for the timer.
        """
        self._task = asyncio.create_task(self._run_timer())
        return self._task

    def stop_no_wait(self) -> None:
        """Stop the timer."""
        if self._task is not None:
            self._task.cancel()
            self._task = None

    async def stop(self) -> None:
        """Stop the timer, and block until it exits."""
        if self._task is not None:
            self._active.set()
            self._task.cancel()
            self._task = None

    def pause(self) -> None:
        """Pause the timer.

        A paused timer will not send events until it is resumed.

        """
        self._active.clear()

    def reset(self) -> None:
        """Reset the timer, so it starts from the beginning."""
        self._active.set()
        self._reset = True

    def resume(self) -> None:
        """Resume a paused timer."""
        self._active.set()

    async def _run_timer(self) -> None:
        """Run the timer task."""
        try:
            await self._run()
        except CancelledError:
            pass

    async def _run(self) -> None:
        """Run the timer."""
        count = 0
        _repeat = self._repeat
        _interval = self._interval
        await self._active.wait()
        start = _clock.get_time_no_wait()
        while _repeat is None or count <= _repeat:
            next_timer = start + ((count + 1) * _interval)
            now = await _clock.get_time()
            if self._skip and next_timer < now:
                count += 1
                continue
            now = await _clock.get_time()
            wait_time = max(0, next_timer - now)
            if wait_time:
                await _clock.sleep(wait_time)

            count += 1
            await self._active.wait()
            if self._reset:
                start = _clock.get_time_no_wait()
                count = 0
                self._reset = False
                continue
            try:
                await self._tick(next_timer=next_timer, count=count)
            except EventTargetGone:
                break

    async def _tick(self, *, next_timer: float, count: int) -> None:
        """Triggers the Timer's action: either call its callback, or sends an event to its target"""
        if self._callback is not None:
            try:
                await invoke(self._callback)
            except Exception as error:
                app = active_app.get()
                app._handle_exception(error)
        else:
            event = events.Timer(
                self.sender,
                timer=self,
                time=next_timer,
                count=count,
                callback=self._callback,
            )
            await self.target._post_priority_message(event)

Classes

class EventTargetGone (*args, **kwargs)

Common base class for all non-exit exceptions.

Expand source code
class EventTargetGone(Exception):
    pass

Ancestors

  • builtins.Exception
  • builtins.BaseException
class Timer (event_target: MessageTarget, interval: float, sender: MessageTarget, *, name: str | None = None, callback: TimerCallback | None = None, repeat: int | None = None, skip: bool = True, pause: bool = False)

A class to send timer-based events.

Args

event_target : MessageTarget
The object which will receive the timer events.
interval : float
The time between timer events.
sender : MessageTarget
The sender of the event.
name (str | None, optional): A name to assign the event (for debugging). Defaults to None.
callback (TimerCallback | None, optional): A optional callback to invoke when the event is handled. Defaults to None.
repeat (int | None, optional): The number of times to repeat the timer, or None to repeat forever. Defaults to None.
skip : bool, optional
Enable skipping of scheduled events that couldn't be sent in time. Defaults to True.
pause : bool, optional
Start the timer paused. Defaults to False.
Expand source code
class Timer:
    """A class to send timer-based events.

    Args:
        event_target (MessageTarget): The object which will receive the timer events.
        interval (float): The time between timer events.
        sender (MessageTarget): The sender of the event.
        name (str | None, optional): A name to assign the event (for debugging). Defaults to None.
        callback (TimerCallback | None, optional): A optional callback to invoke when the event is handled. Defaults to None.
        repeat (int | None, optional): The number of times to repeat the timer, or None to repeat forever. Defaults to None.
        skip (bool, optional): Enable skipping of scheduled events that couldn't be sent in time. Defaults to True.
        pause (bool, optional): Start the timer paused. Defaults to False.
    """

    _timer_count: int = 1

    def __init__(
        self,
        event_target: MessageTarget,
        interval: float,
        sender: MessageTarget,
        *,
        name: str | None = None,
        callback: TimerCallback | None = None,
        repeat: int | None = None,
        skip: bool = True,
        pause: bool = False,
    ) -> None:
        self._target_repr = repr(event_target)
        self._target = weakref.ref(event_target)
        self._interval = interval
        self.sender = sender
        self.name = f"Timer#{self._timer_count}" if name is None else name
        self._timer_count += 1
        self._callback = callback
        self._repeat = repeat
        self._skip = skip
        self._active = Event()
        self._task: Task | None = None
        self._reset: bool = False
        if not pause:
            self._active.set()

    def __rich_repr__(self) -> Result:
        yield self._interval
        yield "name", self.name
        yield "repeat", self._repeat, None

    @property
    def target(self) -> MessageTarget:
        target = self._target()
        if target is None:
            raise EventTargetGone()
        return target

    def start(self) -> Task:
        """Start the timer return the task.

        Returns:
            Task: A Task instance for the timer.
        """
        self._task = asyncio.create_task(self._run_timer())
        return self._task

    def stop_no_wait(self) -> None:
        """Stop the timer."""
        if self._task is not None:
            self._task.cancel()
            self._task = None

    async def stop(self) -> None:
        """Stop the timer, and block until it exits."""
        if self._task is not None:
            self._active.set()
            self._task.cancel()
            self._task = None

    def pause(self) -> None:
        """Pause the timer.

        A paused timer will not send events until it is resumed.

        """
        self._active.clear()

    def reset(self) -> None:
        """Reset the timer, so it starts from the beginning."""
        self._active.set()
        self._reset = True

    def resume(self) -> None:
        """Resume a paused timer."""
        self._active.set()

    async def _run_timer(self) -> None:
        """Run the timer task."""
        try:
            await self._run()
        except CancelledError:
            pass

    async def _run(self) -> None:
        """Run the timer."""
        count = 0
        _repeat = self._repeat
        _interval = self._interval
        await self._active.wait()
        start = _clock.get_time_no_wait()
        while _repeat is None or count <= _repeat:
            next_timer = start + ((count + 1) * _interval)
            now = await _clock.get_time()
            if self._skip and next_timer < now:
                count += 1
                continue
            now = await _clock.get_time()
            wait_time = max(0, next_timer - now)
            if wait_time:
                await _clock.sleep(wait_time)

            count += 1
            await self._active.wait()
            if self._reset:
                start = _clock.get_time_no_wait()
                count = 0
                self._reset = False
                continue
            try:
                await self._tick(next_timer=next_timer, count=count)
            except EventTargetGone:
                break

    async def _tick(self, *, next_timer: float, count: int) -> None:
        """Triggers the Timer's action: either call its callback, or sends an event to its target"""
        if self._callback is not None:
            try:
                await invoke(self._callback)
            except Exception as error:
                app = active_app.get()
                app._handle_exception(error)
        else:
            event = events.Timer(
                self.sender,
                timer=self,
                time=next_timer,
                count=count,
                callback=self._callback,
            )
            await self.target._post_priority_message(event)

Instance variables

var target : textual._types.MessageTarget
Expand source code
@property
def target(self) -> MessageTarget:
    target = self._target()
    if target is None:
        raise EventTargetGone()
    return target

Methods

def pause(self) ‑> None

Pause the timer.

A paused timer will not send events until it is resumed.

Expand source code
def pause(self) -> None:
    """Pause the timer.

    A paused timer will not send events until it is resumed.

    """
    self._active.clear()
def reset(self) ‑> None

Reset the timer, so it starts from the beginning.

Expand source code
def reset(self) -> None:
    """Reset the timer, so it starts from the beginning."""
    self._active.set()
    self._reset = True
def resume(self) ‑> None

Resume a paused timer.

Expand source code
def resume(self) -> None:
    """Resume a paused timer."""
    self._active.set()
def start(self) ‑> _asyncio.Task

Start the timer return the task.

Returns

Task
A Task instance for the timer.
Expand source code
def start(self) -> Task:
    """Start the timer return the task.

    Returns:
        Task: A Task instance for the timer.
    """
    self._task = asyncio.create_task(self._run_timer())
    return self._task
async def stop(self) ‑> None

Stop the timer, and block until it exits.

Expand source code
async def stop(self) -> None:
    """Stop the timer, and block until it exits."""
    if self._task is not None:
        self._active.set()
        self._task.cancel()
        self._task = None
def stop_no_wait(self) ‑> None

Stop the timer.

Expand source code
def stop_no_wait(self) -> None:
    """Stop the timer."""
    if self._task is not None:
        self._task.cancel()
        self._task = None