import html
import inspect
import traceback
from i3pystatus.core.settings import SettingsBase
from i3pystatus.core.threading import Manager
from i3pystatus.core.util import (convert_position,
MultiClickHandler)
from i3pystatus.core.command import execute
[docs]def is_method_of(method, object):
"""Decide whether ``method`` is contained within the MRO of ``object``."""
if not callable(method) or not hasattr(method, "__name__"):
return False
if inspect.ismethod(method):
return method.__self__ is object
for cls in inspect.getmro(object.__class__):
if cls.__dict__.get(method.__name__, None) is method:
return True
return False
[docs]class Module(SettingsBase):
position = 0
settings = (
('on_leftclick', "Callback called on left click (see :ref:`callbacks`)"),
('on_middleclick', "Callback called on middle click (see :ref:`callbacks`)"),
('on_rightclick', "Callback called on right click (see :ref:`callbacks`)"),
('on_upscroll', "Callback called on scrolling up (see :ref:`callbacks`)"),
('on_downscroll', "Callback called on scrolling down (see :ref:`callbacks`)"),
('on_doubleleftclick', "Callback called on double left click (see :ref:`callbacks`)"),
('on_doubleleftclick', "Callback called on double left click (see :ref:`callbacks`)"),
('on_doublemiddleclick', "Callback called on double middle click (see :ref:`callbacks`)"),
('on_doublerightclick', "Callback called on double right click (see :ref:`callbacks`)"),
('on_doubleupscroll', "Callback called on double scroll up (see :ref:`callbacks`)"),
('on_doubledownscroll', "Callback called on double scroll down (see :ref:`callbacks`)"),
('on_otherclick', "Callback called on other click (see :ref:`callbacks`)"),
('on_doubleotherclick', "Callback called on double other click (see :ref:`callbacks`)"),
('on_change', "Callback called when output is changed (see :ref:`callbacks`)"),
('multi_click_timeout', "Time (in seconds) before a single click is executed."),
('hints', "Additional output blocks for module output (see :ref:`hints`)"),
)
on_leftclick = None
on_middleclick = None
on_rightclick = None
on_upscroll = None
on_downscroll = None
on_doubleleftclick = None
on_doublemiddleclick = None
on_doublerightclick = None
on_doubleupscroll = None
on_doubledownscroll = None
on_otherclick = None
on_change = None
on_doubleotherclick = None
multi_click_timeout = 0.25
hints = {"markup": "none"}
def __init__(self, *args, **kwargs):
self._output = None
super(Module, self).__init__(*args, **kwargs)
self.__multi_click = MultiClickHandler(self.__button_callback_handler,
self.multi_click_timeout)
@property
def output(self):
return self._output
@output.setter
def output(self, value):
self._output = value
if self.on_change:
self.on_change()
[docs] def registered(self, status_handler):
"""Called when this module is registered with a status handler"""
self.__status_handler = status_handler
[docs] def inject(self, json):
if self.output:
if "name" not in self.output:
self.output["name"] = self.__name__
self.output["instance"] = str(id(self))
if (self.output.get("color", "") or "").lower() == "#ffffff":
del self.output["color"]
if self.hints:
for key, val in self.hints.items():
if key not in self.output:
self.output.update({key: val})
if self.output.get("markup") == "pango":
self.text_to_pango()
json.insert(convert_position(self.position, json), self.output)
[docs] def send_output(self):
"""Send a status update with the current module output"""
self.__status_handler.io.async_refresh()
def __log_button_event(self, button, cb, args, action, **kwargs):
msg = "{}: button={}, cb='{}', args={}, kwargs={}, type='{}'".format(
self.__name__, button, cb, args, kwargs, action)
self.logger.debug(msg)
def __button_callback_handler(self, button, cb, **kwargs):
def call_callback(cb, *args, **kwargs):
# Recover the function if wrapped (with get_module for example)
wrapped_cb = getattr(cb, "__wrapped__", None)
if wrapped_cb:
locals()["self"] = self # Add self to the local stack frame
tmp_cb = wrapped_cb
else:
tmp_cb = cb
try:
args_spec = inspect.getfullargspec(tmp_cb)
except Exception:
args_spec = inspect.FullArgSpec(
[], None, None, None, None, None, {})
# Remove all variables present in kwargs that are not used in the
# callback, except if there is a keyword argument.
if not args_spec.varkw:
kwargs = {k: v for k, v in kwargs.items()
if k in args_spec.args}
cb(*args, **kwargs)
if not cb:
self.__log_button_event(button, None, None,
"No callback attached", **kwargs)
return False
if isinstance(cb, list):
cb, args = (cb[0], cb[1:])
else:
args = []
try:
our_method = is_method_of(cb, self)
if callable(cb) and not our_method:
self.__log_button_event(button, cb, args,
"Python callback", **kwargs)
call_callback(cb, *args, **kwargs)
elif our_method:
self.__log_button_event(button, cb, args,
"Method callback", **kwargs)
call_callback(cb, self, *args, **kwargs)
elif hasattr(self, cb):
if cb != "run":
# CommandEndpoint already calls run() after every
# callback to instantly update any changed state due
# to the callback's actions.
self.__log_button_event(button, cb, args,
"Member callback", **kwargs)
call_callback(getattr(self, cb), *args, **kwargs)
else:
self.__log_button_event(button, cb, args,
"External command", **kwargs)
if hasattr(self, "data"):
kwargs.update(self.data)
args = [str(arg).format(**kwargs) for arg in args]
cb = cb.format(**kwargs)
execute(cb + " " + " ".join(args), detach=True)
except Exception as e:
self.logger.critical("Exception while processing button "
"callback: {!r}".format(e))
self.logger.critical(traceback.format_exc())
# Notify status handler
try:
self.__status_handler.io.async_refresh()
except:
pass
[docs] def on_click(self, button, **kwargs):
"""
Maps a click event with its associated callback.
Currently implemented events are:
============ ================ =========
Event Callback setting Button ID
============ ================ =========
Left click on_leftclick 1
Middle click on_middleclick 2
Right click on_rightclick 3
Scroll up on_upscroll 4
Scroll down on_downscroll 5
Others on_otherclick > 5
============ ================ =========
The action is determined by the nature (type and value) of the callback
setting in the following order:
1. If null callback (``None``), no action is taken.
2. If it's a `python function`, call it and pass any additional
arguments.
3. If it's name of a `member method` of current module (string), call
it and pass any additional arguments.
4. If the name does not match with `member method` name execute program
with such name.
.. seealso:: :ref:`callbacks` for more information about
callback settings and examples.
:param button: The ID of button event received from i3bar.
:param kwargs: Further information received from i3bar like the
positions of the mouse where the click occurred.
:return: Returns ``True`` if a valid callback action was executed.
``False`` otherwise.
"""
actions = ['leftclick', 'middleclick', 'rightclick',
'upscroll', 'downscroll']
try:
action = actions[button - 1]
except (TypeError, IndexError):
self.__log_button_event(button, None, None, "Other button")
action = "otherclick"
m_click = self.__multi_click
with m_click.lock:
double = m_click.check_double(button)
double_action = 'double%s' % action
if double:
action = double_action
# Get callback function
cb = getattr(self, 'on_%s' % action, None)
double_handler = getattr(self, 'on_%s' % double_action, None)
delay_execution = (not double and double_handler)
if delay_execution:
m_click.set_timer(button, cb, **kwargs)
else:
self.__button_callback_handler(button, cb, **kwargs)
[docs] def move(self, position):
self.position = position
return self
[docs] def text_to_pango(self):
"""
Replaces all ampersands in `full_text` and `short_text` attributes of
`self.output` with `&`.
It is called internally when pango markup is used.
Can be called multiple times (`&` won't change to `&`).
"""
def replace(text):
components = text.split("&")
out = components[0]
for item in components[1:]:
if item.startswith("amp;") \
or (not item.startswith("amp;")
and html.unescape(f'&{item}') != f'&{item}'):
out += "&" + item
else:
out += "&" + item
return out
if "full_text" in self.output.keys():
self.output["full_text"] = replace(self.output["full_text"])
if "short_text" in self.output.keys():
self.output["short_text"] = replace(self.output["short_text"])
[docs]class IntervalModule(Module):
settings = (
("interval", "interval in seconds between module updates"),
)
interval = 5 # seconds
managers = {}
[docs] def registered(self, status_handler):
super(IntervalModule, self).registered(status_handler)
if self.interval in IntervalModule.managers:
IntervalModule.managers[self.interval].append(self)
else:
am = Manager(self.interval)
am.append(self)
IntervalModule.managers[self.interval] = am
am.start()
def __call__(self):
self.run()
[docs] def run(self):
"""Called approximately every self.interval seconds
Do not rely on this being called from the same thread at all times.
If you need to always have the same thread context, subclass AsyncModule."""