Source code for i3pystatus.abc_radio

import logging
import shutil
import threading
import os
import xml.etree.ElementTree as etree
from datetime import datetime

import requests
import vlc
from dateutil import parser
from import tzutc
from i3pystatus import IntervalModule
from i3pystatus.core.desktop import DesktopNotification
from i3pystatus.core.util import internet, require

class State:
    PLAYING = 1
    PAUSED = 2
    STOPPED = 3

[docs]class ABCRadio(IntervalModule): """ Streams ABC Australia radio - Currently uses VLC to do the actual streaming. Requires the PyPI packages `python-vlc`, `python-dateutil` and `requests`. Also requires VLC - .. rubric:: Available formatters * `{station}` — Current station * `{title}` — Title of current show * `{url}` — Show's URL * `{remaining}` — Time left for current show * `{player_state}` — Unicode icons representing play, pause and stop """ settings = ( ("format", "format string for when the player is inactive"), ("format_playing", "format string for when the player is playing"), ("target_stations", "list of station ids to select from. Station ids can be obtained " "from the following XML - " "If the list is empty, all stations will be accessible."), ) format = "{station} {title} {player_state}" format_playing = "{station} {title} {remaining} {player_state}" on_leftclick = 'toggle_play' on_upscroll = ['cycle_stations', 1] on_downscroll = ['cycle_stations', -1] on_doubleleftclick = 'display_notification' interval = 1 # Destroy the player after this many seconds of inactivity PLAYER_LIFETIME = 5 # Do not suspend the player when i3bar is hidden. keep_alive = True show_info = {} player = None station_info = None station_id = None stations = None prev_title = None prev_station = None target_stations = [] end = None start = None destroy_timer = None cycle_lock = threading.Lock() player_icons = { State.PAUSED: "▷", State.PLAYING: "▶", State.STOPPED: "◾", } def init(self): self.station_info = ABCStationInfo() @require(internet) def run(self): if self.station_id is None: self.stations = self.station_info.get_stations() # Select the first station in the list self.cycle_stations(1) if self.end and self.end <= self.update_show_info() format_dict = self.show_info.copy() format_dict['player_state'] = self.get_player_state() format_dict['remaining'] = self.get_remaining() format_template = self.format_playing if self.player else self.format self.output = { "full_text": format_template.format(**format_dict) } def update_show_info(self): log.debug("Updating: show_info - %s" % self.show_info = dict.fromkeys( ('title', 'url', 'start', 'end', 'duration', 'stream', 'remaining', 'station', 'description', 'title', 'short_synopsis', 'url'), '') self.show_info.update(self.stations[self.station_id]) self.show_info.update(self.station_info.currently_playing(self.station_id)) # Show a notification when the show changes if the user is actively listening. should_show = self.prev_station == self.show_info['station'] and self.prev_title != self.show_info[ 'title'] and self.player if should_show: self.display_notification() self.prev_title = self.show_info['title'] self.prev_station = self.show_info['station'] self.end = self.show_info['end'] if self.show_info['end'] else None self.start = self.show_info['start'] if self.show_info['start'] else None def get_player_state(self): if self.player: return self.player_icons[self.player.player_state] else: return self.player_icons[State.STOPPED] def get_remaining(self): if self.end and self.end > return str(self.end -".")[0] return '' def cycle_stations(self, increment=1): with self.cycle_lock: target_array = self.target_stations if len(self.target_stations) > 0 else list(self.stations.keys()) if self.station_id in target_array: next_index = (target_array.index(self.station_id) + increment) % len(target_array) self.station_id = target_array[next_index] else: self.station_id = target_array[0] log.debug("Cycle to: {}".format(self.station_id)) if self.player: current_state = self.player.player_state self.player.stop() else: current_state = State.STOPPED self.update_show_info() if self.player: self.player.load_stream(self.show_info['stream']) self.player.set_state(current_state) def display_notification(self): if self.show_info: station, title, synopsis = self.show_info['station'], self.show_info['title'], self.show_info[ 'short_synopsis'] title = "{} - {}".format(station, title) def get_image(): image_link = self.show_info.get('image_link', None) if image_link: try: image_path = "/tmp/{}.icon".format(station) if not os.path.isfile(image_path): response = requests.get(image_link, stream=True) with open(image_path, 'wb') as out_file: shutil.copyfileobj(response.raw, out_file) return image_path except: pass DesktopNotification(title=title, body=synopsis, icon=get_image()).display()"Displayed notification") def toggle_play(self): if not self.player: self.init_player() if self.player.is_playing(): self.player.pause() self.destroy_timer = threading.Timer(self.PLAYER_LIFETIME, self.destroy) self.destroy_timer.start() else: if self.destroy_timer: self.destroy_timer.cancel() self.destroy_timer = None def init_player(self): if self.show_info: self.player = VLCPlayer()"Created player: {}".format(id(self.player))) if not self.player.stream_loaded():"Loading stream: {}".format(self.show_info['stream'])) self.player.load_stream(self.show_info['stream']) if not self.player.is_alive(): self.player.start() def destroy(self): log.debug("Destroying player: {}".format(id(self.player))) if self.player: self.player.destroy() self.player = None
class ABCStationInfo: PLAYING_URL = "{}/live.json?include=now" def currently_playing(self, station_id): station_info = self._get(self.PLAYING_URL.format(station_id)).json() try: return dict( title=station_info['now']['program']['title'], url=station_info['now']['primary_webpage']['url'], start=parser.parse(station_info['now']['live'][0]['start']), end=parser.parse(station_info['now']['live'][0]['end']), duration=station_info['now']['live'][0]['duration_seconds'], short_synopsis=station_info['now']['short_synopsis'], stream=sorted(station_info['now']['live'][0]['outlets'][0]['audio_streams'], key=lambda x: x['type'])[0]['url'] ) except (KeyError, IndexError): return {} def get_stations(self): stations = dict() station_xml = etree.fromstring(self._get('').content) for element in station_xml: attrib = element.attrib if attrib["showInAndroidApp"] == 'true': stations[attrib['id']] = dict( id=attrib['id'], station=attrib['name'], description=attrib.get('description', None), link=attrib.get('linkUrl', None), image_link=attrib.get('WEBimageUrl', None), stream=attrib.get('hlsStreamUrl', None), ) return stations def _get(self, url): result = requests.get(url=url) if result.status_code not in range(200, 300): result.raise_for_status() return result log = logging.getLogger(__name__) class VLCPlayer(threading.Thread): def __init__(self): threading.Thread.__init__(self) self.idle = threading.Event() self.die = threading.Event() self.instance = vlc.Instance() self.player_state = State.STOPPED self.player = self.instance.media_player_new() def run(self): states = { State.STOPPED: self.player.stop, State.PLAYING:, State.PAUSED: self.player.pause, } while not self.die.is_set(): self.idle.wait() states[self.player_state]() self.idle.clear() def load_stream(self, url): self.player.set_media(self.instance.media_new(url)) def stream_loaded(self): return self.player.get_media() is not None def play(self): self.set_state(State.PLAYING) def pause(self): self.set_state(State.PAUSED) def stop(self): self.set_state(State.STOPPED) def destroy(self): self.die.set() self.idle.set() self.player.stop() self.player.release() def set_state(self, state):"{} -> {}".format(self.player_state, state)) self.player_state = state self.idle.set() def is_playing(self): return self.player.is_playing()