Source code for i3pystatus.core.io
import json
import signal
import sys
from contextlib import contextmanager
from threading import Condition
from threading import Thread
[docs]class IOHandler:
def __init__(self, inp=sys.stdin, out=sys.stdout):
self.inp = inp
self.out = out
[docs] def write_line(self, message):
"""Unbuffered printing to stdout."""
self.out.write(message + "\n")
self.out.flush()
[docs] def read(self):
"""Iterate over all input lines (Generator)"""
while True:
try:
yield self.read_line()
except EOFError:
return
[docs] def read_line(self):
"""
Interrupted respecting reader for stdin.
Raises EOFError if the end of stream has been reached
"""
try:
line = self.inp.readline().strip()
except KeyboardInterrupt:
raise EOFError()
# i3status sends EOF, or an empty line
if not line:
raise EOFError()
return line
[docs]class StandaloneIO(IOHandler):
"""
I/O handler for standalone usage of i3pystatus (w/o i3status)
Writing works as usual, but reading will always return a empty JSON array,
and the i3bar protocol header
"""
n = -1
proto = [
{"version": 1, "click_events": True}, "[", "[]", ",[]",
]
def __init__(self, click_events, modules, interval=1):
"""
StandaloneIO instance must be created in main thread to be able to set
the SIGUSR1 signal handler.
"""
super().__init__()
self.interval = interval
self.modules = modules
self.proto[0]['click_events'] = click_events
self.proto[0] = json.dumps(self.proto[0])
self.refresh_cond = Condition()
self.treshold_interval = 20.0
signal.signal(signal.SIGUSR1, self.refresh_signal_handler)
[docs] def read(self):
self.compute_treshold_interval()
self.refresh_cond.acquire()
while True:
try:
self.refresh_cond.wait(timeout=self.interval)
except KeyboardInterrupt:
self.refresh_cond.release()
return
yield self.read_line()
[docs] def read_line(self):
self.n += 1
return self.proto[min(self.n, len(self.proto) - 1)]
[docs] def compute_treshold_interval(self):
"""
Current method is to compute average from all intervals.
"""
intervals = [m.interval for m in self.modules if hasattr(m, "interval")]
if len(intervals) > 0:
self.treshold_interval = round(sum(intervals) / len(intervals))
[docs] def async_refresh(self):
"""
Calling this method will send the status line to i3bar immediately
without waiting for timeout (1s by default).
"""
self.refresh_cond.acquire()
self.refresh_cond.notify()
self.refresh_cond.release()
[docs] def refresh_signal_handler(self, signo, frame):
"""
This callback is called when SIGUSR1 signal is received.
It updates outputs of all modules by calling their `run` method.
Interval modules are updated in separate threads if their interval is
above a certain treshold value.
This treshold is computed by :func:`compute_treshold_interval` class
method.
The reasoning is that modules with larger intervals also usually take
longer to refresh their output and that their output is not required in
'real time'.
This also prevents possible lag when updating all modules in a row.
"""
if signo != signal.SIGUSR1:
return
for module in self.modules:
if hasattr(module, "interval"):
if module.interval > self.treshold_interval:
thread = Thread(target=module.run)
thread.start()
else:
module.run()
else:
module.run()
self.async_refresh()
[docs]class JSONIO:
def __init__(self, io, skiplines=2):
self.io = io
for i in range(skiplines):
self.io.write_line(self.io.read_line())
[docs] def read(self):
"""Iterate over all JSON input (Generator)"""
for line in self.io.read():
with self.parse_line(line) as j:
yield j
@contextmanager
[docs] def parse_line(self, line):
"""Parse a single line of JSON and write modified JSON back."""
prefix = ""
# ignore comma at start of lines
if line.startswith(","):
line, prefix = line[1:], ","
j = json.loads(line)
yield j
self.io.write_line(prefix + json.dumps(j))