Source code for i3pystatus.core.util

import collections
import functools
import re
import socket
import string
import inspect
from threading import Timer, RLock


[docs]def lchop(string, prefix): """Removes a prefix from string :param string: String, possibly prefixed with prefix :param prefix: Prefix to remove from string :returns: string without the prefix """ if string.startswith(prefix): return string[len(prefix):] return string
[docs]def popwhile(predicate, iterable): """Generator function yielding items of iterable while predicate holds for each item :param predicate: function taking an item returning bool :param iterable: iterable :returns: iterable (generator function) """ while iterable: item = iterable.pop() if predicate(item): yield item else: break
[docs]def partition(iterable, limit, key=lambda x: x): def pop_partition(): sum = 0.0 while sum < limit and iterable: sum += key(iterable[-1]) yield iterable.pop() partitions = [] iterable.sort(reverse=True) while iterable: partitions.append(list(pop_partition())) return partitions
[docs]def round_dict(dic, places): """ Rounds all values in a dict containing only numeric types to `places` decimal places. If places is None, round to INT. """ if places is None: for key, value in dic.items(): dic[key] = round(value) else: for key, value in dic.items(): dic[key] = round(value, places)
[docs]class ModuleList(collections.UserList): def __init__(self, status_handler, class_finder): self.status_handler = status_handler self.finder = class_finder super().__init__()
[docs] def append(self, module, *args, **kwargs): module = self.finder.instanciate_class_from_module( module, *args, **kwargs) module.registered(self.status_handler) super().append(module) return module
[docs] def get(self, find_id): find_id = int(find_id) for module in self: if id(module) == find_id: return module
[docs]class KeyConstraintDict(collections.UserDict): """ A dict implementation with sets of valid and required keys :param valid_keys: Set of valid keys :param required_keys: Set of required keys, must be a subset of valid_keys """
[docs] class MissingKeys(Exception): def __init__(self, keys): self.keys = keys
def __init__(self, valid_keys, required_keys): super().__init__() self.valid_keys = valid_keys self.required_keys = set(required_keys) self.seen_keys = set() def __setitem__(self, key, value): """Trying to add an invalid key will raise KeyError """ if key in self.valid_keys: self.seen_keys.add(key) self.data[key] = value else: raise KeyError(key) def __delitem__(self, key): self.seen_keys.remove(key) del self.data[key] def __iter__(self): """Iteration will raise a MissingKeys exception unless all required keys are set """ if self.missing(): raise self.MissingKeys(self.missing()) return self.data.__iter__()
[docs] def missing(self): """Returns a set of keys that are required but not set """ return self.required_keys - (self.seen_keys & self.required_keys)
[docs]def convert_position(pos, json): if pos < 0: pos = len(json) + (pos + 1) return pos
[docs]def flatten(l): """ Flattens a hierarchy of nested lists into a single list containing all elements in order :param l: list of arbitrary types and lists :returns: list of arbitrary types """ l = list(l) i = 0 while i < len(l): while isinstance(l[i], list): if not l[i]: l.pop(i) i -= 1 break else: l[i:i + 1] = l[i] i += 1 return l
[docs]def formatp(string, **kwargs): """ Function for advanced format strings with partial formatting This function consumes format strings with groups enclosed in brackets. A group enclosed in brackets will only become part of the result if all fields inside the group evaluate True in boolean contexts. Groups can be nested. The fields in a nested group do not count as fields in the enclosing group, i.e. the enclosing group will evaluate to an empty string even if a nested group would be eligible for formatting. Nesting is thus equivalent to a logical or of all enclosing groups with the enclosed group. Escaped brackets, i.e. \\\\[ and \\\\] are copied verbatim to output. :param string: Format string :param kwargs: keyword arguments providing data for the format string :returns: Formatted string """ def build_stack(string): """ Builds a stack with OpeningBracket, ClosingBracket and String tokens. Tokens have a level property denoting their nesting level. They also have a string property containing associated text (empty for all tokens but String tokens). """ class Token: string = "" class OpeningBracket(Token): pass class ClosingBracket(Token): pass class String(Token): def __init__(self, str): self.string = str TOKENS = { "[": OpeningBracket, "]": ClosingBracket, } stack = [] # Index of next unconsumed char next = 0 # Last consumed char prev = "" # Current char char = "" # Current level level = 0 while next < len(string): prev = char char = string[next] next += 1 if prev != "\\" and char in TOKENS: token = TOKENS[char]() token.index = next if char == "]": level -= 1 token.level = level if char == "[": level += 1 stack.append(token) else: if stack and isinstance(stack[-1], String): stack[-1].string += char else: token = String(char) token.level = level stack.append(token) return stack def build_tree(items, level=0): """ Builds a list-of-lists tree (in forward order) from a stack (reversed order), and formats the elements on the fly, discarding everything not eligible for inclusion. """ subtree = [] while items: nested = [] while items[0].level > level: nested.append(items.pop(0)) if nested: subtree.append(build_tree(nested, level + 1)) item = items.pop(0) if item.string: string = item.string if level == 0: subtree.append(string.format(**kwargs)) else: fields = re.findall(r"({(\w+)[^}]*})", string) successful_fields = 0 for fieldspec, fieldname in fields: if kwargs.get(fieldname, False): successful_fields += 1 if successful_fields == len(fields): subtree.append(string.format(**kwargs)) else: return [] return subtree def merge_tree(items): return "".join(flatten(items)).replace("\]", "]").replace("\[", "[") stack = build_stack(string) tree = build_tree(stack, 0) return merge_tree(tree)
[docs]class TimeWrapper: """ A wrapper that implements __format__ and __bool__ for time differences and time spans. :param seconds: seconds (numeric) :param default_format: the default format to be used if no explicit format_spec is passed to __format__ Format string syntax: * %h, %m and %s are the hours, minutes and seconds without leading zeros (i.e. 0 to 59 for minutes and seconds) * %H, %M and %S are padded with a leading zero to two digits, i.e. 00 to 59 * %l and %L produce hours non-padded and padded but only if hours is not zero. If the hours are zero it produces an empty string. * %% produces a literal % * %E (only valid on beginning of the string) if the time is null, don't format anything but rather produce an empty string. If the time is non-null it is removed from the string. The formatted string is stripped, i.e. spaces on both ends of the result are removed """
[docs] class TimeTemplate(string.Template): delimiter = "%" idpattern = r"[a-zA-Z]"
def __init__(self, seconds, default_format="%m:%S"): self.seconds = int(seconds) self.default_format = default_format def __bool__(self): """:returns: `bool(seconds)`, i.e. False if seconds == 0 and True otherwise """ return bool(self.seconds) def __format__(self, format_spec): """Formats the time span given the format_spec (or the default_format). """ format_spec = format_spec or self.default_format h = self.seconds // 3600 m, s = divmod(self.seconds % 3600, 60) l = h if h else "" L = "%02d" % h if h else "" if format_spec.startswith("%E"): format_spec = format_spec[2:] if not self.seconds: return "" return self.TimeTemplate(format_spec).substitute( h=h, m=m, s=s, H="%02d" % h, M="%02d" % m, S="%02d" % s, l=l, L=L, ).strip()
[docs]def require(predicate): """Decorator factory for methods requiring a predicate. If the predicate is not fulfilled during a method call, the method call is skipped and None is returned. :param predicate: A callable returning a truth value :returns: Method decorator .. seealso:: :py:class:`internet` """ def decorator(method): @functools.wraps(method) def wrapper(*args, **kwargs): if predicate(): return method(*args, **kwargs) return None return wrapper return decorator
[docs]class internet: """ Checks for internet connection by connecting to a server. Used server is determined by the `address` class variable which consists of server host name and port number. :rtype: bool .. seealso:: :py:func:`require` """ address = ("google-public-dns-a.google.com", 53) def __new__(cls): try: socket.create_connection(cls.address, 1).close() return True except OSError: return False
[docs]def make_graph(values, lower_limit=0.0, upper_limit=100.0, style="blocks"): """ Draws a graph made of unicode characters. :param values: An array of values to graph. :param lower_limit: Minimum value for the y axis (or None for dynamic). :param upper_limit: Maximum value for the y axis (or None for dynamic). :param style: Drawing style ('blocks', 'braille-fill', 'braille-peak', or 'braille-snake'). :returns: Bar as a string """ values = [float(n) for n in values] mn, mx = min(values), max(values) mn = mn if lower_limit is None else min(mn, float(lower_limit)) mx = mx if upper_limit is None else max(mx, float(upper_limit)) extent = mx - mn if style == 'blocks': bar = '_▁▂▃▄▅▆▇█' bar_count = len(bar) - 1 if extent == 0: graph = '_' * len(values) else: graph = ''.join(bar[int((n - mn) / extent * bar_count)] for n in values) elif style in ['braille-fill', 'braille-peak', 'braille-snake']: # idea from https://github.com/asciimoo/drawille # unicode values from http://en.wikipedia.org/wiki/Braille vpad = values if len(values) % 2 == 0 else values + [mn] vscale = [round(4 * (vp - mn) / extent) for vp in vpad] l = len(vscale) // 2 # do the 2-character collapse separately for clarity if 'fill' in style: vbits = [[0, 0x40, 0x44, 0x46, 0x47][vs] for vs in vscale] elif 'peak' in style: vbits = [[0, 0x40, 0x04, 0x02, 0x01][vs] for vs in vscale] else: assert('snake' in style) # there are a few choices for what to put last in vb2. # arguable vscale[-1] from the _previous_ call is best. vb2 = [vscale[0]] + vscale + [0] vbits = [] for i in range(1, l + 1): c = 0 for j in range(min(vb2[i - 1], vb2[i], vb2[i + 1]), vb2[i] + 1): c |= [0, 0x40, 0x04, 0x02, 0x01][j] vbits.append(c) # 2-character collapse graph = '' for i in range(0, l, 2): b1 = vbits[i] b2 = vbits[i + 1] if b2 & 0x40: b2 = b2 - 0x30 b2 = b2 << 3 graph += chr(0x2800 + b1 + b2) else: raise NotImplementedError("Graph drawing style '%s' unimplemented." % style) return graph
[docs]def make_vertical_bar(percentage, width=1): """ Draws a vertical bar made of unicode characters. :param value: A value between 0 and 100 :param width: How many characters wide the bar should be. :returns: Bar as a String """ bar = ' _▁▂▃▄▅▆▇█' percentage //= 10 if percentage < 0: output = bar[0] elif percentage >= len(bar): output = bar[-1] else: output = bar[percentage] return output * width
[docs]def make_bar(percentage): """ Draws a bar made of unicode box characters. :param percentage: A value between 0 and 100 :returns: Bar as a string """ bars = [' ', '▏', '▎', '▍', '▌', '▋', '▋', '▊', '▊', '█'] tens = int(percentage / 10) ones = int(percentage) - tens * 10 result = tens * '█' if(ones >= 1): result = result + bars[ones] result = result + (10 - len(result)) * ' ' return result
[docs]def user_open(url_or_command): """Open the specified paramater in the web browser if a URL is detected, othewrise pass the paramater to the shell as a subprocess. This function is inteded to bu used in on_leftclick/on_rightclick callbacks. :param url_or_command: String containing URL or command """ from urllib.parse import urlparse scheme = urlparse(url_or_command).scheme if scheme == 'http' or scheme == 'https': import webbrowser import os # webbrowser.open() sometimes prints a message for some reason and confuses i3 # Redirect stdout briefly to prevent this from happening. savout = os.dup(1) os.close(1) os.open(os.devnull, os.O_RDWR) try: webbrowser.open(url_or_command) finally: os.dup2(savout, 1) else: import subprocess subprocess.Popen(url_or_command, shell=True)
[docs]class MultiClickHandler(object): def __init__(self, callback_handler, timeout): self.callback_handler = callback_handler self.timeout = timeout self.lock = RLock() self._timer_id = 0 self.timer = None self.button = None self.cb = None
[docs] def set_timer(self, button, cb): with self.lock: self.clear_timer() self.timer = Timer(self.timeout, self._timer_function, args=[self._timer_id]) self.button = button self.cb = cb self.timer.start()
[docs] def clear_timer(self): with self.lock: if self.timer is None: return self._timer_id += 1 # Invalidate existent timer self.timer.cancel() # Cancel the existent timer self.timer = None self.button = None self.cb = None
def _timer_function(self, timer_id): with self.lock: if self._timer_id != timer_id: return self.callback_handler(self.button, self.cb) self.clear_timer()
[docs] def check_double(self, button): if self.timer is None: return False ret = True if button != self.button: self.callback_handler(self.button, self.cb) ret = False self.clear_timer() return ret
[docs]def get_module(function): """Function decorator for retrieving the ``self`` argument from the stack. Intended for use with callbacks that need access to a modules variables, for example: .. code:: python from i3pystatus import Status, get_module from i3pystatus.core.command import execute status = Status(...) # other modules etc. @get_module def display_ip_verbose(module): execute('sh -c "ip addr show dev {dev} | xmessage -file -"'.format(dev=module.interface)) status.register("network", interface="wlan1", on_leftclick=display_ip_verbose) """ @functools.wraps(function) def call_wrapper(*args, **kwargs): stack = inspect.stack() caller_frame_info = stack[1] self = caller_frame_info[0].f_locals["self"] # not completly sure whether this is necessary # see note in Python docs about stack frames del stack function(self, *args, **kwargs) return call_wrapper