Module textual.drivers.linux_driver

Expand source code
from __future__ import annotations

import asyncio
import os
from codecs import getincrementaldecoder
import selectors
import signal
import sys
import termios
import tty
from typing import Any, TYPE_CHECKING
from threading import Event, Thread

if TYPE_CHECKING:
    from rich.console import Console

import rich.repr

from .. import log
from ..driver import Driver
from ..geometry import Size
from .._types import MessageTarget
from .._xterm_parser import XTermParser
from .._profile import timer
from .. import events


@rich.repr.auto
class LinuxDriver(Driver):
    """Powers display and input for Linux / MacOS"""

    def __init__(
        self, console: "Console", target: "MessageTarget", debug: bool = False
    ) -> None:
        super().__init__(console, target, debug)
        self.fileno = sys.stdin.fileno()
        self.attrs_before: list[Any] | None = None
        self.exit_event = Event()
        self._key_thread: Thread | None = None

    def __rich_repr__(self) -> rich.repr.Result:
        yield "debug", self._debug

    def _get_terminal_size(self) -> tuple[int, int]:
        width: int | None = 80
        height: int | None = 25
        import shutil

        try:
            width, height = shutil.get_terminal_size()
        except (AttributeError, ValueError, OSError):
            try:
                width, height = shutil.get_terminal_size()
            except (AttributeError, ValueError, OSError):
                pass
        width = width or 80
        height = height or 25
        return width, height

    def _enable_mouse_support(self) -> None:
        write = self.console.file.write
        write("\x1b[?1000h")  # SET_VT200_MOUSE
        write("\x1b[?1003h")  # SET_ANY_EVENT_MOUSE
        write("\x1b[?1015h")  # SET_VT200_HIGHLIGHT_MOUSE
        write("\x1b[?1006h")  # SET_SGR_EXT_MODE_MOUSE

        # write("\x1b[?1007h")
        self.console.file.flush()

        # Note: E.g. lxterminal understands 1000h, but not the urxvt or sgr
        #       extensions.

    def _enable_bracketed_paste(self) -> None:
        """Enable bracketed paste mode."""
        self.console.file.write("\x1b[?2004h")

    def _disable_bracketed_paste(self) -> None:
        """Disable bracketed paste mode."""
        self.console.file.write("\x1b[?2004l")

    def _disable_mouse_support(self) -> None:
        write = self.console.file.write
        write("\x1b[?1000l")  #
        write("\x1b[?1003l")  #
        write("\x1b[?1015l")
        write("\x1b[?1006l")
        self.console.file.flush()

    def start_application_mode(self):

        loop = asyncio.get_running_loop()

        def send_size_event():
            terminal_size = self._get_terminal_size()
            width, height = terminal_size
            textual_size = Size(width, height)
            event = events.Resize(self._target, textual_size, textual_size)
            asyncio.run_coroutine_threadsafe(
                self._target.post_message(event),
                loop=loop,
            )

        def on_terminal_resize(signum, stack) -> None:
            send_size_event()

        signal.signal(signal.SIGWINCH, on_terminal_resize)

        self.console.set_alt_screen(True)
        self._enable_mouse_support()
        try:
            self.attrs_before = termios.tcgetattr(self.fileno)
        except termios.error:
            # Ignore attribute errors.
            self.attrs_before = None

        try:
            newattr = termios.tcgetattr(self.fileno)
        except termios.error:
            pass
        else:

            newattr[tty.LFLAG] = self._patch_lflag(newattr[tty.LFLAG])
            newattr[tty.IFLAG] = self._patch_iflag(newattr[tty.IFLAG])

            # VMIN defines the number of characters read at a time in
            # non-canonical mode. It seems to default to 1 on Linux, but on
            # Solaris and derived operating systems it defaults to 4. (This is
            # because the VMIN slot is the same as the VEOF slot, which
            # defaults to ASCII EOT = Ctrl-D = 4.)
            newattr[tty.CC][termios.VMIN] = 1

            termios.tcsetattr(self.fileno, termios.TCSANOW, newattr)

        self.console.show_cursor(False)
        self.console.file.write("\033[?1003h\n")
        self.console.file.flush()
        self._key_thread = Thread(target=self.run_input_thread, args=(loop,))
        send_size_event()
        self._key_thread.start()
        self._request_terminal_sync_mode_support()
        self._enable_bracketed_paste()

    def _request_terminal_sync_mode_support(self):
        self.console.file.write("\033[?2026$p")
        self.console.file.flush()

    @classmethod
    def _patch_lflag(cls, attrs: int) -> int:
        return attrs & ~(termios.ECHO | termios.ICANON | termios.IEXTEN | termios.ISIG)

    @classmethod
    def _patch_iflag(cls, attrs: int) -> int:
        return attrs & ~(
            # Disable XON/XOFF flow control on output and input.
            # (Don't capture Ctrl-S and Ctrl-Q.)
            # Like executing: "stty -ixon."
            termios.IXON
            | termios.IXOFF
            |
            # Don't translate carriage return into newline on input.
            termios.ICRNL
            | termios.INLCR
            | termios.IGNCR
        )

    def disable_input(self) -> None:
        try:
            if not self.exit_event.is_set():
                signal.signal(signal.SIGWINCH, signal.SIG_DFL)
                self._disable_mouse_support()
                self.exit_event.set()
                if self._key_thread is not None:
                    self._key_thread.join()
                termios.tcflush(self.fileno, termios.TCIFLUSH)
        except Exception as error:
            # TODO: log this
            pass

    def stop_application_mode(self) -> None:
        self._disable_bracketed_paste()
        self.disable_input()

        if self.attrs_before is not None:
            try:
                termios.tcsetattr(self.fileno, termios.TCSANOW, self.attrs_before)
            except termios.error:
                pass

        with self.console:
            self.console.set_alt_screen(False)
            self.console.show_cursor(True)

    def run_input_thread(self, loop) -> None:
        try:
            self._run_input_thread(loop)
        except Exception:
            pass  # TODO: log

    def _run_input_thread(self, loop) -> None:

        selector = selectors.DefaultSelector()
        selector.register(self.fileno, selectors.EVENT_READ)

        fileno = self.fileno

        def more_data() -> bool:
            """Check if there is more data to parse."""
            for key, events in selector.select(0.01):
                if events:
                    return True
            return False

        parser = XTermParser(self._target, more_data, self._debug)
        feed = parser.feed

        utf8_decoder = getincrementaldecoder("utf-8")().decode
        decode = utf8_decoder
        read = os.read
        EVENT_READ = selectors.EVENT_READ

        try:
            while not self.exit_event.is_set():
                selector_events = selector.select(0.1)
                for _selector_key, mask in selector_events:
                    if mask | EVENT_READ:
                        unicode_data = decode(read(fileno, 1024))
                        for event in feed(unicode_data):
                            self.process_event(event)
        except Exception as error:
            log(error)
        finally:
            with timer("selector.close"):
                selector.close()


if __name__ == "__main__":
    from rich.console import Console

    console = Console()

    from ..app import App

    class MyApp(App):
        async def on_mount(self, event: events.Mount) -> None:
            self.set_timer(5, callback=self._close_messages)

    MyApp.run()

Classes

class LinuxDriver (console: "'Console'", target: "'MessageTarget'", debug: bool = False)

Powers display and input for Linux / MacOS

Expand source code
class LinuxDriver(Driver):
    """Powers display and input for Linux / MacOS"""

    def __init__(
        self, console: "Console", target: "MessageTarget", debug: bool = False
    ) -> None:
        super().__init__(console, target, debug)
        self.fileno = sys.stdin.fileno()
        self.attrs_before: list[Any] | None = None
        self.exit_event = Event()
        self._key_thread: Thread | None = None

    def __rich_repr__(self) -> rich.repr.Result:
        yield "debug", self._debug

    def _get_terminal_size(self) -> tuple[int, int]:
        width: int | None = 80
        height: int | None = 25
        import shutil

        try:
            width, height = shutil.get_terminal_size()
        except (AttributeError, ValueError, OSError):
            try:
                width, height = shutil.get_terminal_size()
            except (AttributeError, ValueError, OSError):
                pass
        width = width or 80
        height = height or 25
        return width, height

    def _enable_mouse_support(self) -> None:
        write = self.console.file.write
        write("\x1b[?1000h")  # SET_VT200_MOUSE
        write("\x1b[?1003h")  # SET_ANY_EVENT_MOUSE
        write("\x1b[?1015h")  # SET_VT200_HIGHLIGHT_MOUSE
        write("\x1b[?1006h")  # SET_SGR_EXT_MODE_MOUSE

        # write("\x1b[?1007h")
        self.console.file.flush()

        # Note: E.g. lxterminal understands 1000h, but not the urxvt or sgr
        #       extensions.

    def _enable_bracketed_paste(self) -> None:
        """Enable bracketed paste mode."""
        self.console.file.write("\x1b[?2004h")

    def _disable_bracketed_paste(self) -> None:
        """Disable bracketed paste mode."""
        self.console.file.write("\x1b[?2004l")

    def _disable_mouse_support(self) -> None:
        write = self.console.file.write
        write("\x1b[?1000l")  #
        write("\x1b[?1003l")  #
        write("\x1b[?1015l")
        write("\x1b[?1006l")
        self.console.file.flush()

    def start_application_mode(self):

        loop = asyncio.get_running_loop()

        def send_size_event():
            terminal_size = self._get_terminal_size()
            width, height = terminal_size
            textual_size = Size(width, height)
            event = events.Resize(self._target, textual_size, textual_size)
            asyncio.run_coroutine_threadsafe(
                self._target.post_message(event),
                loop=loop,
            )

        def on_terminal_resize(signum, stack) -> None:
            send_size_event()

        signal.signal(signal.SIGWINCH, on_terminal_resize)

        self.console.set_alt_screen(True)
        self._enable_mouse_support()
        try:
            self.attrs_before = termios.tcgetattr(self.fileno)
        except termios.error:
            # Ignore attribute errors.
            self.attrs_before = None

        try:
            newattr = termios.tcgetattr(self.fileno)
        except termios.error:
            pass
        else:

            newattr[tty.LFLAG] = self._patch_lflag(newattr[tty.LFLAG])
            newattr[tty.IFLAG] = self._patch_iflag(newattr[tty.IFLAG])

            # VMIN defines the number of characters read at a time in
            # non-canonical mode. It seems to default to 1 on Linux, but on
            # Solaris and derived operating systems it defaults to 4. (This is
            # because the VMIN slot is the same as the VEOF slot, which
            # defaults to ASCII EOT = Ctrl-D = 4.)
            newattr[tty.CC][termios.VMIN] = 1

            termios.tcsetattr(self.fileno, termios.TCSANOW, newattr)

        self.console.show_cursor(False)
        self.console.file.write("\033[?1003h\n")
        self.console.file.flush()
        self._key_thread = Thread(target=self.run_input_thread, args=(loop,))
        send_size_event()
        self._key_thread.start()
        self._request_terminal_sync_mode_support()
        self._enable_bracketed_paste()

    def _request_terminal_sync_mode_support(self):
        self.console.file.write("\033[?2026$p")
        self.console.file.flush()

    @classmethod
    def _patch_lflag(cls, attrs: int) -> int:
        return attrs & ~(termios.ECHO | termios.ICANON | termios.IEXTEN | termios.ISIG)

    @classmethod
    def _patch_iflag(cls, attrs: int) -> int:
        return attrs & ~(
            # Disable XON/XOFF flow control on output and input.
            # (Don't capture Ctrl-S and Ctrl-Q.)
            # Like executing: "stty -ixon."
            termios.IXON
            | termios.IXOFF
            |
            # Don't translate carriage return into newline on input.
            termios.ICRNL
            | termios.INLCR
            | termios.IGNCR
        )

    def disable_input(self) -> None:
        try:
            if not self.exit_event.is_set():
                signal.signal(signal.SIGWINCH, signal.SIG_DFL)
                self._disable_mouse_support()
                self.exit_event.set()
                if self._key_thread is not None:
                    self._key_thread.join()
                termios.tcflush(self.fileno, termios.TCIFLUSH)
        except Exception as error:
            # TODO: log this
            pass

    def stop_application_mode(self) -> None:
        self._disable_bracketed_paste()
        self.disable_input()

        if self.attrs_before is not None:
            try:
                termios.tcsetattr(self.fileno, termios.TCSANOW, self.attrs_before)
            except termios.error:
                pass

        with self.console:
            self.console.set_alt_screen(False)
            self.console.show_cursor(True)

    def run_input_thread(self, loop) -> None:
        try:
            self._run_input_thread(loop)
        except Exception:
            pass  # TODO: log

    def _run_input_thread(self, loop) -> None:

        selector = selectors.DefaultSelector()
        selector.register(self.fileno, selectors.EVENT_READ)

        fileno = self.fileno

        def more_data() -> bool:
            """Check if there is more data to parse."""
            for key, events in selector.select(0.01):
                if events:
                    return True
            return False

        parser = XTermParser(self._target, more_data, self._debug)
        feed = parser.feed

        utf8_decoder = getincrementaldecoder("utf-8")().decode
        decode = utf8_decoder
        read = os.read
        EVENT_READ = selectors.EVENT_READ

        try:
            while not self.exit_event.is_set():
                selector_events = selector.select(0.1)
                for _selector_key, mask in selector_events:
                    if mask | EVENT_READ:
                        unicode_data = decode(read(fileno, 1024))
                        for event in feed(unicode_data):
                            self.process_event(event)
        except Exception as error:
            log(error)
        finally:
            with timer("selector.close"):
                selector.close()

Ancestors

Methods

def disable_input(self) ‑> None
Expand source code
def disable_input(self) -> None:
    try:
        if not self.exit_event.is_set():
            signal.signal(signal.SIGWINCH, signal.SIG_DFL)
            self._disable_mouse_support()
            self.exit_event.set()
            if self._key_thread is not None:
                self._key_thread.join()
            termios.tcflush(self.fileno, termios.TCIFLUSH)
    except Exception as error:
        # TODO: log this
        pass
def run_input_thread(self, loop) ‑> None
Expand source code
def run_input_thread(self, loop) -> None:
    try:
        self._run_input_thread(loop)
    except Exception:
        pass  # TODO: log
def start_application_mode(self)
Expand source code
def start_application_mode(self):

    loop = asyncio.get_running_loop()

    def send_size_event():
        terminal_size = self._get_terminal_size()
        width, height = terminal_size
        textual_size = Size(width, height)
        event = events.Resize(self._target, textual_size, textual_size)
        asyncio.run_coroutine_threadsafe(
            self._target.post_message(event),
            loop=loop,
        )

    def on_terminal_resize(signum, stack) -> None:
        send_size_event()

    signal.signal(signal.SIGWINCH, on_terminal_resize)

    self.console.set_alt_screen(True)
    self._enable_mouse_support()
    try:
        self.attrs_before = termios.tcgetattr(self.fileno)
    except termios.error:
        # Ignore attribute errors.
        self.attrs_before = None

    try:
        newattr = termios.tcgetattr(self.fileno)
    except termios.error:
        pass
    else:

        newattr[tty.LFLAG] = self._patch_lflag(newattr[tty.LFLAG])
        newattr[tty.IFLAG] = self._patch_iflag(newattr[tty.IFLAG])

        # VMIN defines the number of characters read at a time in
        # non-canonical mode. It seems to default to 1 on Linux, but on
        # Solaris and derived operating systems it defaults to 4. (This is
        # because the VMIN slot is the same as the VEOF slot, which
        # defaults to ASCII EOT = Ctrl-D = 4.)
        newattr[tty.CC][termios.VMIN] = 1

        termios.tcsetattr(self.fileno, termios.TCSANOW, newattr)

    self.console.show_cursor(False)
    self.console.file.write("\033[?1003h\n")
    self.console.file.flush()
    self._key_thread = Thread(target=self.run_input_thread, args=(loop,))
    send_size_event()
    self._key_thread.start()
    self._request_terminal_sync_mode_support()
    self._enable_bracketed_paste()
def stop_application_mode(self) ‑> None
Expand source code
def stop_application_mode(self) -> None:
    self._disable_bracketed_paste()
    self.disable_input()

    if self.attrs_before is not None:
        try:
            termios.tcsetattr(self.fileno, termios.TCSANOW, self.attrs_before)
        except termios.error:
            pass

    with self.console:
        self.console.set_alt_screen(False)
        self.console.show_cursor(True)

Inherited members