import collections
import functools
import re
import socket
import string
import inspect
from threading import Timer, RLock
[docs]def lchop(string, prefix):
"""Removes a prefix from string
:param string: String, possibly prefixed with prefix
:param prefix: Prefix to remove from string
:returns: string without the prefix
"""
if string.startswith(prefix):
return string[len(prefix):]
return string
[docs]def popwhile(predicate, iterable):
"""Generator function yielding items of iterable while predicate holds for each item
:param predicate: function taking an item returning bool
:param iterable: iterable
:returns: iterable (generator function)
"""
while iterable:
item = iterable.pop()
if predicate(item):
yield item
else:
break
[docs]def partition(iterable, limit, key=lambda x: x):
def pop_partition():
sum = 0.0
while sum < limit and iterable:
sum += key(iterable[-1])
yield iterable.pop()
partitions = []
iterable.sort(reverse=True)
while iterable:
partitions.append(list(pop_partition()))
return partitions
[docs]def round_dict(dic, places):
"""
Rounds all values in a dict containing only numeric types to `places` decimal places.
If places is None, round to INT.
"""
if places is None:
for key, value in dic.items():
dic[key] = round(value)
else:
for key, value in dic.items():
dic[key] = round(value, places)
[docs]class ModuleList(collections.UserList):
def __init__(self, status_handler, class_finder):
self.status_handler = status_handler
self.finder = class_finder
super().__init__()
[docs] def append(self, module, *args, **kwargs):
module = self.finder.instanciate_class_from_module(
module, *args, **kwargs)
module.registered(self.status_handler)
super().append(module)
return module
[docs] def get(self, find_id):
find_id = int(find_id)
for module in self:
if id(module) == find_id:
return module
[docs]class KeyConstraintDict(collections.UserDict):
"""
A dict implementation with sets of valid and required keys
:param valid_keys: Set of valid keys
:param required_keys: Set of required keys, must be a subset of valid_keys
"""
[docs] class MissingKeys(Exception):
def __init__(self, keys):
self.keys = keys
def __init__(self, valid_keys, required_keys):
super().__init__()
self.valid_keys = valid_keys
self.required_keys = set(required_keys)
self.seen_keys = set()
def __setitem__(self, key, value):
"""Trying to add an invalid key will raise KeyError
"""
if key in self.valid_keys:
self.seen_keys.add(key)
self.data[key] = value
else:
raise KeyError(key)
def __delitem__(self, key):
self.seen_keys.remove(key)
del self.data[key]
def __iter__(self):
"""Iteration will raise a MissingKeys exception unless all required keys are set
"""
if self.missing():
raise self.MissingKeys(self.missing())
return self.data.__iter__()
[docs] def missing(self):
"""Returns a set of keys that are required but not set
"""
return self.required_keys - (self.seen_keys & self.required_keys)
[docs]def convert_position(pos, json):
if pos < 0:
pos = len(json) + (pos + 1)
return pos
[docs]def flatten(l):
"""
Flattens a hierarchy of nested lists into a single list containing all elements in order
:param l: list of arbitrary types and lists
:returns: list of arbitrary types
"""
l = list(l)
i = 0
while i < len(l):
while isinstance(l[i], list):
if not l[i]:
l.pop(i)
i -= 1
break
else:
l[i:i + 1] = l[i]
i += 1
return l
[docs]class TimeWrapper:
"""
A wrapper that implements __format__ and __bool__ for time differences and time spans.
:param seconds: seconds (numeric)
:param default_format: the default format to be used if no explicit format_spec is passed to __format__
Format string syntax:
* %h, %m and %s are the hours, minutes and seconds without leading zeros (i.e. 0 to 59 for minutes and seconds)
* %H, %M and %S are padded with a leading zero to two digits, i.e. 00 to 59
* %l and %L produce hours non-padded and padded but only if hours is not zero. If the hours are zero it produces an empty string.
* %% produces a literal %
* %E (only valid on beginning of the string) if the time is null, don't format anything but rather produce an empty string. If the time is non-null it is removed from the string.
The formatted string is stripped, i.e. spaces on both ends of the result are removed
"""
[docs] class TimeTemplate(string.Template):
delimiter = "%"
idpattern = r"[a-zA-Z]"
def __init__(self, seconds, default_format="%m:%S"):
self.seconds = int(seconds)
self.default_format = default_format
def __bool__(self):
""":returns: `bool(seconds)`, i.e. False if seconds == 0 and True otherwise
"""
return bool(self.seconds)
def __format__(self, format_spec):
"""Formats the time span given the format_spec (or the default_format).
"""
format_spec = format_spec or self.default_format
h = self.seconds // 3600
m, s = divmod(self.seconds % 3600, 60)
l = h if h else ""
L = "%02d" % h if h else ""
if format_spec.startswith("%E"):
format_spec = format_spec[2:]
if not self.seconds:
return ""
return self.TimeTemplate(format_spec).substitute(
h=h, m=m, s=s,
H="%02d" % h, M="%02d" % m, S="%02d" % s,
l=l, L=L,
).strip()
[docs]def require(predicate):
"""Decorator factory for methods requiring a predicate. If the
predicate is not fulfilled during a method call, the method call
is skipped and None is returned.
:param predicate: A callable returning a truth value
:returns: Method decorator
.. seealso::
:py:class:`internet`
"""
def decorator(method):
@functools.wraps(method)
def wrapper(*args, **kwargs):
if predicate():
return method(*args, **kwargs)
return None
return wrapper
return decorator
[docs]class internet:
"""
Checks for internet connection by connecting to a server.
Used server is determined by the `address` class variable which consists of
server host name and port number.
:rtype: bool
.. seealso::
:py:func:`require`
"""
address = ("google-public-dns-a.google.com", 53)
def __new__(cls):
try:
socket.create_connection(cls.address, 1).close()
return True
except OSError:
return False
[docs]def make_graph(values, lower_limit=0.0, upper_limit=100.0, style="blocks"):
"""
Draws a graph made of unicode characters.
:param values: An array of values to graph.
:param lower_limit: Minimum value for the y axis (or None for dynamic).
:param upper_limit: Maximum value for the y axis (or None for dynamic).
:param style: Drawing style ('blocks', 'braille-fill', 'braille-peak', or 'braille-snake').
:returns: Bar as a string
"""
values = [float(n) for n in values]
mn, mx = min(values), max(values)
mn = mn if lower_limit is None else min(mn, float(lower_limit))
mx = mx if upper_limit is None else max(mx, float(upper_limit))
extent = mx - mn
if style == 'blocks':
bar = '_▁▂▃▄▅▆▇█'
bar_count = len(bar) - 1
if extent == 0:
graph = '_' * len(values)
else:
graph = ''.join(bar[int((n - mn) / extent * bar_count)] for n in values)
elif style in ['braille-fill', 'braille-peak', 'braille-snake']:
# idea from https://github.com/asciimoo/drawille
# unicode values from http://en.wikipedia.org/wiki/Braille
vpad = values if len(values) % 2 == 0 else values + [mn]
vscale = [round(4 * (vp - mn) / extent) for vp in vpad]
l = len(vscale) // 2
# do the 2-character collapse separately for clarity
if 'fill' in style:
vbits = [[0, 0x40, 0x44, 0x46, 0x47][vs] for vs in vscale]
elif 'peak' in style:
vbits = [[0, 0x40, 0x04, 0x02, 0x01][vs] for vs in vscale]
else:
assert('snake' in style)
# there are a few choices for what to put last in vb2.
# arguable vscale[-1] from the _previous_ call is best.
vb2 = [vscale[0]] + vscale + [0]
vbits = []
for i in range(1, l + 1):
c = 0
for j in range(min(vb2[i - 1], vb2[i], vb2[i + 1]), vb2[i] + 1):
c |= [0, 0x40, 0x04, 0x02, 0x01][j]
vbits.append(c)
# 2-character collapse
graph = ''
for i in range(0, l, 2):
b1 = vbits[i]
b2 = vbits[i + 1]
if b2 & 0x40:
b2 = b2 - 0x30
b2 = b2 << 3
graph += chr(0x2800 + b1 + b2)
else:
raise NotImplementedError("Graph drawing style '%s' unimplemented." % style)
return graph
[docs]def make_vertical_bar(percentage, width=1):
"""
Draws a vertical bar made of unicode characters.
:param value: A value between 0 and 100
:param width: How many characters wide the bar should be.
:returns: Bar as a String
"""
bar = ' _▁▂▃▄▅▆▇█'
percentage //= 10
if percentage < 0:
output = bar[0]
elif percentage >= len(bar):
output = bar[-1]
else:
output = bar[percentage]
return output * width
[docs]def make_bar(percentage):
"""
Draws a bar made of unicode box characters.
:param percentage: A value between 0 and 100
:returns: Bar as a string
"""
bars = [' ', '▏', '▎', '▍', '▌', '▋', '▋', '▊', '▊', '█']
tens = int(percentage / 10)
ones = int(percentage) - tens * 10
result = tens * '█'
if(ones >= 1):
result = result + bars[ones]
result = result + (10 - len(result)) * ' '
return result
[docs]def user_open(url_or_command):
"""Open the specified paramater in the web browser if a URL is detected,
othewrise pass the paramater to the shell as a subprocess. This function
is inteded to bu used in on_leftclick/on_rightclick callbacks.
:param url_or_command: String containing URL or command
"""
from urllib.parse import urlparse
scheme = urlparse(url_or_command).scheme
if scheme == 'http' or scheme == 'https':
import webbrowser
import os
# webbrowser.open() sometimes prints a message for some reason and confuses i3
# Redirect stdout briefly to prevent this from happening.
savout = os.dup(1)
os.close(1)
os.open(os.devnull, os.O_RDWR)
try:
webbrowser.open(url_or_command)
finally:
os.dup2(savout, 1)
else:
import subprocess
subprocess.Popen(url_or_command, shell=True)
[docs]class MultiClickHandler(object):
def __init__(self, callback_handler, timeout):
self.callback_handler = callback_handler
self.timeout = timeout
self.lock = RLock()
self._timer_id = 0
self.timer = None
self.button = None
self.cb = None
[docs] def set_timer(self, button, cb):
with self.lock:
self.clear_timer()
self.timer = Timer(self.timeout,
self._timer_function,
args=[self._timer_id])
self.button = button
self.cb = cb
self.timer.start()
[docs] def clear_timer(self):
with self.lock:
if self.timer is None:
return
self._timer_id += 1 # Invalidate existent timer
self.timer.cancel() # Cancel the existent timer
self.timer = None
self.button = None
self.cb = None
def _timer_function(self, timer_id):
with self.lock:
if self._timer_id != timer_id:
return
self.callback_handler(self.button, self.cb)
self.clear_timer()
[docs] def check_double(self, button):
if self.timer is None:
return False
ret = True
if button != self.button:
self.callback_handler(self.button, self.cb)
ret = False
self.clear_timer()
return ret
[docs]def get_module(function):
"""Function decorator for retrieving the ``self`` argument from the stack.
Intended for use with callbacks that need access to a modules variables, for example:
.. code:: python
from i3pystatus import Status, get_module
from i3pystatus.core.command import execute
status = Status(...)
# other modules etc.
@get_module
def display_ip_verbose(module):
execute('sh -c "ip addr show dev {dev} | xmessage -file -"'.format(dev=module.interface))
status.register("network", interface="wlan1", on_leftclick=display_ip_verbose)
"""
@functools.wraps(function)
def call_wrapper(*args, **kwargs):
stack = inspect.stack()
caller_frame_info = stack[1]
self = caller_frame_info[0].f_locals["self"]
# not completly sure whether this is necessary
# see note in Python docs about stack frames
del stack
function(self, *args, **kwargs)
return call_wrapper