Source code for i3pystatus.core.io

import json
import signal
import sys

from contextlib import contextmanager
from threading import Condition
from threading import Thread
from i3pystatus.core.modules import IntervalModule


[docs]class IOHandler: def __init__(self, inp=sys.stdin, out=sys.stdout): self.inp = inp self.out = out
[docs] def write_line(self, message): """Unbuffered printing to stdout.""" self.out.write(message + "\n") self.out.flush()
[docs] def read(self): """Iterate over all input lines (Generator)""" while True: try: yield self.read_line() except EOFError: return
[docs] def read_line(self): """ Interrupted respecting reader for stdin. Raises EOFError if the end of stream has been reached """ try: line = self.inp.readline().strip() except KeyboardInterrupt: raise EOFError() # i3status sends EOF, or an empty line if not line: raise EOFError() return line
[docs]class StandaloneIO(IOHandler): """ I/O handler for standalone usage of i3pystatus (w/o i3status) Writing works as usual, but reading will always return a empty JSON array, and the i3bar protocol header """ n = -1 proto = [ { "version": 1, "click_events": True, }, "[", "[]", ",[]", ] def __init__(self, click_events, modules, keep_alive, interval=1): """ StandaloneIO instance must be created in main thread to be able to set the SIGUSR1 signal handler. """ super().__init__() self.interval = interval self.modules = modules self.proto[0]['click_events'] = click_events if keep_alive: self.proto[0].update(dict(stop_signal=signal.SIGUSR2, cont_signal=signal.SIGUSR2)) signal.signal(signal.SIGUSR2, self.suspend_signal_handler) self.proto[0] = json.dumps(self.proto[0]) self.refresh_cond = Condition() self.treshold_interval = 20.0 self.stopped = False signal.signal(signal.SIGUSR1, self.refresh_signal_handler)
[docs] def read(self): self.compute_treshold_interval() self.refresh_cond.acquire() while True: try: self.refresh_cond.wait(timeout=self.interval) except KeyboardInterrupt: self.refresh_cond.release() return yield self.read_line()
[docs] def read_line(self): self.n += 1 return self.proto[min(self.n, len(self.proto) - 1)]
[docs] def compute_treshold_interval(self): """ Current method is to compute average from all intervals. """ intervals = [m.interval for m in self.modules if hasattr(m, "interval")] if len(intervals) > 0: self.treshold_interval = round(sum(intervals) / len(intervals))
[docs] def async_refresh(self): """ Calling this method will send the status line to i3bar immediately without waiting for timeout (1s by default). """ self.refresh_cond.acquire() self.refresh_cond.notify() self.refresh_cond.release()
[docs] def refresh_signal_handler(self, signo, frame): """ This callback is called when SIGUSR1 signal is received. It updates outputs of all modules by calling their `run` method. Interval modules are updated in separate threads if their interval is above a certain treshold value. This treshold is computed by :func:`compute_treshold_interval` class method. The reasoning is that modules with larger intervals also usually take longer to refresh their output and that their output is not required in 'real time'. This also prevents possible lag when updating all modules in a row. """ if signo != signal.SIGUSR1: return for module in self.modules: if hasattr(module, "interval"): if module.interval > self.treshold_interval: thread = Thread(target=module.run) thread.start() else: module.run() else: module.run() self.async_refresh()
[docs] def suspend_signal_handler(self, signo, frame): """ By default, i3bar sends SIGSTOP to all children when it is not visible (for example, the screen sleeps or you enter full screen mode). This stops the i3pystatus process and all threads within it. For some modules, this is not desirable. Thankfully, the i3bar protocol supports setting the "stop_signal" and "cont_signal" key/value pairs in the header to allow sending a custom signal when these events occur. Here we use SIGUSR2 for both "stop_signal" and "cont_signal" and maintain a toggle to determine whether we have just been stopped or continued. When we have been stopped, notify the IntervalModule managers that they should suspend any module that does not set the keep_alive flag to a truthy value, and when we have been continued, notify the IntervalModule managers that they can resume execution of all modules. """ if signo != signal.SIGUSR2: return self.stopped = not self.stopped if self.stopped: [m.suspend() for m in IntervalModule.managers.values()] else: [m.resume() for m in IntervalModule.managers.values()]
[docs]class JSONIO: def __init__(self, io, skiplines=2): self.io = io for i in range(skiplines): self.io.write_line(self.io.read_line())
[docs] def read(self): """Iterate over all JSON input (Generator)""" for line in self.io.read(): with self.parse_line(line) as j: yield j
@contextmanager
[docs] def parse_line(self, line): """Parse a single line of JSON and write modified JSON back.""" prefix = "" # ignore comma at start of lines if line.startswith(","): line, prefix = line[1:], "," j = json.loads(line) yield j self.io.write_line(prefix + json.dumps(j))