Module textual.app

Expand source code
from __future__ import annotations

import asyncio
import inspect
import io
import os
import platform
import sys
import unicodedata
import warnings
from contextlib import redirect_stderr, redirect_stdout
from datetime import datetime
from pathlib import Path, PurePath
from time import perf_counter
from typing import Any, Generic, Iterable, Type, TYPE_CHECKING, TypeVar, cast, Union
from weakref import WeakSet, WeakValueDictionary

from ._ansi_sequences import SYNC_END, SYNC_START
from ._path import _make_path_object_relative

import nanoid
import rich
import rich.repr
from rich.console import Console, RenderableType
from rich.protocol import is_renderable
from rich.segment import Segment, Segments
from rich.traceback import Traceback

from . import Logger, LogGroup, LogVerbosity, actions, events, log, messages
from ._animator import Animator, DEFAULT_EASING, Animatable, EasingFunction
from ._callback import invoke
from ._context import active_app
from ._event_broker import NoHandler, extract_handler_actions
from ._filter import LineFilter, Monochrome
from .binding import Binding, Bindings
from .css.query import NoMatches
from .css.stylesheet import Stylesheet
from .design import ColorSystem
from .dom import DOMNode
from .driver import Driver
from .drivers.headless_driver import HeadlessDriver
from .features import FeatureFlag, parse_features
from .file_monitor import FileMonitor
from .geometry import Offset, Region, Size
from .keys import REPLACED_KEYS
from .messages import CallbackType
from .reactive import Reactive
from .renderables.blank import Blank
from .screen import Screen
from .widget import AwaitMount, Widget

if TYPE_CHECKING:
    from .devtools.client import DevtoolsClient


PLATFORM = platform.system()
WINDOWS = PLATFORM == "Windows"

# asyncio will warn against resources not being cleared
warnings.simplefilter("always", ResourceWarning)

# `asyncio.get_event_loop()` is deprecated since Python 3.10:
_ASYNCIO_GET_EVENT_LOOP_IS_DEPRECATED = sys.version_info >= (3, 10, 0)

LayoutDefinition = "dict[str, Any]"

DEFAULT_COLORS = {
    "dark": ColorSystem(
        primary="#004578",
        secondary="#ffa62b",
        warning="#ffa62b",
        error="#ba3c5b",
        success="#4EBF71",
        accent="#0178D4",
        dark=True,
    ),
    "light": ColorSystem(
        primary="#004578",
        secondary="#ffa62b",
        warning="#ffa62b",
        error="#ba3c5b",
        success="#4EBF71",
        accent="#0178D4",
        dark=False,
    ),
}

ComposeResult = Iterable[Widget]
RenderResult = RenderableType


class AppError(Exception):
    pass


class ActionError(Exception):
    pass


class ScreenError(Exception):
    pass


class ScreenStackError(ScreenError):
    """Raised when attempting to pop the last screen from the stack."""


ReturnType = TypeVar("ReturnType")


class _NullFile:
    def write(self, text: str) -> None:
        pass

    def flush(self) -> None:
        pass


CSSPathType = Union[str, PurePath, None]


@rich.repr.auto
class App(Generic[ReturnType], DOMNode):
    """The base class for Textual Applications.

    Args:
        driver_class (Type[Driver] | None, optional): Driver class or ``None`` to auto-detect. Defaults to None.
        title (str | None, optional): Title of the application. If ``None``, the title is set to the name of the ``App`` subclass. Defaults to ``None``.
        css_path (str | PurePath | None, optional): Path to CSS or ``None`` for no CSS file. Defaults to None.
        watch_css (bool, optional): Watch CSS for changes. Defaults to False.
    """

    # Inline CSS for quick scripts (generally css_path should be preferred.)
    CSS = ""

    # Default (lowest priority) CSS
    DEFAULT_CSS = """
    App {
        background: $background;
        color: $text;
    }
    """

    SCREENS: dict[str, Screen] = {}
    _BASE_PATH: str | None = None
    CSS_PATH: CSSPathType = None
    TITLE: str | None = None
    SUB_TITLE: str | None = None

    title: Reactive[str] = Reactive("")
    sub_title: Reactive[str] = Reactive("")
    dark: Reactive[bool] = Reactive(True)

    def __init__(
        self,
        driver_class: Type[Driver] | None = None,
        css_path: CSSPathType = None,
        watch_css: bool = False,
    ):
        # N.B. This must be done *before* we call the parent constructor, because MessagePump's
        # constructor instantiates a `asyncio.PriorityQueue` and in Python versions older than 3.10
        # this will create some first references to an asyncio loop.
        _init_uvloop()

        super().__init__()
        self.features: frozenset[FeatureFlag] = parse_features(os.getenv("TEXTUAL", ""))

        self._filter: LineFilter | None = None
        environ = dict(os.environ)
        no_color = environ.pop("NO_COLOR", None)
        if no_color is not None:
            self._filter = Monochrome()
        self.console = Console(
            file=(_NullFile() if self.is_headless else sys.__stdout__),
            markup=False,
            highlight=False,
            emoji=False,
            legacy_windows=False,
            _environ=environ,
        )
        self.error_console = Console(markup=False, stderr=True)
        self.driver_class = driver_class or self.get_driver_class()
        self._screen_stack: list[Screen] = []
        self._sync_available = False

        self.mouse_over: Widget | None = None
        self.mouse_captured: Widget | None = None
        self._driver: Driver | None = None
        self._exit_renderables: list[RenderableType] = []

        self._action_targets = {"app", "screen"}
        self._animator = Animator(self)
        self._animate = self._animator.bind(self)
        self.mouse_position = Offset(0, 0)
        self.title = (
            self.TITLE if self.TITLE is not None else f"{self.__class__.__name__}"
        )
        self.sub_title = self.SUB_TITLE if self.SUB_TITLE is not None else ""

        self._logger = Logger(self._log)

        self._bindings.bind("ctrl+c", "quit", show=False, universal=True)
        self._refresh_required = False

        self.design = DEFAULT_COLORS

        self.stylesheet = Stylesheet(variables=self.get_css_variables())
        self._require_stylesheet_update: set[DOMNode] = set()

        # We want the CSS path to be resolved from the location of the App subclass
        css_path = css_path or self.CSS_PATH
        if css_path is not None:
            if isinstance(css_path, str):
                css_path = Path(css_path)
            css_path = _make_path_object_relative(css_path, self) if css_path else None

        self.css_path = css_path

        self._registry: WeakSet[DOMNode] = WeakSet()

        self._installed_screens: WeakValueDictionary[
            str, Screen
        ] = WeakValueDictionary()
        self._installed_screens.update(**self.SCREENS)

        self.devtools: DevtoolsClient | None = None
        if "devtools" in self.features:
            try:
                from .devtools.client import DevtoolsClient
            except ImportError:
                # Dev dependencies not installed
                pass
            else:
                self.devtools = DevtoolsClient()

        self._return_value: ReturnType | None = None

        self.css_monitor = (
            FileMonitor(self.css_path, self._on_css_change)
            if ((watch_css or self.debug) and self.css_path)
            else None
        )
        self._screenshot: str | None = None

    def animate(
        self,
        attribute: str,
        value: float | Animatable,
        *,
        final_value: object = ...,
        duration: float | None = None,
        speed: float | None = None,
        delay: float = 0.0,
        easing: EasingFunction | str = DEFAULT_EASING,
        on_complete: CallbackType | None = None,
    ) -> None:
        """Animate an attribute.

        Args:
            attribute (str): Name of the attribute to animate.
            value (float | Animatable): The value to animate to.
            final_value (object, optional): The final value of the animation. Defaults to `value` if not set.
            duration (float | None, optional): The duration of the animate. Defaults to None.
            speed (float | None, optional): The speed of the animation. Defaults to None.
            delay (float, optional): A delay (in seconds) before the animation starts. Defaults to 0.0.
            easing (EasingFunction | str, optional): An easing method. Defaults to "in_out_cubic".
            on_complete (CallbackType | None, optional): A callable to invoke when the animation is finished. Defaults to None.

        """
        self._animate(
            attribute,
            value,
            final_value=final_value,
            duration=duration,
            speed=speed,
            delay=delay,
            easing=easing,
            on_complete=on_complete,
        )

    @property
    def debug(self) -> bool:
        """Check if debug mode is enabled.

        Returns:
            bool: True if debug mode is enabled.

        """
        return "debug" in self.features

    @property
    def is_headless(self) -> bool:
        """Check if the app is running in 'headless' mode.

        Returns:
            bool: True if the app is in headless mode.

        """
        return "headless" in self.features

    @property
    def screen_stack(self) -> list[Screen]:
        """Get a *copy* of the screen stack.

        Returns:
            list[Screen]: List of screens.

        """
        return self._screen_stack.copy()

    def exit(self, result: ReturnType | None = None) -> None:
        """Exit the app, and return the supplied result.

        Args:
            result (ReturnType | None, optional): Return value. Defaults to None.
        """
        self._return_value = result
        self._close_messages_no_wait()

    @property
    def focused(self) -> Widget | None:
        """Get the widget that is focused on the currently active screen."""
        return self.screen.focused

    @property
    def namespace_bindings(self) -> dict[str, tuple[DOMNode, Binding]]:
        """Get current bindings. If no widget is focused, then the app-level bindings
        are returned. If a widget is focused, then any bindings present in the active
        screen and app are merged and returned."""

        namespace_binding_map: dict[str, tuple[DOMNode, Binding]] = {}
        for namespace, bindings in reversed(self._binding_chain):
            for key, binding in bindings.keys.items():
                namespace_binding_map[key] = (namespace, binding)

        return namespace_binding_map

    def _set_active(self) -> None:
        """Set this app to be the currently active app."""
        active_app.set(self)

    def compose(self) -> ComposeResult:
        """Yield child widgets for a container."""
        return
        yield

    def get_css_variables(self) -> dict[str, str]:
        """Get a mapping of variables used to pre-populate CSS.

        Returns:
            dict[str, str]: A mapping of variable name to value.
        """
        variables = self.design["dark" if self.dark else "light"].generate()
        return variables

    def watch_dark(self, dark: bool) -> None:
        """Watches the dark bool."""
        self.set_class(dark, "-dark-mode")
        self.set_class(not dark, "-light-mode")
        self.refresh_css()

    def get_driver_class(self) -> Type[Driver]:
        """Get a driver class for this platform.

        Called by the constructor.

        Returns:
            Driver: A Driver class which manages input and display.
        """
        driver_class: Type[Driver]
        if WINDOWS:
            from .drivers.windows_driver import WindowsDriver

            driver_class = WindowsDriver
        else:
            from .drivers.linux_driver import LinuxDriver

            driver_class = LinuxDriver
        return driver_class

    def __rich_repr__(self) -> rich.repr.Result:
        yield "title", self.title
        yield "id", self.id, None
        if self.name:
            yield "name", self.name
        if self.classes:
            yield "classes", set(self.classes)
        pseudo_classes = self.pseudo_classes
        if pseudo_classes:
            yield "pseudo_classes", set(pseudo_classes)

    @property
    def is_transparent(self) -> bool:
        return True

    @property
    def animator(self) -> Animator:
        return self._animator

    @property
    def screen(self) -> Screen:
        """Get the current screen.

        Raises:
            ScreenStackError: If there are no screens on the stack.

        Returns:
            Screen: The currently active screen.
        """
        try:
            return self._screen_stack[-1]
        except IndexError:
            raise ScreenStackError("No screens on stack") from None

    @property
    def size(self) -> Size:
        """Get the size of the terminal.

        Returns:
            Size: Size of the terminal
        """
        return Size(*self.console.size)

    @property
    def log(self) -> Logger:
        return self._logger

    def _log(
        self,
        group: LogGroup,
        verbosity: LogVerbosity,
        _textual_calling_frame: inspect.FrameInfo,
        *objects: Any,
        **kwargs,
    ) -> None:
        """Write to logs or devtools.

        Positional args will logged. Keyword args will be prefixed with the key.

        Example:
            ```python
            data = [1,2,3]
            self.log("Hello, World", state=data)
            self.log(self.tree)
            self.log(locals())
            ```

        Args:
            verbosity (int, optional): Verbosity level 0-3. Defaults to 1.
        """

        devtools = self.devtools
        if devtools is None or not devtools.is_connected:
            return

        if verbosity.value > LogVerbosity.NORMAL.value and not devtools.verbose:
            return

        try:
            from .devtools.client import DevtoolsLog

            if len(objects) == 1 and not kwargs:
                devtools.log(
                    DevtoolsLog(objects, caller=_textual_calling_frame),
                    group,
                    verbosity,
                )
            else:
                output = " ".join(str(arg) for arg in objects)
                if kwargs:
                    key_values = " ".join(
                        f"{key}={value!r}" for key, value in kwargs.items()
                    )
                    output = f"{output} {key_values}" if output else key_values
                devtools.log(
                    DevtoolsLog(output, caller=_textual_calling_frame),
                    group,
                    verbosity,
                )
        except Exception as error:
            self._handle_exception(error)

    def action_toggle_dark(self) -> None:
        """Action to toggle dark mode."""
        self.dark = not self.dark

    def action_screenshot(self, filename: str | None = None, path: str = "./") -> None:
        """Save an SVG "screenshot". This action will save an SVG file containing the current contents of the screen.

        Args:
            filename (str | None, optional): Filename of screenshot, or None to auto-generate. Defaults to None.
            path (str, optional): Path to directory. Defaults to "~/".
        """
        self.save_screenshot(filename, path)

    def export_screenshot(self, *, title: str | None = None) -> str:
        """Export an SVG screenshot of the current screen.

        Args:
            title (str | None, optional): The title of the exported screenshot or None
                to use app title. Defaults to None.

        """

        console = Console(
            width=self.console.width,
            height=self.console.height,
            file=io.StringIO(),
            force_terminal=True,
            color_system="truecolor",
            record=True,
            legacy_windows=False,
        )
        screen_render = self.screen._compositor.render(full=True)
        console.print(screen_render)
        return console.export_svg(title=title or self.title)

    def save_screenshot(
        self,
        filename: str | None = None,
        path: str = "./",
        time_format: str = "%Y-%m-%d %X %f",
    ) -> str:
        """Save an SVG screenshot of the current screen.

        Args:
            filename (str | None, optional): Filename of SVG screenshot, or None to auto-generate
                a filename with the date and time. Defaults to None.
            path (str, optional): Path to directory for output. Defaults to current working directory.
            time_format (str, optional): Time format to use if filename is None. Defaults to "%Y-%m-%d %X %f".

        Returns:
            str: Filename of screenshot.
        """
        if filename is None:
            svg_filename = (
                f"{self.title.lower()} {datetime.now().strftime(time_format)}.svg"
            )
            svg_filename = svg_filename.replace("/", "_").replace("\\", "_")
        else:
            svg_filename = filename
        svg_path = os.path.expanduser(os.path.join(path, svg_filename))
        screenshot_svg = self.export_screenshot()
        with open(svg_path, "w") as svg_file:
            svg_file.write(screenshot_svg)
        return svg_path

    def bind(
        self,
        keys: str,
        action: str,
        *,
        description: str = "",
        show: bool = True,
        key_display: str | None = None,
    ) -> None:
        """Bind a key to an action.

        Args:
            keys (str): A comma separated list of keys, i.e.
            action (str): Action to bind to.
            description (str, optional): Short description of action. Defaults to "".
            show (bool, optional): Show key in UI. Defaults to True.
            key_display (str, optional): Replacement text for key, or None to use default. Defaults to None.
        """
        self._bindings.bind(
            keys, action, description, show=show, key_display=key_display
        )

    def run(
        self,
        *,
        quit_after: float | None = None,
        headless: bool = False,
        press: Iterable[str] | None = None,
        screenshot: bool = False,
        screenshot_title: str | None = None,
    ) -> ReturnType | None:
        """The main entry point for apps.

        Args:
            quit_after (float | None, optional): Quit after a given number of seconds, or None
                to run forever. Defaults to None.
            headless (bool, optional): Run in "headless" mode (don't write to stdout).
            press (str, optional): An iterable of keys to simulate being pressed.
            screenshot (bool, optional): Take a screenshot after pressing keys (svg data stored in self._screenshot). Defaults to False.
            screenshot_title (str | None, optional): Title of screenshot, or None to use App title. Defaults to None.

        Returns:
            ReturnType | None: The return value specified in `App.exit` or None if exit wasn't called.
        """

        if headless:
            self.features = cast(
                "frozenset[FeatureFlag]", self.features.union({"headless"})
            )

        async def run_app() -> None:
            if quit_after is not None:
                self.set_timer(quit_after, self.shutdown)
            if press is not None:
                app = self

                async def press_keys() -> None:
                    """A task to send key events."""
                    assert press
                    driver = app._driver
                    assert driver is not None
                    await asyncio.sleep(0.01)
                    for key in press:
                        if key == "_":
                            print("(pause 50ms)")
                            await asyncio.sleep(0.05)
                        elif key.startswith("wait:"):
                            _, wait_ms = key.split(":")
                            print(f"(pause {wait_ms}ms)")
                            await asyncio.sleep(float(wait_ms) / 1000)
                        else:
                            if len(key) == 1 and not key.isalnum():
                                key = (
                                    unicodedata.name(key)
                                    .lower()
                                    .replace("-", "_")
                                    .replace(" ", "_")
                                )
                            original_key = REPLACED_KEYS.get(key, key)
                            try:
                                char = unicodedata.lookup(
                                    original_key.upper().replace("_", " ")
                                )
                            except KeyError:
                                char = key if len(key) == 1 else None
                            print(f"press {key!r} (char={char!r})")
                            key_event = events.Key(self, key, char)
                            driver.send_event(key_event)
                            await asyncio.sleep(0.01)

                    await app._animator.wait_for_idle()

                    if screenshot:
                        self._screenshot = self.export_screenshot(
                            title=screenshot_title
                        )
                    await self.shutdown()

                async def press_keys_task():
                    """Press some keys in the background."""
                    asyncio.create_task(press_keys())

                await self._process_messages(ready_callback=press_keys_task)
            else:
                await self._process_messages()

        if _ASYNCIO_GET_EVENT_LOOP_IS_DEPRECATED:
            # N.B. This doesn't work with Python<3.10, as we end up with 2 event loops:
            asyncio.run(run_app())
        else:
            # However, this works with Python<3.10:
            event_loop = asyncio.get_event_loop()
            event_loop.run_until_complete(run_app())

        return self._return_value

    async def _on_css_change(self) -> None:
        """Called when the CSS changes (if watch_css is True)."""
        if self.css_path is not None:
            try:
                time = perf_counter()
                stylesheet = self.stylesheet.copy()
                stylesheet.read(self.css_path)
                stylesheet.parse()
                elapsed = (perf_counter() - time) * 1000
                self.log.system(
                    f"<stylesheet> loaded {self.css_path!r} in {elapsed:.0f} ms"
                )
            except Exception as error:
                # TODO: Catch specific exceptions
                self.log.error(error)
                self.bell()
            else:
                self.stylesheet = stylesheet
                self.reset_styles()
                self.stylesheet.update(self)
                self.screen.refresh(layout=True)

    def render(self) -> RenderableType:
        return Blank(self.styles.background)

    def get_child(self, id: str) -> DOMNode:
        """Shorthand for self.screen.get_child(id: str)
        Returns the first child (immediate descendent) of this DOMNode
        with the given ID.

        Args:
            id (str): The ID of the node to search for.

        Returns:
            DOMNode: The first child of this node with the specified ID.

        Raises:
            NoMatches: if no children could be found for this ID
        """
        return self.screen.get_child(id)

    def update_styles(self, node: DOMNode | None = None) -> None:
        """Request update of styles.

        Should be called whenever CSS classes / pseudo classes change.

        """
        self._require_stylesheet_update.add(self.screen if node is None else node)
        self.check_idle()

    def mount(self, *anon_widgets: Widget, **widgets: Widget) -> AwaitMount:
        """Mount widgets. Widgets specified as positional args, or keywords args. If supplied
        as keyword args they will be assigned an id of the key.

        Returns:
            AwaitMount: An awaitable object that waits for widgets to be mounted.

        """
        mounted_widgets = self._register(self.screen, *anon_widgets, **widgets)
        return AwaitMount(mounted_widgets)

    def mount_all(self, widgets: Iterable[Widget]) -> AwaitMount:
        """Mount widgets from an iterable.

        Args:
            widgets (Iterable[Widget]): An iterable of widgets.
        """
        mounted_widgets = list(widgets)
        for widget in mounted_widgets:
            self._register(self.screen, widget)
        return AwaitMount(mounted_widgets)

    def is_screen_installed(self, screen: Screen | str) -> bool:
        """Check if a given screen has been installed.

        Args:
            screen (Screen | str): Either a Screen object or screen name (the `name` argument when installed).

        Returns:
            bool: True if the screen is currently installed,
        """
        if isinstance(screen, str):
            return screen in self._installed_screens
        else:
            return screen in self._installed_screens.values()

    def get_screen(self, screen: Screen | str) -> Screen:
        """Get an installed screen.

        If the screen isn't running, it will be registered before it is run.

        Args:
            screen (Screen | str): Either a Screen object or screen name (the `name` argument when installed).

        Raises:
            KeyError: If the named screen doesn't exist.

        Returns:
            Screen: A screen instance.
        """
        if isinstance(screen, str):
            try:
                next_screen = self._installed_screens[screen]
            except KeyError:
                raise KeyError(f"No screen called {screen!r} installed") from None
        else:
            next_screen = screen
        if not next_screen.is_running:
            self._register(self, next_screen)
        return next_screen

    def _replace_screen(self, screen: Screen) -> Screen:
        """Handle the replaced screen.

        Args:
            screen (Screen): A screen object.

        Returns:
            Screen: The screen that was replaced.

        """
        screen.post_message_no_wait(events.ScreenSuspend(self))
        self.log.system(f"{screen} SUSPENDED")
        if not self.is_screen_installed(screen) and screen not in self._screen_stack:
            screen.remove()
            self.log.system(f"{screen} REMOVED")
        return screen

    def push_screen(self, screen: Screen | str) -> None:
        """Push a new screen on the screen stack.

        Args:
            screen (Screen | str): A Screen instance or the name of an installed screen.

        """
        next_screen = self.get_screen(screen)
        self._screen_stack.append(next_screen)
        self.screen.post_message_no_wait(events.ScreenResume(self))
        self.log.system(f"{self.screen} is current (PUSHED)")

    def switch_screen(self, screen: Screen | str) -> None:
        """Switch to another screen by replacing the top of the screen stack with a new screen.

        Args:
            screen (Screen | str): Either a Screen object or screen name (the `name` argument when installed).

        """
        if self.screen is not screen:
            self._replace_screen(self._screen_stack.pop())
            next_screen = self.get_screen(screen)
            self._screen_stack.append(next_screen)
            self.screen.post_message_no_wait(events.ScreenResume(self))
            self.log.system(f"{self.screen} is current (SWITCHED)")

    def install_screen(self, screen: Screen, name: str | None = None) -> str:
        """Install a screen.

        Args:
            screen (Screen): Screen to install.
            name (str | None, optional): Unique name of screen or None to auto-generate.
                Defaults to None.

        Raises:
            ScreenError: If the screen can't be installed.

        Returns:
            str: The name of the screen
        """
        if name is None:
            name = nanoid.generate()
        if name in self._installed_screens:
            raise ScreenError(f"Can't install screen; {name!r} is already installed")
        if screen in self._installed_screens.values():
            raise ScreenError(
                "Can't install screen; {screen!r} has already been installed"
            )
        self._installed_screens[name] = screen
        self.get_screen(name)  # Ensures screen is running
        self.log.system(f"{screen} INSTALLED name={name!r}")
        return name

    def uninstall_screen(self, screen: Screen | str) -> str | None:
        """Uninstall a screen. If the screen was not previously installed then this
        method is a null-op.

        Args:
            screen (Screen | str): The screen to uninstall or the name of a installed screen.

        Returns:
            str | None: The name of the screen that was uninstalled, or None if no screen was uninstalled.
        """
        if isinstance(screen, str):
            if screen not in self._installed_screens:
                return None
            uninstall_screen = self._installed_screens[screen]
            if uninstall_screen in self._screen_stack:
                raise ScreenStackError("Can't uninstall screen in screen stack")
            del self._installed_screens[screen]
            self.log.system(f"{uninstall_screen} UNINSTALLED name={screen!r}")
            return screen
        else:
            if screen in self._screen_stack:
                raise ScreenStackError("Can't uninstall screen in screen stack")
            for name, installed_screen in self._installed_screens.items():
                if installed_screen is screen:
                    self._installed_screens.pop(name)
                    self.log.system(f"{screen} UNINSTALLED name={name!r}")
                    return name
        return None

    def pop_screen(self) -> Screen:
        """Pop the current screen from the stack, and switch to the previous screen.

        Returns:
            Screen: The screen that was replaced.
        """
        screen_stack = self._screen_stack
        if len(screen_stack) <= 1:
            raise ScreenStackError(
                "Can't pop screen; there must be at least one screen on the stack"
            )
        previous_screen = self._replace_screen(screen_stack.pop())
        self.screen._screen_resized(self.size)
        self.screen.post_message_no_wait(events.ScreenResume(self))
        self.log.system(f"{self.screen} is active")
        return previous_screen

    def set_focus(self, widget: Widget | None, scroll_visible: bool = True) -> None:
        """Focus (or unfocus) a widget. A focused widget will receive key events first.

        Args:
            widget (Widget): Widget to focus.
            scroll_visible (bool, optional): Scroll widget in to view.
        """
        self.screen.set_focus(widget, scroll_visible)

    async def _set_mouse_over(self, widget: Widget | None) -> None:
        """Called when the mouse is over another widget.

        Args:
            widget (Widget | None): Widget under mouse, or None for no widgets.
        """
        if widget is None:
            if self.mouse_over is not None:
                try:
                    await self.mouse_over.post_message(events.Leave(self))
                finally:
                    self.mouse_over = None
        else:
            if self.mouse_over is not widget:
                try:
                    if self.mouse_over is not None:
                        await self.mouse_over._forward_event(events.Leave(self))
                    if widget is not None:
                        await widget._forward_event(events.Enter(self))
                finally:
                    self.mouse_over = widget

    def capture_mouse(self, widget: Widget | None) -> None:
        """Send all mouse events to the given widget, disable mouse capture.

        Args:
            widget (Widget | None): If a widget, capture mouse event, or None to end mouse capture.
        """
        if widget == self.mouse_captured:
            return
        if self.mouse_captured is not None:
            self.mouse_captured.post_message_no_wait(
                events.MouseRelease(self, self.mouse_position)
            )
        self.mouse_captured = widget
        if widget is not None:
            widget.post_message_no_wait(events.MouseCapture(self, self.mouse_position))

    def panic(self, *renderables: RenderableType) -> None:
        """Exits the app then displays a message.

        Args:
            *renderables (RenderableType, optional): Rich renderables to display on exit.
        """

        assert all(
            is_renderable(renderable) for renderable in renderables
        ), "Can only call panic with strings or Rich renderables"

        def render(renderable: RenderableType) -> list[Segment]:
            """Render a panic renderables."""
            segments = list(self.console.render(renderable, self.console.options))
            return segments

        pre_rendered = [Segments(render(renderable)) for renderable in renderables]
        self._exit_renderables.extend(pre_rendered)
        self._close_messages_no_wait()

    def _handle_exception(self, error: Exception) -> None:
        """Called with an unhandled exception.

        Args:
            error (Exception): An exception instance.
        """

        if hasattr(error, "__rich__"):
            # Exception has a rich method, so we can defer to that for the rendering
            self.panic(error)
        else:
            # Use default exception rendering
            self.fatal_error()

    def fatal_error(self) -> None:
        """Exits the app after an unhandled exception."""
        self.bell()
        traceback = Traceback(
            show_locals=True, width=None, locals_max_length=5, suppress=[rich]
        )
        self._exit_renderables.append(
            Segments(self.console.render(traceback, self.console.options))
        )
        self._close_messages_no_wait()

    def _print_error_renderables(self) -> None:
        for renderable in self._exit_renderables:
            self.error_console.print(renderable)
        self._exit_renderables.clear()

    async def _process_messages(
        self, ready_callback: CallbackType | None = None
    ) -> None:
        self._set_active()

        if self.devtools is not None:
            from .devtools.client import DevtoolsConnectionError

            try:
                await self.devtools.connect()
                self.log.system(f"Connected to devtools ( {self.devtools.url} )")
            except DevtoolsConnectionError:
                self.log.system(f"Couldn't connect to devtools ( {self.devtools.url} )")

        self.log.system("---")

        self.log.system(driver=self.driver_class)
        self.log.system(loop=asyncio.get_running_loop())
        self.log.system(features=self.features)

        try:
            if self.css_path is not None:
                self.stylesheet.read(self.css_path)
            for path, css, tie_breaker in self.get_default_css():
                self.stylesheet.add_source(
                    css, path=path, is_default_css=True, tie_breaker=tie_breaker
                )
            if self.CSS:
                try:
                    app_css_path = (
                        f"{inspect.getfile(self.__class__)}:{self.__class__.__name__}"
                    )
                except TypeError:
                    app_css_path = f"{self.__class__.__name__}"
                self.stylesheet.add_source(
                    self.CSS, path=app_css_path, is_default_css=False
                )
        except Exception as error:
            self._handle_exception(error)
            self._print_error_renderables()
            return

        if self.css_monitor:
            self.set_interval(0.25, self.css_monitor, name="css monitor")
            self.log.system("[b green]STARTED[/]", self.css_monitor)

        async def run_process_messages():

            try:
                await self._dispatch_message(events.Compose(sender=self))
                await self._dispatch_message(events.Mount(sender=self))
            finally:
                self._mounted_event.set()

            Reactive._initialize_object(self)

            self.stylesheet.update(self)
            self.refresh()

            await self.animator.start()
            await self._ready()
            if ready_callback is not None:
                await ready_callback()

            self._running = True

            try:
                await self._process_messages_loop()
            except asyncio.CancelledError:
                pass
            finally:
                self._running = False
                for timer in list(self._timers):
                    await timer.stop()

            await self.animator.stop()
            await self._close_all()

        self._running = True
        try:
            load_event = events.Load(sender=self)
            await self._dispatch_message(load_event)

            driver: Driver
            driver_class = cast(
                "type[Driver]",
                HeadlessDriver if self.is_headless else self.driver_class,
            )
            driver = self._driver = driver_class(self.console, self)

            driver.start_application_mode()
            try:
                if self.is_headless:
                    await run_process_messages()
                else:
                    if self.devtools is not None:
                        devtools = self.devtools
                        assert devtools is not None
                        from .devtools.redirect_output import StdoutRedirector

                        redirector = StdoutRedirector(devtools)
                        with redirect_stderr(redirector):
                            with redirect_stdout(redirector):  # type: ignore
                                await run_process_messages()
                    else:
                        null_file = _NullFile()
                        with redirect_stderr(null_file):
                            with redirect_stdout(null_file):
                                await run_process_messages()

            finally:
                driver.stop_application_mode()
        except Exception as error:
            self._handle_exception(error)
        finally:
            self._running = False
            self._print_error_renderables()
            if self.devtools is not None and self.devtools.is_connected:
                await self._disconnect_devtools()

    async def _pre_process(self) -> None:
        pass

    async def _ready(self) -> None:
        """Called immediately prior to processing messages.

        May be used as a hook for any operations that should run first.

        """
        try:
            screenshot_timer = float(os.environ.get("TEXTUAL_SCREENSHOT", "0"))
        except ValueError:
            return

        screenshot_title = os.environ.get("TEXTUAL_SCREENSHOT_TITLE")

        if not screenshot_timer:
            return

        async def on_screenshot():
            """Used by docs plugin."""
            svg = self.export_screenshot(title=screenshot_title)
            self._screenshot = svg  # type: ignore
            await self.shutdown()

        self.set_timer(screenshot_timer, on_screenshot, name="screenshot timer")

    async def _on_compose(self) -> None:
        widgets = list(self.compose())
        await self.mount_all(widgets)

    def _on_idle(self) -> None:
        """Perform actions when there are no messages in the queue."""
        if self._require_stylesheet_update:
            nodes: set[DOMNode] = {
                child
                for node in self._require_stylesheet_update
                for child in node.walk_children()
            }
            self._require_stylesheet_update.clear()
            self.stylesheet.update_nodes(nodes, animate=True)

    def _register_child(self, parent: DOMNode, child: Widget) -> bool:
        if child not in self._registry:
            parent.children._append(child)
            self._registry.add(child)
            child._attach(parent)
            child._post_register(self)
            child._start_messages()
            return True
        return False

    def _register(
        self, parent: DOMNode, *anon_widgets: Widget, **widgets: Widget
    ) -> list[Widget]:
        """Register widget(s) so they may receive events.

        Args:
            parent (Widget): Parent Widget.

        Returns:
            list[Widget]: List of modified widgets.

        """
        if not anon_widgets and not widgets:
            return []
        name_widgets: list[tuple[str | None, Widget]]
        name_widgets = [*((None, widget) for widget in anon_widgets), *widgets.items()]
        apply_stylesheet = self.stylesheet.apply

        for widget_id, widget in name_widgets:
            if not isinstance(widget, Widget):
                raise AppError(f"Can't register {widget!r}; expected a Widget instance")
            if widget not in self._registry:
                if widget_id is not None:
                    widget.id = widget_id
                self._register_child(parent, widget)
                if widget.children:
                    self._register(widget, *widget.children)
                apply_stylesheet(widget)

        registered_widgets = [widget for _, widget in name_widgets]
        return registered_widgets

    def _unregister(self, widget: Widget) -> None:
        """Unregister a widget.

        Args:
            widget (Widget): A Widget to unregister
        """
        widget.reset_focus()
        if isinstance(widget._parent, Widget):
            widget._parent.children._remove(widget)
            widget._detach()
        self._registry.discard(widget)

    async def _disconnect_devtools(self):
        if self.devtools is not None:
            await self.devtools.disconnect()

    def _start_widget(self, parent: Widget, widget: Widget) -> None:
        """Start a widget (run it's task) so that it can receive messages.

        Args:
            parent (Widget): The parent of the Widget.
            widget (Widget): The Widget to start.
        """
        widget._attach(parent)
        widget._start_messages()

    def is_mounted(self, widget: Widget) -> bool:
        """Check if a widget is mounted.

        Args:
            widget (Widget): A widget.

        Returns:
            bool: True of the widget is mounted.
        """
        return widget in self._registry

    async def _close_all(self) -> None:
        while self._registry:
            child = self._registry.pop()
            await child._close_messages()

    async def shutdown(self):
        await self._disconnect_devtools()
        driver = self._driver
        if driver is not None:
            driver.disable_input()
        await self._close_messages()

    def refresh(self, *, repaint: bool = True, layout: bool = False) -> None:
        if self._screen_stack:
            self.screen.refresh(repaint=repaint, layout=layout)
        self.check_idle()

    def refresh_css(self, animate: bool = True) -> None:
        """Refresh CSS.

        Args:
            animate (bool, optional): Also execute CSS animations. Defaults to True.
        """
        stylesheet = self.app.stylesheet
        stylesheet.set_variables(self.get_css_variables())
        stylesheet.reparse()
        stylesheet.update(self.app, animate=animate)
        self.screen._refresh_layout(self.size, full=True)

    def _display(self, screen: Screen, renderable: RenderableType | None) -> None:
        """Display a renderable within a sync.

        Args:
            screen (Screen): Screen instance
            renderable (RenderableType): A Rich renderable.
        """
        if screen is not self.screen or renderable is None:
            return
        if self._running and not self._closed and not self.is_headless:
            console = self.console
            self._begin_update()
            try:
                try:
                    console.print(renderable)
                except Exception as error:
                    self._handle_exception(error)
            finally:
                self._end_update()
            console.file.flush()

    def get_widget_at(self, x: int, y: int) -> tuple[Widget, Region]:
        """Get the widget under the given coordinates.

        Args:
            x (int): X Coord.
            y (int): Y Coord.

        Returns:
            tuple[Widget, Region]: The widget and the widget's screen region.
        """
        return self.screen.get_widget_at(x, y)

    def bell(self) -> None:
        """Play the console 'bell'."""
        if not self.is_headless:
            self.console.bell()

    @property
    def _binding_chain(self) -> list[tuple[DOMNode, Bindings]]:
        """Get a chain of nodes and bindings to consider. If no widget is focused, returns the bindings from both the screen and the app level bindings. Otherwise, combines all the bindings from the currently focused node up the DOM to the root App.

        Returns:
            list[tuple[DOMNode, Bindings]]: List of DOM nodes and their bindings.
        """
        focused = self.focused
        namespace_bindings: list[tuple[DOMNode, Bindings]]
        if focused is None:
            namespace_bindings = [
                (self.screen, self.screen._bindings),
                (self, self._bindings),
            ]
        else:
            namespace_bindings = [(node, node._bindings) for node in focused.ancestors]
        return namespace_bindings

    async def check_bindings(self, key: str, universal: bool = False) -> bool:
        """Handle a key press.

        Args:
            key (str): A key
            universal (bool): Check universal keys if True, otherwise non-universal keys.

        Returns:
            bool: True if the key was handled by a binding, otherwise False
        """

        for namespace, bindings in self._binding_chain:
            binding = bindings.keys.get(key)
            if binding is not None and binding.universal == universal:
                await self.action(binding.action, default_namespace=namespace)
                return True
        return False

    async def on_event(self, event: events.Event) -> None:
        # Handle input events that haven't been forwarded
        # If the event has been forwarded it may have bubbled up back to the App
        if isinstance(event, events.Compose):
            screen = Screen(id="_default")
            self._register(self, screen)
            self._screen_stack.append(screen)
            await super().on_event(event)

        elif isinstance(event, events.InputEvent) and not event.is_forwarded:
            if isinstance(event, events.MouseEvent):
                # Record current mouse position on App
                self.mouse_position = Offset(event.x, event.y)
                await self.screen._forward_event(event)
            elif isinstance(event, events.Key):
                if not await self.check_bindings(event.key, universal=True):
                    forward_target = self.focused or self.screen
                    await forward_target._forward_event(event)
            else:
                await self.screen._forward_event(event)

        elif isinstance(event, events.Paste):
            if self.focused is not None:
                await self.focused._forward_event(event)
        else:
            await super().on_event(event)

    async def action(
        self,
        action: str | tuple[str, tuple[str, ...]],
        default_namespace: object | None = None,
    ) -> bool:
        """Perform an action.

        Args:
            action (str): Action encoded in a string.
            default_namespace (object | None): Namespace to use if not provided in the action,
                or None to use app. Defaults to None.

        Returns:
            bool: True if the event has handled.
        """
        print("ACTION", action, default_namespace)
        if isinstance(action, str):
            target, params = actions.parse(action)
        else:
            target, params = action
        implicit_destination = True
        if "." in target:
            destination, action_name = target.split(".", 1)
            if destination not in self._action_targets:
                raise ActionError(f"Action namespace {destination} is not known")
            action_target = getattr(self, destination)
            implicit_destination = True
        else:
            action_target = default_namespace or self
            action_name = target

        handled = await self._dispatch_action(action_target, action_name, params)
        if not handled and implicit_destination and not isinstance(action_target, App):
            handled = await self.app._dispatch_action(self.app, action_name, params)
        return handled

    async def _dispatch_action(
        self, namespace: object, action_name: str, params: Any
    ) -> bool:
        log(
            "<action>",
            namespace=namespace,
            action_name=action_name,
            params=params,
        )
        _rich_traceback_guard = True

        public_method_name = f"action_{action_name}"
        private_method_name = f"_{public_method_name}"

        private_method = getattr(namespace, private_method_name, None)
        public_method = getattr(namespace, public_method_name, None)

        if private_method is None and public_method is None:
            log(
                f"<action> {action_name!r} has no target. Couldn't find methods {public_method_name!r} or {private_method_name!r}"
            )

        if callable(private_method):
            await invoke(private_method, *params)
            return True
        elif callable(public_method):
            await invoke(public_method, *params)
            return True

        return False

    async def _broker_event(
        self, event_name: str, event: events.Event, default_namespace: object | None
    ) -> bool:
        """Allow the app an opportunity to dispatch events to action system.

        Args:
            event_name (str): _description_
            event (events.Event): An event object.
            default_namespace (object | None): TODO: _description_

        Returns:
            bool: True if an action was processed.
        """
        try:
            style = getattr(event, "style")
        except AttributeError:
            return False
        try:
            _modifiers, action = extract_handler_actions(event_name, style.meta)
        except NoHandler:
            return False
        else:
            event.stop()
        if isinstance(action, (str, tuple)):
            await self.action(action, default_namespace=default_namespace)
        elif callable(action):
            await action()
        else:
            return False
        return True

    async def _on_update(self, message: messages.Update) -> None:
        message.stop()

    async def _on_layout(self, message: messages.Layout) -> None:
        message.stop()

    async def _on_key(self, event: events.Key) -> None:
        if event.key == "tab":
            self.screen.focus_next()
        elif event.key == "shift+tab":
            self.screen.focus_previous()
        else:
            if not (await self.check_bindings(event.key)):
                await self.dispatch_key(event)

    async def _on_shutdown_request(self, event: events.ShutdownRequest) -> None:
        log("shutdown request")
        await self._close_messages()

    async def _on_resize(self, event: events.Resize) -> None:
        event.stop()
        await self.screen.post_message(event)

    async def _on_remove(self, event: events.Remove) -> None:
        widget = event.widget
        parent = widget.parent

        remove_widgets = widget.walk_children(
            Widget, with_self=True, method="depth", reverse=True
        )

        if self.screen.focused in remove_widgets:
            self.screen._reset_focus(
                self.screen.focused,
                [to_remove for to_remove in remove_widgets if to_remove.can_focus],
            )

        for child in remove_widgets:
            await child._close_messages()
            self._unregister(child)
        if parent is not None:
            parent.refresh(layout=True)

    async def action_check_bindings(self, key: str) -> None:
        await self.check_bindings(key)

    async def action_quit(self) -> None:
        """Quit the app as soon as possible."""
        await self.shutdown()

    async def action_bang(self) -> None:
        1 / 0

    async def action_bell(self) -> None:
        """Play the terminal 'bell'."""
        self.bell()

    async def action_focus(self, widget_id: str) -> None:
        """Focus the given widget.

        Args:
            widget_id (str): ID of widget to focus.
        """
        try:
            node = self.query(f"#{widget_id}").first()
        except NoMatches:
            pass
        else:
            if isinstance(node, Widget):
                self.set_focus(node)

    async def action_switch_screen(self, screen: str) -> None:
        """Switches to another screen.

        Args:
            screen (str): Name of the screen.
        """
        self.switch_screen(screen)

    async def action_push_screen(self, screen: str) -> None:
        """Pushes a screen on to the screen stack and makes it active.

        Args:
            screen (str): Name of the screen.
        """
        self.push_screen(screen)

    async def action_pop_screen(self) -> None:
        """Removes the topmost screen and makes the new topmost screen active."""
        self.pop_screen()

    async def action_back(self) -> None:
        try:
            self.pop_screen()
        except ScreenStackError:
            pass

    async def action_add_class_(self, selector: str, class_name: str) -> None:
        self.screen.query(selector).add_class(class_name)

    async def action_remove_class_(self, selector: str, class_name: str) -> None:
        self.screen.query(selector).remove_class(class_name)

    async def action_toggle_class(self, selector: str, class_name: str) -> None:
        self.screen.query(selector).toggle_class(class_name)

    def _on_terminal_supports_synchronized_output(
        self, message: messages.TerminalSupportsSynchronizedOutput
    ) -> None:
        log.system("[b green]SynchronizedOutput mode is supported")
        self._sync_available = True

    def _begin_update(self) -> None:
        if self._sync_available:
            self.console.file.write(SYNC_START)

    def _end_update(self) -> None:
        if self._sync_available:
            self.console.file.write(SYNC_END)


_uvloop_init_done: bool = False


def _init_uvloop() -> None:
    """
    Import and install the `uvloop` asyncio policy, if available.
    This is done only once, even if the function is called multiple times.
    """
    global _uvloop_init_done

    if _uvloop_init_done:
        return

    try:
        import uvloop
    except ImportError:
        pass
    else:
        uvloop.install()

    _uvloop_init_done = True

Classes

class ActionError (*args, **kwargs)

Common base class for all non-exit exceptions.

Expand source code
class ActionError(Exception):
    pass

Ancestors

  • builtins.Exception
  • builtins.BaseException
class App (driver_class: Type[Driver] | None = None, css_path: CSSPathType = None, watch_css: bool = False)

The base class for Textual Applications.

Args

driver_class (Type[Driver] | None, optional): Driver class or None to auto-detect. Defaults to None.
title (str | None, optional): Title of the application. If None, the title is set to the name of the App subclass. Defaults to None.
css_path (str | PurePath | None, optional): Path to CSS or None for no CSS file. Defaults to None.
watch_css : bool, optional
Watch CSS for changes. Defaults to False.
Expand source code
class App(Generic[ReturnType], DOMNode):
    """The base class for Textual Applications.

    Args:
        driver_class (Type[Driver] | None, optional): Driver class or ``None`` to auto-detect. Defaults to None.
        title (str | None, optional): Title of the application. If ``None``, the title is set to the name of the ``App`` subclass. Defaults to ``None``.
        css_path (str | PurePath | None, optional): Path to CSS or ``None`` for no CSS file. Defaults to None.
        watch_css (bool, optional): Watch CSS for changes. Defaults to False.
    """

    # Inline CSS for quick scripts (generally css_path should be preferred.)
    CSS = ""

    # Default (lowest priority) CSS
    DEFAULT_CSS = """
    App {
        background: $background;
        color: $text;
    }
    """

    SCREENS: dict[str, Screen] = {}
    _BASE_PATH: str | None = None
    CSS_PATH: CSSPathType = None
    TITLE: str | None = None
    SUB_TITLE: str | None = None

    title: Reactive[str] = Reactive("")
    sub_title: Reactive[str] = Reactive("")
    dark: Reactive[bool] = Reactive(True)

    def __init__(
        self,
        driver_class: Type[Driver] | None = None,
        css_path: CSSPathType = None,
        watch_css: bool = False,
    ):
        # N.B. This must be done *before* we call the parent constructor, because MessagePump's
        # constructor instantiates a `asyncio.PriorityQueue` and in Python versions older than 3.10
        # this will create some first references to an asyncio loop.
        _init_uvloop()

        super().__init__()
        self.features: frozenset[FeatureFlag] = parse_features(os.getenv("TEXTUAL", ""))

        self._filter: LineFilter | None = None
        environ = dict(os.environ)
        no_color = environ.pop("NO_COLOR", None)
        if no_color is not None:
            self._filter = Monochrome()
        self.console = Console(
            file=(_NullFile() if self.is_headless else sys.__stdout__),
            markup=False,
            highlight=False,
            emoji=False,
            legacy_windows=False,
            _environ=environ,
        )
        self.error_console = Console(markup=False, stderr=True)
        self.driver_class = driver_class or self.get_driver_class()
        self._screen_stack: list[Screen] = []
        self._sync_available = False

        self.mouse_over: Widget | None = None
        self.mouse_captured: Widget | None = None
        self._driver: Driver | None = None
        self._exit_renderables: list[RenderableType] = []

        self._action_targets = {"app", "screen"}
        self._animator = Animator(self)
        self._animate = self._animator.bind(self)
        self.mouse_position = Offset(0, 0)
        self.title = (
            self.TITLE if self.TITLE is not None else f"{self.__class__.__name__}"
        )
        self.sub_title = self.SUB_TITLE if self.SUB_TITLE is not None else ""

        self._logger = Logger(self._log)

        self._bindings.bind("ctrl+c", "quit", show=False, universal=True)
        self._refresh_required = False

        self.design = DEFAULT_COLORS

        self.stylesheet = Stylesheet(variables=self.get_css_variables())
        self._require_stylesheet_update: set[DOMNode] = set()

        # We want the CSS path to be resolved from the location of the App subclass
        css_path = css_path or self.CSS_PATH
        if css_path is not None:
            if isinstance(css_path, str):
                css_path = Path(css_path)
            css_path = _make_path_object_relative(css_path, self) if css_path else None

        self.css_path = css_path

        self._registry: WeakSet[DOMNode] = WeakSet()

        self._installed_screens: WeakValueDictionary[
            str, Screen
        ] = WeakValueDictionary()
        self._installed_screens.update(**self.SCREENS)

        self.devtools: DevtoolsClient | None = None
        if "devtools" in self.features:
            try:
                from .devtools.client import DevtoolsClient
            except ImportError:
                # Dev dependencies not installed
                pass
            else:
                self.devtools = DevtoolsClient()

        self._return_value: ReturnType | None = None

        self.css_monitor = (
            FileMonitor(self.css_path, self._on_css_change)
            if ((watch_css or self.debug) and self.css_path)
            else None
        )
        self._screenshot: str | None = None

    def animate(
        self,
        attribute: str,
        value: float | Animatable,
        *,
        final_value: object = ...,
        duration: float | None = None,
        speed: float | None = None,
        delay: float = 0.0,
        easing: EasingFunction | str = DEFAULT_EASING,
        on_complete: CallbackType | None = None,
    ) -> None:
        """Animate an attribute.

        Args:
            attribute (str): Name of the attribute to animate.
            value (float | Animatable): The value to animate to.
            final_value (object, optional): The final value of the animation. Defaults to `value` if not set.
            duration (float | None, optional): The duration of the animate. Defaults to None.
            speed (float | None, optional): The speed of the animation. Defaults to None.
            delay (float, optional): A delay (in seconds) before the animation starts. Defaults to 0.0.
            easing (EasingFunction | str, optional): An easing method. Defaults to "in_out_cubic".
            on_complete (CallbackType | None, optional): A callable to invoke when the animation is finished. Defaults to None.

        """
        self._animate(
            attribute,
            value,
            final_value=final_value,
            duration=duration,
            speed=speed,
            delay=delay,
            easing=easing,
            on_complete=on_complete,
        )

    @property
    def debug(self) -> bool:
        """Check if debug mode is enabled.

        Returns:
            bool: True if debug mode is enabled.

        """
        return "debug" in self.features

    @property
    def is_headless(self) -> bool:
        """Check if the app is running in 'headless' mode.

        Returns:
            bool: True if the app is in headless mode.

        """
        return "headless" in self.features

    @property
    def screen_stack(self) -> list[Screen]:
        """Get a *copy* of the screen stack.

        Returns:
            list[Screen]: List of screens.

        """
        return self._screen_stack.copy()

    def exit(self, result: ReturnType | None = None) -> None:
        """Exit the app, and return the supplied result.

        Args:
            result (ReturnType | None, optional): Return value. Defaults to None.
        """
        self._return_value = result
        self._close_messages_no_wait()

    @property
    def focused(self) -> Widget | None:
        """Get the widget that is focused on the currently active screen."""
        return self.screen.focused

    @property
    def namespace_bindings(self) -> dict[str, tuple[DOMNode, Binding]]:
        """Get current bindings. If no widget is focused, then the app-level bindings
        are returned. If a widget is focused, then any bindings present in the active
        screen and app are merged and returned."""

        namespace_binding_map: dict[str, tuple[DOMNode, Binding]] = {}
        for namespace, bindings in reversed(self._binding_chain):
            for key, binding in bindings.keys.items():
                namespace_binding_map[key] = (namespace, binding)

        return namespace_binding_map

    def _set_active(self) -> None:
        """Set this app to be the currently active app."""
        active_app.set(self)

    def compose(self) -> ComposeResult:
        """Yield child widgets for a container."""
        return
        yield

    def get_css_variables(self) -> dict[str, str]:
        """Get a mapping of variables used to pre-populate CSS.

        Returns:
            dict[str, str]: A mapping of variable name to value.
        """
        variables = self.design["dark" if self.dark else "light"].generate()
        return variables

    def watch_dark(self, dark: bool) -> None:
        """Watches the dark bool."""
        self.set_class(dark, "-dark-mode")
        self.set_class(not dark, "-light-mode")
        self.refresh_css()

    def get_driver_class(self) -> Type[Driver]:
        """Get a driver class for this platform.

        Called by the constructor.

        Returns:
            Driver: A Driver class which manages input and display.
        """
        driver_class: Type[Driver]
        if WINDOWS:
            from .drivers.windows_driver import WindowsDriver

            driver_class = WindowsDriver
        else:
            from .drivers.linux_driver import LinuxDriver

            driver_class = LinuxDriver
        return driver_class

    def __rich_repr__(self) -> rich.repr.Result:
        yield "title", self.title
        yield "id", self.id, None
        if self.name:
            yield "name", self.name
        if self.classes:
            yield "classes", set(self.classes)
        pseudo_classes = self.pseudo_classes
        if pseudo_classes:
            yield "pseudo_classes", set(pseudo_classes)

    @property
    def is_transparent(self) -> bool:
        return True

    @property
    def animator(self) -> Animator:
        return self._animator

    @property
    def screen(self) -> Screen:
        """Get the current screen.

        Raises:
            ScreenStackError: If there are no screens on the stack.

        Returns:
            Screen: The currently active screen.
        """
        try:
            return self._screen_stack[-1]
        except IndexError:
            raise ScreenStackError("No screens on stack") from None

    @property
    def size(self) -> Size:
        """Get the size of the terminal.

        Returns:
            Size: Size of the terminal
        """
        return Size(*self.console.size)

    @property
    def log(self) -> Logger:
        return self._logger

    def _log(
        self,
        group: LogGroup,
        verbosity: LogVerbosity,
        _textual_calling_frame: inspect.FrameInfo,
        *objects: Any,
        **kwargs,
    ) -> None:
        """Write to logs or devtools.

        Positional args will logged. Keyword args will be prefixed with the key.

        Example:
            ```python
            data = [1,2,3]
            self.log("Hello, World", state=data)
            self.log(self.tree)
            self.log(locals())
            ```

        Args:
            verbosity (int, optional): Verbosity level 0-3. Defaults to 1.
        """

        devtools = self.devtools
        if devtools is None or not devtools.is_connected:
            return

        if verbosity.value > LogVerbosity.NORMAL.value and not devtools.verbose:
            return

        try:
            from .devtools.client import DevtoolsLog

            if len(objects) == 1 and not kwargs:
                devtools.log(
                    DevtoolsLog(objects, caller=_textual_calling_frame),
                    group,
                    verbosity,
                )
            else:
                output = " ".join(str(arg) for arg in objects)
                if kwargs:
                    key_values = " ".join(
                        f"{key}={value!r}" for key, value in kwargs.items()
                    )
                    output = f"{output} {key_values}" if output else key_values
                devtools.log(
                    DevtoolsLog(output, caller=_textual_calling_frame),
                    group,
                    verbosity,
                )
        except Exception as error:
            self._handle_exception(error)

    def action_toggle_dark(self) -> None:
        """Action to toggle dark mode."""
        self.dark = not self.dark

    def action_screenshot(self, filename: str | None = None, path: str = "./") -> None:
        """Save an SVG "screenshot". This action will save an SVG file containing the current contents of the screen.

        Args:
            filename (str | None, optional): Filename of screenshot, or None to auto-generate. Defaults to None.
            path (str, optional): Path to directory. Defaults to "~/".
        """
        self.save_screenshot(filename, path)

    def export_screenshot(self, *, title: str | None = None) -> str:
        """Export an SVG screenshot of the current screen.

        Args:
            title (str | None, optional): The title of the exported screenshot or None
                to use app title. Defaults to None.

        """

        console = Console(
            width=self.console.width,
            height=self.console.height,
            file=io.StringIO(),
            force_terminal=True,
            color_system="truecolor",
            record=True,
            legacy_windows=False,
        )
        screen_render = self.screen._compositor.render(full=True)
        console.print(screen_render)
        return console.export_svg(title=title or self.title)

    def save_screenshot(
        self,
        filename: str | None = None,
        path: str = "./",
        time_format: str = "%Y-%m-%d %X %f",
    ) -> str:
        """Save an SVG screenshot of the current screen.

        Args:
            filename (str | None, optional): Filename of SVG screenshot, or None to auto-generate
                a filename with the date and time. Defaults to None.
            path (str, optional): Path to directory for output. Defaults to current working directory.
            time_format (str, optional): Time format to use if filename is None. Defaults to "%Y-%m-%d %X %f".

        Returns:
            str: Filename of screenshot.
        """
        if filename is None:
            svg_filename = (
                f"{self.title.lower()} {datetime.now().strftime(time_format)}.svg"
            )
            svg_filename = svg_filename.replace("/", "_").replace("\\", "_")
        else:
            svg_filename = filename
        svg_path = os.path.expanduser(os.path.join(path, svg_filename))
        screenshot_svg = self.export_screenshot()
        with open(svg_path, "w") as svg_file:
            svg_file.write(screenshot_svg)
        return svg_path

    def bind(
        self,
        keys: str,
        action: str,
        *,
        description: str = "",
        show: bool = True,
        key_display: str | None = None,
    ) -> None:
        """Bind a key to an action.

        Args:
            keys (str): A comma separated list of keys, i.e.
            action (str): Action to bind to.
            description (str, optional): Short description of action. Defaults to "".
            show (bool, optional): Show key in UI. Defaults to True.
            key_display (str, optional): Replacement text for key, or None to use default. Defaults to None.
        """
        self._bindings.bind(
            keys, action, description, show=show, key_display=key_display
        )

    def run(
        self,
        *,
        quit_after: float | None = None,
        headless: bool = False,
        press: Iterable[str] | None = None,
        screenshot: bool = False,
        screenshot_title: str | None = None,
    ) -> ReturnType | None:
        """The main entry point for apps.

        Args:
            quit_after (float | None, optional): Quit after a given number of seconds, or None
                to run forever. Defaults to None.
            headless (bool, optional): Run in "headless" mode (don't write to stdout).
            press (str, optional): An iterable of keys to simulate being pressed.
            screenshot (bool, optional): Take a screenshot after pressing keys (svg data stored in self._screenshot). Defaults to False.
            screenshot_title (str | None, optional): Title of screenshot, or None to use App title. Defaults to None.

        Returns:
            ReturnType | None: The return value specified in `App.exit` or None if exit wasn't called.
        """

        if headless:
            self.features = cast(
                "frozenset[FeatureFlag]", self.features.union({"headless"})
            )

        async def run_app() -> None:
            if quit_after is not None:
                self.set_timer(quit_after, self.shutdown)
            if press is not None:
                app = self

                async def press_keys() -> None:
                    """A task to send key events."""
                    assert press
                    driver = app._driver
                    assert driver is not None
                    await asyncio.sleep(0.01)
                    for key in press:
                        if key == "_":
                            print("(pause 50ms)")
                            await asyncio.sleep(0.05)
                        elif key.startswith("wait:"):
                            _, wait_ms = key.split(":")
                            print(f"(pause {wait_ms}ms)")
                            await asyncio.sleep(float(wait_ms) / 1000)
                        else:
                            if len(key) == 1 and not key.isalnum():
                                key = (
                                    unicodedata.name(key)
                                    .lower()
                                    .replace("-", "_")
                                    .replace(" ", "_")
                                )
                            original_key = REPLACED_KEYS.get(key, key)
                            try:
                                char = unicodedata.lookup(
                                    original_key.upper().replace("_", " ")
                                )
                            except KeyError:
                                char = key if len(key) == 1 else None
                            print(f"press {key!r} (char={char!r})")
                            key_event = events.Key(self, key, char)
                            driver.send_event(key_event)
                            await asyncio.sleep(0.01)

                    await app._animator.wait_for_idle()

                    if screenshot:
                        self._screenshot = self.export_screenshot(
                            title=screenshot_title
                        )
                    await self.shutdown()

                async def press_keys_task():
                    """Press some keys in the background."""
                    asyncio.create_task(press_keys())

                await self._process_messages(ready_callback=press_keys_task)
            else:
                await self._process_messages()

        if _ASYNCIO_GET_EVENT_LOOP_IS_DEPRECATED:
            # N.B. This doesn't work with Python<3.10, as we end up with 2 event loops:
            asyncio.run(run_app())
        else:
            # However, this works with Python<3.10:
            event_loop = asyncio.get_event_loop()
            event_loop.run_until_complete(run_app())

        return self._return_value

    async def _on_css_change(self) -> None:
        """Called when the CSS changes (if watch_css is True)."""
        if self.css_path is not None:
            try:
                time = perf_counter()
                stylesheet = self.stylesheet.copy()
                stylesheet.read(self.css_path)
                stylesheet.parse()
                elapsed = (perf_counter() - time) * 1000
                self.log.system(
                    f"<stylesheet> loaded {self.css_path!r} in {elapsed:.0f} ms"
                )
            except Exception as error:
                # TODO: Catch specific exceptions
                self.log.error(error)
                self.bell()
            else:
                self.stylesheet = stylesheet
                self.reset_styles()
                self.stylesheet.update(self)
                self.screen.refresh(layout=True)

    def render(self) -> RenderableType:
        return Blank(self.styles.background)

    def get_child(self, id: str) -> DOMNode:
        """Shorthand for self.screen.get_child(id: str)
        Returns the first child (immediate descendent) of this DOMNode
        with the given ID.

        Args:
            id (str): The ID of the node to search for.

        Returns:
            DOMNode: The first child of this node with the specified ID.

        Raises:
            NoMatches: if no children could be found for this ID
        """
        return self.screen.get_child(id)

    def update_styles(self, node: DOMNode | None = None) -> None:
        """Request update of styles.

        Should be called whenever CSS classes / pseudo classes change.

        """
        self._require_stylesheet_update.add(self.screen if node is None else node)
        self.check_idle()

    def mount(self, *anon_widgets: Widget, **widgets: Widget) -> AwaitMount:
        """Mount widgets. Widgets specified as positional args, or keywords args. If supplied
        as keyword args they will be assigned an id of the key.

        Returns:
            AwaitMount: An awaitable object that waits for widgets to be mounted.

        """
        mounted_widgets = self._register(self.screen, *anon_widgets, **widgets)
        return AwaitMount(mounted_widgets)

    def mount_all(self, widgets: Iterable[Widget]) -> AwaitMount:
        """Mount widgets from an iterable.

        Args:
            widgets (Iterable[Widget]): An iterable of widgets.
        """
        mounted_widgets = list(widgets)
        for widget in mounted_widgets:
            self._register(self.screen, widget)
        return AwaitMount(mounted_widgets)

    def is_screen_installed(self, screen: Screen | str) -> bool:
        """Check if a given screen has been installed.

        Args:
            screen (Screen | str): Either a Screen object or screen name (the `name` argument when installed).

        Returns:
            bool: True if the screen is currently installed,
        """
        if isinstance(screen, str):
            return screen in self._installed_screens
        else:
            return screen in self._installed_screens.values()

    def get_screen(self, screen: Screen | str) -> Screen:
        """Get an installed screen.

        If the screen isn't running, it will be registered before it is run.

        Args:
            screen (Screen | str): Either a Screen object or screen name (the `name` argument when installed).

        Raises:
            KeyError: If the named screen doesn't exist.

        Returns:
            Screen: A screen instance.
        """
        if isinstance(screen, str):
            try:
                next_screen = self._installed_screens[screen]
            except KeyError:
                raise KeyError(f"No screen called {screen!r} installed") from None
        else:
            next_screen = screen
        if not next_screen.is_running:
            self._register(self, next_screen)
        return next_screen

    def _replace_screen(self, screen: Screen) -> Screen:
        """Handle the replaced screen.

        Args:
            screen (Screen): A screen object.

        Returns:
            Screen: The screen that was replaced.

        """
        screen.post_message_no_wait(events.ScreenSuspend(self))
        self.log.system(f"{screen} SUSPENDED")
        if not self.is_screen_installed(screen) and screen not in self._screen_stack:
            screen.remove()
            self.log.system(f"{screen} REMOVED")
        return screen

    def push_screen(self, screen: Screen | str) -> None:
        """Push a new screen on the screen stack.

        Args:
            screen (Screen | str): A Screen instance or the name of an installed screen.

        """
        next_screen = self.get_screen(screen)
        self._screen_stack.append(next_screen)
        self.screen.post_message_no_wait(events.ScreenResume(self))
        self.log.system(f"{self.screen} is current (PUSHED)")

    def switch_screen(self, screen: Screen | str) -> None:
        """Switch to another screen by replacing the top of the screen stack with a new screen.

        Args:
            screen (Screen | str): Either a Screen object or screen name (the `name` argument when installed).

        """
        if self.screen is not screen:
            self._replace_screen(self._screen_stack.pop())
            next_screen = self.get_screen(screen)
            self._screen_stack.append(next_screen)
            self.screen.post_message_no_wait(events.ScreenResume(self))
            self.log.system(f"{self.screen} is current (SWITCHED)")

    def install_screen(self, screen: Screen, name: str | None = None) -> str:
        """Install a screen.

        Args:
            screen (Screen): Screen to install.
            name (str | None, optional): Unique name of screen or None to auto-generate.
                Defaults to None.

        Raises:
            ScreenError: If the screen can't be installed.

        Returns:
            str: The name of the screen
        """
        if name is None:
            name = nanoid.generate()
        if name in self._installed_screens:
            raise ScreenError(f"Can't install screen; {name!r} is already installed")
        if screen in self._installed_screens.values():
            raise ScreenError(
                "Can't install screen; {screen!r} has already been installed"
            )
        self._installed_screens[name] = screen
        self.get_screen(name)  # Ensures screen is running
        self.log.system(f"{screen} INSTALLED name={name!r}")
        return name

    def uninstall_screen(self, screen: Screen | str) -> str | None:
        """Uninstall a screen. If the screen was not previously installed then this
        method is a null-op.

        Args:
            screen (Screen | str): The screen to uninstall or the name of a installed screen.

        Returns:
            str | None: The name of the screen that was uninstalled, or None if no screen was uninstalled.
        """
        if isinstance(screen, str):
            if screen not in self._installed_screens:
                return None
            uninstall_screen = self._installed_screens[screen]
            if uninstall_screen in self._screen_stack:
                raise ScreenStackError("Can't uninstall screen in screen stack")
            del self._installed_screens[screen]
            self.log.system(f"{uninstall_screen} UNINSTALLED name={screen!r}")
            return screen
        else:
            if screen in self._screen_stack:
                raise ScreenStackError("Can't uninstall screen in screen stack")
            for name, installed_screen in self._installed_screens.items():
                if installed_screen is screen:
                    self._installed_screens.pop(name)
                    self.log.system(f"{screen} UNINSTALLED name={name!r}")
                    return name
        return None

    def pop_screen(self) -> Screen:
        """Pop the current screen from the stack, and switch to the previous screen.

        Returns:
            Screen: The screen that was replaced.
        """
        screen_stack = self._screen_stack
        if len(screen_stack) <= 1:
            raise ScreenStackError(
                "Can't pop screen; there must be at least one screen on the stack"
            )
        previous_screen = self._replace_screen(screen_stack.pop())
        self.screen._screen_resized(self.size)
        self.screen.post_message_no_wait(events.ScreenResume(self))
        self.log.system(f"{self.screen} is active")
        return previous_screen

    def set_focus(self, widget: Widget | None, scroll_visible: bool = True) -> None:
        """Focus (or unfocus) a widget. A focused widget will receive key events first.

        Args:
            widget (Widget): Widget to focus.
            scroll_visible (bool, optional): Scroll widget in to view.
        """
        self.screen.set_focus(widget, scroll_visible)

    async def _set_mouse_over(self, widget: Widget | None) -> None:
        """Called when the mouse is over another widget.

        Args:
            widget (Widget | None): Widget under mouse, or None for no widgets.
        """
        if widget is None:
            if self.mouse_over is not None:
                try:
                    await self.mouse_over.post_message(events.Leave(self))
                finally:
                    self.mouse_over = None
        else:
            if self.mouse_over is not widget:
                try:
                    if self.mouse_over is not None:
                        await self.mouse_over._forward_event(events.Leave(self))
                    if widget is not None:
                        await widget._forward_event(events.Enter(self))
                finally:
                    self.mouse_over = widget

    def capture_mouse(self, widget: Widget | None) -> None:
        """Send all mouse events to the given widget, disable mouse capture.

        Args:
            widget (Widget | None): If a widget, capture mouse event, or None to end mouse capture.
        """
        if widget == self.mouse_captured:
            return
        if self.mouse_captured is not None:
            self.mouse_captured.post_message_no_wait(
                events.MouseRelease(self, self.mouse_position)
            )
        self.mouse_captured = widget
        if widget is not None:
            widget.post_message_no_wait(events.MouseCapture(self, self.mouse_position))

    def panic(self, *renderables: RenderableType) -> None:
        """Exits the app then displays a message.

        Args:
            *renderables (RenderableType, optional): Rich renderables to display on exit.
        """

        assert all(
            is_renderable(renderable) for renderable in renderables
        ), "Can only call panic with strings or Rich renderables"

        def render(renderable: RenderableType) -> list[Segment]:
            """Render a panic renderables."""
            segments = list(self.console.render(renderable, self.console.options))
            return segments

        pre_rendered = [Segments(render(renderable)) for renderable in renderables]
        self._exit_renderables.extend(pre_rendered)
        self._close_messages_no_wait()

    def _handle_exception(self, error: Exception) -> None:
        """Called with an unhandled exception.

        Args:
            error (Exception): An exception instance.
        """

        if hasattr(error, "__rich__"):
            # Exception has a rich method, so we can defer to that for the rendering
            self.panic(error)
        else:
            # Use default exception rendering
            self.fatal_error()

    def fatal_error(self) -> None:
        """Exits the app after an unhandled exception."""
        self.bell()
        traceback = Traceback(
            show_locals=True, width=None, locals_max_length=5, suppress=[rich]
        )
        self._exit_renderables.append(
            Segments(self.console.render(traceback, self.console.options))
        )
        self._close_messages_no_wait()

    def _print_error_renderables(self) -> None:
        for renderable in self._exit_renderables:
            self.error_console.print(renderable)
        self._exit_renderables.clear()

    async def _process_messages(
        self, ready_callback: CallbackType | None = None
    ) -> None:
        self._set_active()

        if self.devtools is not None:
            from .devtools.client import DevtoolsConnectionError

            try:
                await self.devtools.connect()
                self.log.system(f"Connected to devtools ( {self.devtools.url} )")
            except DevtoolsConnectionError:
                self.log.system(f"Couldn't connect to devtools ( {self.devtools.url} )")

        self.log.system("---")

        self.log.system(driver=self.driver_class)
        self.log.system(loop=asyncio.get_running_loop())
        self.log.system(features=self.features)

        try:
            if self.css_path is not None:
                self.stylesheet.read(self.css_path)
            for path, css, tie_breaker in self.get_default_css():
                self.stylesheet.add_source(
                    css, path=path, is_default_css=True, tie_breaker=tie_breaker
                )
            if self.CSS:
                try:
                    app_css_path = (
                        f"{inspect.getfile(self.__class__)}:{self.__class__.__name__}"
                    )
                except TypeError:
                    app_css_path = f"{self.__class__.__name__}"
                self.stylesheet.add_source(
                    self.CSS, path=app_css_path, is_default_css=False
                )
        except Exception as error:
            self._handle_exception(error)
            self._print_error_renderables()
            return

        if self.css_monitor:
            self.set_interval(0.25, self.css_monitor, name="css monitor")
            self.log.system("[b green]STARTED[/]", self.css_monitor)

        async def run_process_messages():

            try:
                await self._dispatch_message(events.Compose(sender=self))
                await self._dispatch_message(events.Mount(sender=self))
            finally:
                self._mounted_event.set()

            Reactive._initialize_object(self)

            self.stylesheet.update(self)
            self.refresh()

            await self.animator.start()
            await self._ready()
            if ready_callback is not None:
                await ready_callback()

            self._running = True

            try:
                await self._process_messages_loop()
            except asyncio.CancelledError:
                pass
            finally:
                self._running = False
                for timer in list(self._timers):
                    await timer.stop()

            await self.animator.stop()
            await self._close_all()

        self._running = True
        try:
            load_event = events.Load(sender=self)
            await self._dispatch_message(load_event)

            driver: Driver
            driver_class = cast(
                "type[Driver]",
                HeadlessDriver if self.is_headless else self.driver_class,
            )
            driver = self._driver = driver_class(self.console, self)

            driver.start_application_mode()
            try:
                if self.is_headless:
                    await run_process_messages()
                else:
                    if self.devtools is not None:
                        devtools = self.devtools
                        assert devtools is not None
                        from .devtools.redirect_output import StdoutRedirector

                        redirector = StdoutRedirector(devtools)
                        with redirect_stderr(redirector):
                            with redirect_stdout(redirector):  # type: ignore
                                await run_process_messages()
                    else:
                        null_file = _NullFile()
                        with redirect_stderr(null_file):
                            with redirect_stdout(null_file):
                                await run_process_messages()

            finally:
                driver.stop_application_mode()
        except Exception as error:
            self._handle_exception(error)
        finally:
            self._running = False
            self._print_error_renderables()
            if self.devtools is not None and self.devtools.is_connected:
                await self._disconnect_devtools()

    async def _pre_process(self) -> None:
        pass

    async def _ready(self) -> None:
        """Called immediately prior to processing messages.

        May be used as a hook for any operations that should run first.

        """
        try:
            screenshot_timer = float(os.environ.get("TEXTUAL_SCREENSHOT", "0"))
        except ValueError:
            return

        screenshot_title = os.environ.get("TEXTUAL_SCREENSHOT_TITLE")

        if not screenshot_timer:
            return

        async def on_screenshot():
            """Used by docs plugin."""
            svg = self.export_screenshot(title=screenshot_title)
            self._screenshot = svg  # type: ignore
            await self.shutdown()

        self.set_timer(screenshot_timer, on_screenshot, name="screenshot timer")

    async def _on_compose(self) -> None:
        widgets = list(self.compose())
        await self.mount_all(widgets)

    def _on_idle(self) -> None:
        """Perform actions when there are no messages in the queue."""
        if self._require_stylesheet_update:
            nodes: set[DOMNode] = {
                child
                for node in self._require_stylesheet_update
                for child in node.walk_children()
            }
            self._require_stylesheet_update.clear()
            self.stylesheet.update_nodes(nodes, animate=True)

    def _register_child(self, parent: DOMNode, child: Widget) -> bool:
        if child not in self._registry:
            parent.children._append(child)
            self._registry.add(child)
            child._attach(parent)
            child._post_register(self)
            child._start_messages()
            return True
        return False

    def _register(
        self, parent: DOMNode, *anon_widgets: Widget, **widgets: Widget
    ) -> list[Widget]:
        """Register widget(s) so they may receive events.

        Args:
            parent (Widget): Parent Widget.

        Returns:
            list[Widget]: List of modified widgets.

        """
        if not anon_widgets and not widgets:
            return []
        name_widgets: list[tuple[str | None, Widget]]
        name_widgets = [*((None, widget) for widget in anon_widgets), *widgets.items()]
        apply_stylesheet = self.stylesheet.apply

        for widget_id, widget in name_widgets:
            if not isinstance(widget, Widget):
                raise AppError(f"Can't register {widget!r}; expected a Widget instance")
            if widget not in self._registry:
                if widget_id is not None:
                    widget.id = widget_id
                self._register_child(parent, widget)
                if widget.children:
                    self._register(widget, *widget.children)
                apply_stylesheet(widget)

        registered_widgets = [widget for _, widget in name_widgets]
        return registered_widgets

    def _unregister(self, widget: Widget) -> None:
        """Unregister a widget.

        Args:
            widget (Widget): A Widget to unregister
        """
        widget.reset_focus()
        if isinstance(widget._parent, Widget):
            widget._parent.children._remove(widget)
            widget._detach()
        self._registry.discard(widget)

    async def _disconnect_devtools(self):
        if self.devtools is not None:
            await self.devtools.disconnect()

    def _start_widget(self, parent: Widget, widget: Widget) -> None:
        """Start a widget (run it's task) so that it can receive messages.

        Args:
            parent (Widget): The parent of the Widget.
            widget (Widget): The Widget to start.
        """
        widget._attach(parent)
        widget._start_messages()

    def is_mounted(self, widget: Widget) -> bool:
        """Check if a widget is mounted.

        Args:
            widget (Widget): A widget.

        Returns:
            bool: True of the widget is mounted.
        """
        return widget in self._registry

    async def _close_all(self) -> None:
        while self._registry:
            child = self._registry.pop()
            await child._close_messages()

    async def shutdown(self):
        await self._disconnect_devtools()
        driver = self._driver
        if driver is not None:
            driver.disable_input()
        await self._close_messages()

    def refresh(self, *, repaint: bool = True, layout: bool = False) -> None:
        if self._screen_stack:
            self.screen.refresh(repaint=repaint, layout=layout)
        self.check_idle()

    def refresh_css(self, animate: bool = True) -> None:
        """Refresh CSS.

        Args:
            animate (bool, optional): Also execute CSS animations. Defaults to True.
        """
        stylesheet = self.app.stylesheet
        stylesheet.set_variables(self.get_css_variables())
        stylesheet.reparse()
        stylesheet.update(self.app, animate=animate)
        self.screen._refresh_layout(self.size, full=True)

    def _display(self, screen: Screen, renderable: RenderableType | None) -> None:
        """Display a renderable within a sync.

        Args:
            screen (Screen): Screen instance
            renderable (RenderableType): A Rich renderable.
        """
        if screen is not self.screen or renderable is None:
            return
        if self._running and not self._closed and not self.is_headless:
            console = self.console
            self._begin_update()
            try:
                try:
                    console.print(renderable)
                except Exception as error:
                    self._handle_exception(error)
            finally:
                self._end_update()
            console.file.flush()

    def get_widget_at(self, x: int, y: int) -> tuple[Widget, Region]:
        """Get the widget under the given coordinates.

        Args:
            x (int): X Coord.
            y (int): Y Coord.

        Returns:
            tuple[Widget, Region]: The widget and the widget's screen region.
        """
        return self.screen.get_widget_at(x, y)

    def bell(self) -> None:
        """Play the console 'bell'."""
        if not self.is_headless:
            self.console.bell()

    @property
    def _binding_chain(self) -> list[tuple[DOMNode, Bindings]]:
        """Get a chain of nodes and bindings to consider. If no widget is focused, returns the bindings from both the screen and the app level bindings. Otherwise, combines all the bindings from the currently focused node up the DOM to the root App.

        Returns:
            list[tuple[DOMNode, Bindings]]: List of DOM nodes and their bindings.
        """
        focused = self.focused
        namespace_bindings: list[tuple[DOMNode, Bindings]]
        if focused is None:
            namespace_bindings = [
                (self.screen, self.screen._bindings),
                (self, self._bindings),
            ]
        else:
            namespace_bindings = [(node, node._bindings) for node in focused.ancestors]
        return namespace_bindings

    async def check_bindings(self, key: str, universal: bool = False) -> bool:
        """Handle a key press.

        Args:
            key (str): A key
            universal (bool): Check universal keys if True, otherwise non-universal keys.

        Returns:
            bool: True if the key was handled by a binding, otherwise False
        """

        for namespace, bindings in self._binding_chain:
            binding = bindings.keys.get(key)
            if binding is not None and binding.universal == universal:
                await self.action(binding.action, default_namespace=namespace)
                return True
        return False

    async def on_event(self, event: events.Event) -> None:
        # Handle input events that haven't been forwarded
        # If the event has been forwarded it may have bubbled up back to the App
        if isinstance(event, events.Compose):
            screen = Screen(id="_default")
            self._register(self, screen)
            self._screen_stack.append(screen)
            await super().on_event(event)

        elif isinstance(event, events.InputEvent) and not event.is_forwarded:
            if isinstance(event, events.MouseEvent):
                # Record current mouse position on App
                self.mouse_position = Offset(event.x, event.y)
                await self.screen._forward_event(event)
            elif isinstance(event, events.Key):
                if not await self.check_bindings(event.key, universal=True):
                    forward_target = self.focused or self.screen
                    await forward_target._forward_event(event)
            else:
                await self.screen._forward_event(event)

        elif isinstance(event, events.Paste):
            if self.focused is not None:
                await self.focused._forward_event(event)
        else:
            await super().on_event(event)

    async def action(
        self,
        action: str | tuple[str, tuple[str, ...]],
        default_namespace: object | None = None,
    ) -> bool:
        """Perform an action.

        Args:
            action (str): Action encoded in a string.
            default_namespace (object | None): Namespace to use if not provided in the action,
                or None to use app. Defaults to None.

        Returns:
            bool: True if the event has handled.
        """
        print("ACTION", action, default_namespace)
        if isinstance(action, str):
            target, params = actions.parse(action)
        else:
            target, params = action
        implicit_destination = True
        if "." in target:
            destination, action_name = target.split(".", 1)
            if destination not in self._action_targets:
                raise ActionError(f"Action namespace {destination} is not known")
            action_target = getattr(self, destination)
            implicit_destination = True
        else:
            action_target = default_namespace or self
            action_name = target

        handled = await self._dispatch_action(action_target, action_name, params)
        if not handled and implicit_destination and not isinstance(action_target, App):
            handled = await self.app._dispatch_action(self.app, action_name, params)
        return handled

    async def _dispatch_action(
        self, namespace: object, action_name: str, params: Any
    ) -> bool:
        log(
            "<action>",
            namespace=namespace,
            action_name=action_name,
            params=params,
        )
        _rich_traceback_guard = True

        public_method_name = f"action_{action_name}"
        private_method_name = f"_{public_method_name}"

        private_method = getattr(namespace, private_method_name, None)
        public_method = getattr(namespace, public_method_name, None)

        if private_method is None and public_method is None:
            log(
                f"<action> {action_name!r} has no target. Couldn't find methods {public_method_name!r} or {private_method_name!r}"
            )

        if callable(private_method):
            await invoke(private_method, *params)
            return True
        elif callable(public_method):
            await invoke(public_method, *params)
            return True

        return False

    async def _broker_event(
        self, event_name: str, event: events.Event, default_namespace: object | None
    ) -> bool:
        """Allow the app an opportunity to dispatch events to action system.

        Args:
            event_name (str): _description_
            event (events.Event): An event object.
            default_namespace (object | None): TODO: _description_

        Returns:
            bool: True if an action was processed.
        """
        try:
            style = getattr(event, "style")
        except AttributeError:
            return False
        try:
            _modifiers, action = extract_handler_actions(event_name, style.meta)
        except NoHandler:
            return False
        else:
            event.stop()
        if isinstance(action, (str, tuple)):
            await self.action(action, default_namespace=default_namespace)
        elif callable(action):
            await action()
        else:
            return False
        return True

    async def _on_update(self, message: messages.Update) -> None:
        message.stop()

    async def _on_layout(self, message: messages.Layout) -> None:
        message.stop()

    async def _on_key(self, event: events.Key) -> None:
        if event.key == "tab":
            self.screen.focus_next()
        elif event.key == "shift+tab":
            self.screen.focus_previous()
        else:
            if not (await self.check_bindings(event.key)):
                await self.dispatch_key(event)

    async def _on_shutdown_request(self, event: events.ShutdownRequest) -> None:
        log("shutdown request")
        await self._close_messages()

    async def _on_resize(self, event: events.Resize) -> None:
        event.stop()
        await self.screen.post_message(event)

    async def _on_remove(self, event: events.Remove) -> None:
        widget = event.widget
        parent = widget.parent

        remove_widgets = widget.walk_children(
            Widget, with_self=True, method="depth", reverse=True
        )

        if self.screen.focused in remove_widgets:
            self.screen._reset_focus(
                self.screen.focused,
                [to_remove for to_remove in remove_widgets if to_remove.can_focus],
            )

        for child in remove_widgets:
            await child._close_messages()
            self._unregister(child)
        if parent is not None:
            parent.refresh(layout=True)

    async def action_check_bindings(self, key: str) -> None:
        await self.check_bindings(key)

    async def action_quit(self) -> None:
        """Quit the app as soon as possible."""
        await self.shutdown()

    async def action_bang(self) -> None:
        1 / 0

    async def action_bell(self) -> None:
        """Play the terminal 'bell'."""
        self.bell()

    async def action_focus(self, widget_id: str) -> None:
        """Focus the given widget.

        Args:
            widget_id (str): ID of widget to focus.
        """
        try:
            node = self.query(f"#{widget_id}").first()
        except NoMatches:
            pass
        else:
            if isinstance(node, Widget):
                self.set_focus(node)

    async def action_switch_screen(self, screen: str) -> None:
        """Switches to another screen.

        Args:
            screen (str): Name of the screen.
        """
        self.switch_screen(screen)

    async def action_push_screen(self, screen: str) -> None:
        """Pushes a screen on to the screen stack and makes it active.

        Args:
            screen (str): Name of the screen.
        """
        self.push_screen(screen)

    async def action_pop_screen(self) -> None:
        """Removes the topmost screen and makes the new topmost screen active."""
        self.pop_screen()

    async def action_back(self) -> None:
        try:
            self.pop_screen()
        except ScreenStackError:
            pass

    async def action_add_class_(self, selector: str, class_name: str) -> None:
        self.screen.query(selector).add_class(class_name)

    async def action_remove_class_(self, selector: str, class_name: str) -> None:
        self.screen.query(selector).remove_class(class_name)

    async def action_toggle_class(self, selector: str, class_name: str) -> None:
        self.screen.query(selector).toggle_class(class_name)

    def _on_terminal_supports_synchronized_output(
        self, message: messages.TerminalSupportsSynchronizedOutput
    ) -> None:
        log.system("[b green]SynchronizedOutput mode is supported")
        self._sync_available = True

    def _begin_update(self) -> None:
        if self._sync_available:
            self.console.file.write(SYNC_START)

    def _end_update(self) -> None:
        if self._sync_available:
            self.console.file.write(SYNC_END)

Ancestors

Subclasses

Class variables

var CSS
var CSS_PATH : CSSPathType
var DEFAULT_CSS
var SCREENS : dict[str, Screen]
var SUB_TITLE : str | None
var TITLE : str | None

Instance variables

var animator : textual._animator.Animator
Expand source code
@property
def animator(self) -> Animator:
    return self._animator
var dark : ReactiveType

Reactive descriptor.

Args

default (ReactiveType | Callable[[], ReactiveType]): A default value or callable that returns a default.
layout : bool, optional
Perform a layout on change. Defaults to False.
repaint : bool, optional
Perform a repaint on change. Defaults to True.
init : bool, optional
Call watchers on initialize (post mount). Defaults to False.
Expand source code
def __get__(self, obj: Reactable, obj_type: type[object]) -> ReactiveType:
    value: _NotSet | ReactiveType = getattr(obj, self.internal_name, _NOT_SET)
    if isinstance(value, _NotSet):
        # No value present, we need to set the default
        init_name = f"_default_{self.name}"
        default = getattr(obj, init_name)
        default_value = default() if callable(default) else default
        # Set and return the value
        setattr(obj, self.internal_name, default_value)
        if self._init:
            self._check_watchers(obj, self.name, default_value, first_set=True)
        return default_value
    return value
var debug : bool

Check if debug mode is enabled.

Returns

bool
True if debug mode is enabled.
Expand source code
@property
def debug(self) -> bool:
    """Check if debug mode is enabled.

    Returns:
        bool: True if debug mode is enabled.

    """
    return "debug" in self.features
var focused : Widget | None

Get the widget that is focused on the currently active screen.

Expand source code
@property
def focused(self) -> Widget | None:
    """Get the widget that is focused on the currently active screen."""
    return self.screen.focused
var is_headless : bool

Check if the app is running in 'headless' mode.

Returns

bool
True if the app is in headless mode.
Expand source code
@property
def is_headless(self) -> bool:
    """Check if the app is running in 'headless' mode.

    Returns:
        bool: True if the app is in headless mode.

    """
    return "headless" in self.features
var is_transparent : bool
Expand source code
@property
def is_transparent(self) -> bool:
    return True
var namespace_bindings : dict[str, tuple[DOMNode, Binding]]

Get current bindings. If no widget is focused, then the app-level bindings are returned. If a widget is focused, then any bindings present in the active screen and app are merged and returned.

Expand source code
@property
def namespace_bindings(self) -> dict[str, tuple[DOMNode, Binding]]:
    """Get current bindings. If no widget is focused, then the app-level bindings
    are returned. If a widget is focused, then any bindings present in the active
    screen and app are merged and returned."""

    namespace_binding_map: dict[str, tuple[DOMNode, Binding]] = {}
    for namespace, bindings in reversed(self._binding_chain):
        for key, binding in bindings.keys.items():
            namespace_binding_map[key] = (namespace, binding)

    return namespace_binding_map
var screenScreen

Get the current screen.

Raises

ScreenStackError
If there are no screens on the stack.

Returns

Screen
The currently active screen.
Expand source code
@property
def screen(self) -> Screen:
    """Get the current screen.

    Raises:
        ScreenStackError: If there are no screens on the stack.

    Returns:
        Screen: The currently active screen.
    """
    try:
        return self._screen_stack[-1]
    except IndexError:
        raise ScreenStackError("No screens on stack") from None
var screen_stack : list[Screen]

Get a copy of the screen stack.

Returns

list[Screen]
List of screens.
Expand source code
@property
def screen_stack(self) -> list[Screen]:
    """Get a *copy* of the screen stack.

    Returns:
        list[Screen]: List of screens.

    """
    return self._screen_stack.copy()
var sizeSize

Get the size of the terminal.

Returns

Size
Size of the terminal
Expand source code
@property
def size(self) -> Size:
    """Get the size of the terminal.

    Returns:
        Size: Size of the terminal
    """
    return Size(*self.console.size)
var sub_title : ReactiveType

Reactive descriptor.

Args

default (ReactiveType | Callable[[], ReactiveType]): A default value or callable that returns a default.
layout : bool, optional
Perform a layout on change. Defaults to False.
repaint : bool, optional
Perform a repaint on change. Defaults to True.
init : bool, optional
Call watchers on initialize (post mount). Defaults to False.
Expand source code
def __get__(self, obj: Reactable, obj_type: type[object]) -> ReactiveType:
    value: _NotSet | ReactiveType = getattr(obj, self.internal_name, _NOT_SET)
    if isinstance(value, _NotSet):
        # No value present, we need to set the default
        init_name = f"_default_{self.name}"
        default = getattr(obj, init_name)
        default_value = default() if callable(default) else default
        # Set and return the value
        setattr(obj, self.internal_name, default_value)
        if self._init:
            self._check_watchers(obj, self.name, default_value, first_set=True)
        return default_value
    return value
var title : ReactiveType

Reactive descriptor.

Args

default (ReactiveType | Callable[[], ReactiveType]): A default value or callable that returns a default.
layout : bool, optional
Perform a layout on change. Defaults to False.
repaint : bool, optional
Perform a repaint on change. Defaults to True.
init : bool, optional
Call watchers on initialize (post mount). Defaults to False.
Expand source code
def __get__(self, obj: Reactable, obj_type: type[object]) -> ReactiveType:
    value: _NotSet | ReactiveType = getattr(obj, self.internal_name, _NOT_SET)
    if isinstance(value, _NotSet):
        # No value present, we need to set the default
        init_name = f"_default_{self.name}"
        default = getattr(obj, init_name)
        default_value = default() if callable(default) else default
        # Set and return the value
        setattr(obj, self.internal_name, default_value)
        if self._init:
            self._check_watchers(obj, self.name, default_value, first_set=True)
        return default_value
    return value

Methods

async def action(self, action: str | tuple[str, tuple[str, ...]], default_namespace: object | None = None) ‑> bool

Perform an action.

Args

action : str
Action encoded in a string.

default_namespace (object | None): Namespace to use if not provided in the action, or None to use app. Defaults to None.

Returns

bool
True if the event has handled.
Expand source code
async def action(
    self,
    action: str | tuple[str, tuple[str, ...]],
    default_namespace: object | None = None,
) -> bool:
    """Perform an action.

    Args:
        action (str): Action encoded in a string.
        default_namespace (object | None): Namespace to use if not provided in the action,
            or None to use app. Defaults to None.

    Returns:
        bool: True if the event has handled.
    """
    print("ACTION", action, default_namespace)
    if isinstance(action, str):
        target, params = actions.parse(action)
    else:
        target, params = action
    implicit_destination = True
    if "." in target:
        destination, action_name = target.split(".", 1)
        if destination not in self._action_targets:
            raise ActionError(f"Action namespace {destination} is not known")
        action_target = getattr(self, destination)
        implicit_destination = True
    else:
        action_target = default_namespace or self
        action_name = target

    handled = await self._dispatch_action(action_target, action_name, params)
    if not handled and implicit_destination and not isinstance(action_target, App):
        handled = await self.app._dispatch_action(self.app, action_name, params)
    return handled
async def action_add_class_(self, selector: str, class_name: str) ‑> None
Expand source code
async def action_add_class_(self, selector: str, class_name: str) -> None:
    self.screen.query(selector).add_class(class_name)
async def action_back(self) ‑> None
Expand source code
async def action_back(self) -> None:
    try:
        self.pop_screen()
    except ScreenStackError:
        pass
async def action_bang(self) ‑> None
Expand source code
async def action_bang(self) -> None:
    1 / 0
async def action_bell(self) ‑> None

Play the terminal 'bell'.

Expand source code
async def action_bell(self) -> None:
    """Play the terminal 'bell'."""
    self.bell()
async def action_check_bindings(self, key: str) ‑> None
Expand source code
async def action_check_bindings(self, key: str) -> None:
    await self.check_bindings(key)
async def action_focus(self, widget_id: str) ‑> None

Focus the given widget.

Args

widget_id : str
ID of widget to focus.
Expand source code
async def action_focus(self, widget_id: str) -> None:
    """Focus the given widget.

    Args:
        widget_id (str): ID of widget to focus.
    """
    try:
        node = self.query(f"#{widget_id}").first()
    except NoMatches:
        pass
    else:
        if isinstance(node, Widget):
            self.set_focus(node)
async def action_pop_screen(self) ‑> None

Removes the topmost screen and makes the new topmost screen active.

Expand source code
async def action_pop_screen(self) -> None:
    """Removes the topmost screen and makes the new topmost screen active."""
    self.pop_screen()
async def action_push_screen(self, screen: str) ‑> None

Pushes a screen on to the screen stack and makes it active.

Args

screen : str
Name of the screen.
Expand source code
async def action_push_screen(self, screen: str) -> None:
    """Pushes a screen on to the screen stack and makes it active.

    Args:
        screen (str): Name of the screen.
    """
    self.push_screen(screen)
async def action_quit(self) ‑> None

Quit the app as soon as possible.

Expand source code
async def action_quit(self) -> None:
    """Quit the app as soon as possible."""
    await self.shutdown()
async def action_remove_class_(self, selector: str, class_name: str) ‑> None
Expand source code
async def action_remove_class_(self, selector: str, class_name: str) -> None:
    self.screen.query(selector).remove_class(class_name)
def action_screenshot(self, filename: str | None = None, path: str = './') ‑> None

Save an SVG "screenshot". This action will save an SVG file containing the current contents of the screen.

Args

filename (str | None, optional): Filename of screenshot, or None to auto-generate. Defaults to None.
path : str, optional
Path to directory. Defaults to "~/".
Expand source code
def action_screenshot(self, filename: str | None = None, path: str = "./") -> None:
    """Save an SVG "screenshot". This action will save an SVG file containing the current contents of the screen.

    Args:
        filename (str | None, optional): Filename of screenshot, or None to auto-generate. Defaults to None.
        path (str, optional): Path to directory. Defaults to "~/".
    """
    self.save_screenshot(filename, path)
async def action_switch_screen(self, screen: str) ‑> None

Switches to another screen.

Args

screen : str
Name of the screen.
Expand source code
async def action_switch_screen(self, screen: str) -> None:
    """Switches to another screen.

    Args:
        screen (str): Name of the screen.
    """
    self.switch_screen(screen)
async def action_toggle_class(self, selector: str, class_name: str) ‑> None
Expand source code
async def action_toggle_class(self, selector: str, class_name: str) -> None:
    self.screen.query(selector).toggle_class(class_name)
def action_toggle_dark(self) ‑> None

Action to toggle dark mode.

Expand source code
def action_toggle_dark(self) -> None:
    """Action to toggle dark mode."""
    self.dark = not self.dark
def animate(self, attribute: str, value: float | Animatable, *, final_value: object = Ellipsis, duration: float | None = None, speed: float | None = None, delay: float = 0.0, easing: EasingFunction | str = 'in_out_cubic', on_complete: CallbackType | None = None) ‑> None

Animate an attribute.

Args

attribute : str
Name of the attribute to animate.
value (float | Animatable): The value to animate to.
final_value : object, optional
The final value of the animation. Defaults to value if not set.
duration (float | None, optional): The duration of the animate. Defaults to None.
speed (float | None, optional): The speed of the animation. Defaults to None.
delay : float, optional
A delay (in seconds) before the animation starts. Defaults to 0.0.

easing (EasingFunction | str, optional): An easing method. Defaults to "in_out_cubic". on_complete (CallbackType | None, optional): A callable to invoke when the animation is finished. Defaults to None.

Expand source code
def animate(
    self,
    attribute: str,
    value: float | Animatable,
    *,
    final_value: object = ...,
    duration: float | None = None,
    speed: float | None = None,
    delay: float = 0.0,
    easing: EasingFunction | str = DEFAULT_EASING,
    on_complete: CallbackType | None = None,
) -> None:
    """Animate an attribute.

    Args:
        attribute (str): Name of the attribute to animate.
        value (float | Animatable): The value to animate to.
        final_value (object, optional): The final value of the animation. Defaults to `value` if not set.
        duration (float | None, optional): The duration of the animate. Defaults to None.
        speed (float | None, optional): The speed of the animation. Defaults to None.
        delay (float, optional): A delay (in seconds) before the animation starts. Defaults to 0.0.
        easing (EasingFunction | str, optional): An easing method. Defaults to "in_out_cubic".
        on_complete (CallbackType | None, optional): A callable to invoke when the animation is finished. Defaults to None.

    """
    self._animate(
        attribute,
        value,
        final_value=final_value,
        duration=duration,
        speed=speed,
        delay=delay,
        easing=easing,
        on_complete=on_complete,
    )
def bell(self) ‑> None

Play the console 'bell'.

Expand source code
def bell(self) -> None:
    """Play the console 'bell'."""
    if not self.is_headless:
        self.console.bell()
def bind(self, keys: str, action: str, *, description: str = '', show: bool = True, key_display: str | None = None) ‑> None

Bind a key to an action.

Args

keys : str
A comma separated list of keys, i.e.
action : str
Action to bind to.
description : str, optional
Short description of action. Defaults to "".
show : bool, optional
Show key in UI. Defaults to True.
key_display : str, optional
Replacement text for key, or None to use default. Defaults to None.
Expand source code
def bind(
    self,
    keys: str,
    action: str,
    *,
    description: str = "",
    show: bool = True,
    key_display: str | None = None,
) -> None:
    """Bind a key to an action.

    Args:
        keys (str): A comma separated list of keys, i.e.
        action (str): Action to bind to.
        description (str, optional): Short description of action. Defaults to "".
        show (bool, optional): Show key in UI. Defaults to True.
        key_display (str, optional): Replacement text for key, or None to use default. Defaults to None.
    """
    self._bindings.bind(
        keys, action, description, show=show, key_display=key_display
    )
def capture_mouse(self, widget: Widget | None) ‑> None

Send all mouse events to the given widget, disable mouse capture.

Args

widget (Widget | None): If a widget, capture mouse event, or None to end mouse capture.

Expand source code
def capture_mouse(self, widget: Widget | None) -> None:
    """Send all mouse events to the given widget, disable mouse capture.

    Args:
        widget (Widget | None): If a widget, capture mouse event, or None to end mouse capture.
    """
    if widget == self.mouse_captured:
        return
    if self.mouse_captured is not None:
        self.mouse_captured.post_message_no_wait(
            events.MouseRelease(self, self.mouse_position)
        )
    self.mouse_captured = widget
    if widget is not None:
        widget.post_message_no_wait(events.MouseCapture(self, self.mouse_position))
async def check_bindings(self, key: str, universal: bool = False) ‑> bool

Handle a key press.

Args

key : str
A key
universal : bool
Check universal keys if True, otherwise non-universal keys.

Returns

bool
True if the key was handled by a binding, otherwise False
Expand source code
async def check_bindings(self, key: str, universal: bool = False) -> bool:
    """Handle a key press.

    Args:
        key (str): A key
        universal (bool): Check universal keys if True, otherwise non-universal keys.

    Returns:
        bool: True if the key was handled by a binding, otherwise False
    """

    for namespace, bindings in self._binding_chain:
        binding = bindings.keys.get(key)
        if binding is not None and binding.universal == universal:
            await self.action(binding.action, default_namespace=namespace)
            return True
    return False
def compose(self) ‑> Iterable[Widget]

Yield child widgets for a container.

Expand source code
def compose(self) -> ComposeResult:
    """Yield child widgets for a container."""
    return
    yield
def exit(self, result: ReturnType | None = None) ‑> None

Exit the app, and return the supplied result.

Args

result (ReturnType | None, optional): Return value. Defaults to None.

Expand source code
def exit(self, result: ReturnType | None = None) -> None:
    """Exit the app, and return the supplied result.

    Args:
        result (ReturnType | None, optional): Return value. Defaults to None.
    """
    self._return_value = result
    self._close_messages_no_wait()
def export_screenshot(self, *, title: str | None = None) ‑> str

Export an SVG screenshot of the current screen.

Args

title (str | None, optional): The title of the exported screenshot or None to use app title. Defaults to None.

Expand source code
def export_screenshot(self, *, title: str | None = None) -> str:
    """Export an SVG screenshot of the current screen.

    Args:
        title (str | None, optional): The title of the exported screenshot or None
            to use app title. Defaults to None.

    """

    console = Console(
        width=self.console.width,
        height=self.console.height,
        file=io.StringIO(),
        force_terminal=True,
        color_system="truecolor",
        record=True,
        legacy_windows=False,
    )
    screen_render = self.screen._compositor.render(full=True)
    console.print(screen_render)
    return console.export_svg(title=title or self.title)
def fatal_error(self) ‑> None

Exits the app after an unhandled exception.

Expand source code
def fatal_error(self) -> None:
    """Exits the app after an unhandled exception."""
    self.bell()
    traceback = Traceback(
        show_locals=True, width=None, locals_max_length=5, suppress=[rich]
    )
    self._exit_renderables.append(
        Segments(self.console.render(traceback, self.console.options))
    )
    self._close_messages_no_wait()
def get_child(self, id: str) ‑> DOMNode

Shorthand for self.screen.get_child(id: str) Returns the first child (immediate descendent) of this DOMNode with the given ID.

Args

id : str
The ID of the node to search for.

Returns

DOMNode
The first child of this node with the specified ID.

Raises

NoMatches
if no children could be found for this ID
Expand source code
def get_child(self, id: str) -> DOMNode:
    """Shorthand for self.screen.get_child(id: str)
    Returns the first child (immediate descendent) of this DOMNode
    with the given ID.

    Args:
        id (str): The ID of the node to search for.

    Returns:
        DOMNode: The first child of this node with the specified ID.

    Raises:
        NoMatches: if no children could be found for this ID
    """
    return self.screen.get_child(id)
def get_css_variables(self) ‑> dict[str, str]

Get a mapping of variables used to pre-populate CSS.

Returns

dict[str, str]
A mapping of variable name to value.
Expand source code
def get_css_variables(self) -> dict[str, str]:
    """Get a mapping of variables used to pre-populate CSS.

    Returns:
        dict[str, str]: A mapping of variable name to value.
    """
    variables = self.design["dark" if self.dark else "light"].generate()
    return variables
def get_driver_class(self) ‑> Type[Driver]

Get a driver class for this platform.

Called by the constructor.

Returns

Driver
A Driver class which manages input and display.
Expand source code
def get_driver_class(self) -> Type[Driver]:
    """Get a driver class for this platform.

    Called by the constructor.

    Returns:
        Driver: A Driver class which manages input and display.
    """
    driver_class: Type[Driver]
    if WINDOWS:
        from .drivers.windows_driver import WindowsDriver

        driver_class = WindowsDriver
    else:
        from .drivers.linux_driver import LinuxDriver

        driver_class = LinuxDriver
    return driver_class
def get_screen(self, screen: Screen | str) ‑> Screen

Get an installed screen.

If the screen isn't running, it will be registered before it is run.

Args

screen (Screen | str): Either a Screen object or screen name (the name argument when installed).

Raises

KeyError
If the named screen doesn't exist.

Returns

Screen
A screen instance.
Expand source code
def get_screen(self, screen: Screen | str) -> Screen:
    """Get an installed screen.

    If the screen isn't running, it will be registered before it is run.

    Args:
        screen (Screen | str): Either a Screen object or screen name (the `name` argument when installed).

    Raises:
        KeyError: If the named screen doesn't exist.

    Returns:
        Screen: A screen instance.
    """
    if isinstance(screen, str):
        try:
            next_screen = self._installed_screens[screen]
        except KeyError:
            raise KeyError(f"No screen called {screen!r} installed") from None
    else:
        next_screen = screen
    if not next_screen.is_running:
        self._register(self, next_screen)
    return next_screen
def get_widget_at(self, x: int, y: int) ‑> tuple[Widget, Region]

Get the widget under the given coordinates.

Args

x : int
X Coord.
y : int
Y Coord.

Returns

tuple[Widget, Region]
The widget and the widget's screen region.
Expand source code
def get_widget_at(self, x: int, y: int) -> tuple[Widget, Region]:
    """Get the widget under the given coordinates.

    Args:
        x (int): X Coord.
        y (int): Y Coord.

    Returns:
        tuple[Widget, Region]: The widget and the widget's screen region.
    """
    return self.screen.get_widget_at(x, y)
def install_screen(self, screen: Screen, name: str | None = None) ‑> str

Install a screen.

Args

screen : Screen
Screen to install.

name (str | None, optional): Unique name of screen or None to auto-generate. Defaults to None.

Raises

ScreenError
If the screen can't be installed.

Returns

str
The name of the screen
Expand source code
def install_screen(self, screen: Screen, name: str | None = None) -> str:
    """Install a screen.

    Args:
        screen (Screen): Screen to install.
        name (str | None, optional): Unique name of screen or None to auto-generate.
            Defaults to None.

    Raises:
        ScreenError: If the screen can't be installed.

    Returns:
        str: The name of the screen
    """
    if name is None:
        name = nanoid.generate()
    if name in self._installed_screens:
        raise ScreenError(f"Can't install screen; {name!r} is already installed")
    if screen in self._installed_screens.values():
        raise ScreenError(
            "Can't install screen; {screen!r} has already been installed"
        )
    self._installed_screens[name] = screen
    self.get_screen(name)  # Ensures screen is running
    self.log.system(f"{screen} INSTALLED name={name!r}")
    return name
def is_mounted(self, widget: Widget) ‑> bool

Check if a widget is mounted.

Args

widget : Widget
A widget.

Returns

bool
True of the widget is mounted.
Expand source code
def is_mounted(self, widget: Widget) -> bool:
    """Check if a widget is mounted.

    Args:
        widget (Widget): A widget.

    Returns:
        bool: True of the widget is mounted.
    """
    return widget in self._registry
def is_screen_installed(self, screen: Screen | str) ‑> bool

Check if a given screen has been installed.

Args

screen (Screen | str): Either a Screen object or screen name (the name argument when installed).

Returns

bool
True if the screen is currently installed,
Expand source code
def is_screen_installed(self, screen: Screen | str) -> bool:
    """Check if a given screen has been installed.

    Args:
        screen (Screen | str): Either a Screen object or screen name (the `name` argument when installed).

    Returns:
        bool: True if the screen is currently installed,
    """
    if isinstance(screen, str):
        return screen in self._installed_screens
    else:
        return screen in self._installed_screens.values()
def mount(self, *anon_widgets: Widget, **widgets: Widget) ‑> AwaitMount

Mount widgets. Widgets specified as positional args, or keywords args. If supplied as keyword args they will be assigned an id of the key.

Returns

AwaitMount
An awaitable object that waits for widgets to be mounted.
Expand source code
def mount(self, *anon_widgets: Widget, **widgets: Widget) -> AwaitMount:
    """Mount widgets. Widgets specified as positional args, or keywords args. If supplied
    as keyword args they will be assigned an id of the key.

    Returns:
        AwaitMount: An awaitable object that waits for widgets to be mounted.

    """
    mounted_widgets = self._register(self.screen, *anon_widgets, **widgets)
    return AwaitMount(mounted_widgets)
def mount_all(self, widgets: Iterable[Widget]) ‑> AwaitMount

Mount widgets from an iterable.

Args

widgets : Iterable[Widget]
An iterable of widgets.
Expand source code
def mount_all(self, widgets: Iterable[Widget]) -> AwaitMount:
    """Mount widgets from an iterable.

    Args:
        widgets (Iterable[Widget]): An iterable of widgets.
    """
    mounted_widgets = list(widgets)
    for widget in mounted_widgets:
        self._register(self.screen, widget)
    return AwaitMount(mounted_widgets)
def panic(self, *renderables: RenderableType) ‑> None

Exits the app then displays a message.

Args

*renderables : RenderableType, optional
Rich renderables to display on exit.
Expand source code
def panic(self, *renderables: RenderableType) -> None:
    """Exits the app then displays a message.

    Args:
        *renderables (RenderableType, optional): Rich renderables to display on exit.
    """

    assert all(
        is_renderable(renderable) for renderable in renderables
    ), "Can only call panic with strings or Rich renderables"

    def render(renderable: RenderableType) -> list[Segment]:
        """Render a panic renderables."""
        segments = list(self.console.render(renderable, self.console.options))
        return segments

    pre_rendered = [Segments(render(renderable)) for renderable in renderables]
    self._exit_renderables.extend(pre_rendered)
    self._close_messages_no_wait()
def pop_screen(self) ‑> Screen

Pop the current screen from the stack, and switch to the previous screen.

Returns

Screen
The screen that was replaced.
Expand source code
def pop_screen(self) -> Screen:
    """Pop the current screen from the stack, and switch to the previous screen.

    Returns:
        Screen: The screen that was replaced.
    """
    screen_stack = self._screen_stack
    if len(screen_stack) <= 1:
        raise ScreenStackError(
            "Can't pop screen; there must be at least one screen on the stack"
        )
    previous_screen = self._replace_screen(screen_stack.pop())
    self.screen._screen_resized(self.size)
    self.screen.post_message_no_wait(events.ScreenResume(self))
    self.log.system(f"{self.screen} is active")
    return previous_screen
def push_screen(self, screen: Screen | str) ‑> None

Push a new screen on the screen stack.

Args

screen (Screen | str): A Screen instance or the name of an installed screen.

Expand source code
def push_screen(self, screen: Screen | str) -> None:
    """Push a new screen on the screen stack.

    Args:
        screen (Screen | str): A Screen instance or the name of an installed screen.

    """
    next_screen = self.get_screen(screen)
    self._screen_stack.append(next_screen)
    self.screen.post_message_no_wait(events.ScreenResume(self))
    self.log.system(f"{self.screen} is current (PUSHED)")
def refresh(self, *, repaint: bool = True, layout: bool = False) ‑> None
Expand source code
def refresh(self, *, repaint: bool = True, layout: bool = False) -> None:
    if self._screen_stack:
        self.screen.refresh(repaint=repaint, layout=layout)
    self.check_idle()
def refresh_css(self, animate: bool = True) ‑> None

Refresh CSS.

Args

animate : bool, optional
Also execute CSS animations. Defaults to True.
Expand source code
def refresh_css(self, animate: bool = True) -> None:
    """Refresh CSS.

    Args:
        animate (bool, optional): Also execute CSS animations. Defaults to True.
    """
    stylesheet = self.app.stylesheet
    stylesheet.set_variables(self.get_css_variables())
    stylesheet.reparse()
    stylesheet.update(self.app, animate=animate)
    self.screen._refresh_layout(self.size, full=True)
def render(self) ‑> Union[rich.console.ConsoleRenderable, rich.console.RichCast, str]
Expand source code
def render(self) -> RenderableType:
    return Blank(self.styles.background)
def run(self, *, quit_after: float | None = None, headless: bool = False, press: Iterable[str] | None = None, screenshot: bool = False, screenshot_title: str | None = None) ‑> ReturnType | None

The main entry point for apps.

Args

quit_after (float | None, optional): Quit after a given number of seconds, or None
to run forever. Defaults to None.
headless : bool, optional
Run in "headless" mode (don't write to stdout).
press : str, optional
An iterable of keys to simulate being pressed.
screenshot : bool, optional
Take a screenshot after pressing keys (svg data stored in self._screenshot). Defaults to False.

screenshot_title (str | None, optional): Title of screenshot, or None to use App title. Defaults to None.

Returns

ReturnType | None: The return value specified in App.exit() or None if exit wasn't called.

Expand source code
def run(
    self,
    *,
    quit_after: float | None = None,
    headless: bool = False,
    press: Iterable[str] | None = None,
    screenshot: bool = False,
    screenshot_title: str | None = None,
) -> ReturnType | None:
    """The main entry point for apps.

    Args:
        quit_after (float | None, optional): Quit after a given number of seconds, or None
            to run forever. Defaults to None.
        headless (bool, optional): Run in "headless" mode (don't write to stdout).
        press (str, optional): An iterable of keys to simulate being pressed.
        screenshot (bool, optional): Take a screenshot after pressing keys (svg data stored in self._screenshot). Defaults to False.
        screenshot_title (str | None, optional): Title of screenshot, or None to use App title. Defaults to None.

    Returns:
        ReturnType | None: The return value specified in `App.exit` or None if exit wasn't called.
    """

    if headless:
        self.features = cast(
            "frozenset[FeatureFlag]", self.features.union({"headless"})
        )

    async def run_app() -> None:
        if quit_after is not None:
            self.set_timer(quit_after, self.shutdown)
        if press is not None:
            app = self

            async def press_keys() -> None:
                """A task to send key events."""
                assert press
                driver = app._driver
                assert driver is not None
                await asyncio.sleep(0.01)
                for key in press:
                    if key == "_":
                        print("(pause 50ms)")
                        await asyncio.sleep(0.05)
                    elif key.startswith("wait:"):
                        _, wait_ms = key.split(":")
                        print(f"(pause {wait_ms}ms)")
                        await asyncio.sleep(float(wait_ms) / 1000)
                    else:
                        if len(key) == 1 and not key.isalnum():
                            key = (
                                unicodedata.name(key)
                                .lower()
                                .replace("-", "_")
                                .replace(" ", "_")
                            )
                        original_key = REPLACED_KEYS.get(key, key)
                        try:
                            char = unicodedata.lookup(
                                original_key.upper().replace("_", " ")
                            )
                        except KeyError:
                            char = key if len(key) == 1 else None
                        print(f"press {key!r} (char={char!r})")
                        key_event = events.Key(self, key, char)
                        driver.send_event(key_event)
                        await asyncio.sleep(0.01)

                await app._animator.wait_for_idle()

                if screenshot:
                    self._screenshot = self.export_screenshot(
                        title=screenshot_title
                    )
                await self.shutdown()

            async def press_keys_task():
                """Press some keys in the background."""
                asyncio.create_task(press_keys())

            await self._process_messages(ready_callback=press_keys_task)
        else:
            await self._process_messages()

    if _ASYNCIO_GET_EVENT_LOOP_IS_DEPRECATED:
        # N.B. This doesn't work with Python<3.10, as we end up with 2 event loops:
        asyncio.run(run_app())
    else:
        # However, this works with Python<3.10:
        event_loop = asyncio.get_event_loop()
        event_loop.run_until_complete(run_app())

    return self._return_value
def save_screenshot(self, filename: str | None = None, path: str = './', time_format: str = '%Y-%m-%d %X %f') ‑> str

Save an SVG screenshot of the current screen.

Args

filename (str | None, optional): Filename of SVG screenshot, or None to auto-generate
a filename with the date and time. Defaults to None.
path : str, optional
Path to directory for output. Defaults to current working directory.
time_format : str, optional
Time format to use if filename is None. Defaults to "%Y-%m-%d %X %f".

Returns

str
Filename of screenshot.
Expand source code
def save_screenshot(
    self,
    filename: str | None = None,
    path: str = "./",
    time_format: str = "%Y-%m-%d %X %f",
) -> str:
    """Save an SVG screenshot of the current screen.

    Args:
        filename (str | None, optional): Filename of SVG screenshot, or None to auto-generate
            a filename with the date and time. Defaults to None.
        path (str, optional): Path to directory for output. Defaults to current working directory.
        time_format (str, optional): Time format to use if filename is None. Defaults to "%Y-%m-%d %X %f".

    Returns:
        str: Filename of screenshot.
    """
    if filename is None:
        svg_filename = (
            f"{self.title.lower()} {datetime.now().strftime(time_format)}.svg"
        )
        svg_filename = svg_filename.replace("/", "_").replace("\\", "_")
    else:
        svg_filename = filename
    svg_path = os.path.expanduser(os.path.join(path, svg_filename))
    screenshot_svg = self.export_screenshot()
    with open(svg_path, "w") as svg_file:
        svg_file.write(screenshot_svg)
    return svg_path
def set_focus(self, widget: Widget | None, scroll_visible: bool = True) ‑> None

Focus (or unfocus) a widget. A focused widget will receive key events first.

Args

widget : Widget
Widget to focus.
scroll_visible : bool, optional
Scroll widget in to view.
Expand source code
def set_focus(self, widget: Widget | None, scroll_visible: bool = True) -> None:
    """Focus (or unfocus) a widget. A focused widget will receive key events first.

    Args:
        widget (Widget): Widget to focus.
        scroll_visible (bool, optional): Scroll widget in to view.
    """
    self.screen.set_focus(widget, scroll_visible)
async def shutdown(self)
Expand source code
async def shutdown(self):
    await self._disconnect_devtools()
    driver = self._driver
    if driver is not None:
        driver.disable_input()
    await self._close_messages()
def switch_screen(self, screen: Screen | str) ‑> None

Switch to another screen by replacing the top of the screen stack with a new screen.

Args

screen (Screen | str): Either a Screen object or screen name (the name argument when installed).

Expand source code
def switch_screen(self, screen: Screen | str) -> None:
    """Switch to another screen by replacing the top of the screen stack with a new screen.

    Args:
        screen (Screen | str): Either a Screen object or screen name (the `name` argument when installed).

    """
    if self.screen is not screen:
        self._replace_screen(self._screen_stack.pop())
        next_screen = self.get_screen(screen)
        self._screen_stack.append(next_screen)
        self.screen.post_message_no_wait(events.ScreenResume(self))
        self.log.system(f"{self.screen} is current (SWITCHED)")
def uninstall_screen(self, screen: Screen | str) ‑> str | None

Uninstall a screen. If the screen was not previously installed then this method is a null-op.

Args

screen (Screen | str): The screen to uninstall or the name of a installed screen.

Returns

str | None: The name of the screen that was uninstalled, or None if no screen was uninstalled.

Expand source code
def uninstall_screen(self, screen: Screen | str) -> str | None:
    """Uninstall a screen. If the screen was not previously installed then this
    method is a null-op.

    Args:
        screen (Screen | str): The screen to uninstall or the name of a installed screen.

    Returns:
        str | None: The name of the screen that was uninstalled, or None if no screen was uninstalled.
    """
    if isinstance(screen, str):
        if screen not in self._installed_screens:
            return None
        uninstall_screen = self._installed_screens[screen]
        if uninstall_screen in self._screen_stack:
            raise ScreenStackError("Can't uninstall screen in screen stack")
        del self._installed_screens[screen]
        self.log.system(f"{uninstall_screen} UNINSTALLED name={screen!r}")
        return screen
    else:
        if screen in self._screen_stack:
            raise ScreenStackError("Can't uninstall screen in screen stack")
        for name, installed_screen in self._installed_screens.items():
            if installed_screen is screen:
                self._installed_screens.pop(name)
                self.log.system(f"{screen} UNINSTALLED name={name!r}")
                return name
    return None
def update_styles(self, node: DOMNode | None = None) ‑> None

Request update of styles.

Should be called whenever CSS classes / pseudo classes change.

Expand source code
def update_styles(self, node: DOMNode | None = None) -> None:
    """Request update of styles.

    Should be called whenever CSS classes / pseudo classes change.

    """
    self._require_stylesheet_update.add(self.screen if node is None else node)
    self.check_idle()
def watch_dark(self, dark: bool) ‑> None

Watches the dark bool.

Expand source code
def watch_dark(self, dark: bool) -> None:
    """Watches the dark bool."""
    self.set_class(dark, "-dark-mode")
    self.set_class(not dark, "-light-mode")
    self.refresh_css()

Inherited members

class AppError (*args, **kwargs)

Common base class for all non-exit exceptions.

Expand source code
class AppError(Exception):
    pass

Ancestors

  • builtins.Exception
  • builtins.BaseException
class ScreenError (*args, **kwargs)

Common base class for all non-exit exceptions.

Expand source code
class ScreenError(Exception):
    pass

Ancestors

  • builtins.Exception
  • builtins.BaseException

Subclasses

class ScreenStackError (*args, **kwargs)

Raised when attempting to pop the last screen from the stack.

Expand source code
class ScreenStackError(ScreenError):
    """Raised when attempting to pop the last screen from the stack."""

Ancestors

  • ScreenError
  • builtins.Exception
  • builtins.BaseException