Module textual.app
Expand source code
from __future__ import annotations
import asyncio
import inspect
import io
import os
import platform
import sys
import unicodedata
import warnings
from contextlib import redirect_stderr, redirect_stdout
from datetime import datetime
from pathlib import Path, PurePath
from time import perf_counter
from typing import Any, Generic, Iterable, Type, TYPE_CHECKING, TypeVar, cast, Union
from weakref import WeakSet, WeakValueDictionary
from ._ansi_sequences import SYNC_END, SYNC_START
from ._path import _make_path_object_relative
import nanoid
import rich
import rich.repr
from rich.console import Console, RenderableType
from rich.protocol import is_renderable
from rich.segment import Segment, Segments
from rich.traceback import Traceback
from . import Logger, LogGroup, LogVerbosity, actions, events, log, messages
from ._animator import Animator, DEFAULT_EASING, Animatable, EasingFunction
from ._callback import invoke
from ._context import active_app
from ._event_broker import NoHandler, extract_handler_actions
from ._filter import LineFilter, Monochrome
from .binding import Binding, Bindings
from .css.query import NoMatches
from .css.stylesheet import Stylesheet
from .design import ColorSystem
from .dom import DOMNode
from .driver import Driver
from .drivers.headless_driver import HeadlessDriver
from .features import FeatureFlag, parse_features
from .file_monitor import FileMonitor
from .geometry import Offset, Region, Size
from .keys import REPLACED_KEYS
from .messages import CallbackType
from .reactive import Reactive
from .renderables.blank import Blank
from .screen import Screen
from .widget import AwaitMount, Widget
if TYPE_CHECKING:
from .devtools.client import DevtoolsClient
PLATFORM = platform.system()
WINDOWS = PLATFORM == "Windows"
# asyncio will warn against resources not being cleared
warnings.simplefilter("always", ResourceWarning)
# `asyncio.get_event_loop()` is deprecated since Python 3.10:
_ASYNCIO_GET_EVENT_LOOP_IS_DEPRECATED = sys.version_info >= (3, 10, 0)
LayoutDefinition = "dict[str, Any]"
DEFAULT_COLORS = {
"dark": ColorSystem(
primary="#004578",
secondary="#ffa62b",
warning="#ffa62b",
error="#ba3c5b",
success="#4EBF71",
accent="#0178D4",
dark=True,
),
"light": ColorSystem(
primary="#004578",
secondary="#ffa62b",
warning="#ffa62b",
error="#ba3c5b",
success="#4EBF71",
accent="#0178D4",
dark=False,
),
}
ComposeResult = Iterable[Widget]
RenderResult = RenderableType
class AppError(Exception):
pass
class ActionError(Exception):
pass
class ScreenError(Exception):
pass
class ScreenStackError(ScreenError):
"""Raised when attempting to pop the last screen from the stack."""
ReturnType = TypeVar("ReturnType")
class _NullFile:
def write(self, text: str) -> None:
pass
def flush(self) -> None:
pass
CSSPathType = Union[str, PurePath, None]
@rich.repr.auto
class App(Generic[ReturnType], DOMNode):
"""The base class for Textual Applications.
Args:
driver_class (Type[Driver] | None, optional): Driver class or ``None`` to auto-detect. Defaults to None.
title (str | None, optional): Title of the application. If ``None``, the title is set to the name of the ``App`` subclass. Defaults to ``None``.
css_path (str | PurePath | None, optional): Path to CSS or ``None`` for no CSS file. Defaults to None.
watch_css (bool, optional): Watch CSS for changes. Defaults to False.
"""
# Inline CSS for quick scripts (generally css_path should be preferred.)
CSS = ""
# Default (lowest priority) CSS
DEFAULT_CSS = """
App {
background: $background;
color: $text;
}
"""
SCREENS: dict[str, Screen] = {}
_BASE_PATH: str | None = None
CSS_PATH: CSSPathType = None
TITLE: str | None = None
SUB_TITLE: str | None = None
title: Reactive[str] = Reactive("")
sub_title: Reactive[str] = Reactive("")
dark: Reactive[bool] = Reactive(True)
def __init__(
self,
driver_class: Type[Driver] | None = None,
css_path: CSSPathType = None,
watch_css: bool = False,
):
# N.B. This must be done *before* we call the parent constructor, because MessagePump's
# constructor instantiates a `asyncio.PriorityQueue` and in Python versions older than 3.10
# this will create some first references to an asyncio loop.
_init_uvloop()
super().__init__()
self.features: frozenset[FeatureFlag] = parse_features(os.getenv("TEXTUAL", ""))
self._filter: LineFilter | None = None
environ = dict(os.environ)
no_color = environ.pop("NO_COLOR", None)
if no_color is not None:
self._filter = Monochrome()
self.console = Console(
file=(_NullFile() if self.is_headless else sys.__stdout__),
markup=False,
highlight=False,
emoji=False,
legacy_windows=False,
_environ=environ,
)
self.error_console = Console(markup=False, stderr=True)
self.driver_class = driver_class or self.get_driver_class()
self._screen_stack: list[Screen] = []
self._sync_available = False
self.mouse_over: Widget | None = None
self.mouse_captured: Widget | None = None
self._driver: Driver | None = None
self._exit_renderables: list[RenderableType] = []
self._action_targets = {"app", "screen"}
self._animator = Animator(self)
self._animate = self._animator.bind(self)
self.mouse_position = Offset(0, 0)
self.title = (
self.TITLE if self.TITLE is not None else f"{self.__class__.__name__}"
)
self.sub_title = self.SUB_TITLE if self.SUB_TITLE is not None else ""
self._logger = Logger(self._log)
self._bindings.bind("ctrl+c", "quit", show=False, universal=True)
self._refresh_required = False
self.design = DEFAULT_COLORS
self.stylesheet = Stylesheet(variables=self.get_css_variables())
self._require_stylesheet_update: set[DOMNode] = set()
# We want the CSS path to be resolved from the location of the App subclass
css_path = css_path or self.CSS_PATH
if css_path is not None:
if isinstance(css_path, str):
css_path = Path(css_path)
css_path = _make_path_object_relative(css_path, self) if css_path else None
self.css_path = css_path
self._registry: WeakSet[DOMNode] = WeakSet()
self._installed_screens: WeakValueDictionary[
str, Screen
] = WeakValueDictionary()
self._installed_screens.update(**self.SCREENS)
self.devtools: DevtoolsClient | None = None
if "devtools" in self.features:
try:
from .devtools.client import DevtoolsClient
except ImportError:
# Dev dependencies not installed
pass
else:
self.devtools = DevtoolsClient()
self._return_value: ReturnType | None = None
self.css_monitor = (
FileMonitor(self.css_path, self._on_css_change)
if ((watch_css or self.debug) and self.css_path)
else None
)
self._screenshot: str | None = None
def animate(
self,
attribute: str,
value: float | Animatable,
*,
final_value: object = ...,
duration: float | None = None,
speed: float | None = None,
delay: float = 0.0,
easing: EasingFunction | str = DEFAULT_EASING,
on_complete: CallbackType | None = None,
) -> None:
"""Animate an attribute.
Args:
attribute (str): Name of the attribute to animate.
value (float | Animatable): The value to animate to.
final_value (object, optional): The final value of the animation. Defaults to `value` if not set.
duration (float | None, optional): The duration of the animate. Defaults to None.
speed (float | None, optional): The speed of the animation. Defaults to None.
delay (float, optional): A delay (in seconds) before the animation starts. Defaults to 0.0.
easing (EasingFunction | str, optional): An easing method. Defaults to "in_out_cubic".
on_complete (CallbackType | None, optional): A callable to invoke when the animation is finished. Defaults to None.
"""
self._animate(
attribute,
value,
final_value=final_value,
duration=duration,
speed=speed,
delay=delay,
easing=easing,
on_complete=on_complete,
)
@property
def debug(self) -> bool:
"""Check if debug mode is enabled.
Returns:
bool: True if debug mode is enabled.
"""
return "debug" in self.features
@property
def is_headless(self) -> bool:
"""Check if the app is running in 'headless' mode.
Returns:
bool: True if the app is in headless mode.
"""
return "headless" in self.features
@property
def screen_stack(self) -> list[Screen]:
"""Get a *copy* of the screen stack.
Returns:
list[Screen]: List of screens.
"""
return self._screen_stack.copy()
def exit(self, result: ReturnType | None = None) -> None:
"""Exit the app, and return the supplied result.
Args:
result (ReturnType | None, optional): Return value. Defaults to None.
"""
self._return_value = result
self._close_messages_no_wait()
@property
def focused(self) -> Widget | None:
"""Get the widget that is focused on the currently active screen."""
return self.screen.focused
@property
def namespace_bindings(self) -> dict[str, tuple[DOMNode, Binding]]:
"""Get current bindings. If no widget is focused, then the app-level bindings
are returned. If a widget is focused, then any bindings present in the active
screen and app are merged and returned."""
namespace_binding_map: dict[str, tuple[DOMNode, Binding]] = {}
for namespace, bindings in reversed(self._binding_chain):
for key, binding in bindings.keys.items():
namespace_binding_map[key] = (namespace, binding)
return namespace_binding_map
def _set_active(self) -> None:
"""Set this app to be the currently active app."""
active_app.set(self)
def compose(self) -> ComposeResult:
"""Yield child widgets for a container."""
return
yield
def get_css_variables(self) -> dict[str, str]:
"""Get a mapping of variables used to pre-populate CSS.
Returns:
dict[str, str]: A mapping of variable name to value.
"""
variables = self.design["dark" if self.dark else "light"].generate()
return variables
def watch_dark(self, dark: bool) -> None:
"""Watches the dark bool."""
self.set_class(dark, "-dark-mode")
self.set_class(not dark, "-light-mode")
self.refresh_css()
def get_driver_class(self) -> Type[Driver]:
"""Get a driver class for this platform.
Called by the constructor.
Returns:
Driver: A Driver class which manages input and display.
"""
driver_class: Type[Driver]
if WINDOWS:
from .drivers.windows_driver import WindowsDriver
driver_class = WindowsDriver
else:
from .drivers.linux_driver import LinuxDriver
driver_class = LinuxDriver
return driver_class
def __rich_repr__(self) -> rich.repr.Result:
yield "title", self.title
yield "id", self.id, None
if self.name:
yield "name", self.name
if self.classes:
yield "classes", set(self.classes)
pseudo_classes = self.pseudo_classes
if pseudo_classes:
yield "pseudo_classes", set(pseudo_classes)
@property
def is_transparent(self) -> bool:
return True
@property
def animator(self) -> Animator:
return self._animator
@property
def screen(self) -> Screen:
"""Get the current screen.
Raises:
ScreenStackError: If there are no screens on the stack.
Returns:
Screen: The currently active screen.
"""
try:
return self._screen_stack[-1]
except IndexError:
raise ScreenStackError("No screens on stack") from None
@property
def size(self) -> Size:
"""Get the size of the terminal.
Returns:
Size: Size of the terminal
"""
return Size(*self.console.size)
@property
def log(self) -> Logger:
return self._logger
def _log(
self,
group: LogGroup,
verbosity: LogVerbosity,
_textual_calling_frame: inspect.FrameInfo,
*objects: Any,
**kwargs,
) -> None:
"""Write to logs or devtools.
Positional args will logged. Keyword args will be prefixed with the key.
Example:
```python
data = [1,2,3]
self.log("Hello, World", state=data)
self.log(self.tree)
self.log(locals())
```
Args:
verbosity (int, optional): Verbosity level 0-3. Defaults to 1.
"""
devtools = self.devtools
if devtools is None or not devtools.is_connected:
return
if verbosity.value > LogVerbosity.NORMAL.value and not devtools.verbose:
return
try:
from .devtools.client import DevtoolsLog
if len(objects) == 1 and not kwargs:
devtools.log(
DevtoolsLog(objects, caller=_textual_calling_frame),
group,
verbosity,
)
else:
output = " ".join(str(arg) for arg in objects)
if kwargs:
key_values = " ".join(
f"{key}={value!r}" for key, value in kwargs.items()
)
output = f"{output} {key_values}" if output else key_values
devtools.log(
DevtoolsLog(output, caller=_textual_calling_frame),
group,
verbosity,
)
except Exception as error:
self._handle_exception(error)
def action_toggle_dark(self) -> None:
"""Action to toggle dark mode."""
self.dark = not self.dark
def action_screenshot(self, filename: str | None = None, path: str = "./") -> None:
"""Save an SVG "screenshot". This action will save an SVG file containing the current contents of the screen.
Args:
filename (str | None, optional): Filename of screenshot, or None to auto-generate. Defaults to None.
path (str, optional): Path to directory. Defaults to "~/".
"""
self.save_screenshot(filename, path)
def export_screenshot(self, *, title: str | None = None) -> str:
"""Export an SVG screenshot of the current screen.
Args:
title (str | None, optional): The title of the exported screenshot or None
to use app title. Defaults to None.
"""
console = Console(
width=self.console.width,
height=self.console.height,
file=io.StringIO(),
force_terminal=True,
color_system="truecolor",
record=True,
legacy_windows=False,
)
screen_render = self.screen._compositor.render(full=True)
console.print(screen_render)
return console.export_svg(title=title or self.title)
def save_screenshot(
self,
filename: str | None = None,
path: str = "./",
time_format: str = "%Y-%m-%d %X %f",
) -> str:
"""Save an SVG screenshot of the current screen.
Args:
filename (str | None, optional): Filename of SVG screenshot, or None to auto-generate
a filename with the date and time. Defaults to None.
path (str, optional): Path to directory for output. Defaults to current working directory.
time_format (str, optional): Time format to use if filename is None. Defaults to "%Y-%m-%d %X %f".
Returns:
str: Filename of screenshot.
"""
if filename is None:
svg_filename = (
f"{self.title.lower()} {datetime.now().strftime(time_format)}.svg"
)
svg_filename = svg_filename.replace("/", "_").replace("\\", "_")
else:
svg_filename = filename
svg_path = os.path.expanduser(os.path.join(path, svg_filename))
screenshot_svg = self.export_screenshot()
with open(svg_path, "w") as svg_file:
svg_file.write(screenshot_svg)
return svg_path
def bind(
self,
keys: str,
action: str,
*,
description: str = "",
show: bool = True,
key_display: str | None = None,
) -> None:
"""Bind a key to an action.
Args:
keys (str): A comma separated list of keys, i.e.
action (str): Action to bind to.
description (str, optional): Short description of action. Defaults to "".
show (bool, optional): Show key in UI. Defaults to True.
key_display (str, optional): Replacement text for key, or None to use default. Defaults to None.
"""
self._bindings.bind(
keys, action, description, show=show, key_display=key_display
)
def run(
self,
*,
quit_after: float | None = None,
headless: bool = False,
press: Iterable[str] | None = None,
screenshot: bool = False,
screenshot_title: str | None = None,
) -> ReturnType | None:
"""The main entry point for apps.
Args:
quit_after (float | None, optional): Quit after a given number of seconds, or None
to run forever. Defaults to None.
headless (bool, optional): Run in "headless" mode (don't write to stdout).
press (str, optional): An iterable of keys to simulate being pressed.
screenshot (bool, optional): Take a screenshot after pressing keys (svg data stored in self._screenshot). Defaults to False.
screenshot_title (str | None, optional): Title of screenshot, or None to use App title. Defaults to None.
Returns:
ReturnType | None: The return value specified in `App.exit` or None if exit wasn't called.
"""
if headless:
self.features = cast(
"frozenset[FeatureFlag]", self.features.union({"headless"})
)
async def run_app() -> None:
if quit_after is not None:
self.set_timer(quit_after, self.shutdown)
if press is not None:
app = self
async def press_keys() -> None:
"""A task to send key events."""
assert press
driver = app._driver
assert driver is not None
await asyncio.sleep(0.01)
for key in press:
if key == "_":
print("(pause 50ms)")
await asyncio.sleep(0.05)
elif key.startswith("wait:"):
_, wait_ms = key.split(":")
print(f"(pause {wait_ms}ms)")
await asyncio.sleep(float(wait_ms) / 1000)
else:
if len(key) == 1 and not key.isalnum():
key = (
unicodedata.name(key)
.lower()
.replace("-", "_")
.replace(" ", "_")
)
original_key = REPLACED_KEYS.get(key, key)
try:
char = unicodedata.lookup(
original_key.upper().replace("_", " ")
)
except KeyError:
char = key if len(key) == 1 else None
print(f"press {key!r} (char={char!r})")
key_event = events.Key(self, key, char)
driver.send_event(key_event)
await asyncio.sleep(0.01)
await app._animator.wait_for_idle()
if screenshot:
self._screenshot = self.export_screenshot(
title=screenshot_title
)
await self.shutdown()
async def press_keys_task():
"""Press some keys in the background."""
asyncio.create_task(press_keys())
await self._process_messages(ready_callback=press_keys_task)
else:
await self._process_messages()
if _ASYNCIO_GET_EVENT_LOOP_IS_DEPRECATED:
# N.B. This doesn't work with Python<3.10, as we end up with 2 event loops:
asyncio.run(run_app())
else:
# However, this works with Python<3.10:
event_loop = asyncio.get_event_loop()
event_loop.run_until_complete(run_app())
return self._return_value
async def _on_css_change(self) -> None:
"""Called when the CSS changes (if watch_css is True)."""
if self.css_path is not None:
try:
time = perf_counter()
stylesheet = self.stylesheet.copy()
stylesheet.read(self.css_path)
stylesheet.parse()
elapsed = (perf_counter() - time) * 1000
self.log.system(
f"<stylesheet> loaded {self.css_path!r} in {elapsed:.0f} ms"
)
except Exception as error:
# TODO: Catch specific exceptions
self.log.error(error)
self.bell()
else:
self.stylesheet = stylesheet
self.reset_styles()
self.stylesheet.update(self)
self.screen.refresh(layout=True)
def render(self) -> RenderableType:
return Blank(self.styles.background)
def get_child(self, id: str) -> DOMNode:
"""Shorthand for self.screen.get_child(id: str)
Returns the first child (immediate descendent) of this DOMNode
with the given ID.
Args:
id (str): The ID of the node to search for.
Returns:
DOMNode: The first child of this node with the specified ID.
Raises:
NoMatches: if no children could be found for this ID
"""
return self.screen.get_child(id)
def update_styles(self, node: DOMNode | None = None) -> None:
"""Request update of styles.
Should be called whenever CSS classes / pseudo classes change.
"""
self._require_stylesheet_update.add(self.screen if node is None else node)
self.check_idle()
def mount(self, *anon_widgets: Widget, **widgets: Widget) -> AwaitMount:
"""Mount widgets. Widgets specified as positional args, or keywords args. If supplied
as keyword args they will be assigned an id of the key.
Returns:
AwaitMount: An awaitable object that waits for widgets to be mounted.
"""
mounted_widgets = self._register(self.screen, *anon_widgets, **widgets)
return AwaitMount(mounted_widgets)
def mount_all(self, widgets: Iterable[Widget]) -> AwaitMount:
"""Mount widgets from an iterable.
Args:
widgets (Iterable[Widget]): An iterable of widgets.
"""
mounted_widgets = list(widgets)
for widget in mounted_widgets:
self._register(self.screen, widget)
return AwaitMount(mounted_widgets)
def is_screen_installed(self, screen: Screen | str) -> bool:
"""Check if a given screen has been installed.
Args:
screen (Screen | str): Either a Screen object or screen name (the `name` argument when installed).
Returns:
bool: True if the screen is currently installed,
"""
if isinstance(screen, str):
return screen in self._installed_screens
else:
return screen in self._installed_screens.values()
def get_screen(self, screen: Screen | str) -> Screen:
"""Get an installed screen.
If the screen isn't running, it will be registered before it is run.
Args:
screen (Screen | str): Either a Screen object or screen name (the `name` argument when installed).
Raises:
KeyError: If the named screen doesn't exist.
Returns:
Screen: A screen instance.
"""
if isinstance(screen, str):
try:
next_screen = self._installed_screens[screen]
except KeyError:
raise KeyError(f"No screen called {screen!r} installed") from None
else:
next_screen = screen
if not next_screen.is_running:
self._register(self, next_screen)
return next_screen
def _replace_screen(self, screen: Screen) -> Screen:
"""Handle the replaced screen.
Args:
screen (Screen): A screen object.
Returns:
Screen: The screen that was replaced.
"""
screen.post_message_no_wait(events.ScreenSuspend(self))
self.log.system(f"{screen} SUSPENDED")
if not self.is_screen_installed(screen) and screen not in self._screen_stack:
screen.remove()
self.log.system(f"{screen} REMOVED")
return screen
def push_screen(self, screen: Screen | str) -> None:
"""Push a new screen on the screen stack.
Args:
screen (Screen | str): A Screen instance or the name of an installed screen.
"""
next_screen = self.get_screen(screen)
self._screen_stack.append(next_screen)
self.screen.post_message_no_wait(events.ScreenResume(self))
self.log.system(f"{self.screen} is current (PUSHED)")
def switch_screen(self, screen: Screen | str) -> None:
"""Switch to another screen by replacing the top of the screen stack with a new screen.
Args:
screen (Screen | str): Either a Screen object or screen name (the `name` argument when installed).
"""
if self.screen is not screen:
self._replace_screen(self._screen_stack.pop())
next_screen = self.get_screen(screen)
self._screen_stack.append(next_screen)
self.screen.post_message_no_wait(events.ScreenResume(self))
self.log.system(f"{self.screen} is current (SWITCHED)")
def install_screen(self, screen: Screen, name: str | None = None) -> str:
"""Install a screen.
Args:
screen (Screen): Screen to install.
name (str | None, optional): Unique name of screen or None to auto-generate.
Defaults to None.
Raises:
ScreenError: If the screen can't be installed.
Returns:
str: The name of the screen
"""
if name is None:
name = nanoid.generate()
if name in self._installed_screens:
raise ScreenError(f"Can't install screen; {name!r} is already installed")
if screen in self._installed_screens.values():
raise ScreenError(
"Can't install screen; {screen!r} has already been installed"
)
self._installed_screens[name] = screen
self.get_screen(name) # Ensures screen is running
self.log.system(f"{screen} INSTALLED name={name!r}")
return name
def uninstall_screen(self, screen: Screen | str) -> str | None:
"""Uninstall a screen. If the screen was not previously installed then this
method is a null-op.
Args:
screen (Screen | str): The screen to uninstall or the name of a installed screen.
Returns:
str | None: The name of the screen that was uninstalled, or None if no screen was uninstalled.
"""
if isinstance(screen, str):
if screen not in self._installed_screens:
return None
uninstall_screen = self._installed_screens[screen]
if uninstall_screen in self._screen_stack:
raise ScreenStackError("Can't uninstall screen in screen stack")
del self._installed_screens[screen]
self.log.system(f"{uninstall_screen} UNINSTALLED name={screen!r}")
return screen
else:
if screen in self._screen_stack:
raise ScreenStackError("Can't uninstall screen in screen stack")
for name, installed_screen in self._installed_screens.items():
if installed_screen is screen:
self._installed_screens.pop(name)
self.log.system(f"{screen} UNINSTALLED name={name!r}")
return name
return None
def pop_screen(self) -> Screen:
"""Pop the current screen from the stack, and switch to the previous screen.
Returns:
Screen: The screen that was replaced.
"""
screen_stack = self._screen_stack
if len(screen_stack) <= 1:
raise ScreenStackError(
"Can't pop screen; there must be at least one screen on the stack"
)
previous_screen = self._replace_screen(screen_stack.pop())
self.screen._screen_resized(self.size)
self.screen.post_message_no_wait(events.ScreenResume(self))
self.log.system(f"{self.screen} is active")
return previous_screen
def set_focus(self, widget: Widget | None, scroll_visible: bool = True) -> None:
"""Focus (or unfocus) a widget. A focused widget will receive key events first.
Args:
widget (Widget): Widget to focus.
scroll_visible (bool, optional): Scroll widget in to view.
"""
self.screen.set_focus(widget, scroll_visible)
async def _set_mouse_over(self, widget: Widget | None) -> None:
"""Called when the mouse is over another widget.
Args:
widget (Widget | None): Widget under mouse, or None for no widgets.
"""
if widget is None:
if self.mouse_over is not None:
try:
await self.mouse_over.post_message(events.Leave(self))
finally:
self.mouse_over = None
else:
if self.mouse_over is not widget:
try:
if self.mouse_over is not None:
await self.mouse_over._forward_event(events.Leave(self))
if widget is not None:
await widget._forward_event(events.Enter(self))
finally:
self.mouse_over = widget
def capture_mouse(self, widget: Widget | None) -> None:
"""Send all mouse events to the given widget, disable mouse capture.
Args:
widget (Widget | None): If a widget, capture mouse event, or None to end mouse capture.
"""
if widget == self.mouse_captured:
return
if self.mouse_captured is not None:
self.mouse_captured.post_message_no_wait(
events.MouseRelease(self, self.mouse_position)
)
self.mouse_captured = widget
if widget is not None:
widget.post_message_no_wait(events.MouseCapture(self, self.mouse_position))
def panic(self, *renderables: RenderableType) -> None:
"""Exits the app then displays a message.
Args:
*renderables (RenderableType, optional): Rich renderables to display on exit.
"""
assert all(
is_renderable(renderable) for renderable in renderables
), "Can only call panic with strings or Rich renderables"
def render(renderable: RenderableType) -> list[Segment]:
"""Render a panic renderables."""
segments = list(self.console.render(renderable, self.console.options))
return segments
pre_rendered = [Segments(render(renderable)) for renderable in renderables]
self._exit_renderables.extend(pre_rendered)
self._close_messages_no_wait()
def _handle_exception(self, error: Exception) -> None:
"""Called with an unhandled exception.
Args:
error (Exception): An exception instance.
"""
if hasattr(error, "__rich__"):
# Exception has a rich method, so we can defer to that for the rendering
self.panic(error)
else:
# Use default exception rendering
self.fatal_error()
def fatal_error(self) -> None:
"""Exits the app after an unhandled exception."""
self.bell()
traceback = Traceback(
show_locals=True, width=None, locals_max_length=5, suppress=[rich]
)
self._exit_renderables.append(
Segments(self.console.render(traceback, self.console.options))
)
self._close_messages_no_wait()
def _print_error_renderables(self) -> None:
for renderable in self._exit_renderables:
self.error_console.print(renderable)
self._exit_renderables.clear()
async def _process_messages(
self, ready_callback: CallbackType | None = None
) -> None:
self._set_active()
if self.devtools is not None:
from .devtools.client import DevtoolsConnectionError
try:
await self.devtools.connect()
self.log.system(f"Connected to devtools ( {self.devtools.url} )")
except DevtoolsConnectionError:
self.log.system(f"Couldn't connect to devtools ( {self.devtools.url} )")
self.log.system("---")
self.log.system(driver=self.driver_class)
self.log.system(loop=asyncio.get_running_loop())
self.log.system(features=self.features)
try:
if self.css_path is not None:
self.stylesheet.read(self.css_path)
for path, css, tie_breaker in self.get_default_css():
self.stylesheet.add_source(
css, path=path, is_default_css=True, tie_breaker=tie_breaker
)
if self.CSS:
try:
app_css_path = (
f"{inspect.getfile(self.__class__)}:{self.__class__.__name__}"
)
except TypeError:
app_css_path = f"{self.__class__.__name__}"
self.stylesheet.add_source(
self.CSS, path=app_css_path, is_default_css=False
)
except Exception as error:
self._handle_exception(error)
self._print_error_renderables()
return
if self.css_monitor:
self.set_interval(0.25, self.css_monitor, name="css monitor")
self.log.system("[b green]STARTED[/]", self.css_monitor)
async def run_process_messages():
try:
await self._dispatch_message(events.Compose(sender=self))
await self._dispatch_message(events.Mount(sender=self))
finally:
self._mounted_event.set()
Reactive._initialize_object(self)
self.stylesheet.update(self)
self.refresh()
await self.animator.start()
await self._ready()
if ready_callback is not None:
await ready_callback()
self._running = True
try:
await self._process_messages_loop()
except asyncio.CancelledError:
pass
finally:
self._running = False
for timer in list(self._timers):
await timer.stop()
await self.animator.stop()
await self._close_all()
self._running = True
try:
load_event = events.Load(sender=self)
await self._dispatch_message(load_event)
driver: Driver
driver_class = cast(
"type[Driver]",
HeadlessDriver if self.is_headless else self.driver_class,
)
driver = self._driver = driver_class(self.console, self)
driver.start_application_mode()
try:
if self.is_headless:
await run_process_messages()
else:
if self.devtools is not None:
devtools = self.devtools
assert devtools is not None
from .devtools.redirect_output import StdoutRedirector
redirector = StdoutRedirector(devtools)
with redirect_stderr(redirector):
with redirect_stdout(redirector): # type: ignore
await run_process_messages()
else:
null_file = _NullFile()
with redirect_stderr(null_file):
with redirect_stdout(null_file):
await run_process_messages()
finally:
driver.stop_application_mode()
except Exception as error:
self._handle_exception(error)
finally:
self._running = False
self._print_error_renderables()
if self.devtools is not None and self.devtools.is_connected:
await self._disconnect_devtools()
async def _pre_process(self) -> None:
pass
async def _ready(self) -> None:
"""Called immediately prior to processing messages.
May be used as a hook for any operations that should run first.
"""
try:
screenshot_timer = float(os.environ.get("TEXTUAL_SCREENSHOT", "0"))
except ValueError:
return
screenshot_title = os.environ.get("TEXTUAL_SCREENSHOT_TITLE")
if not screenshot_timer:
return
async def on_screenshot():
"""Used by docs plugin."""
svg = self.export_screenshot(title=screenshot_title)
self._screenshot = svg # type: ignore
await self.shutdown()
self.set_timer(screenshot_timer, on_screenshot, name="screenshot timer")
async def _on_compose(self) -> None:
widgets = list(self.compose())
await self.mount_all(widgets)
def _on_idle(self) -> None:
"""Perform actions when there are no messages in the queue."""
if self._require_stylesheet_update:
nodes: set[DOMNode] = {
child
for node in self._require_stylesheet_update
for child in node.walk_children()
}
self._require_stylesheet_update.clear()
self.stylesheet.update_nodes(nodes, animate=True)
def _register_child(self, parent: DOMNode, child: Widget) -> bool:
if child not in self._registry:
parent.children._append(child)
self._registry.add(child)
child._attach(parent)
child._post_register(self)
child._start_messages()
return True
return False
def _register(
self, parent: DOMNode, *anon_widgets: Widget, **widgets: Widget
) -> list[Widget]:
"""Register widget(s) so they may receive events.
Args:
parent (Widget): Parent Widget.
Returns:
list[Widget]: List of modified widgets.
"""
if not anon_widgets and not widgets:
return []
name_widgets: list[tuple[str | None, Widget]]
name_widgets = [*((None, widget) for widget in anon_widgets), *widgets.items()]
apply_stylesheet = self.stylesheet.apply
for widget_id, widget in name_widgets:
if not isinstance(widget, Widget):
raise AppError(f"Can't register {widget!r}; expected a Widget instance")
if widget not in self._registry:
if widget_id is not None:
widget.id = widget_id
self._register_child(parent, widget)
if widget.children:
self._register(widget, *widget.children)
apply_stylesheet(widget)
registered_widgets = [widget for _, widget in name_widgets]
return registered_widgets
def _unregister(self, widget: Widget) -> None:
"""Unregister a widget.
Args:
widget (Widget): A Widget to unregister
"""
widget.reset_focus()
if isinstance(widget._parent, Widget):
widget._parent.children._remove(widget)
widget._detach()
self._registry.discard(widget)
async def _disconnect_devtools(self):
if self.devtools is not None:
await self.devtools.disconnect()
def _start_widget(self, parent: Widget, widget: Widget) -> None:
"""Start a widget (run it's task) so that it can receive messages.
Args:
parent (Widget): The parent of the Widget.
widget (Widget): The Widget to start.
"""
widget._attach(parent)
widget._start_messages()
def is_mounted(self, widget: Widget) -> bool:
"""Check if a widget is mounted.
Args:
widget (Widget): A widget.
Returns:
bool: True of the widget is mounted.
"""
return widget in self._registry
async def _close_all(self) -> None:
while self._registry:
child = self._registry.pop()
await child._close_messages()
async def shutdown(self):
await self._disconnect_devtools()
driver = self._driver
if driver is not None:
driver.disable_input()
await self._close_messages()
def refresh(self, *, repaint: bool = True, layout: bool = False) -> None:
if self._screen_stack:
self.screen.refresh(repaint=repaint, layout=layout)
self.check_idle()
def refresh_css(self, animate: bool = True) -> None:
"""Refresh CSS.
Args:
animate (bool, optional): Also execute CSS animations. Defaults to True.
"""
stylesheet = self.app.stylesheet
stylesheet.set_variables(self.get_css_variables())
stylesheet.reparse()
stylesheet.update(self.app, animate=animate)
self.screen._refresh_layout(self.size, full=True)
def _display(self, screen: Screen, renderable: RenderableType | None) -> None:
"""Display a renderable within a sync.
Args:
screen (Screen): Screen instance
renderable (RenderableType): A Rich renderable.
"""
if screen is not self.screen or renderable is None:
return
if self._running and not self._closed and not self.is_headless:
console = self.console
self._begin_update()
try:
try:
console.print(renderable)
except Exception as error:
self._handle_exception(error)
finally:
self._end_update()
console.file.flush()
def get_widget_at(self, x: int, y: int) -> tuple[Widget, Region]:
"""Get the widget under the given coordinates.
Args:
x (int): X Coord.
y (int): Y Coord.
Returns:
tuple[Widget, Region]: The widget and the widget's screen region.
"""
return self.screen.get_widget_at(x, y)
def bell(self) -> None:
"""Play the console 'bell'."""
if not self.is_headless:
self.console.bell()
@property
def _binding_chain(self) -> list[tuple[DOMNode, Bindings]]:
"""Get a chain of nodes and bindings to consider. If no widget is focused, returns the bindings from both the screen and the app level bindings. Otherwise, combines all the bindings from the currently focused node up the DOM to the root App.
Returns:
list[tuple[DOMNode, Bindings]]: List of DOM nodes and their bindings.
"""
focused = self.focused
namespace_bindings: list[tuple[DOMNode, Bindings]]
if focused is None:
namespace_bindings = [
(self.screen, self.screen._bindings),
(self, self._bindings),
]
else:
namespace_bindings = [(node, node._bindings) for node in focused.ancestors]
return namespace_bindings
async def check_bindings(self, key: str, universal: bool = False) -> bool:
"""Handle a key press.
Args:
key (str): A key
universal (bool): Check universal keys if True, otherwise non-universal keys.
Returns:
bool: True if the key was handled by a binding, otherwise False
"""
for namespace, bindings in self._binding_chain:
binding = bindings.keys.get(key)
if binding is not None and binding.universal == universal:
await self.action(binding.action, default_namespace=namespace)
return True
return False
async def on_event(self, event: events.Event) -> None:
# Handle input events that haven't been forwarded
# If the event has been forwarded it may have bubbled up back to the App
if isinstance(event, events.Compose):
screen = Screen(id="_default")
self._register(self, screen)
self._screen_stack.append(screen)
await super().on_event(event)
elif isinstance(event, events.InputEvent) and not event.is_forwarded:
if isinstance(event, events.MouseEvent):
# Record current mouse position on App
self.mouse_position = Offset(event.x, event.y)
await self.screen._forward_event(event)
elif isinstance(event, events.Key):
if not await self.check_bindings(event.key, universal=True):
forward_target = self.focused or self.screen
await forward_target._forward_event(event)
else:
await self.screen._forward_event(event)
elif isinstance(event, events.Paste):
if self.focused is not None:
await self.focused._forward_event(event)
else:
await super().on_event(event)
async def action(
self,
action: str | tuple[str, tuple[str, ...]],
default_namespace: object | None = None,
) -> bool:
"""Perform an action.
Args:
action (str): Action encoded in a string.
default_namespace (object | None): Namespace to use if not provided in the action,
or None to use app. Defaults to None.
Returns:
bool: True if the event has handled.
"""
print("ACTION", action, default_namespace)
if isinstance(action, str):
target, params = actions.parse(action)
else:
target, params = action
implicit_destination = True
if "." in target:
destination, action_name = target.split(".", 1)
if destination not in self._action_targets:
raise ActionError(f"Action namespace {destination} is not known")
action_target = getattr(self, destination)
implicit_destination = True
else:
action_target = default_namespace or self
action_name = target
handled = await self._dispatch_action(action_target, action_name, params)
if not handled and implicit_destination and not isinstance(action_target, App):
handled = await self.app._dispatch_action(self.app, action_name, params)
return handled
async def _dispatch_action(
self, namespace: object, action_name: str, params: Any
) -> bool:
log(
"<action>",
namespace=namespace,
action_name=action_name,
params=params,
)
_rich_traceback_guard = True
public_method_name = f"action_{action_name}"
private_method_name = f"_{public_method_name}"
private_method = getattr(namespace, private_method_name, None)
public_method = getattr(namespace, public_method_name, None)
if private_method is None and public_method is None:
log(
f"<action> {action_name!r} has no target. Couldn't find methods {public_method_name!r} or {private_method_name!r}"
)
if callable(private_method):
await invoke(private_method, *params)
return True
elif callable(public_method):
await invoke(public_method, *params)
return True
return False
async def _broker_event(
self, event_name: str, event: events.Event, default_namespace: object | None
) -> bool:
"""Allow the app an opportunity to dispatch events to action system.
Args:
event_name (str): _description_
event (events.Event): An event object.
default_namespace (object | None): TODO: _description_
Returns:
bool: True if an action was processed.
"""
try:
style = getattr(event, "style")
except AttributeError:
return False
try:
_modifiers, action = extract_handler_actions(event_name, style.meta)
except NoHandler:
return False
else:
event.stop()
if isinstance(action, (str, tuple)):
await self.action(action, default_namespace=default_namespace)
elif callable(action):
await action()
else:
return False
return True
async def _on_update(self, message: messages.Update) -> None:
message.stop()
async def _on_layout(self, message: messages.Layout) -> None:
message.stop()
async def _on_key(self, event: events.Key) -> None:
if event.key == "tab":
self.screen.focus_next()
elif event.key == "shift+tab":
self.screen.focus_previous()
else:
if not (await self.check_bindings(event.key)):
await self.dispatch_key(event)
async def _on_shutdown_request(self, event: events.ShutdownRequest) -> None:
log("shutdown request")
await self._close_messages()
async def _on_resize(self, event: events.Resize) -> None:
event.stop()
await self.screen.post_message(event)
async def _on_remove(self, event: events.Remove) -> None:
widget = event.widget
parent = widget.parent
remove_widgets = widget.walk_children(
Widget, with_self=True, method="depth", reverse=True
)
if self.screen.focused in remove_widgets:
self.screen._reset_focus(
self.screen.focused,
[to_remove for to_remove in remove_widgets if to_remove.can_focus],
)
for child in remove_widgets:
await child._close_messages()
self._unregister(child)
if parent is not None:
parent.refresh(layout=True)
async def action_check_bindings(self, key: str) -> None:
await self.check_bindings(key)
async def action_quit(self) -> None:
"""Quit the app as soon as possible."""
await self.shutdown()
async def action_bang(self) -> None:
1 / 0
async def action_bell(self) -> None:
"""Play the terminal 'bell'."""
self.bell()
async def action_focus(self, widget_id: str) -> None:
"""Focus the given widget.
Args:
widget_id (str): ID of widget to focus.
"""
try:
node = self.query(f"#{widget_id}").first()
except NoMatches:
pass
else:
if isinstance(node, Widget):
self.set_focus(node)
async def action_switch_screen(self, screen: str) -> None:
"""Switches to another screen.
Args:
screen (str): Name of the screen.
"""
self.switch_screen(screen)
async def action_push_screen(self, screen: str) -> None:
"""Pushes a screen on to the screen stack and makes it active.
Args:
screen (str): Name of the screen.
"""
self.push_screen(screen)
async def action_pop_screen(self) -> None:
"""Removes the topmost screen and makes the new topmost screen active."""
self.pop_screen()
async def action_back(self) -> None:
try:
self.pop_screen()
except ScreenStackError:
pass
async def action_add_class_(self, selector: str, class_name: str) -> None:
self.screen.query(selector).add_class(class_name)
async def action_remove_class_(self, selector: str, class_name: str) -> None:
self.screen.query(selector).remove_class(class_name)
async def action_toggle_class(self, selector: str, class_name: str) -> None:
self.screen.query(selector).toggle_class(class_name)
def _on_terminal_supports_synchronized_output(
self, message: messages.TerminalSupportsSynchronizedOutput
) -> None:
log.system("[b green]SynchronizedOutput mode is supported")
self._sync_available = True
def _begin_update(self) -> None:
if self._sync_available:
self.console.file.write(SYNC_START)
def _end_update(self) -> None:
if self._sync_available:
self.console.file.write(SYNC_END)
_uvloop_init_done: bool = False
def _init_uvloop() -> None:
"""
Import and install the `uvloop` asyncio policy, if available.
This is done only once, even if the function is called multiple times.
"""
global _uvloop_init_done
if _uvloop_init_done:
return
try:
import uvloop
except ImportError:
pass
else:
uvloop.install()
_uvloop_init_done = True
Classes
class ActionError (*args, **kwargs)
-
Common base class for all non-exit exceptions.
Expand source code
class ActionError(Exception): pass
Ancestors
- builtins.Exception
- builtins.BaseException
class App (driver_class: Type[Driver] | None = None, css_path: CSSPathType = None, watch_css: bool = False)
-
The base class for Textual Applications.
Args
- driver_class (Type[Driver] | None, optional): Driver class or
None
to auto-detect. Defaults to None. - title (str | None, optional): Title of the application. If
None
, the title is set to the name of theApp
subclass. Defaults toNone
. - css_path (str | PurePath | None, optional): Path to CSS or
None
for no CSS file. Defaults to None. watch_css
:bool
, optional- Watch CSS for changes. Defaults to False.
Expand source code
class App(Generic[ReturnType], DOMNode): """The base class for Textual Applications. Args: driver_class (Type[Driver] | None, optional): Driver class or ``None`` to auto-detect. Defaults to None. title (str | None, optional): Title of the application. If ``None``, the title is set to the name of the ``App`` subclass. Defaults to ``None``. css_path (str | PurePath | None, optional): Path to CSS or ``None`` for no CSS file. Defaults to None. watch_css (bool, optional): Watch CSS for changes. Defaults to False. """ # Inline CSS for quick scripts (generally css_path should be preferred.) CSS = "" # Default (lowest priority) CSS DEFAULT_CSS = """ App { background: $background; color: $text; } """ SCREENS: dict[str, Screen] = {} _BASE_PATH: str | None = None CSS_PATH: CSSPathType = None TITLE: str | None = None SUB_TITLE: str | None = None title: Reactive[str] = Reactive("") sub_title: Reactive[str] = Reactive("") dark: Reactive[bool] = Reactive(True) def __init__( self, driver_class: Type[Driver] | None = None, css_path: CSSPathType = None, watch_css: bool = False, ): # N.B. This must be done *before* we call the parent constructor, because MessagePump's # constructor instantiates a `asyncio.PriorityQueue` and in Python versions older than 3.10 # this will create some first references to an asyncio loop. _init_uvloop() super().__init__() self.features: frozenset[FeatureFlag] = parse_features(os.getenv("TEXTUAL", "")) self._filter: LineFilter | None = None environ = dict(os.environ) no_color = environ.pop("NO_COLOR", None) if no_color is not None: self._filter = Monochrome() self.console = Console( file=(_NullFile() if self.is_headless else sys.__stdout__), markup=False, highlight=False, emoji=False, legacy_windows=False, _environ=environ, ) self.error_console = Console(markup=False, stderr=True) self.driver_class = driver_class or self.get_driver_class() self._screen_stack: list[Screen] = [] self._sync_available = False self.mouse_over: Widget | None = None self.mouse_captured: Widget | None = None self._driver: Driver | None = None self._exit_renderables: list[RenderableType] = [] self._action_targets = {"app", "screen"} self._animator = Animator(self) self._animate = self._animator.bind(self) self.mouse_position = Offset(0, 0) self.title = ( self.TITLE if self.TITLE is not None else f"{self.__class__.__name__}" ) self.sub_title = self.SUB_TITLE if self.SUB_TITLE is not None else "" self._logger = Logger(self._log) self._bindings.bind("ctrl+c", "quit", show=False, universal=True) self._refresh_required = False self.design = DEFAULT_COLORS self.stylesheet = Stylesheet(variables=self.get_css_variables()) self._require_stylesheet_update: set[DOMNode] = set() # We want the CSS path to be resolved from the location of the App subclass css_path = css_path or self.CSS_PATH if css_path is not None: if isinstance(css_path, str): css_path = Path(css_path) css_path = _make_path_object_relative(css_path, self) if css_path else None self.css_path = css_path self._registry: WeakSet[DOMNode] = WeakSet() self._installed_screens: WeakValueDictionary[ str, Screen ] = WeakValueDictionary() self._installed_screens.update(**self.SCREENS) self.devtools: DevtoolsClient | None = None if "devtools" in self.features: try: from .devtools.client import DevtoolsClient except ImportError: # Dev dependencies not installed pass else: self.devtools = DevtoolsClient() self._return_value: ReturnType | None = None self.css_monitor = ( FileMonitor(self.css_path, self._on_css_change) if ((watch_css or self.debug) and self.css_path) else None ) self._screenshot: str | None = None def animate( self, attribute: str, value: float | Animatable, *, final_value: object = ..., duration: float | None = None, speed: float | None = None, delay: float = 0.0, easing: EasingFunction | str = DEFAULT_EASING, on_complete: CallbackType | None = None, ) -> None: """Animate an attribute. Args: attribute (str): Name of the attribute to animate. value (float | Animatable): The value to animate to. final_value (object, optional): The final value of the animation. Defaults to `value` if not set. duration (float | None, optional): The duration of the animate. Defaults to None. speed (float | None, optional): The speed of the animation. Defaults to None. delay (float, optional): A delay (in seconds) before the animation starts. Defaults to 0.0. easing (EasingFunction | str, optional): An easing method. Defaults to "in_out_cubic". on_complete (CallbackType | None, optional): A callable to invoke when the animation is finished. Defaults to None. """ self._animate( attribute, value, final_value=final_value, duration=duration, speed=speed, delay=delay, easing=easing, on_complete=on_complete, ) @property def debug(self) -> bool: """Check if debug mode is enabled. Returns: bool: True if debug mode is enabled. """ return "debug" in self.features @property def is_headless(self) -> bool: """Check if the app is running in 'headless' mode. Returns: bool: True if the app is in headless mode. """ return "headless" in self.features @property def screen_stack(self) -> list[Screen]: """Get a *copy* of the screen stack. Returns: list[Screen]: List of screens. """ return self._screen_stack.copy() def exit(self, result: ReturnType | None = None) -> None: """Exit the app, and return the supplied result. Args: result (ReturnType | None, optional): Return value. Defaults to None. """ self._return_value = result self._close_messages_no_wait() @property def focused(self) -> Widget | None: """Get the widget that is focused on the currently active screen.""" return self.screen.focused @property def namespace_bindings(self) -> dict[str, tuple[DOMNode, Binding]]: """Get current bindings. If no widget is focused, then the app-level bindings are returned. If a widget is focused, then any bindings present in the active screen and app are merged and returned.""" namespace_binding_map: dict[str, tuple[DOMNode, Binding]] = {} for namespace, bindings in reversed(self._binding_chain): for key, binding in bindings.keys.items(): namespace_binding_map[key] = (namespace, binding) return namespace_binding_map def _set_active(self) -> None: """Set this app to be the currently active app.""" active_app.set(self) def compose(self) -> ComposeResult: """Yield child widgets for a container.""" return yield def get_css_variables(self) -> dict[str, str]: """Get a mapping of variables used to pre-populate CSS. Returns: dict[str, str]: A mapping of variable name to value. """ variables = self.design["dark" if self.dark else "light"].generate() return variables def watch_dark(self, dark: bool) -> None: """Watches the dark bool.""" self.set_class(dark, "-dark-mode") self.set_class(not dark, "-light-mode") self.refresh_css() def get_driver_class(self) -> Type[Driver]: """Get a driver class for this platform. Called by the constructor. Returns: Driver: A Driver class which manages input and display. """ driver_class: Type[Driver] if WINDOWS: from .drivers.windows_driver import WindowsDriver driver_class = WindowsDriver else: from .drivers.linux_driver import LinuxDriver driver_class = LinuxDriver return driver_class def __rich_repr__(self) -> rich.repr.Result: yield "title", self.title yield "id", self.id, None if self.name: yield "name", self.name if self.classes: yield "classes", set(self.classes) pseudo_classes = self.pseudo_classes if pseudo_classes: yield "pseudo_classes", set(pseudo_classes) @property def is_transparent(self) -> bool: return True @property def animator(self) -> Animator: return self._animator @property def screen(self) -> Screen: """Get the current screen. Raises: ScreenStackError: If there are no screens on the stack. Returns: Screen: The currently active screen. """ try: return self._screen_stack[-1] except IndexError: raise ScreenStackError("No screens on stack") from None @property def size(self) -> Size: """Get the size of the terminal. Returns: Size: Size of the terminal """ return Size(*self.console.size) @property def log(self) -> Logger: return self._logger def _log( self, group: LogGroup, verbosity: LogVerbosity, _textual_calling_frame: inspect.FrameInfo, *objects: Any, **kwargs, ) -> None: """Write to logs or devtools. Positional args will logged. Keyword args will be prefixed with the key. Example: ```python data = [1,2,3] self.log("Hello, World", state=data) self.log(self.tree) self.log(locals()) ``` Args: verbosity (int, optional): Verbosity level 0-3. Defaults to 1. """ devtools = self.devtools if devtools is None or not devtools.is_connected: return if verbosity.value > LogVerbosity.NORMAL.value and not devtools.verbose: return try: from .devtools.client import DevtoolsLog if len(objects) == 1 and not kwargs: devtools.log( DevtoolsLog(objects, caller=_textual_calling_frame), group, verbosity, ) else: output = " ".join(str(arg) for arg in objects) if kwargs: key_values = " ".join( f"{key}={value!r}" for key, value in kwargs.items() ) output = f"{output} {key_values}" if output else key_values devtools.log( DevtoolsLog(output, caller=_textual_calling_frame), group, verbosity, ) except Exception as error: self._handle_exception(error) def action_toggle_dark(self) -> None: """Action to toggle dark mode.""" self.dark = not self.dark def action_screenshot(self, filename: str | None = None, path: str = "./") -> None: """Save an SVG "screenshot". This action will save an SVG file containing the current contents of the screen. Args: filename (str | None, optional): Filename of screenshot, or None to auto-generate. Defaults to None. path (str, optional): Path to directory. Defaults to "~/". """ self.save_screenshot(filename, path) def export_screenshot(self, *, title: str | None = None) -> str: """Export an SVG screenshot of the current screen. Args: title (str | None, optional): The title of the exported screenshot or None to use app title. Defaults to None. """ console = Console( width=self.console.width, height=self.console.height, file=io.StringIO(), force_terminal=True, color_system="truecolor", record=True, legacy_windows=False, ) screen_render = self.screen._compositor.render(full=True) console.print(screen_render) return console.export_svg(title=title or self.title) def save_screenshot( self, filename: str | None = None, path: str = "./", time_format: str = "%Y-%m-%d %X %f", ) -> str: """Save an SVG screenshot of the current screen. Args: filename (str | None, optional): Filename of SVG screenshot, or None to auto-generate a filename with the date and time. Defaults to None. path (str, optional): Path to directory for output. Defaults to current working directory. time_format (str, optional): Time format to use if filename is None. Defaults to "%Y-%m-%d %X %f". Returns: str: Filename of screenshot. """ if filename is None: svg_filename = ( f"{self.title.lower()} {datetime.now().strftime(time_format)}.svg" ) svg_filename = svg_filename.replace("/", "_").replace("\\", "_") else: svg_filename = filename svg_path = os.path.expanduser(os.path.join(path, svg_filename)) screenshot_svg = self.export_screenshot() with open(svg_path, "w") as svg_file: svg_file.write(screenshot_svg) return svg_path def bind( self, keys: str, action: str, *, description: str = "", show: bool = True, key_display: str | None = None, ) -> None: """Bind a key to an action. Args: keys (str): A comma separated list of keys, i.e. action (str): Action to bind to. description (str, optional): Short description of action. Defaults to "". show (bool, optional): Show key in UI. Defaults to True. key_display (str, optional): Replacement text for key, or None to use default. Defaults to None. """ self._bindings.bind( keys, action, description, show=show, key_display=key_display ) def run( self, *, quit_after: float | None = None, headless: bool = False, press: Iterable[str] | None = None, screenshot: bool = False, screenshot_title: str | None = None, ) -> ReturnType | None: """The main entry point for apps. Args: quit_after (float | None, optional): Quit after a given number of seconds, or None to run forever. Defaults to None. headless (bool, optional): Run in "headless" mode (don't write to stdout). press (str, optional): An iterable of keys to simulate being pressed. screenshot (bool, optional): Take a screenshot after pressing keys (svg data stored in self._screenshot). Defaults to False. screenshot_title (str | None, optional): Title of screenshot, or None to use App title. Defaults to None. Returns: ReturnType | None: The return value specified in `App.exit` or None if exit wasn't called. """ if headless: self.features = cast( "frozenset[FeatureFlag]", self.features.union({"headless"}) ) async def run_app() -> None: if quit_after is not None: self.set_timer(quit_after, self.shutdown) if press is not None: app = self async def press_keys() -> None: """A task to send key events.""" assert press driver = app._driver assert driver is not None await asyncio.sleep(0.01) for key in press: if key == "_": print("(pause 50ms)") await asyncio.sleep(0.05) elif key.startswith("wait:"): _, wait_ms = key.split(":") print(f"(pause {wait_ms}ms)") await asyncio.sleep(float(wait_ms) / 1000) else: if len(key) == 1 and not key.isalnum(): key = ( unicodedata.name(key) .lower() .replace("-", "_") .replace(" ", "_") ) original_key = REPLACED_KEYS.get(key, key) try: char = unicodedata.lookup( original_key.upper().replace("_", " ") ) except KeyError: char = key if len(key) == 1 else None print(f"press {key!r} (char={char!r})") key_event = events.Key(self, key, char) driver.send_event(key_event) await asyncio.sleep(0.01) await app._animator.wait_for_idle() if screenshot: self._screenshot = self.export_screenshot( title=screenshot_title ) await self.shutdown() async def press_keys_task(): """Press some keys in the background.""" asyncio.create_task(press_keys()) await self._process_messages(ready_callback=press_keys_task) else: await self._process_messages() if _ASYNCIO_GET_EVENT_LOOP_IS_DEPRECATED: # N.B. This doesn't work with Python<3.10, as we end up with 2 event loops: asyncio.run(run_app()) else: # However, this works with Python<3.10: event_loop = asyncio.get_event_loop() event_loop.run_until_complete(run_app()) return self._return_value async def _on_css_change(self) -> None: """Called when the CSS changes (if watch_css is True).""" if self.css_path is not None: try: time = perf_counter() stylesheet = self.stylesheet.copy() stylesheet.read(self.css_path) stylesheet.parse() elapsed = (perf_counter() - time) * 1000 self.log.system( f"<stylesheet> loaded {self.css_path!r} in {elapsed:.0f} ms" ) except Exception as error: # TODO: Catch specific exceptions self.log.error(error) self.bell() else: self.stylesheet = stylesheet self.reset_styles() self.stylesheet.update(self) self.screen.refresh(layout=True) def render(self) -> RenderableType: return Blank(self.styles.background) def get_child(self, id: str) -> DOMNode: """Shorthand for self.screen.get_child(id: str) Returns the first child (immediate descendent) of this DOMNode with the given ID. Args: id (str): The ID of the node to search for. Returns: DOMNode: The first child of this node with the specified ID. Raises: NoMatches: if no children could be found for this ID """ return self.screen.get_child(id) def update_styles(self, node: DOMNode | None = None) -> None: """Request update of styles. Should be called whenever CSS classes / pseudo classes change. """ self._require_stylesheet_update.add(self.screen if node is None else node) self.check_idle() def mount(self, *anon_widgets: Widget, **widgets: Widget) -> AwaitMount: """Mount widgets. Widgets specified as positional args, or keywords args. If supplied as keyword args they will be assigned an id of the key. Returns: AwaitMount: An awaitable object that waits for widgets to be mounted. """ mounted_widgets = self._register(self.screen, *anon_widgets, **widgets) return AwaitMount(mounted_widgets) def mount_all(self, widgets: Iterable[Widget]) -> AwaitMount: """Mount widgets from an iterable. Args: widgets (Iterable[Widget]): An iterable of widgets. """ mounted_widgets = list(widgets) for widget in mounted_widgets: self._register(self.screen, widget) return AwaitMount(mounted_widgets) def is_screen_installed(self, screen: Screen | str) -> bool: """Check if a given screen has been installed. Args: screen (Screen | str): Either a Screen object or screen name (the `name` argument when installed). Returns: bool: True if the screen is currently installed, """ if isinstance(screen, str): return screen in self._installed_screens else: return screen in self._installed_screens.values() def get_screen(self, screen: Screen | str) -> Screen: """Get an installed screen. If the screen isn't running, it will be registered before it is run. Args: screen (Screen | str): Either a Screen object or screen name (the `name` argument when installed). Raises: KeyError: If the named screen doesn't exist. Returns: Screen: A screen instance. """ if isinstance(screen, str): try: next_screen = self._installed_screens[screen] except KeyError: raise KeyError(f"No screen called {screen!r} installed") from None else: next_screen = screen if not next_screen.is_running: self._register(self, next_screen) return next_screen def _replace_screen(self, screen: Screen) -> Screen: """Handle the replaced screen. Args: screen (Screen): A screen object. Returns: Screen: The screen that was replaced. """ screen.post_message_no_wait(events.ScreenSuspend(self)) self.log.system(f"{screen} SUSPENDED") if not self.is_screen_installed(screen) and screen not in self._screen_stack: screen.remove() self.log.system(f"{screen} REMOVED") return screen def push_screen(self, screen: Screen | str) -> None: """Push a new screen on the screen stack. Args: screen (Screen | str): A Screen instance or the name of an installed screen. """ next_screen = self.get_screen(screen) self._screen_stack.append(next_screen) self.screen.post_message_no_wait(events.ScreenResume(self)) self.log.system(f"{self.screen} is current (PUSHED)") def switch_screen(self, screen: Screen | str) -> None: """Switch to another screen by replacing the top of the screen stack with a new screen. Args: screen (Screen | str): Either a Screen object or screen name (the `name` argument when installed). """ if self.screen is not screen: self._replace_screen(self._screen_stack.pop()) next_screen = self.get_screen(screen) self._screen_stack.append(next_screen) self.screen.post_message_no_wait(events.ScreenResume(self)) self.log.system(f"{self.screen} is current (SWITCHED)") def install_screen(self, screen: Screen, name: str | None = None) -> str: """Install a screen. Args: screen (Screen): Screen to install. name (str | None, optional): Unique name of screen or None to auto-generate. Defaults to None. Raises: ScreenError: If the screen can't be installed. Returns: str: The name of the screen """ if name is None: name = nanoid.generate() if name in self._installed_screens: raise ScreenError(f"Can't install screen; {name!r} is already installed") if screen in self._installed_screens.values(): raise ScreenError( "Can't install screen; {screen!r} has already been installed" ) self._installed_screens[name] = screen self.get_screen(name) # Ensures screen is running self.log.system(f"{screen} INSTALLED name={name!r}") return name def uninstall_screen(self, screen: Screen | str) -> str | None: """Uninstall a screen. If the screen was not previously installed then this method is a null-op. Args: screen (Screen | str): The screen to uninstall or the name of a installed screen. Returns: str | None: The name of the screen that was uninstalled, or None if no screen was uninstalled. """ if isinstance(screen, str): if screen not in self._installed_screens: return None uninstall_screen = self._installed_screens[screen] if uninstall_screen in self._screen_stack: raise ScreenStackError("Can't uninstall screen in screen stack") del self._installed_screens[screen] self.log.system(f"{uninstall_screen} UNINSTALLED name={screen!r}") return screen else: if screen in self._screen_stack: raise ScreenStackError("Can't uninstall screen in screen stack") for name, installed_screen in self._installed_screens.items(): if installed_screen is screen: self._installed_screens.pop(name) self.log.system(f"{screen} UNINSTALLED name={name!r}") return name return None def pop_screen(self) -> Screen: """Pop the current screen from the stack, and switch to the previous screen. Returns: Screen: The screen that was replaced. """ screen_stack = self._screen_stack if len(screen_stack) <= 1: raise ScreenStackError( "Can't pop screen; there must be at least one screen on the stack" ) previous_screen = self._replace_screen(screen_stack.pop()) self.screen._screen_resized(self.size) self.screen.post_message_no_wait(events.ScreenResume(self)) self.log.system(f"{self.screen} is active") return previous_screen def set_focus(self, widget: Widget | None, scroll_visible: bool = True) -> None: """Focus (or unfocus) a widget. A focused widget will receive key events first. Args: widget (Widget): Widget to focus. scroll_visible (bool, optional): Scroll widget in to view. """ self.screen.set_focus(widget, scroll_visible) async def _set_mouse_over(self, widget: Widget | None) -> None: """Called when the mouse is over another widget. Args: widget (Widget | None): Widget under mouse, or None for no widgets. """ if widget is None: if self.mouse_over is not None: try: await self.mouse_over.post_message(events.Leave(self)) finally: self.mouse_over = None else: if self.mouse_over is not widget: try: if self.mouse_over is not None: await self.mouse_over._forward_event(events.Leave(self)) if widget is not None: await widget._forward_event(events.Enter(self)) finally: self.mouse_over = widget def capture_mouse(self, widget: Widget | None) -> None: """Send all mouse events to the given widget, disable mouse capture. Args: widget (Widget | None): If a widget, capture mouse event, or None to end mouse capture. """ if widget == self.mouse_captured: return if self.mouse_captured is not None: self.mouse_captured.post_message_no_wait( events.MouseRelease(self, self.mouse_position) ) self.mouse_captured = widget if widget is not None: widget.post_message_no_wait(events.MouseCapture(self, self.mouse_position)) def panic(self, *renderables: RenderableType) -> None: """Exits the app then displays a message. Args: *renderables (RenderableType, optional): Rich renderables to display on exit. """ assert all( is_renderable(renderable) for renderable in renderables ), "Can only call panic with strings or Rich renderables" def render(renderable: RenderableType) -> list[Segment]: """Render a panic renderables.""" segments = list(self.console.render(renderable, self.console.options)) return segments pre_rendered = [Segments(render(renderable)) for renderable in renderables] self._exit_renderables.extend(pre_rendered) self._close_messages_no_wait() def _handle_exception(self, error: Exception) -> None: """Called with an unhandled exception. Args: error (Exception): An exception instance. """ if hasattr(error, "__rich__"): # Exception has a rich method, so we can defer to that for the rendering self.panic(error) else: # Use default exception rendering self.fatal_error() def fatal_error(self) -> None: """Exits the app after an unhandled exception.""" self.bell() traceback = Traceback( show_locals=True, width=None, locals_max_length=5, suppress=[rich] ) self._exit_renderables.append( Segments(self.console.render(traceback, self.console.options)) ) self._close_messages_no_wait() def _print_error_renderables(self) -> None: for renderable in self._exit_renderables: self.error_console.print(renderable) self._exit_renderables.clear() async def _process_messages( self, ready_callback: CallbackType | None = None ) -> None: self._set_active() if self.devtools is not None: from .devtools.client import DevtoolsConnectionError try: await self.devtools.connect() self.log.system(f"Connected to devtools ( {self.devtools.url} )") except DevtoolsConnectionError: self.log.system(f"Couldn't connect to devtools ( {self.devtools.url} )") self.log.system("---") self.log.system(driver=self.driver_class) self.log.system(loop=asyncio.get_running_loop()) self.log.system(features=self.features) try: if self.css_path is not None: self.stylesheet.read(self.css_path) for path, css, tie_breaker in self.get_default_css(): self.stylesheet.add_source( css, path=path, is_default_css=True, tie_breaker=tie_breaker ) if self.CSS: try: app_css_path = ( f"{inspect.getfile(self.__class__)}:{self.__class__.__name__}" ) except TypeError: app_css_path = f"{self.__class__.__name__}" self.stylesheet.add_source( self.CSS, path=app_css_path, is_default_css=False ) except Exception as error: self._handle_exception(error) self._print_error_renderables() return if self.css_monitor: self.set_interval(0.25, self.css_monitor, name="css monitor") self.log.system("[b green]STARTED[/]", self.css_monitor) async def run_process_messages(): try: await self._dispatch_message(events.Compose(sender=self)) await self._dispatch_message(events.Mount(sender=self)) finally: self._mounted_event.set() Reactive._initialize_object(self) self.stylesheet.update(self) self.refresh() await self.animator.start() await self._ready() if ready_callback is not None: await ready_callback() self._running = True try: await self._process_messages_loop() except asyncio.CancelledError: pass finally: self._running = False for timer in list(self._timers): await timer.stop() await self.animator.stop() await self._close_all() self._running = True try: load_event = events.Load(sender=self) await self._dispatch_message(load_event) driver: Driver driver_class = cast( "type[Driver]", HeadlessDriver if self.is_headless else self.driver_class, ) driver = self._driver = driver_class(self.console, self) driver.start_application_mode() try: if self.is_headless: await run_process_messages() else: if self.devtools is not None: devtools = self.devtools assert devtools is not None from .devtools.redirect_output import StdoutRedirector redirector = StdoutRedirector(devtools) with redirect_stderr(redirector): with redirect_stdout(redirector): # type: ignore await run_process_messages() else: null_file = _NullFile() with redirect_stderr(null_file): with redirect_stdout(null_file): await run_process_messages() finally: driver.stop_application_mode() except Exception as error: self._handle_exception(error) finally: self._running = False self._print_error_renderables() if self.devtools is not None and self.devtools.is_connected: await self._disconnect_devtools() async def _pre_process(self) -> None: pass async def _ready(self) -> None: """Called immediately prior to processing messages. May be used as a hook for any operations that should run first. """ try: screenshot_timer = float(os.environ.get("TEXTUAL_SCREENSHOT", "0")) except ValueError: return screenshot_title = os.environ.get("TEXTUAL_SCREENSHOT_TITLE") if not screenshot_timer: return async def on_screenshot(): """Used by docs plugin.""" svg = self.export_screenshot(title=screenshot_title) self._screenshot = svg # type: ignore await self.shutdown() self.set_timer(screenshot_timer, on_screenshot, name="screenshot timer") async def _on_compose(self) -> None: widgets = list(self.compose()) await self.mount_all(widgets) def _on_idle(self) -> None: """Perform actions when there are no messages in the queue.""" if self._require_stylesheet_update: nodes: set[DOMNode] = { child for node in self._require_stylesheet_update for child in node.walk_children() } self._require_stylesheet_update.clear() self.stylesheet.update_nodes(nodes, animate=True) def _register_child(self, parent: DOMNode, child: Widget) -> bool: if child not in self._registry: parent.children._append(child) self._registry.add(child) child._attach(parent) child._post_register(self) child._start_messages() return True return False def _register( self, parent: DOMNode, *anon_widgets: Widget, **widgets: Widget ) -> list[Widget]: """Register widget(s) so they may receive events. Args: parent (Widget): Parent Widget. Returns: list[Widget]: List of modified widgets. """ if not anon_widgets and not widgets: return [] name_widgets: list[tuple[str | None, Widget]] name_widgets = [*((None, widget) for widget in anon_widgets), *widgets.items()] apply_stylesheet = self.stylesheet.apply for widget_id, widget in name_widgets: if not isinstance(widget, Widget): raise AppError(f"Can't register {widget!r}; expected a Widget instance") if widget not in self._registry: if widget_id is not None: widget.id = widget_id self._register_child(parent, widget) if widget.children: self._register(widget, *widget.children) apply_stylesheet(widget) registered_widgets = [widget for _, widget in name_widgets] return registered_widgets def _unregister(self, widget: Widget) -> None: """Unregister a widget. Args: widget (Widget): A Widget to unregister """ widget.reset_focus() if isinstance(widget._parent, Widget): widget._parent.children._remove(widget) widget._detach() self._registry.discard(widget) async def _disconnect_devtools(self): if self.devtools is not None: await self.devtools.disconnect() def _start_widget(self, parent: Widget, widget: Widget) -> None: """Start a widget (run it's task) so that it can receive messages. Args: parent (Widget): The parent of the Widget. widget (Widget): The Widget to start. """ widget._attach(parent) widget._start_messages() def is_mounted(self, widget: Widget) -> bool: """Check if a widget is mounted. Args: widget (Widget): A widget. Returns: bool: True of the widget is mounted. """ return widget in self._registry async def _close_all(self) -> None: while self._registry: child = self._registry.pop() await child._close_messages() async def shutdown(self): await self._disconnect_devtools() driver = self._driver if driver is not None: driver.disable_input() await self._close_messages() def refresh(self, *, repaint: bool = True, layout: bool = False) -> None: if self._screen_stack: self.screen.refresh(repaint=repaint, layout=layout) self.check_idle() def refresh_css(self, animate: bool = True) -> None: """Refresh CSS. Args: animate (bool, optional): Also execute CSS animations. Defaults to True. """ stylesheet = self.app.stylesheet stylesheet.set_variables(self.get_css_variables()) stylesheet.reparse() stylesheet.update(self.app, animate=animate) self.screen._refresh_layout(self.size, full=True) def _display(self, screen: Screen, renderable: RenderableType | None) -> None: """Display a renderable within a sync. Args: screen (Screen): Screen instance renderable (RenderableType): A Rich renderable. """ if screen is not self.screen or renderable is None: return if self._running and not self._closed and not self.is_headless: console = self.console self._begin_update() try: try: console.print(renderable) except Exception as error: self._handle_exception(error) finally: self._end_update() console.file.flush() def get_widget_at(self, x: int, y: int) -> tuple[Widget, Region]: """Get the widget under the given coordinates. Args: x (int): X Coord. y (int): Y Coord. Returns: tuple[Widget, Region]: The widget and the widget's screen region. """ return self.screen.get_widget_at(x, y) def bell(self) -> None: """Play the console 'bell'.""" if not self.is_headless: self.console.bell() @property def _binding_chain(self) -> list[tuple[DOMNode, Bindings]]: """Get a chain of nodes and bindings to consider. If no widget is focused, returns the bindings from both the screen and the app level bindings. Otherwise, combines all the bindings from the currently focused node up the DOM to the root App. Returns: list[tuple[DOMNode, Bindings]]: List of DOM nodes and their bindings. """ focused = self.focused namespace_bindings: list[tuple[DOMNode, Bindings]] if focused is None: namespace_bindings = [ (self.screen, self.screen._bindings), (self, self._bindings), ] else: namespace_bindings = [(node, node._bindings) for node in focused.ancestors] return namespace_bindings async def check_bindings(self, key: str, universal: bool = False) -> bool: """Handle a key press. Args: key (str): A key universal (bool): Check universal keys if True, otherwise non-universal keys. Returns: bool: True if the key was handled by a binding, otherwise False """ for namespace, bindings in self._binding_chain: binding = bindings.keys.get(key) if binding is not None and binding.universal == universal: await self.action(binding.action, default_namespace=namespace) return True return False async def on_event(self, event: events.Event) -> None: # Handle input events that haven't been forwarded # If the event has been forwarded it may have bubbled up back to the App if isinstance(event, events.Compose): screen = Screen(id="_default") self._register(self, screen) self._screen_stack.append(screen) await super().on_event(event) elif isinstance(event, events.InputEvent) and not event.is_forwarded: if isinstance(event, events.MouseEvent): # Record current mouse position on App self.mouse_position = Offset(event.x, event.y) await self.screen._forward_event(event) elif isinstance(event, events.Key): if not await self.check_bindings(event.key, universal=True): forward_target = self.focused or self.screen await forward_target._forward_event(event) else: await self.screen._forward_event(event) elif isinstance(event, events.Paste): if self.focused is not None: await self.focused._forward_event(event) else: await super().on_event(event) async def action( self, action: str | tuple[str, tuple[str, ...]], default_namespace: object | None = None, ) -> bool: """Perform an action. Args: action (str): Action encoded in a string. default_namespace (object | None): Namespace to use if not provided in the action, or None to use app. Defaults to None. Returns: bool: True if the event has handled. """ print("ACTION", action, default_namespace) if isinstance(action, str): target, params = actions.parse(action) else: target, params = action implicit_destination = True if "." in target: destination, action_name = target.split(".", 1) if destination not in self._action_targets: raise ActionError(f"Action namespace {destination} is not known") action_target = getattr(self, destination) implicit_destination = True else: action_target = default_namespace or self action_name = target handled = await self._dispatch_action(action_target, action_name, params) if not handled and implicit_destination and not isinstance(action_target, App): handled = await self.app._dispatch_action(self.app, action_name, params) return handled async def _dispatch_action( self, namespace: object, action_name: str, params: Any ) -> bool: log( "<action>", namespace=namespace, action_name=action_name, params=params, ) _rich_traceback_guard = True public_method_name = f"action_{action_name}" private_method_name = f"_{public_method_name}" private_method = getattr(namespace, private_method_name, None) public_method = getattr(namespace, public_method_name, None) if private_method is None and public_method is None: log( f"<action> {action_name!r} has no target. Couldn't find methods {public_method_name!r} or {private_method_name!r}" ) if callable(private_method): await invoke(private_method, *params) return True elif callable(public_method): await invoke(public_method, *params) return True return False async def _broker_event( self, event_name: str, event: events.Event, default_namespace: object | None ) -> bool: """Allow the app an opportunity to dispatch events to action system. Args: event_name (str): _description_ event (events.Event): An event object. default_namespace (object | None): TODO: _description_ Returns: bool: True if an action was processed. """ try: style = getattr(event, "style") except AttributeError: return False try: _modifiers, action = extract_handler_actions(event_name, style.meta) except NoHandler: return False else: event.stop() if isinstance(action, (str, tuple)): await self.action(action, default_namespace=default_namespace) elif callable(action): await action() else: return False return True async def _on_update(self, message: messages.Update) -> None: message.stop() async def _on_layout(self, message: messages.Layout) -> None: message.stop() async def _on_key(self, event: events.Key) -> None: if event.key == "tab": self.screen.focus_next() elif event.key == "shift+tab": self.screen.focus_previous() else: if not (await self.check_bindings(event.key)): await self.dispatch_key(event) async def _on_shutdown_request(self, event: events.ShutdownRequest) -> None: log("shutdown request") await self._close_messages() async def _on_resize(self, event: events.Resize) -> None: event.stop() await self.screen.post_message(event) async def _on_remove(self, event: events.Remove) -> None: widget = event.widget parent = widget.parent remove_widgets = widget.walk_children( Widget, with_self=True, method="depth", reverse=True ) if self.screen.focused in remove_widgets: self.screen._reset_focus( self.screen.focused, [to_remove for to_remove in remove_widgets if to_remove.can_focus], ) for child in remove_widgets: await child._close_messages() self._unregister(child) if parent is not None: parent.refresh(layout=True) async def action_check_bindings(self, key: str) -> None: await self.check_bindings(key) async def action_quit(self) -> None: """Quit the app as soon as possible.""" await self.shutdown() async def action_bang(self) -> None: 1 / 0 async def action_bell(self) -> None: """Play the terminal 'bell'.""" self.bell() async def action_focus(self, widget_id: str) -> None: """Focus the given widget. Args: widget_id (str): ID of widget to focus. """ try: node = self.query(f"#{widget_id}").first() except NoMatches: pass else: if isinstance(node, Widget): self.set_focus(node) async def action_switch_screen(self, screen: str) -> None: """Switches to another screen. Args: screen (str): Name of the screen. """ self.switch_screen(screen) async def action_push_screen(self, screen: str) -> None: """Pushes a screen on to the screen stack and makes it active. Args: screen (str): Name of the screen. """ self.push_screen(screen) async def action_pop_screen(self) -> None: """Removes the topmost screen and makes the new topmost screen active.""" self.pop_screen() async def action_back(self) -> None: try: self.pop_screen() except ScreenStackError: pass async def action_add_class_(self, selector: str, class_name: str) -> None: self.screen.query(selector).add_class(class_name) async def action_remove_class_(self, selector: str, class_name: str) -> None: self.screen.query(selector).remove_class(class_name) async def action_toggle_class(self, selector: str, class_name: str) -> None: self.screen.query(selector).toggle_class(class_name) def _on_terminal_supports_synchronized_output( self, message: messages.TerminalSupportsSynchronizedOutput ) -> None: log.system("[b green]SynchronizedOutput mode is supported") self._sync_available = True def _begin_update(self) -> None: if self._sync_available: self.console.file.write(SYNC_START) def _end_update(self) -> None: if self._sync_available: self.console.file.write(SYNC_END)
Ancestors
- typing.Generic
- DOMNode
- MessagePump
Subclasses
Class variables
var CSS
var CSS_PATH : CSSPathType
var DEFAULT_CSS
var SCREENS : dict[str, Screen]
var SUB_TITLE : str | None
var TITLE : str | None
Instance variables
var animator : textual._animator.Animator
-
Expand source code
@property def animator(self) -> Animator: return self._animator
var dark : ReactiveType
-
Reactive descriptor.
Args
- default (ReactiveType | Callable[[], ReactiveType]): A default value or callable that returns a default.
layout
:bool
, optional- Perform a layout on change. Defaults to False.
repaint
:bool
, optional- Perform a repaint on change. Defaults to True.
init
:bool
, optional- Call watchers on initialize (post mount). Defaults to False.
Expand source code
def __get__(self, obj: Reactable, obj_type: type[object]) -> ReactiveType: value: _NotSet | ReactiveType = getattr(obj, self.internal_name, _NOT_SET) if isinstance(value, _NotSet): # No value present, we need to set the default init_name = f"_default_{self.name}" default = getattr(obj, init_name) default_value = default() if callable(default) else default # Set and return the value setattr(obj, self.internal_name, default_value) if self._init: self._check_watchers(obj, self.name, default_value, first_set=True) return default_value return value
var debug : bool
-
Check if debug mode is enabled.
Returns
bool
- True if debug mode is enabled.
Expand source code
@property def debug(self) -> bool: """Check if debug mode is enabled. Returns: bool: True if debug mode is enabled. """ return "debug" in self.features
var focused : Widget | None
-
Get the widget that is focused on the currently active screen.
Expand source code
@property def focused(self) -> Widget | None: """Get the widget that is focused on the currently active screen.""" return self.screen.focused
var is_headless : bool
-
Check if the app is running in 'headless' mode.
Returns
bool
- True if the app is in headless mode.
Expand source code
@property def is_headless(self) -> bool: """Check if the app is running in 'headless' mode. Returns: bool: True if the app is in headless mode. """ return "headless" in self.features
var is_transparent : bool
-
Expand source code
@property def is_transparent(self) -> bool: return True
var namespace_bindings : dict[str, tuple[DOMNode, Binding]]
-
Get current bindings. If no widget is focused, then the app-level bindings are returned. If a widget is focused, then any bindings present in the active screen and app are merged and returned.
Expand source code
@property def namespace_bindings(self) -> dict[str, tuple[DOMNode, Binding]]: """Get current bindings. If no widget is focused, then the app-level bindings are returned. If a widget is focused, then any bindings present in the active screen and app are merged and returned.""" namespace_binding_map: dict[str, tuple[DOMNode, Binding]] = {} for namespace, bindings in reversed(self._binding_chain): for key, binding in bindings.keys.items(): namespace_binding_map[key] = (namespace, binding) return namespace_binding_map
var screen : Screen
-
Get the current screen.
Raises
ScreenStackError
- If there are no screens on the stack.
Returns
Screen
- The currently active screen.
Expand source code
@property def screen(self) -> Screen: """Get the current screen. Raises: ScreenStackError: If there are no screens on the stack. Returns: Screen: The currently active screen. """ try: return self._screen_stack[-1] except IndexError: raise ScreenStackError("No screens on stack") from None
var screen_stack : list[Screen]
-
Get a copy of the screen stack.
Returns
list[Screen]
- List of screens.
Expand source code
@property def screen_stack(self) -> list[Screen]: """Get a *copy* of the screen stack. Returns: list[Screen]: List of screens. """ return self._screen_stack.copy()
var size : Size
-
Get the size of the terminal.
Returns
Size
- Size of the terminal
Expand source code
@property def size(self) -> Size: """Get the size of the terminal. Returns: Size: Size of the terminal """ return Size(*self.console.size)
var sub_title : ReactiveType
-
Reactive descriptor.
Args
- default (ReactiveType | Callable[[], ReactiveType]): A default value or callable that returns a default.
layout
:bool
, optional- Perform a layout on change. Defaults to False.
repaint
:bool
, optional- Perform a repaint on change. Defaults to True.
init
:bool
, optional- Call watchers on initialize (post mount). Defaults to False.
Expand source code
def __get__(self, obj: Reactable, obj_type: type[object]) -> ReactiveType: value: _NotSet | ReactiveType = getattr(obj, self.internal_name, _NOT_SET) if isinstance(value, _NotSet): # No value present, we need to set the default init_name = f"_default_{self.name}" default = getattr(obj, init_name) default_value = default() if callable(default) else default # Set and return the value setattr(obj, self.internal_name, default_value) if self._init: self._check_watchers(obj, self.name, default_value, first_set=True) return default_value return value
var title : ReactiveType
-
Reactive descriptor.
Args
- default (ReactiveType | Callable[[], ReactiveType]): A default value or callable that returns a default.
layout
:bool
, optional- Perform a layout on change. Defaults to False.
repaint
:bool
, optional- Perform a repaint on change. Defaults to True.
init
:bool
, optional- Call watchers on initialize (post mount). Defaults to False.
Expand source code
def __get__(self, obj: Reactable, obj_type: type[object]) -> ReactiveType: value: _NotSet | ReactiveType = getattr(obj, self.internal_name, _NOT_SET) if isinstance(value, _NotSet): # No value present, we need to set the default init_name = f"_default_{self.name}" default = getattr(obj, init_name) default_value = default() if callable(default) else default # Set and return the value setattr(obj, self.internal_name, default_value) if self._init: self._check_watchers(obj, self.name, default_value, first_set=True) return default_value return value
Methods
async def action(self, action: str | tuple[str, tuple[str, ...]], default_namespace: object | None = None) ‑> bool
-
Perform an action.
Args
action
:str
- Action encoded in a string.
default_namespace (object | None): Namespace to use if not provided in the action, or None to use app. Defaults to None.
Returns
bool
- True if the event has handled.
Expand source code
async def action( self, action: str | tuple[str, tuple[str, ...]], default_namespace: object | None = None, ) -> bool: """Perform an action. Args: action (str): Action encoded in a string. default_namespace (object | None): Namespace to use if not provided in the action, or None to use app. Defaults to None. Returns: bool: True if the event has handled. """ print("ACTION", action, default_namespace) if isinstance(action, str): target, params = actions.parse(action) else: target, params = action implicit_destination = True if "." in target: destination, action_name = target.split(".", 1) if destination not in self._action_targets: raise ActionError(f"Action namespace {destination} is not known") action_target = getattr(self, destination) implicit_destination = True else: action_target = default_namespace or self action_name = target handled = await self._dispatch_action(action_target, action_name, params) if not handled and implicit_destination and not isinstance(action_target, App): handled = await self.app._dispatch_action(self.app, action_name, params) return handled
async def action_add_class_(self, selector: str, class_name: str) ‑> None
-
Expand source code
async def action_add_class_(self, selector: str, class_name: str) -> None: self.screen.query(selector).add_class(class_name)
async def action_back(self) ‑> None
-
Expand source code
async def action_back(self) -> None: try: self.pop_screen() except ScreenStackError: pass
async def action_bang(self) ‑> None
-
Expand source code
async def action_bang(self) -> None: 1 / 0
async def action_bell(self) ‑> None
-
Play the terminal 'bell'.
Expand source code
async def action_bell(self) -> None: """Play the terminal 'bell'.""" self.bell()
async def action_check_bindings(self, key: str) ‑> None
-
Expand source code
async def action_check_bindings(self, key: str) -> None: await self.check_bindings(key)
async def action_focus(self, widget_id: str) ‑> None
-
Focus the given widget.
Args
widget_id
:str
- ID of widget to focus.
Expand source code
async def action_focus(self, widget_id: str) -> None: """Focus the given widget. Args: widget_id (str): ID of widget to focus. """ try: node = self.query(f"#{widget_id}").first() except NoMatches: pass else: if isinstance(node, Widget): self.set_focus(node)
async def action_pop_screen(self) ‑> None
-
Removes the topmost screen and makes the new topmost screen active.
Expand source code
async def action_pop_screen(self) -> None: """Removes the topmost screen and makes the new topmost screen active.""" self.pop_screen()
async def action_push_screen(self, screen: str) ‑> None
-
Pushes a screen on to the screen stack and makes it active.
Args
screen
:str
- Name of the screen.
Expand source code
async def action_push_screen(self, screen: str) -> None: """Pushes a screen on to the screen stack and makes it active. Args: screen (str): Name of the screen. """ self.push_screen(screen)
async def action_quit(self) ‑> None
-
Quit the app as soon as possible.
Expand source code
async def action_quit(self) -> None: """Quit the app as soon as possible.""" await self.shutdown()
async def action_remove_class_(self, selector: str, class_name: str) ‑> None
-
Expand source code
async def action_remove_class_(self, selector: str, class_name: str) -> None: self.screen.query(selector).remove_class(class_name)
def action_screenshot(self, filename: str | None = None, path: str = './') ‑> None
-
Save an SVG "screenshot". This action will save an SVG file containing the current contents of the screen.
Args
- filename (str | None, optional): Filename of screenshot, or None to auto-generate. Defaults to None.
path
:str
, optional- Path to directory. Defaults to "~/".
Expand source code
def action_screenshot(self, filename: str | None = None, path: str = "./") -> None: """Save an SVG "screenshot". This action will save an SVG file containing the current contents of the screen. Args: filename (str | None, optional): Filename of screenshot, or None to auto-generate. Defaults to None. path (str, optional): Path to directory. Defaults to "~/". """ self.save_screenshot(filename, path)
async def action_switch_screen(self, screen: str) ‑> None
-
Switches to another screen.
Args
screen
:str
- Name of the screen.
Expand source code
async def action_switch_screen(self, screen: str) -> None: """Switches to another screen. Args: screen (str): Name of the screen. """ self.switch_screen(screen)
async def action_toggle_class(self, selector: str, class_name: str) ‑> None
-
Expand source code
async def action_toggle_class(self, selector: str, class_name: str) -> None: self.screen.query(selector).toggle_class(class_name)
def action_toggle_dark(self) ‑> None
-
Action to toggle dark mode.
Expand source code
def action_toggle_dark(self) -> None: """Action to toggle dark mode.""" self.dark = not self.dark
def animate(self, attribute: str, value: float | Animatable, *, final_value: object = Ellipsis, duration: float | None = None, speed: float | None = None, delay: float = 0.0, easing: EasingFunction | str = 'in_out_cubic', on_complete: CallbackType | None = None) ‑> None
-
Animate an attribute.
Args
attribute
:str
- Name of the attribute to animate.
- value (float | Animatable): The value to animate to.
final_value
:object
, optional- The final value of the animation. Defaults to
value
if not set. - duration (float | None, optional): The duration of the animate. Defaults to None.
- speed (float | None, optional): The speed of the animation. Defaults to None.
delay
:float
, optional- A delay (in seconds) before the animation starts. Defaults to 0.0.
easing (EasingFunction | str, optional): An easing method. Defaults to "in_out_cubic". on_complete (CallbackType | None, optional): A callable to invoke when the animation is finished. Defaults to None.
Expand source code
def animate( self, attribute: str, value: float | Animatable, *, final_value: object = ..., duration: float | None = None, speed: float | None = None, delay: float = 0.0, easing: EasingFunction | str = DEFAULT_EASING, on_complete: CallbackType | None = None, ) -> None: """Animate an attribute. Args: attribute (str): Name of the attribute to animate. value (float | Animatable): The value to animate to. final_value (object, optional): The final value of the animation. Defaults to `value` if not set. duration (float | None, optional): The duration of the animate. Defaults to None. speed (float | None, optional): The speed of the animation. Defaults to None. delay (float, optional): A delay (in seconds) before the animation starts. Defaults to 0.0. easing (EasingFunction | str, optional): An easing method. Defaults to "in_out_cubic". on_complete (CallbackType | None, optional): A callable to invoke when the animation is finished. Defaults to None. """ self._animate( attribute, value, final_value=final_value, duration=duration, speed=speed, delay=delay, easing=easing, on_complete=on_complete, )
def bell(self) ‑> None
-
Play the console 'bell'.
Expand source code
def bell(self) -> None: """Play the console 'bell'.""" if not self.is_headless: self.console.bell()
def bind(self, keys: str, action: str, *, description: str = '', show: bool = True, key_display: str | None = None) ‑> None
-
Bind a key to an action.
Args
keys
:str
- A comma separated list of keys, i.e.
action
:str
- Action to bind to.
description
:str
, optional- Short description of action. Defaults to "".
show
:bool
, optional- Show key in UI. Defaults to True.
key_display
:str
, optional- Replacement text for key, or None to use default. Defaults to None.
Expand source code
def bind( self, keys: str, action: str, *, description: str = "", show: bool = True, key_display: str | None = None, ) -> None: """Bind a key to an action. Args: keys (str): A comma separated list of keys, i.e. action (str): Action to bind to. description (str, optional): Short description of action. Defaults to "". show (bool, optional): Show key in UI. Defaults to True. key_display (str, optional): Replacement text for key, or None to use default. Defaults to None. """ self._bindings.bind( keys, action, description, show=show, key_display=key_display )
def capture_mouse(self, widget: Widget | None) ‑> None
-
Send all mouse events to the given widget, disable mouse capture.
Args
widget (Widget | None): If a widget, capture mouse event, or None to end mouse capture.
Expand source code
def capture_mouse(self, widget: Widget | None) -> None: """Send all mouse events to the given widget, disable mouse capture. Args: widget (Widget | None): If a widget, capture mouse event, or None to end mouse capture. """ if widget == self.mouse_captured: return if self.mouse_captured is not None: self.mouse_captured.post_message_no_wait( events.MouseRelease(self, self.mouse_position) ) self.mouse_captured = widget if widget is not None: widget.post_message_no_wait(events.MouseCapture(self, self.mouse_position))
async def check_bindings(self, key: str, universal: bool = False) ‑> bool
-
Handle a key press.
Args
key
:str
- A key
universal
:bool
- Check universal keys if True, otherwise non-universal keys.
Returns
bool
- True if the key was handled by a binding, otherwise False
Expand source code
async def check_bindings(self, key: str, universal: bool = False) -> bool: """Handle a key press. Args: key (str): A key universal (bool): Check universal keys if True, otherwise non-universal keys. Returns: bool: True if the key was handled by a binding, otherwise False """ for namespace, bindings in self._binding_chain: binding = bindings.keys.get(key) if binding is not None and binding.universal == universal: await self.action(binding.action, default_namespace=namespace) return True return False
def compose(self) ‑> Iterable[Widget]
-
Yield child widgets for a container.
Expand source code
def compose(self) -> ComposeResult: """Yield child widgets for a container.""" return yield
def exit(self, result: ReturnType | None = None) ‑> None
-
Exit the app, and return the supplied result.
Args
result (ReturnType | None, optional): Return value. Defaults to None.
Expand source code
def exit(self, result: ReturnType | None = None) -> None: """Exit the app, and return the supplied result. Args: result (ReturnType | None, optional): Return value. Defaults to None. """ self._return_value = result self._close_messages_no_wait()
def export_screenshot(self, *, title: str | None = None) ‑> str
-
Export an SVG screenshot of the current screen.
Args
title (str | None, optional): The title of the exported screenshot or None to use app title. Defaults to None.
Expand source code
def export_screenshot(self, *, title: str | None = None) -> str: """Export an SVG screenshot of the current screen. Args: title (str | None, optional): The title of the exported screenshot or None to use app title. Defaults to None. """ console = Console( width=self.console.width, height=self.console.height, file=io.StringIO(), force_terminal=True, color_system="truecolor", record=True, legacy_windows=False, ) screen_render = self.screen._compositor.render(full=True) console.print(screen_render) return console.export_svg(title=title or self.title)
def fatal_error(self) ‑> None
-
Exits the app after an unhandled exception.
Expand source code
def fatal_error(self) -> None: """Exits the app after an unhandled exception.""" self.bell() traceback = Traceback( show_locals=True, width=None, locals_max_length=5, suppress=[rich] ) self._exit_renderables.append( Segments(self.console.render(traceback, self.console.options)) ) self._close_messages_no_wait()
def get_child(self, id: str) ‑> DOMNode
-
Shorthand for self.screen.get_child(id: str) Returns the first child (immediate descendent) of this DOMNode with the given ID.
Args
id
:str
- The ID of the node to search for.
Returns
DOMNode
- The first child of this node with the specified ID.
Raises
NoMatches
- if no children could be found for this ID
Expand source code
def get_child(self, id: str) -> DOMNode: """Shorthand for self.screen.get_child(id: str) Returns the first child (immediate descendent) of this DOMNode with the given ID. Args: id (str): The ID of the node to search for. Returns: DOMNode: The first child of this node with the specified ID. Raises: NoMatches: if no children could be found for this ID """ return self.screen.get_child(id)
def get_css_variables(self) ‑> dict[str, str]
-
Get a mapping of variables used to pre-populate CSS.
Returns
dict[str, str]
- A mapping of variable name to value.
Expand source code
def get_css_variables(self) -> dict[str, str]: """Get a mapping of variables used to pre-populate CSS. Returns: dict[str, str]: A mapping of variable name to value. """ variables = self.design["dark" if self.dark else "light"].generate() return variables
def get_driver_class(self) ‑> Type[Driver]
-
Get a driver class for this platform.
Called by the constructor.
Returns
Driver
- A Driver class which manages input and display.
Expand source code
def get_driver_class(self) -> Type[Driver]: """Get a driver class for this platform. Called by the constructor. Returns: Driver: A Driver class which manages input and display. """ driver_class: Type[Driver] if WINDOWS: from .drivers.windows_driver import WindowsDriver driver_class = WindowsDriver else: from .drivers.linux_driver import LinuxDriver driver_class = LinuxDriver return driver_class
def get_screen(self, screen: Screen | str) ‑> Screen
-
Get an installed screen.
If the screen isn't running, it will be registered before it is run.
Args
screen (Screen | str): Either a Screen object or screen name (the
name
argument when installed).Raises
KeyError
- If the named screen doesn't exist.
Returns
Screen
- A screen instance.
Expand source code
def get_screen(self, screen: Screen | str) -> Screen: """Get an installed screen. If the screen isn't running, it will be registered before it is run. Args: screen (Screen | str): Either a Screen object or screen name (the `name` argument when installed). Raises: KeyError: If the named screen doesn't exist. Returns: Screen: A screen instance. """ if isinstance(screen, str): try: next_screen = self._installed_screens[screen] except KeyError: raise KeyError(f"No screen called {screen!r} installed") from None else: next_screen = screen if not next_screen.is_running: self._register(self, next_screen) return next_screen
def get_widget_at(self, x: int, y: int) ‑> tuple[Widget, Region]
-
Get the widget under the given coordinates.
Args
x
:int
- X Coord.
y
:int
- Y Coord.
Returns
tuple[Widget, Region]
- The widget and the widget's screen region.
Expand source code
def get_widget_at(self, x: int, y: int) -> tuple[Widget, Region]: """Get the widget under the given coordinates. Args: x (int): X Coord. y (int): Y Coord. Returns: tuple[Widget, Region]: The widget and the widget's screen region. """ return self.screen.get_widget_at(x, y)
def install_screen(self, screen: Screen, name: str | None = None) ‑> str
-
Install a screen.
Args
screen
:Screen
- Screen to install.
name (str | None, optional): Unique name of screen or None to auto-generate. Defaults to None.
Raises
ScreenError
- If the screen can't be installed.
Returns
str
- The name of the screen
Expand source code
def install_screen(self, screen: Screen, name: str | None = None) -> str: """Install a screen. Args: screen (Screen): Screen to install. name (str | None, optional): Unique name of screen or None to auto-generate. Defaults to None. Raises: ScreenError: If the screen can't be installed. Returns: str: The name of the screen """ if name is None: name = nanoid.generate() if name in self._installed_screens: raise ScreenError(f"Can't install screen; {name!r} is already installed") if screen in self._installed_screens.values(): raise ScreenError( "Can't install screen; {screen!r} has already been installed" ) self._installed_screens[name] = screen self.get_screen(name) # Ensures screen is running self.log.system(f"{screen} INSTALLED name={name!r}") return name
def is_mounted(self, widget: Widget) ‑> bool
-
Check if a widget is mounted.
Args
widget
:Widget
- A widget.
Returns
bool
- True of the widget is mounted.
Expand source code
def is_mounted(self, widget: Widget) -> bool: """Check if a widget is mounted. Args: widget (Widget): A widget. Returns: bool: True of the widget is mounted. """ return widget in self._registry
def is_screen_installed(self, screen: Screen | str) ‑> bool
-
Check if a given screen has been installed.
Args
screen (Screen | str): Either a Screen object or screen name (the
name
argument when installed).Returns
bool
- True if the screen is currently installed,
Expand source code
def is_screen_installed(self, screen: Screen | str) -> bool: """Check if a given screen has been installed. Args: screen (Screen | str): Either a Screen object or screen name (the `name` argument when installed). Returns: bool: True if the screen is currently installed, """ if isinstance(screen, str): return screen in self._installed_screens else: return screen in self._installed_screens.values()
def mount(self, *anon_widgets: Widget, **widgets: Widget) ‑> AwaitMount
-
Mount widgets. Widgets specified as positional args, or keywords args. If supplied as keyword args they will be assigned an id of the key.
Returns
AwaitMount
- An awaitable object that waits for widgets to be mounted.
Expand source code
def mount(self, *anon_widgets: Widget, **widgets: Widget) -> AwaitMount: """Mount widgets. Widgets specified as positional args, or keywords args. If supplied as keyword args they will be assigned an id of the key. Returns: AwaitMount: An awaitable object that waits for widgets to be mounted. """ mounted_widgets = self._register(self.screen, *anon_widgets, **widgets) return AwaitMount(mounted_widgets)
def mount_all(self, widgets: Iterable[Widget]) ‑> AwaitMount
-
Mount widgets from an iterable.
Args
widgets
:Iterable[Widget]
- An iterable of widgets.
Expand source code
def mount_all(self, widgets: Iterable[Widget]) -> AwaitMount: """Mount widgets from an iterable. Args: widgets (Iterable[Widget]): An iterable of widgets. """ mounted_widgets = list(widgets) for widget in mounted_widgets: self._register(self.screen, widget) return AwaitMount(mounted_widgets)
def panic(self, *renderables: RenderableType) ‑> None
-
Exits the app then displays a message.
Args
*renderables
:RenderableType
, optional- Rich renderables to display on exit.
Expand source code
def panic(self, *renderables: RenderableType) -> None: """Exits the app then displays a message. Args: *renderables (RenderableType, optional): Rich renderables to display on exit. """ assert all( is_renderable(renderable) for renderable in renderables ), "Can only call panic with strings or Rich renderables" def render(renderable: RenderableType) -> list[Segment]: """Render a panic renderables.""" segments = list(self.console.render(renderable, self.console.options)) return segments pre_rendered = [Segments(render(renderable)) for renderable in renderables] self._exit_renderables.extend(pre_rendered) self._close_messages_no_wait()
def pop_screen(self) ‑> Screen
-
Pop the current screen from the stack, and switch to the previous screen.
Returns
Screen
- The screen that was replaced.
Expand source code
def pop_screen(self) -> Screen: """Pop the current screen from the stack, and switch to the previous screen. Returns: Screen: The screen that was replaced. """ screen_stack = self._screen_stack if len(screen_stack) <= 1: raise ScreenStackError( "Can't pop screen; there must be at least one screen on the stack" ) previous_screen = self._replace_screen(screen_stack.pop()) self.screen._screen_resized(self.size) self.screen.post_message_no_wait(events.ScreenResume(self)) self.log.system(f"{self.screen} is active") return previous_screen
def push_screen(self, screen: Screen | str) ‑> None
-
Push a new screen on the screen stack.
Args
screen (Screen | str): A Screen instance or the name of an installed screen.
Expand source code
def push_screen(self, screen: Screen | str) -> None: """Push a new screen on the screen stack. Args: screen (Screen | str): A Screen instance or the name of an installed screen. """ next_screen = self.get_screen(screen) self._screen_stack.append(next_screen) self.screen.post_message_no_wait(events.ScreenResume(self)) self.log.system(f"{self.screen} is current (PUSHED)")
def refresh(self, *, repaint: bool = True, layout: bool = False) ‑> None
-
Expand source code
def refresh(self, *, repaint: bool = True, layout: bool = False) -> None: if self._screen_stack: self.screen.refresh(repaint=repaint, layout=layout) self.check_idle()
def refresh_css(self, animate: bool = True) ‑> None
-
Refresh CSS.
Args
animate
:bool
, optional- Also execute CSS animations. Defaults to True.
Expand source code
def refresh_css(self, animate: bool = True) -> None: """Refresh CSS. Args: animate (bool, optional): Also execute CSS animations. Defaults to True. """ stylesheet = self.app.stylesheet stylesheet.set_variables(self.get_css_variables()) stylesheet.reparse() stylesheet.update(self.app, animate=animate) self.screen._refresh_layout(self.size, full=True)
def render(self) ‑> Union[rich.console.ConsoleRenderable, rich.console.RichCast, str]
-
Expand source code
def render(self) -> RenderableType: return Blank(self.styles.background)
def run(self, *, quit_after: float | None = None, headless: bool = False, press: Iterable[str] | None = None, screenshot: bool = False, screenshot_title: str | None = None) ‑> ReturnType | None
-
The main entry point for apps.
Args
- quit_after (float | None, optional): Quit after a given number of seconds, or None
- to run forever. Defaults to None.
headless
:bool
, optional- Run in "headless" mode (don't write to stdout).
press
:str
, optional- An iterable of keys to simulate being pressed.
screenshot
:bool
, optional- Take a screenshot after pressing keys (svg data stored in self._screenshot). Defaults to False.
screenshot_title (str | None, optional): Title of screenshot, or None to use App title. Defaults to None.
Returns
ReturnType | None: The return value specified in
App.exit()
or None if exit wasn't called.Expand source code
def run( self, *, quit_after: float | None = None, headless: bool = False, press: Iterable[str] | None = None, screenshot: bool = False, screenshot_title: str | None = None, ) -> ReturnType | None: """The main entry point for apps. Args: quit_after (float | None, optional): Quit after a given number of seconds, or None to run forever. Defaults to None. headless (bool, optional): Run in "headless" mode (don't write to stdout). press (str, optional): An iterable of keys to simulate being pressed. screenshot (bool, optional): Take a screenshot after pressing keys (svg data stored in self._screenshot). Defaults to False. screenshot_title (str | None, optional): Title of screenshot, or None to use App title. Defaults to None. Returns: ReturnType | None: The return value specified in `App.exit` or None if exit wasn't called. """ if headless: self.features = cast( "frozenset[FeatureFlag]", self.features.union({"headless"}) ) async def run_app() -> None: if quit_after is not None: self.set_timer(quit_after, self.shutdown) if press is not None: app = self async def press_keys() -> None: """A task to send key events.""" assert press driver = app._driver assert driver is not None await asyncio.sleep(0.01) for key in press: if key == "_": print("(pause 50ms)") await asyncio.sleep(0.05) elif key.startswith("wait:"): _, wait_ms = key.split(":") print(f"(pause {wait_ms}ms)") await asyncio.sleep(float(wait_ms) / 1000) else: if len(key) == 1 and not key.isalnum(): key = ( unicodedata.name(key) .lower() .replace("-", "_") .replace(" ", "_") ) original_key = REPLACED_KEYS.get(key, key) try: char = unicodedata.lookup( original_key.upper().replace("_", " ") ) except KeyError: char = key if len(key) == 1 else None print(f"press {key!r} (char={char!r})") key_event = events.Key(self, key, char) driver.send_event(key_event) await asyncio.sleep(0.01) await app._animator.wait_for_idle() if screenshot: self._screenshot = self.export_screenshot( title=screenshot_title ) await self.shutdown() async def press_keys_task(): """Press some keys in the background.""" asyncio.create_task(press_keys()) await self._process_messages(ready_callback=press_keys_task) else: await self._process_messages() if _ASYNCIO_GET_EVENT_LOOP_IS_DEPRECATED: # N.B. This doesn't work with Python<3.10, as we end up with 2 event loops: asyncio.run(run_app()) else: # However, this works with Python<3.10: event_loop = asyncio.get_event_loop() event_loop.run_until_complete(run_app()) return self._return_value
def save_screenshot(self, filename: str | None = None, path: str = './', time_format: str = '%Y-%m-%d %X %f') ‑> str
-
Save an SVG screenshot of the current screen.
Args
- filename (str | None, optional): Filename of SVG screenshot, or None to auto-generate
- a filename with the date and time. Defaults to None.
path
:str
, optional- Path to directory for output. Defaults to current working directory.
time_format
:str
, optional- Time format to use if filename is None. Defaults to "%Y-%m-%d %X %f".
Returns
str
- Filename of screenshot.
Expand source code
def save_screenshot( self, filename: str | None = None, path: str = "./", time_format: str = "%Y-%m-%d %X %f", ) -> str: """Save an SVG screenshot of the current screen. Args: filename (str | None, optional): Filename of SVG screenshot, or None to auto-generate a filename with the date and time. Defaults to None. path (str, optional): Path to directory for output. Defaults to current working directory. time_format (str, optional): Time format to use if filename is None. Defaults to "%Y-%m-%d %X %f". Returns: str: Filename of screenshot. """ if filename is None: svg_filename = ( f"{self.title.lower()} {datetime.now().strftime(time_format)}.svg" ) svg_filename = svg_filename.replace("/", "_").replace("\\", "_") else: svg_filename = filename svg_path = os.path.expanduser(os.path.join(path, svg_filename)) screenshot_svg = self.export_screenshot() with open(svg_path, "w") as svg_file: svg_file.write(screenshot_svg) return svg_path
def set_focus(self, widget: Widget | None, scroll_visible: bool = True) ‑> None
-
Focus (or unfocus) a widget. A focused widget will receive key events first.
Args
widget
:Widget
- Widget to focus.
scroll_visible
:bool
, optional- Scroll widget in to view.
Expand source code
def set_focus(self, widget: Widget | None, scroll_visible: bool = True) -> None: """Focus (or unfocus) a widget. A focused widget will receive key events first. Args: widget (Widget): Widget to focus. scroll_visible (bool, optional): Scroll widget in to view. """ self.screen.set_focus(widget, scroll_visible)
async def shutdown(self)
-
Expand source code
async def shutdown(self): await self._disconnect_devtools() driver = self._driver if driver is not None: driver.disable_input() await self._close_messages()
def switch_screen(self, screen: Screen | str) ‑> None
-
Switch to another screen by replacing the top of the screen stack with a new screen.
Args
screen (Screen | str): Either a Screen object or screen name (the
name
argument when installed).Expand source code
def switch_screen(self, screen: Screen | str) -> None: """Switch to another screen by replacing the top of the screen stack with a new screen. Args: screen (Screen | str): Either a Screen object or screen name (the `name` argument when installed). """ if self.screen is not screen: self._replace_screen(self._screen_stack.pop()) next_screen = self.get_screen(screen) self._screen_stack.append(next_screen) self.screen.post_message_no_wait(events.ScreenResume(self)) self.log.system(f"{self.screen} is current (SWITCHED)")
def uninstall_screen(self, screen: Screen | str) ‑> str | None
-
Uninstall a screen. If the screen was not previously installed then this method is a null-op.
Args
screen (Screen | str): The screen to uninstall or the name of a installed screen.
Returns
str | None: The name of the screen that was uninstalled, or None if no screen was uninstalled.
Expand source code
def uninstall_screen(self, screen: Screen | str) -> str | None: """Uninstall a screen. If the screen was not previously installed then this method is a null-op. Args: screen (Screen | str): The screen to uninstall or the name of a installed screen. Returns: str | None: The name of the screen that was uninstalled, or None if no screen was uninstalled. """ if isinstance(screen, str): if screen not in self._installed_screens: return None uninstall_screen = self._installed_screens[screen] if uninstall_screen in self._screen_stack: raise ScreenStackError("Can't uninstall screen in screen stack") del self._installed_screens[screen] self.log.system(f"{uninstall_screen} UNINSTALLED name={screen!r}") return screen else: if screen in self._screen_stack: raise ScreenStackError("Can't uninstall screen in screen stack") for name, installed_screen in self._installed_screens.items(): if installed_screen is screen: self._installed_screens.pop(name) self.log.system(f"{screen} UNINSTALLED name={name!r}") return name return None
def update_styles(self, node: DOMNode | None = None) ‑> None
-
Request update of styles.
Should be called whenever CSS classes / pseudo classes change.
Expand source code
def update_styles(self, node: DOMNode | None = None) -> None: """Request update of styles. Should be called whenever CSS classes / pseudo classes change. """ self._require_stylesheet_update.add(self.screen if node is None else node) self.check_idle()
def watch_dark(self, dark: bool) ‑> None
-
Watches the dark bool.
Expand source code
def watch_dark(self, dark: bool) -> None: """Watches the dark bool.""" self.set_class(dark, "-dark-mode") self.set_class(not dark, "-light-mode") self.refresh_css()
Inherited members
DOMNode
:add_class
ancestors
app
background_colors
call_later
check_idle
classes
colors
css_identifier
css_identifier_styled
css_path_nodes
disable_messages
dispatch_key
display
displayed_children
emit
emit_no_wait
enable_messages
get_component_styles
get_default_css
get_pseudo_classes
has_class
has_pseudo_class
id
log
on_event
parent
post_message
post_message_no_wait
pseudo_classes
query
query_one
remove_class
reset_styles
rich_style
set_class
set_interval
set_styles
set_timer
text_style
toggle_class
tree
visible
walk_children
- driver_class (Type[Driver] | None, optional): Driver class or
class AppError (*args, **kwargs)
-
Common base class for all non-exit exceptions.
Expand source code
class AppError(Exception): pass
Ancestors
- builtins.Exception
- builtins.BaseException
class ScreenError (*args, **kwargs)
-
Common base class for all non-exit exceptions.
Expand source code
class ScreenError(Exception): pass
Ancestors
- builtins.Exception
- builtins.BaseException
Subclasses
class ScreenStackError (*args, **kwargs)
-
Raised when attempting to pop the last screen from the stack.
Expand source code
class ScreenStackError(ScreenError): """Raised when attempting to pop the last screen from the stack."""
Ancestors
- ScreenError
- builtins.Exception
- builtins.BaseException