Module textual.driver

Expand source code
from __future__ import annotations

import asyncio
from abc import ABC, abstractmethod
from typing import TYPE_CHECKING

from . import _clock, events
from ._types import MessageTarget

if TYPE_CHECKING:
    from rich.console import Console


class Driver(ABC):
    def __init__(
        self, console: "Console", target: "MessageTarget", debug: bool = False
    ) -> None:
        self.console = console
        self._target = target
        self._debug = debug
        self._loop = asyncio.get_running_loop()
        self._mouse_down_time = _clock.get_time_no_wait()

    def send_event(self, event: events.Event) -> None:
        asyncio.run_coroutine_threadsafe(
            self._target.post_message(event), loop=self._loop
        )

    def process_event(self, event: events.Event) -> None:
        """Performs some additional processing of events."""
        if isinstance(event, events.MouseDown):
            self._mouse_down_time = event.time

        self.send_event(event)

        if (
            isinstance(event, events.MouseUp)
            and event.time - self._mouse_down_time <= 0.5
        ):
            click_event = events.Click.from_event(event)
            self.send_event(click_event)

    @abstractmethod
    def start_application_mode(self) -> None:
        ...

    @abstractmethod
    def disable_input(self) -> None:
        ...

    @abstractmethod
    def stop_application_mode(self) -> None:
        ...

Classes

class Driver (console: "'Console'", target: "'MessageTarget'", debug: bool = False)

Helper class that provides a standard way to create an ABC using inheritance.

Expand source code
class Driver(ABC):
    def __init__(
        self, console: "Console", target: "MessageTarget", debug: bool = False
    ) -> None:
        self.console = console
        self._target = target
        self._debug = debug
        self._loop = asyncio.get_running_loop()
        self._mouse_down_time = _clock.get_time_no_wait()

    def send_event(self, event: events.Event) -> None:
        asyncio.run_coroutine_threadsafe(
            self._target.post_message(event), loop=self._loop
        )

    def process_event(self, event: events.Event) -> None:
        """Performs some additional processing of events."""
        if isinstance(event, events.MouseDown):
            self._mouse_down_time = event.time

        self.send_event(event)

        if (
            isinstance(event, events.MouseUp)
            and event.time - self._mouse_down_time <= 0.5
        ):
            click_event = events.Click.from_event(event)
            self.send_event(click_event)

    @abstractmethod
    def start_application_mode(self) -> None:
        ...

    @abstractmethod
    def disable_input(self) -> None:
        ...

    @abstractmethod
    def stop_application_mode(self) -> None:
        ...

Ancestors

  • abc.ABC

Subclasses

Methods

def disable_input(self) ‑> None
Expand source code
@abstractmethod
def disable_input(self) -> None:
    ...
def process_event(self, event: events.Event) ‑> None

Performs some additional processing of events.

Expand source code
def process_event(self, event: events.Event) -> None:
    """Performs some additional processing of events."""
    if isinstance(event, events.MouseDown):
        self._mouse_down_time = event.time

    self.send_event(event)

    if (
        isinstance(event, events.MouseUp)
        and event.time - self._mouse_down_time <= 0.5
    ):
        click_event = events.Click.from_event(event)
        self.send_event(click_event)
def send_event(self, event: events.Event) ‑> None
Expand source code
def send_event(self, event: events.Event) -> None:
    asyncio.run_coroutine_threadsafe(
        self._target.post_message(event), loop=self._loop
    )
def start_application_mode(self) ‑> None
Expand source code
@abstractmethod
def start_application_mode(self) -> None:
    ...
def stop_application_mode(self) ‑> None
Expand source code
@abstractmethod
def stop_application_mode(self) -> None:
    ...