Source code for i3pystatus.taskwarrior

from i3pystatus import IntervalModule
from json import loads
import subprocess


[docs]class Taskwarrior(IntervalModule): """ Check Taskwarrior for pending tasks Requires `json` .. rubric:: Available formatters (uses :ref:`formatp`) * `{ready}` — contains number of tasks returned by `ready_filter` * `{urgent}` — contains number of tasks returned by `urgent_filter` * `{next}` — contains the description of next task * `{project}` — contains the projects the next task belongs to .. rubric:: Available callbacks * ``get_next_task`` — Display the next most urgent task. * ``get_prev_task`` — Display the previous most urgent task. * ``reset_next_task`` — Display the most urgent task, resetting any \ switching by other callbacks. """ format = 'Task: {next}' ready_filter = '+READY' urgent_filter = '+TODAY' enable_mark_done = False color_urgent = '#FF0000' color_ready = '#78EAF2' ready_tasks = [] urgent_tasks = [] current_tasks = [] next_id = 0 next_task = None on_upscroll = "get_prev_task" on_downscroll = "get_next_task" on_rightclick = 'mark_task_as_done' on_leftclick = "reset_next_task" settings = ( ('format', 'format string'), ('ready_filter', 'Filters to get ready tasks example: `+READY`'), ('urgent_filter', 'Filters to get urgent tasks example: `+TODAY`'), ('enable_mark_done', 'Enable right click mark task as done'), ('color_urgent', '#FF0000'), ('color_ready', '#78EAF2') ) def reset_next_task(self): self.next_id = 0 self.next_task = self.current_tasks[self.next_id] def get_next_task(self): self.next_id = (self.next_id + 1) % len(self.current_tasks) self.next_task = self.current_tasks[self.next_id] def get_prev_task(self): self.next_id = (self.next_id - 1) % len(self.current_tasks) self.next_task = self.current_tasks[self.next_id] def mark_task_as_done(self): if self.enable_mark_done and self.next_task is not None: subprocess.check_output(['task', str(self.next_task['id']), 'done']) self.get_next_task() def run(self): try: urgent_params = ['task'] + self.urgent_filter.split(' ') + ['export'] urgent_tasks_json = subprocess.check_output(urgent_params) self.urgent_tasks = loads(urgent_tasks_json.decode("utf-8")) self.urgent_tasks = sorted(self.urgent_tasks, key=lambda x: x['urgency'], reverse=True) ready_params = ['task'] + self.ready_filter.split(' ') + ['export'] ready_tasks = subprocess.check_output(ready_params) self.ready_tasks = loads(ready_tasks.decode("utf-8")) self.ready_tasks = sorted(self.ready_tasks, key=lambda x: x['urgency'], reverse=True) self.current_tasks = self.urgent_tasks if len(self.urgent_tasks) > 0 else self.ready_tasks if self.next_id < len(self.current_tasks): self.next_task = self.current_tasks[self.next_id] else: self.next_id = 0 except ValueError: self.logger.exception('Decoding JSON has failed') raise format_values = dict(urgent=len(self.urgent_tasks), ready=len(self.ready_tasks), next='') if self.next_task is not None: format_values['next'] = self.next_task['description'] format_values['project'] = self.next_task.get('project', '') self.output = { 'full_text': self.format.format(**format_values), 'color': self.color_urgent if len(self.urgent_tasks) > 0 else self.color_ready }