Source code for i3pystatus.core.threading
import threading
import time
import sys
from i3pystatus.core.util import partition
timer = time.perf_counter if hasattr(time, "perf_counter") else time.clock
[docs]def unwrap_workload(workload):
""" Obtain the module from it's wrapper. """
while isinstance(workload, Wrapper):
workload = workload.workload
return workload
[docs]class Thread(threading.Thread):
def __init__(self, target_interval, workloads=None, start_barrier=1):
super().__init__()
self.workloads = workloads or []
self.target_interval = target_interval
self.start_barrier = start_barrier
self._suspended = threading.Event()
self.daemon = True
def __iter__(self):
return iter(self.workloads)
def __len__(self):
return len(self.workloads)
[docs] def pop(self):
return self.workloads.pop()
[docs] def append(self, workload):
self.workloads.append(workload)
@property
def time(self):
return sum(map(lambda workload: workload.time, self))
[docs] def wait_for_start_barrier(self):
while len(self) <= self.start_barrier:
time.sleep(0.4)
[docs] def execute_workloads(self):
for workload in self:
if self.should_execute(workload):
workload()
self.workloads.sort(key=lambda workload: workload.time)
[docs] def should_execute(self, workload):
"""
If we have been suspended by i3bar, only execute those modules that set the keep_alive flag to a truthy
value. See the docs on the suspend_signal_handler method of the io module for more information.
"""
if not self._suspended.is_set():
return True
workload = unwrap_workload(workload)
return hasattr(workload, 'keep_alive') and getattr(workload, 'keep_alive')
[docs] def run(self):
while self:
self.execute_workloads()
filltime = self.target_interval - self.time
if filltime > 0:
time.sleep(filltime)
[docs] def branch(self, vtime, bound):
if len(self) > 1 and vtime > bound:
remove = self.pop()
return [remove] + self.branch(vtime - remove.time, bound)
return []
[docs] def suspend(self):
self._suspended.set()
[docs] def resume(self):
self._suspended.clear()
[docs]class Wrapper:
def __init__(self, workload):
self.workload = workload
def __repr__(self):
return repr(self.workload)
[docs]class ExceptionWrapper(Wrapper):
def __call__(self):
try:
self.workload()
except:
message = "Exception in {thread} at {time}, module {name}".format(
thread=threading.current_thread().name,
time=time.strftime("%c"),
name=self.workload.__class__.__name__
)
if hasattr(self.workload, "logger"):
self.workload.logger.error(message, exc_info=True)
self.workload.output = {
"full_text": self.format_exception(),
"color": "#FF0000",
}
[docs] def truncate_error(self, exception_message):
if hasattr(self.workload, 'max_error_len'):
error_len = self.workload.max_error_len
if len(exception_message) > error_len:
return exception_message[:error_len] + '…'
else:
return exception_message
else:
return exception_message
[docs]class WorkloadWrapper(Wrapper):
time = 0.0
def __call__(self):
tp1 = timer()
self.workload()
self.time = timer() - tp1
[docs]class Manager:
def __init__(self, target_interval):
self.target_interval = target_interval
self.upper_bound = target_interval * 1.1
self.lower_bound = target_interval * 0.7
initial_thread = Thread(target_interval, [self.wrap(self)])
self.threads = [initial_thread]
def __call__(self):
separate = []
for thread in self.threads:
separate.extend(thread.branch(thread.time, self.upper_bound))
self.create_threads(self.partition_workloads(separate))
def __repr__(self):
return "Manager"
[docs] def wrap(self, workload):
return WorkloadWrapper(ExceptionWrapper(workload))
[docs] def partition_workloads(self, workloads):
return partition(workloads, self.lower_bound, lambda workload: workload.time)
[docs] def create_threads(self, threads):
for workloads in threads:
self.create_thread(workloads)
[docs] def create_thread(self, workloads):
thread = Thread(self.target_interval, workloads, start_barrier=0)
thread.start()
self.threads.append(thread)
[docs] def append(self, workload):
self.threads[0].append(self.wrap(workload))
[docs] def start(self):
for thread in self.threads:
thread.start()
[docs] def suspend(self):
for thread in self.threads:
thread.suspend()
[docs] def resume(self):
for thread in self.threads:
thread.resume()