Module textual.message_pump

A message pump is a class that processes messages.

It is a base class for the App, Screen, and Widgets.

Expand source code
"""

A message pump is a class that processes messages.

It is a base class for the App, Screen, and Widgets.

"""
from __future__ import annotations

import asyncio
import inspect
from asyncio import CancelledError, Queue, QueueEmpty, Task
from functools import partial
from typing import TYPE_CHECKING, Any, Awaitable, Callable, Iterable
from weakref import WeakSet

from . import Logger, events, log, messages
from ._callback import invoke
from ._context import NoActiveAppError, active_app
from ._time import time
from .case import camel_to_snake
from .errors import DuplicateKeyHandlers
from .events import Event
from .message import Message
from .reactive import Reactive
from .timer import Timer, TimerCallback

if TYPE_CHECKING:
    from .app import App


class CallbackError(Exception):
    pass


class MessagePumpClosed(Exception):
    pass


class MessagePumpMeta(type):
    """Metaclass for message pump. This exists to populate a Message inner class of a Widget with the
    parent classes' name.

    """

    def __new__(
        cls,
        name: str,
        bases: tuple[type, ...],
        class_dict: dict[str, Any],
        **kwargs,
    ):
        namespace = camel_to_snake(name)
        isclass = inspect.isclass
        for value in class_dict.values():
            if isclass(value) and issubclass(value, Message):
                if not value.namespace:
                    value.namespace = namespace
        class_obj = super().__new__(cls, name, bases, class_dict, **kwargs)
        return class_obj


class MessagePump(metaclass=MessagePumpMeta):
    def __init__(self, parent: MessagePump | None = None) -> None:
        self._message_queue: Queue[Message | None] = Queue()
        self._parent = parent
        self._running: bool = False
        self._closing: bool = False
        self._closed: bool = False
        self._disabled_messages: set[type[Message]] = set()
        self._pending_message: Message | None = None
        self._task: Task | None = None
        self._timers: WeakSet[Timer] = WeakSet()
        self._last_idle: float = time()
        self._max_idle: float | None = None
        self._mounted_event = asyncio.Event()

    @property
    def task(self) -> Task:
        assert self._task is not None
        return self._task

    @property
    def has_parent(self) -> bool:
        return self._parent is not None

    @property
    def app(self) -> "App":
        """
        Get the current app.

        Returns:
            App: The current app.

        Raises:
            NoActiveAppError: if no active app could be found for the current asyncio context
        """
        try:
            return active_app.get()
        except LookupError:
            raise NoActiveAppError()

    @property
    def is_parent_active(self) -> bool:
        return bool(
            self._parent and not self._parent._closed and not self._parent._closing
        )

    @property
    def is_running(self) -> bool:
        return self._running

    @property
    def log(self) -> Logger:
        """Get a logger for this object.

        Returns:
            Logger: A logger.
        """
        return self.app._logger

    def _attach(self, parent: MessagePump) -> None:
        """Set the parent, and therefore attach this node to the tree.

        Args:
            parent (MessagePump): Parent node.
        """
        self._parent = parent

    def _detach(self) -> None:
        """Set the parent to None to remove the node from the tree."""
        self._parent = None

    def check_message_enabled(self, message: Message) -> bool:
        return type(message) not in self._disabled_messages

    def disable_messages(self, *messages: type[Message]) -> None:
        """Disable message types from being processed."""
        self._disabled_messages.update(messages)

    def enable_messages(self, *messages: type[Message]) -> None:
        """Enable processing of messages types."""
        self._disabled_messages.difference_update(messages)

    async def _get_message(self) -> Message:
        """Get the next event on the queue, or None if queue is closed.

        Returns:
            Optional[Event]: Event object or None.
        """
        if self._closed:
            raise MessagePumpClosed("The message pump is closed")
        if self._pending_message is not None:
            try:
                return self._pending_message
            finally:
                self._pending_message = None
        message = await self._message_queue.get()
        if message is None:
            self._closed = True
            raise MessagePumpClosed("The message pump is now closed")
        return message

    def _peek_message(self) -> Message | None:
        """Peek the message at the head of the queue (does not remove it from the queue),
        or return None if the queue is empty.

        Returns:
            Optional[Message]: The message or None.
        """
        if self._pending_message is None:
            try:
                message = self._message_queue.get_nowait()
            except QueueEmpty:
                pass
            else:
                if message is None:
                    self._closed = True
                    raise MessagePumpClosed("The message pump is now closed")
                self._pending_message = message

        if self._pending_message is not None:
            return self._pending_message
        return None

    def set_timer(
        self,
        delay: float,
        callback: TimerCallback | None = None,
        *,
        name: str | None = None,
        pause: bool = False,
    ) -> Timer:
        """Make a function call after a delay.

        Args:
            delay (float): Time to wait before invoking callback.
            callback (TimerCallback | None, optional): Callback to call after time has expired. Defaults to None.
            name (str | None, optional): Name of the timer (for debug). Defaults to None.
            pause (bool, optional): Start timer paused. Defaults to False.

        Returns:
            Timer: A timer object.
        """
        timer = Timer(
            self,
            delay,
            self,
            name=name or f"set_timer#{Timer._timer_count}",
            callback=callback,
            repeat=0,
            pause=pause,
        )
        timer.start()
        self._timers.add(timer)
        return timer

    def set_interval(
        self,
        interval: float,
        callback: TimerCallback | None = None,
        *,
        name: str | None = None,
        repeat: int = 0,
        pause: bool = False,
    ) -> Timer:
        """Call a function at periodic intervals.

        Args:
            interval (float): Time between calls.
            callback (TimerCallback | None, optional): Function to call. Defaults to None.
            name (str | None, optional): Name of the timer object. Defaults to None.
            repeat (int, optional): Number of times to repeat the call or 0 for continuous. Defaults to 0.
            pause (bool, optional): Start the timer paused. Defaults to False.

        Returns:
            Timer: A timer object.
        """
        timer = Timer(
            self,
            interval,
            self,
            name=name or f"set_interval#{Timer._timer_count}",
            callback=callback,
            repeat=repeat or None,
            pause=pause,
        )
        timer.start()
        self._timers.add(timer)
        return timer

    def call_later(self, callback: Callable, *args, **kwargs) -> None:
        """Schedule a callback to run after all messages are processed and the screen
        has been refreshed. Positional and keyword arguments are passed to the callable.

        Args:
            callback (Callable): A callable.
        """
        # We send the InvokeLater message to ourselves first, to ensure we've cleared
        # out anything already pending in our own queue.
        message = messages.InvokeLater(self, partial(callback, *args, **kwargs))
        self.post_message_no_wait(message)

    def _on_invoke_later(self, message: messages.InvokeLater) -> None:
        # Forward InvokeLater message to the Screen
        self.app.screen._invoke_later(message.callback)

    def _close_messages_no_wait(self) -> None:
        """Request the message queue to exit."""
        self._message_queue.put_nowait(None)

    async def _close_messages(self) -> None:
        """Close message queue, and optionally wait for queue to finish processing."""
        if self._closed or self._closing:
            return
        self._closing = True
        stop_timers = list(self._timers)
        for timer in stop_timers:
            await timer.stop()
        self._timers.clear()
        await self._message_queue.put(None)
        if self._task is not None and asyncio.current_task() != self._task:
            # Ensure everything is closed before returning
            await self._task

    def _start_messages(self) -> None:
        """Start messages task."""
        self._task = asyncio.create_task(self._process_messages())

    async def _process_messages(self) -> None:
        self._running = True

        await self._pre_process()

        try:
            await self._process_messages_loop()
        except CancelledError:
            pass
        finally:
            self._running = False
            for timer in list(self._timers):
                await timer.stop()

    async def _pre_process(self) -> None:
        """Procedure to run before processing messages."""
        # Dispatch compose and mount messages without going through loop
        # These events must occur in this order, and at the start.
        try:
            await self._dispatch_message(events.Compose(sender=self))
            await self._dispatch_message(events.Mount(sender=self))
        finally:
            # This is critical, mount may be waiting
            self._mounted_event.set()
        Reactive._initialize_object(self)

    async def _process_messages_loop(self) -> None:
        """Process messages until the queue is closed."""
        _rich_traceback_guard = True

        while not self._closed:
            try:
                message = await self._get_message()
            except MessagePumpClosed:
                break
            except CancelledError:
                raise
            except Exception as error:
                raise error from None

            # Combine any pending messages that may supersede this one
            while not (self._closed or self._closing):
                try:
                    pending = self._peek_message()
                except MessagePumpClosed:
                    break
                if pending is None or not message.can_replace(pending):
                    break
                try:
                    message = await self._get_message()
                except MessagePumpClosed:
                    break

            try:
                await self._dispatch_message(message)
            except CancelledError:
                raise
            except Exception as error:
                self._mounted_event.set()
                self.app._handle_exception(error)
                break
            finally:

                self._message_queue.task_done()
                current_time = time()

                # Insert idle events
                if self._message_queue.empty() or (
                    self._max_idle is not None
                    and current_time - self._last_idle > self._max_idle
                ):
                    self._last_idle = current_time
                    if not self._closed:
                        event = events.Idle(self)
                        for _cls, method in self._get_dispatch_methods(
                            "on_idle", event
                        ):
                            try:
                                await invoke(method, event)
                            except Exception as error:
                                self.app._handle_exception(error)
                                break

        log("CLOSED", self)

    async def _dispatch_message(self, message: Message) -> None:
        """Dispatch a message received from the message queue.

        Args:
            message (Message): A message object
        """
        _rich_traceback_guard = True
        if message.no_dispatch:
            return

        # Allow apps to treat events and messages separately
        if isinstance(message, Event):
            await self.on_event(message)
        else:
            await self._on_message(message)

    def _get_dispatch_methods(
        self, method_name: str, message: Message
    ) -> Iterable[tuple[type, Callable[[Message], Awaitable]]]:
        """Gets handlers from the MRO

        Args:
            method_name (str): Handler method name.
            message (Message): Message object.

        """
        private_method = f"_{method_name}"
        for cls in self.__class__.__mro__:
            if message._no_default_action:
                break
            method = cls.__dict__.get(private_method) or cls.__dict__.get(method_name)
            if method is not None:
                yield cls, method.__get__(self, cls)

    async def on_event(self, event: events.Event) -> None:
        """Called to process an event.

        Args:
            event (events.Event): An Event object.
        """
        await self._on_message(event)

    async def _on_message(self, message: Message) -> None:
        """Called to process a message.

        Args:
            message (Message): A Message object.
        """
        _rich_traceback_guard = True
        handler_name = message._handler_name

        # Look through the MRO to find a handler
        for cls, method in self._get_dispatch_methods(handler_name, message):
            log.event.verbosity(message.verbose)(
                message,
                ">>>",
                self,
                f"method=<{cls.__name__}.{handler_name}>",
            )
            await invoke(method, message)

        # Bubble messages up the DOM (if enabled on the message)
        if message.bubble and self._parent and not message._stop_propagation:
            if message.sender == self._parent:
                # parent is sender, so we stop propagation after parent
                message.stop()
            if self.is_parent_active and not self._parent._closing:
                await message._bubble_to(self._parent)

    def check_idle(self) -> None:
        """Prompt the message pump to call idle if the queue is empty."""
        if self._message_queue.empty():
            self.post_message_no_wait(messages.Prompt(sender=self))

    async def post_message(self, message: Message) -> bool:
        """Post a message or an event to this message pump.

        Args:
            message (Message): A message object.

        Returns:
            bool: True if the messages was posted successfully, False if the message was not posted
                (because the message pump was in the process of closing).
        """

        if self._closing or self._closed:
            return False
        if not self.check_message_enabled(message):
            return True
        await self._message_queue.put(message)
        return True

    # TODO: This may not be needed, or may only be needed by the timer
    # Consider removing or making private
    async def _post_priority_message(self, message: Message) -> bool:
        """Post a "priority" messages which will be processes prior to regular messages.

        Note that you should rarely need this in a regular app. It exists primarily to allow
        timer messages to skip the queue, so that they can be more regular.

        Args:
            message (Message): A message.

        Returns:
            bool: True if the messages was processed, False if it wasn't.
        """
        # TODO: Allow priority messages to jump the queue
        if self._closing or self._closed:
            return False
        if not self.check_message_enabled(message):
            return False
        await self._message_queue.put(message)
        return True

    def post_message_no_wait(self, message: Message) -> bool:
        """Posts a message on the queue.

        Args:
            message (Message): A message (or Event).

        Returns:
            bool: True if the messages was processed, False if it wasn't.
        """
        if self._closing or self._closed:
            return False
        if not self.check_message_enabled(message):
            return False
        self._message_queue.put_nowait(message)
        return True

    async def _post_message_from_child(self, message: Message) -> bool:
        if self._closing or self._closed:
            return False
        return await self.post_message(message)

    def _post_message_from_child_no_wait(self, message: Message) -> bool:
        if self._closing or self._closed:
            return False
        return self.post_message_no_wait(message)

    async def on_callback(self, event: events.Callback) -> None:
        await invoke(event.callback)

    def emit_no_wait(self, message: Message) -> bool:
        """Send a message to the _parent_, non async version.

        Args:
            message (Message): A message object.

        Returns:
            bool: True if the message was posted successfully.
        """
        if self._parent:
            return self._parent._post_message_from_child_no_wait(message)
        else:
            return False

    async def emit(self, message: Message) -> bool:
        """Send a message to the _parent_.

        Args:
            message (Message): A message object.

        Returns:
            bool: True if the message was posted successfully.
        """
        if self._parent:
            return await self._parent._post_message_from_child(message)
        else:
            return False

    # TODO: Does dispatch_key belong on message pump?
    async def dispatch_key(self, event: events.Key) -> bool:
        """Dispatch a key event to method.

        This method will call the method named 'key_<event.key>' if it exists.
        Some keys have aliases. The first alias found will be invoked if it exists.
        If multiple handlers exist that match the key, an exception is raised.

        Args:
            event (events.Key): A key event.

        Returns:
            bool: True if key was handled, otherwise False.

        Raises:
            DuplicateKeyHandlers: When there's more than 1 handler that could handle this key.
        """

        def get_key_handler(pump: MessagePump, key: str) -> Callable | None:
            """Look for the public and private handler methods by name on self."""
            public_handler_name = f"key_{key}"
            public_handler = getattr(pump, public_handler_name, None)

            private_handler_name = f"_key_{key}"
            private_handler = getattr(pump, private_handler_name, None)

            return public_handler or private_handler

        handled = False
        invoked_method = None
        key_name = event.key_name
        if not key_name:
            return False

        for key_alias in event.key_aliases:
            key_method = get_key_handler(self, key_alias)
            if key_method is not None:
                if invoked_method:
                    _raise_duplicate_key_handlers_error(
                        key_name, invoked_method.__name__, key_method.__name__
                    )
                # If key handlers return False, then they are not considered handled
                # This allows key handlers to do some conditional logic
                handled = (await invoke(key_method, event)) != False
                invoked_method = key_method

        return handled

    async def on_timer(self, event: events.Timer) -> None:
        event.prevent_default()
        event.stop()
        if event.callback is not None:
            try:
                await invoke(event.callback)
            except Exception as error:
                raise CallbackError(
                    f"unable to run callback {event.callback!r}; {error}"
                )


def _raise_duplicate_key_handlers_error(
    key_name: str, first_handler: str, second_handler: str
) -> None:
    """Raise exception for case where user presses a key and there are multiple candidate key handler methods for it."""
    raise DuplicateKeyHandlers(
        f"Multiple handlers for key press {key_name!r}.\n"
        f"We found both {first_handler!r} and {second_handler!r}, "
        f"and didn't know which to call.\n"
        f"Consider combining them into a single handler.",
    )

Classes

class CallbackError (*args, **kwargs)

Common base class for all non-exit exceptions.

Expand source code
class CallbackError(Exception):
    pass

Ancestors

  • builtins.Exception
  • builtins.BaseException
class MessagePump (parent: MessagePump | None = None)
Expand source code
class MessagePump(metaclass=MessagePumpMeta):
    def __init__(self, parent: MessagePump | None = None) -> None:
        self._message_queue: Queue[Message | None] = Queue()
        self._parent = parent
        self._running: bool = False
        self._closing: bool = False
        self._closed: bool = False
        self._disabled_messages: set[type[Message]] = set()
        self._pending_message: Message | None = None
        self._task: Task | None = None
        self._timers: WeakSet[Timer] = WeakSet()
        self._last_idle: float = time()
        self._max_idle: float | None = None
        self._mounted_event = asyncio.Event()

    @property
    def task(self) -> Task:
        assert self._task is not None
        return self._task

    @property
    def has_parent(self) -> bool:
        return self._parent is not None

    @property
    def app(self) -> "App":
        """
        Get the current app.

        Returns:
            App: The current app.

        Raises:
            NoActiveAppError: if no active app could be found for the current asyncio context
        """
        try:
            return active_app.get()
        except LookupError:
            raise NoActiveAppError()

    @property
    def is_parent_active(self) -> bool:
        return bool(
            self._parent and not self._parent._closed and not self._parent._closing
        )

    @property
    def is_running(self) -> bool:
        return self._running

    @property
    def log(self) -> Logger:
        """Get a logger for this object.

        Returns:
            Logger: A logger.
        """
        return self.app._logger

    def _attach(self, parent: MessagePump) -> None:
        """Set the parent, and therefore attach this node to the tree.

        Args:
            parent (MessagePump): Parent node.
        """
        self._parent = parent

    def _detach(self) -> None:
        """Set the parent to None to remove the node from the tree."""
        self._parent = None

    def check_message_enabled(self, message: Message) -> bool:
        return type(message) not in self._disabled_messages

    def disable_messages(self, *messages: type[Message]) -> None:
        """Disable message types from being processed."""
        self._disabled_messages.update(messages)

    def enable_messages(self, *messages: type[Message]) -> None:
        """Enable processing of messages types."""
        self._disabled_messages.difference_update(messages)

    async def _get_message(self) -> Message:
        """Get the next event on the queue, or None if queue is closed.

        Returns:
            Optional[Event]: Event object or None.
        """
        if self._closed:
            raise MessagePumpClosed("The message pump is closed")
        if self._pending_message is not None:
            try:
                return self._pending_message
            finally:
                self._pending_message = None
        message = await self._message_queue.get()
        if message is None:
            self._closed = True
            raise MessagePumpClosed("The message pump is now closed")
        return message

    def _peek_message(self) -> Message | None:
        """Peek the message at the head of the queue (does not remove it from the queue),
        or return None if the queue is empty.

        Returns:
            Optional[Message]: The message or None.
        """
        if self._pending_message is None:
            try:
                message = self._message_queue.get_nowait()
            except QueueEmpty:
                pass
            else:
                if message is None:
                    self._closed = True
                    raise MessagePumpClosed("The message pump is now closed")
                self._pending_message = message

        if self._pending_message is not None:
            return self._pending_message
        return None

    def set_timer(
        self,
        delay: float,
        callback: TimerCallback | None = None,
        *,
        name: str | None = None,
        pause: bool = False,
    ) -> Timer:
        """Make a function call after a delay.

        Args:
            delay (float): Time to wait before invoking callback.
            callback (TimerCallback | None, optional): Callback to call after time has expired. Defaults to None.
            name (str | None, optional): Name of the timer (for debug). Defaults to None.
            pause (bool, optional): Start timer paused. Defaults to False.

        Returns:
            Timer: A timer object.
        """
        timer = Timer(
            self,
            delay,
            self,
            name=name or f"set_timer#{Timer._timer_count}",
            callback=callback,
            repeat=0,
            pause=pause,
        )
        timer.start()
        self._timers.add(timer)
        return timer

    def set_interval(
        self,
        interval: float,
        callback: TimerCallback | None = None,
        *,
        name: str | None = None,
        repeat: int = 0,
        pause: bool = False,
    ) -> Timer:
        """Call a function at periodic intervals.

        Args:
            interval (float): Time between calls.
            callback (TimerCallback | None, optional): Function to call. Defaults to None.
            name (str | None, optional): Name of the timer object. Defaults to None.
            repeat (int, optional): Number of times to repeat the call or 0 for continuous. Defaults to 0.
            pause (bool, optional): Start the timer paused. Defaults to False.

        Returns:
            Timer: A timer object.
        """
        timer = Timer(
            self,
            interval,
            self,
            name=name or f"set_interval#{Timer._timer_count}",
            callback=callback,
            repeat=repeat or None,
            pause=pause,
        )
        timer.start()
        self._timers.add(timer)
        return timer

    def call_later(self, callback: Callable, *args, **kwargs) -> None:
        """Schedule a callback to run after all messages are processed and the screen
        has been refreshed. Positional and keyword arguments are passed to the callable.

        Args:
            callback (Callable): A callable.
        """
        # We send the InvokeLater message to ourselves first, to ensure we've cleared
        # out anything already pending in our own queue.
        message = messages.InvokeLater(self, partial(callback, *args, **kwargs))
        self.post_message_no_wait(message)

    def _on_invoke_later(self, message: messages.InvokeLater) -> None:
        # Forward InvokeLater message to the Screen
        self.app.screen._invoke_later(message.callback)

    def _close_messages_no_wait(self) -> None:
        """Request the message queue to exit."""
        self._message_queue.put_nowait(None)

    async def _close_messages(self) -> None:
        """Close message queue, and optionally wait for queue to finish processing."""
        if self._closed or self._closing:
            return
        self._closing = True
        stop_timers = list(self._timers)
        for timer in stop_timers:
            await timer.stop()
        self._timers.clear()
        await self._message_queue.put(None)
        if self._task is not None and asyncio.current_task() != self._task:
            # Ensure everything is closed before returning
            await self._task

    def _start_messages(self) -> None:
        """Start messages task."""
        self._task = asyncio.create_task(self._process_messages())

    async def _process_messages(self) -> None:
        self._running = True

        await self._pre_process()

        try:
            await self._process_messages_loop()
        except CancelledError:
            pass
        finally:
            self._running = False
            for timer in list(self._timers):
                await timer.stop()

    async def _pre_process(self) -> None:
        """Procedure to run before processing messages."""
        # Dispatch compose and mount messages without going through loop
        # These events must occur in this order, and at the start.
        try:
            await self._dispatch_message(events.Compose(sender=self))
            await self._dispatch_message(events.Mount(sender=self))
        finally:
            # This is critical, mount may be waiting
            self._mounted_event.set()
        Reactive._initialize_object(self)

    async def _process_messages_loop(self) -> None:
        """Process messages until the queue is closed."""
        _rich_traceback_guard = True

        while not self._closed:
            try:
                message = await self._get_message()
            except MessagePumpClosed:
                break
            except CancelledError:
                raise
            except Exception as error:
                raise error from None

            # Combine any pending messages that may supersede this one
            while not (self._closed or self._closing):
                try:
                    pending = self._peek_message()
                except MessagePumpClosed:
                    break
                if pending is None or not message.can_replace(pending):
                    break
                try:
                    message = await self._get_message()
                except MessagePumpClosed:
                    break

            try:
                await self._dispatch_message(message)
            except CancelledError:
                raise
            except Exception as error:
                self._mounted_event.set()
                self.app._handle_exception(error)
                break
            finally:

                self._message_queue.task_done()
                current_time = time()

                # Insert idle events
                if self._message_queue.empty() or (
                    self._max_idle is not None
                    and current_time - self._last_idle > self._max_idle
                ):
                    self._last_idle = current_time
                    if not self._closed:
                        event = events.Idle(self)
                        for _cls, method in self._get_dispatch_methods(
                            "on_idle", event
                        ):
                            try:
                                await invoke(method, event)
                            except Exception as error:
                                self.app._handle_exception(error)
                                break

        log("CLOSED", self)

    async def _dispatch_message(self, message: Message) -> None:
        """Dispatch a message received from the message queue.

        Args:
            message (Message): A message object
        """
        _rich_traceback_guard = True
        if message.no_dispatch:
            return

        # Allow apps to treat events and messages separately
        if isinstance(message, Event):
            await self.on_event(message)
        else:
            await self._on_message(message)

    def _get_dispatch_methods(
        self, method_name: str, message: Message
    ) -> Iterable[tuple[type, Callable[[Message], Awaitable]]]:
        """Gets handlers from the MRO

        Args:
            method_name (str): Handler method name.
            message (Message): Message object.

        """
        private_method = f"_{method_name}"
        for cls in self.__class__.__mro__:
            if message._no_default_action:
                break
            method = cls.__dict__.get(private_method) or cls.__dict__.get(method_name)
            if method is not None:
                yield cls, method.__get__(self, cls)

    async def on_event(self, event: events.Event) -> None:
        """Called to process an event.

        Args:
            event (events.Event): An Event object.
        """
        await self._on_message(event)

    async def _on_message(self, message: Message) -> None:
        """Called to process a message.

        Args:
            message (Message): A Message object.
        """
        _rich_traceback_guard = True
        handler_name = message._handler_name

        # Look through the MRO to find a handler
        for cls, method in self._get_dispatch_methods(handler_name, message):
            log.event.verbosity(message.verbose)(
                message,
                ">>>",
                self,
                f"method=<{cls.__name__}.{handler_name}>",
            )
            await invoke(method, message)

        # Bubble messages up the DOM (if enabled on the message)
        if message.bubble and self._parent and not message._stop_propagation:
            if message.sender == self._parent:
                # parent is sender, so we stop propagation after parent
                message.stop()
            if self.is_parent_active and not self._parent._closing:
                await message._bubble_to(self._parent)

    def check_idle(self) -> None:
        """Prompt the message pump to call idle if the queue is empty."""
        if self._message_queue.empty():
            self.post_message_no_wait(messages.Prompt(sender=self))

    async def post_message(self, message: Message) -> bool:
        """Post a message or an event to this message pump.

        Args:
            message (Message): A message object.

        Returns:
            bool: True if the messages was posted successfully, False if the message was not posted
                (because the message pump was in the process of closing).
        """

        if self._closing or self._closed:
            return False
        if not self.check_message_enabled(message):
            return True
        await self._message_queue.put(message)
        return True

    # TODO: This may not be needed, or may only be needed by the timer
    # Consider removing or making private
    async def _post_priority_message(self, message: Message) -> bool:
        """Post a "priority" messages which will be processes prior to regular messages.

        Note that you should rarely need this in a regular app. It exists primarily to allow
        timer messages to skip the queue, so that they can be more regular.

        Args:
            message (Message): A message.

        Returns:
            bool: True if the messages was processed, False if it wasn't.
        """
        # TODO: Allow priority messages to jump the queue
        if self._closing or self._closed:
            return False
        if not self.check_message_enabled(message):
            return False
        await self._message_queue.put(message)
        return True

    def post_message_no_wait(self, message: Message) -> bool:
        """Posts a message on the queue.

        Args:
            message (Message): A message (or Event).

        Returns:
            bool: True if the messages was processed, False if it wasn't.
        """
        if self._closing or self._closed:
            return False
        if not self.check_message_enabled(message):
            return False
        self._message_queue.put_nowait(message)
        return True

    async def _post_message_from_child(self, message: Message) -> bool:
        if self._closing or self._closed:
            return False
        return await self.post_message(message)

    def _post_message_from_child_no_wait(self, message: Message) -> bool:
        if self._closing or self._closed:
            return False
        return self.post_message_no_wait(message)

    async def on_callback(self, event: events.Callback) -> None:
        await invoke(event.callback)

    def emit_no_wait(self, message: Message) -> bool:
        """Send a message to the _parent_, non async version.

        Args:
            message (Message): A message object.

        Returns:
            bool: True if the message was posted successfully.
        """
        if self._parent:
            return self._parent._post_message_from_child_no_wait(message)
        else:
            return False

    async def emit(self, message: Message) -> bool:
        """Send a message to the _parent_.

        Args:
            message (Message): A message object.

        Returns:
            bool: True if the message was posted successfully.
        """
        if self._parent:
            return await self._parent._post_message_from_child(message)
        else:
            return False

    # TODO: Does dispatch_key belong on message pump?
    async def dispatch_key(self, event: events.Key) -> bool:
        """Dispatch a key event to method.

        This method will call the method named 'key_<event.key>' if it exists.
        Some keys have aliases. The first alias found will be invoked if it exists.
        If multiple handlers exist that match the key, an exception is raised.

        Args:
            event (events.Key): A key event.

        Returns:
            bool: True if key was handled, otherwise False.

        Raises:
            DuplicateKeyHandlers: When there's more than 1 handler that could handle this key.
        """

        def get_key_handler(pump: MessagePump, key: str) -> Callable | None:
            """Look for the public and private handler methods by name on self."""
            public_handler_name = f"key_{key}"
            public_handler = getattr(pump, public_handler_name, None)

            private_handler_name = f"_key_{key}"
            private_handler = getattr(pump, private_handler_name, None)

            return public_handler or private_handler

        handled = False
        invoked_method = None
        key_name = event.key_name
        if not key_name:
            return False

        for key_alias in event.key_aliases:
            key_method = get_key_handler(self, key_alias)
            if key_method is not None:
                if invoked_method:
                    _raise_duplicate_key_handlers_error(
                        key_name, invoked_method.__name__, key_method.__name__
                    )
                # If key handlers return False, then they are not considered handled
                # This allows key handlers to do some conditional logic
                handled = (await invoke(key_method, event)) != False
                invoked_method = key_method

        return handled

    async def on_timer(self, event: events.Timer) -> None:
        event.prevent_default()
        event.stop()
        if event.callback is not None:
            try:
                await invoke(event.callback)
            except Exception as error:
                raise CallbackError(
                    f"unable to run callback {event.callback!r}; {error}"
                )

Subclasses

Instance variables

var app : App

Get the current app.

Returns

App
The current app.

Raises

NoActiveAppError
if no active app could be found for the current asyncio context
Expand source code
@property
def app(self) -> "App":
    """
    Get the current app.

    Returns:
        App: The current app.

    Raises:
        NoActiveAppError: if no active app could be found for the current asyncio context
    """
    try:
        return active_app.get()
    except LookupError:
        raise NoActiveAppError()
var has_parent : bool
Expand source code
@property
def has_parent(self) -> bool:
    return self._parent is not None
var is_parent_active : bool
Expand source code
@property
def is_parent_active(self) -> bool:
    return bool(
        self._parent and not self._parent._closed and not self._parent._closing
    )
var is_running : bool
Expand source code
@property
def is_running(self) -> bool:
    return self._running
var log : textual.Logger

Get a logger for this object.

Returns

Logger
A logger.
Expand source code
@property
def log(self) -> Logger:
    """Get a logger for this object.

    Returns:
        Logger: A logger.
    """
    return self.app._logger
var task : _asyncio.Task
Expand source code
@property
def task(self) -> Task:
    assert self._task is not None
    return self._task

Methods

def call_later(self, callback: Callable, *args, **kwargs) ‑> None

Schedule a callback to run after all messages are processed and the screen has been refreshed. Positional and keyword arguments are passed to the callable.

Args

callback : Callable
A callable.
Expand source code
def call_later(self, callback: Callable, *args, **kwargs) -> None:
    """Schedule a callback to run after all messages are processed and the screen
    has been refreshed. Positional and keyword arguments are passed to the callable.

    Args:
        callback (Callable): A callable.
    """
    # We send the InvokeLater message to ourselves first, to ensure we've cleared
    # out anything already pending in our own queue.
    message = messages.InvokeLater(self, partial(callback, *args, **kwargs))
    self.post_message_no_wait(message)
def check_idle(self) ‑> None

Prompt the message pump to call idle if the queue is empty.

Expand source code
def check_idle(self) -> None:
    """Prompt the message pump to call idle if the queue is empty."""
    if self._message_queue.empty():
        self.post_message_no_wait(messages.Prompt(sender=self))
def check_message_enabled(self, message: Message) ‑> bool
Expand source code
def check_message_enabled(self, message: Message) -> bool:
    return type(message) not in self._disabled_messages
def disable_messages(self, *messages: type[Message]) ‑> None

Disable message types from being processed.

Expand source code
def disable_messages(self, *messages: type[Message]) -> None:
    """Disable message types from being processed."""
    self._disabled_messages.update(messages)
async def dispatch_key(self, event: events.Key) ‑> bool

Dispatch a key event to method.

This method will call the method named 'key_' if it exists. Some keys have aliases. The first alias found will be invoked if it exists. If multiple handlers exist that match the key, an exception is raised.

Args

event : events.Key
A key event.

Returns

bool
True if key was handled, otherwise False.

Raises

DuplicateKeyHandlers
When there's more than 1 handler that could handle this key.
Expand source code
async def dispatch_key(self, event: events.Key) -> bool:
    """Dispatch a key event to method.

    This method will call the method named 'key_<event.key>' if it exists.
    Some keys have aliases. The first alias found will be invoked if it exists.
    If multiple handlers exist that match the key, an exception is raised.

    Args:
        event (events.Key): A key event.

    Returns:
        bool: True if key was handled, otherwise False.

    Raises:
        DuplicateKeyHandlers: When there's more than 1 handler that could handle this key.
    """

    def get_key_handler(pump: MessagePump, key: str) -> Callable | None:
        """Look for the public and private handler methods by name on self."""
        public_handler_name = f"key_{key}"
        public_handler = getattr(pump, public_handler_name, None)

        private_handler_name = f"_key_{key}"
        private_handler = getattr(pump, private_handler_name, None)

        return public_handler or private_handler

    handled = False
    invoked_method = None
    key_name = event.key_name
    if not key_name:
        return False

    for key_alias in event.key_aliases:
        key_method = get_key_handler(self, key_alias)
        if key_method is not None:
            if invoked_method:
                _raise_duplicate_key_handlers_error(
                    key_name, invoked_method.__name__, key_method.__name__
                )
            # If key handlers return False, then they are not considered handled
            # This allows key handlers to do some conditional logic
            handled = (await invoke(key_method, event)) != False
            invoked_method = key_method

    return handled
async def emit(self, message: Message) ‑> bool

Send a message to the parent.

Args

message : Message
A message object.

Returns

bool
True if the message was posted successfully.
Expand source code
async def emit(self, message: Message) -> bool:
    """Send a message to the _parent_.

    Args:
        message (Message): A message object.

    Returns:
        bool: True if the message was posted successfully.
    """
    if self._parent:
        return await self._parent._post_message_from_child(message)
    else:
        return False
def emit_no_wait(self, message: Message) ‑> bool

Send a message to the parent, non async version.

Args

message : Message
A message object.

Returns

bool
True if the message was posted successfully.
Expand source code
def emit_no_wait(self, message: Message) -> bool:
    """Send a message to the _parent_, non async version.

    Args:
        message (Message): A message object.

    Returns:
        bool: True if the message was posted successfully.
    """
    if self._parent:
        return self._parent._post_message_from_child_no_wait(message)
    else:
        return False
def enable_messages(self, *messages: type[Message]) ‑> None

Enable processing of messages types.

Expand source code
def enable_messages(self, *messages: type[Message]) -> None:
    """Enable processing of messages types."""
    self._disabled_messages.difference_update(messages)
async def on_callback(self, event: events.Callback) ‑> None
Expand source code
async def on_callback(self, event: events.Callback) -> None:
    await invoke(event.callback)
async def on_event(self, event: events.Event) ‑> None

Called to process an event.

Args

event : events.Event
An Event object.
Expand source code
async def on_event(self, event: events.Event) -> None:
    """Called to process an event.

    Args:
        event (events.Event): An Event object.
    """
    await self._on_message(event)
async def on_timer(self, event: events.Timer) ‑> None
Expand source code
async def on_timer(self, event: events.Timer) -> None:
    event.prevent_default()
    event.stop()
    if event.callback is not None:
        try:
            await invoke(event.callback)
        except Exception as error:
            raise CallbackError(
                f"unable to run callback {event.callback!r}; {error}"
            )
async def post_message(self, message: Message) ‑> bool

Post a message or an event to this message pump.

Args

message : Message
A message object.

Returns

bool
True if the messages was posted successfully, False if the message was not posted (because the message pump was in the process of closing).
Expand source code
async def post_message(self, message: Message) -> bool:
    """Post a message or an event to this message pump.

    Args:
        message (Message): A message object.

    Returns:
        bool: True if the messages was posted successfully, False if the message was not posted
            (because the message pump was in the process of closing).
    """

    if self._closing or self._closed:
        return False
    if not self.check_message_enabled(message):
        return True
    await self._message_queue.put(message)
    return True
def post_message_no_wait(self, message: Message) ‑> bool

Posts a message on the queue.

Args

message : Message
A message (or Event).

Returns

bool
True if the messages was processed, False if it wasn't.
Expand source code
def post_message_no_wait(self, message: Message) -> bool:
    """Posts a message on the queue.

    Args:
        message (Message): A message (or Event).

    Returns:
        bool: True if the messages was processed, False if it wasn't.
    """
    if self._closing or self._closed:
        return False
    if not self.check_message_enabled(message):
        return False
    self._message_queue.put_nowait(message)
    return True
def set_interval(self, interval: float, callback: TimerCallback | None = None, *, name: str | None = None, repeat: int = 0, pause: bool = False) ‑> Timer

Call a function at periodic intervals.

Args

interval : float
Time between calls.
callback (TimerCallback | None, optional): Function to call. Defaults to None.
name (str | None, optional): Name of the timer object. Defaults to None.
repeat : int, optional
Number of times to repeat the call or 0 for continuous. Defaults to 0.
pause : bool, optional
Start the timer paused. Defaults to False.

Returns

Timer
A timer object.
Expand source code
def set_interval(
    self,
    interval: float,
    callback: TimerCallback | None = None,
    *,
    name: str | None = None,
    repeat: int = 0,
    pause: bool = False,
) -> Timer:
    """Call a function at periodic intervals.

    Args:
        interval (float): Time between calls.
        callback (TimerCallback | None, optional): Function to call. Defaults to None.
        name (str | None, optional): Name of the timer object. Defaults to None.
        repeat (int, optional): Number of times to repeat the call or 0 for continuous. Defaults to 0.
        pause (bool, optional): Start the timer paused. Defaults to False.

    Returns:
        Timer: A timer object.
    """
    timer = Timer(
        self,
        interval,
        self,
        name=name or f"set_interval#{Timer._timer_count}",
        callback=callback,
        repeat=repeat or None,
        pause=pause,
    )
    timer.start()
    self._timers.add(timer)
    return timer
def set_timer(self, delay: float, callback: TimerCallback | None = None, *, name: str | None = None, pause: bool = False) ‑> Timer

Make a function call after a delay.

Args

delay : float
Time to wait before invoking callback.
callback (TimerCallback | None, optional): Callback to call after time has expired. Defaults to None.
name (str | None, optional): Name of the timer (for debug). Defaults to None.
pause : bool, optional
Start timer paused. Defaults to False.

Returns

Timer
A timer object.
Expand source code
def set_timer(
    self,
    delay: float,
    callback: TimerCallback | None = None,
    *,
    name: str | None = None,
    pause: bool = False,
) -> Timer:
    """Make a function call after a delay.

    Args:
        delay (float): Time to wait before invoking callback.
        callback (TimerCallback | None, optional): Callback to call after time has expired. Defaults to None.
        name (str | None, optional): Name of the timer (for debug). Defaults to None.
        pause (bool, optional): Start timer paused. Defaults to False.

    Returns:
        Timer: A timer object.
    """
    timer = Timer(
        self,
        delay,
        self,
        name=name or f"set_timer#{Timer._timer_count}",
        callback=callback,
        repeat=0,
        pause=pause,
    )
    timer.start()
    self._timers.add(timer)
    return timer
class MessagePumpClosed (*args, **kwargs)

Common base class for all non-exit exceptions.

Expand source code
class MessagePumpClosed(Exception):
    pass

Ancestors

  • builtins.Exception
  • builtins.BaseException
class MessagePumpMeta (*args, **kwargs)

Metaclass for message pump. This exists to populate a Message inner class of a Widget with the parent classes' name.

Expand source code
class MessagePumpMeta(type):
    """Metaclass for message pump. This exists to populate a Message inner class of a Widget with the
    parent classes' name.

    """

    def __new__(
        cls,
        name: str,
        bases: tuple[type, ...],
        class_dict: dict[str, Any],
        **kwargs,
    ):
        namespace = camel_to_snake(name)
        isclass = inspect.isclass
        for value in class_dict.values():
            if isclass(value) and issubclass(value, Message):
                if not value.namespace:
                    value.namespace = namespace
        class_obj = super().__new__(cls, name, bases, class_dict, **kwargs)
        return class_obj

Ancestors

  • builtins.type