Source code for i3pystatus.scores

import copy
import json
import operator
import pytz
import re
import threading
import time
from datetime import datetime, timedelta
from urllib.request import urlopen
from urllib.error import HTTPError, URLError

from i3pystatus import SettingsBase, Module, formatp
from i3pystatus.core.util import user_open, internet, require


class ScoresBackend(SettingsBase):
    settings = ()
    favorite_teams = []
    all_games = True
    date = None
    games = {}
    scroll_order = []
    last_update = 0

    def init(self):
        # Merge the passed team colors with the global ones. A simple length
        # check is sufficient here because i3pystatus.scores.Scores instance
        # will already have checked to see if any invalid teams were specified
        # in team_colors.
        if len(self.team_colors) != len(self._default_colors):
            self.logger.debug(
                f'Overriding {self.name} team colors '
                f'with: {self.team_colors}'
            )
            new_colors = copy.copy(self._default_colors)
            new_colors.update(self.team_colors)
            self.team_colors = new_colors
        self.logger.debug(f'{self.name} team colors: {self.team_colors}')

    def api_request(self, url):
        self.logger.debug(f'Making {self.name} API request to {url}')
        try:
            with urlopen(url) as content:
                try:
                    if content.url != url:
                        self.logger.debug(
                            f'Request to {url} was redirected to {content.url}'
                        )
                    content_type = dict(content.getheaders())['Content-Type']
                    charset = re.search(r'charset=(.*)', content_type).group(1)
                except AttributeError:
                    charset = 'utf-8'
                response_json = content.read().decode(charset).strip()
                if not response_json:
                    self.logger.debug(f'JSON response from {url} was blank')
                    return {}
                try:
                    response = json.loads(response_json)
                except json.decoder.JSONDecodeError as exc:
                    self.logger.exception(f'Error encountered while loading JSON')
                    self.logger.debug(f'Text that failed to load: {response_json}')
                    return {}
                self.logger.log(5, f'API response: {response}')
                return response
        except HTTPError as exc:
            self.logger.critical(
                f'Error {exc.code} ({exc.reason}) making request to {exc.url}'
            )
            return {}
        except (ConnectionResetError, URLError) as exc:
            self.logger.critical(f'Error making request to {url}: {exc}')
            return {}

    @property
    def name(self):
        '''
        Return the backend name
        '''
        return self.__class__.__name__

    def get_api_date(self):
        '''
        Figure out the date to use for API requests. Assumes yesterday's date
        if between midnight and 10am Eastern time. Override this function in a
        subclass to change how the API date is calculated.
        '''
        # NOTE: If you are writing your own function to get the date, make sure
        # to include the first if block below to allow for the ``date``
        # parameter to hard-code a date.
        api_date = None
        if self.date is not None and not isinstance(self.date, datetime):
            try:
                api_date = datetime.strptime(self.date, '%Y-%m-%d')
            except (TypeError, ValueError):
                self.logger.warning(f"Invalid date '{self.date}'")

        if api_date is None:
            utc_time = pytz.utc.localize(datetime.utcnow())
            eastern = pytz.timezone('US/Eastern')
            api_date = eastern.normalize(utc_time.astimezone(eastern))
            if api_date.hour < 10:
                # The scores on NHL.com change at 10am Eastern, if it's before
                # that time of day then we will use yesterday's date.
                api_date -= timedelta(days=1)
        self.date = api_date

    @staticmethod
    def add_ordinal(number):
        try:
            number = int(number)
        except ValueError:
            return number
        if 4 <= number <= 20:
            suffix = 'th'
        else:
            ord_map = {1: 'st', 2: 'nd', 3: 'rd'}
            suffix = ord_map.get(number % 10, 'th')
        return f'{number}{suffix}'

    @staticmethod
    def zero_fallback(value):
        try:
            int(value)
        except (TypeError, ValueError):
            return '0'
        else:
            return str(value)

    def get_nested(self, data, expr, callback=None, default=''):
        if callback is None:
            def callback(x):
                return x
        try:
            for key in expr.split(':'):
                if key.isdigit() and isinstance(data, list):
                    key = int(key)
                data = data[key]
        except (KeyError, IndexError, TypeError):
            self.logger.debug(
                f'No {self.name} data found at {expr}, '
                f'falling back to {repr(default)}'
            )
            return default
        return callback(data)

    def interpret_api_return(self, data, team_game_map):
        favorite_games = []
        # Cycle through the followed teams to ensure that games show up in the
        # order of teams being followed.
        for team in self.favorite_teams:
            for id_ in team_game_map.get(team, []):
                if id_ not in favorite_games:
                    favorite_games.append(id_)

        # If all games are being tracked, add any games not from
        # explicitly-followed teams.
        if self.all_games:
            additional_games = [x for x in data if x not in favorite_games]
        else:
            additional_games = []

        # Process the API return data for each tracked game
        self.games = {}
        for game_id in favorite_games + additional_games:
            self.games[game_id] = self.process_game(data[game_id])

        # Favorite games come first
        self.scroll_order = [self.games[x]['id'] for x in favorite_games]

        # For any remaining games being tracked, sort each group by start time
        # and add them to the list
        for status in self.display_order:
            time_map = {
                x: self.games[x]['start_time'] for x in self.games
                if x not in favorite_games and self.games[x]['status'] == status
            }
            sorted_games = sorted(time_map.items(), key=operator.itemgetter(1))
            self.scroll_order.extend([x[0] for x in sorted_games])

        # Reverse map so that we can know the scroll position for a given game
        # by just its ID. This will help us to place the game in its new order
        # when that order changes due to the game changing from one status to
        # another.
        self.scroll_order_revmap = {y: x for x, y in enumerate(self.scroll_order)}


[docs]class Scores(Module): ''' This is a generic score checker, which must use at least one configured :ref:`score backend <scorebackends>`. Followed games can be scrolled through with the mouse/trackpad. Left-clicking on the module will refresh the scores, while right-clicking it will cycle through the configured backends. Double-clicking the module with the left button will launch the league-specific (MLB Gameday / NHL GameCenter / etc.) URL for the game. If there is not an active game, double-clicking will launch the league-specific scoreboard URL containing all games for the current day. Double-clicking with the right button will reset the current backend to the first game in the scroll list. This is useful for quickly switching back to a followed team's game after looking at other game scores. Scores for the previous day's games will be shown until 10am Eastern Time (US), after which time the current day's games will be shown. .. rubric:: Available formatters Formatters are set in the backend instances, see the :ref:`scorebackends` for more information. This module supports the :ref:`formatp <formatp>` extended string format syntax. This allows for values to be hidden when they evaluate as False (e.g. when a formatter is blank (an empty string). The default values for the format strings set in the :ref:`score backends <scorebackends>` (``format_pregame``, ``format_in_progress``, etc.) make heavy use of formatp, hiding many formatters when they are blank. .. rubric:: Usage example .. code-block:: python from i3pystatus import Status from i3pystatus.scores import mlb, nhl, nba status = Status() status.register( 'scores', hints={'markup': 'pango'}, colorize_teams=True, favorite_icon='<span size="small" color="#F5FF00">★</span>', team_format='abbreviation', backends=[ mlb.MLB( teams=['CWS', 'SF'], team_format='name', format_no_games='No games today :(', inning_top='⬆', inning_bottom='⬇', ), nhl.NHL(teams=['CHI']), nba.NBA( teams=['GSW'], all_games=False, ), ], ) status.run() To enable colorized team name/city/abbbreviation, ``colorize_teams`` must be set to ``True``. This also requires that i3bar is configured to use Pango, and that the :ref:`hints <hints>` param is set for the module and includes a ``markup`` key, as in the example above. To ensure that i3bar is configured to use Pango, the `font param`__ in your i3 config file must start with ``pango:``. .. __: http://i3wm.org/docs/userguide.html#fonts .. _scores-game-order: If a ``teams`` param is not specified for the backend, then all games for the current day will be tracked, and will be ordered by the start time of the game. Otherwise, only games from explicitly-followed teams will be tracked, and will be in the same order as listed. If ``ALL`` is part of the list, then games from followed teams will be first in the scroll list, followed by all remaining games in order of start time. Therefore, in the above example, only White Sox and Giants games would be tracked, while in the below example all games would be tracked, with White Sox and Giants games appearing first in the scroll list and the remaining games appearing after them, in order of start time. .. code-block:: python from i3pystatus import Status from i3pystatus.scores import mlb status = Status() status.register( 'scores', hints={'markup': 'pango'}, colorize_teams=True, favorite_icon='<span size="small" color="#F5FF00">★</span>', backends=[ mlb.MLB( teams=['CWS', 'SF', 'ALL'], team_colors={ 'NYM': '#1D78CA', }, ), ], ) status.run() .. rubric:: Troubleshooting If the module gets stuck during an update (i.e. the ``refresh_icon`` does not go away), then the update thread probably encountered a traceback. This traceback will (by default) be logged to ``~/.i3pystatus-<pid>`` where ``<pid>`` is the PID of the thread. However, it may be more convenient to manually set the logfile to make the location of the log data reliable and avoid clutter in your home directory. For example: .. code-block:: python import logging from i3pystatus import Status from i3pystatus.scores import mlb, nhl status = Status( logfile='/home/username/var/i3pystatus.log', ) status.register( 'scores', log_level=logging.DEBUG, backends=[ mlb.MLB( teams=['CWS', 'SF'], log_level=logging.DEBUG, ), nhl.NHL( teams=['CHI'], log_level=logging.DEBUG, ), nba.NBA( teams=['CHI'], log_level=logging.DEBUG, ), ], ) status.run() .. note:: The ``log_level`` must be set separately in both the module and the backend instances (as shown above), otherwise the backends will still use the default log level. ''' interval = 300 settings = ( ('backends', 'List of backend instances'), ('interval', 'Update interval (in seconds)'), ('favorite_icon', 'Value for the ``{away_favorite}`` and ' '``{home_favorite}`` formatter when the displayed game ' 'is being played by a followed team'), ('color', 'Color to be used for non-colorized text (defaults to the ' 'i3bar color)'), ('color_no_games', 'Color to use when no games are scheduled for the ' 'currently-displayed backend (defaults to the ' 'i3bar color)'), ('colorize_teams', 'Dislay team city, name, and abbreviation in the ' 'team\'s color (as defined in the ' ':ref:`backend <scorebackends>`\'s ``team_colors`` ' 'attribute)'), ('scroll_arrow', 'Value used for the ``{scroll}`` formatter to ' 'indicate that more than one game is being tracked ' 'for the currently-displayed backend'), ('refresh_icon', 'Text to display (in addition to any text currently ' 'shown by the module) when refreshing scores. ' '**NOTE:** Depending on how quickly the update is ' 'performed, the icon may not be displayed.'), ('team_format', 'One of ``name``, ``abbreviation``, or ``city``'), ) backends = [] favorite_icon = '★' color = None color_no_games = None colorize_teams = False scroll_arrow = '⬍' refresh_icon = '⟳' team_format = 'name' output = {'full_text': ''} game_map = {} backend_id = 0 on_upscroll = ['scroll_game', 1] on_downscroll = ['scroll_game', -1] on_leftclick = ['check_scores', 'click event'] on_rightclick = ['cycle_backend', 1] on_doubleleftclick = ['launch_web'] on_doublerightclick = ['reset_backend'] def init(self): if not isinstance(self.backends, list): self.backends = [self.backends] if not self.backends: raise ValueError('At least one backend is required') # Initialize each backend's game index for index in range(len(self.backends)): self.game_map[index] = None for backend in self.backends: if hasattr(backend, '_valid_teams'): for index in range(len(backend.favorite_teams)): # Force team abbreviation to uppercase team_uc = str(backend.favorite_teams[index]).upper() # Check to make sure the team abbreviation is valid if team_uc not in backend._valid_teams: raise ValueError( f'Invalid {backend.name} team ' f"'{backend.favorite_teams[index]}'" ) backend.favorite_teams[index] = team_uc for index in range(len(backend.display_order)): order_lc = str(backend.display_order[index]).lower() # Check to make sure the display order item is valid if order_lc not in backend._valid_display_order: raise ValueError( f'Invalid {backend.name} display_order ' f"'{backend.display_order[index]}'" ) backend.display_order[index] = order_lc if backend.team_format is None: backend.team_format = self.team_format self.condition = threading.Condition() self.thread = threading.Thread(target=self.update_thread, daemon=True) self.thread.start() def update_thread(self): try: self.check_scores(force='scheduled') while True: with self.condition: self.condition.wait(self.interval) self.check_scores(force='scheduled') except Exception: thread = threading.current_thread().name, timestamp = time.strftime('%c') self.logger.exception( f'Exception in {thread} at {timestamp}, module {self.name}' ) @property def current_backend(self): return self.backends[self.backend_id] @property def current_scroll_index(self): return self.game_map[self.backend_id] @property def current_game_id(self): try: return self.current_backend.scroll_order[self.current_scroll_index] except (AttributeError, TypeError): return None @property def current_game(self): try: return self.current_backend.games[self.current_game_id] except KeyError: return None def scroll_game(self, step=1): cur_index = self.current_scroll_index if cur_index is None: self.logger.debug( f'Cannot scroll, no tracked {self.current_backend.name} games ' f'for {self.current_backend.date:%Y-%m-%d}' ) else: new_index = (cur_index + step) % len(self.current_backend.scroll_order) if new_index != cur_index: cur_id = self.current_game_id # Don't reference self.current_scroll_index here, we're setting # a new value for the data point for which # self.current_scroll_index serves as a shorthand. self.game_map[self.backend_id] = new_index self.logger.debug( f'Scrolled from {self.current_backend.name} ' f'game {cur_index} (ID: {cur_id}) to {new_index} ' f'(ID: {self.current_backend.scroll_order[new_index]})' ) self.refresh_display() else: self.logger.debug( f'Cannot scroll, only one tracked ' f'{self.current_backend.name} game ' f'(ID: {self.current_game_id}) for ' f'{self.current_backend.date:%Y-%m-%d}' ) def cycle_backend(self, step=1): if len(self.backends) < 2: self.logger.debug( 'Only one backend (%s) configured, backend cannot be changed', self.current_backend.__class__.__name__, ) return old = self.backend_id # Set the new backend self.backend_id = (self.backend_id + step) % len(self.backends) self.logger.debug( f'Changed scores backend from {self.backends[old].name} to ' f'{self.current_backend.name}' ) # Display the score for the new backend. This gets rid of lag between # when the mouse is clicked and when the new backend is shown, caused # by any network latency encountered when updating scores. self.refresh_display() # Update scores (if necessary) and display them self.check_scores() def reset_backend(self): if self.current_backend.games: self.game_map[self.backend_id] = 0 self.logger.debug( f'Resetting to first game in {self.current_backend.name} ' f'scroll list (ID: {self.current_game_id})' ) self.refresh_display() else: self.logger.debug( f'No {self.current_backend.name} games, cannot reset to first ' 'game in scroll list', ) def launch_web(self): game = self.current_game if game is None: live_url = self.current_backend.scoreboard_url else: live_url = game['live_url'] self.logger.debug(f'Launching {live_url} in browser') user_open(live_url) @require(internet) def check_scores(self, force=False): update_needed = False if not self.current_backend.last_update: update_needed = True self.logger.debug( f'Performing initial {self.current_backend.name} score check' ) elif force: update_needed = True self.logger.debug( f'{self.current_backend.name} score check triggered ({force})' ) else: update_diff = time.time() - self.current_backend.last_update msg = ( f'Seconds since last {self.current_backend.name} update ' f'({update_diff}) ' ) if update_diff >= self.interval: update_needed = True msg += ( f'meets or exceeds update interval ({self.interval}), ' 'update triggered' ) else: msg += ( f'does not exceed update interval ({self.interval}), ' 'update skipped' ) self.logger.debug(msg) if update_needed: self.show_refresh_icon() cur_id = self.current_game_id cur_games = self.current_backend.games.keys() self.current_backend.check_scores() for game in self.current_backend.games.values(): if game['status'] in ('pregame', 'postponed'): # Allow formatp to conditionally hide the score when game # hasn't started (or has been postponed) game['home_score'] = game['away_score'] = '' if cur_games == self.current_backend.games.keys(): # Set the index to the scroll position of the current game (it # may have changed due to this game or other games changing # status. if cur_id is None: self.logger.debug( f'No tracked {self.current_backend.name} games for ' f'{self.current_backend.date:%Y-%m-%d}' ) else: cur_pos = self.game_map[self.backend_id] new_pos = self.current_backend.scroll_order_revmap[cur_id] if cur_pos != new_pos: self.game_map[self.backend_id] = new_pos self.logger.debug( f'Scroll position for current ' f'{self.current_backend.name} game ' f'({cur_id}) updated from {cur_pos} to {new_pos}' ) else: self.logger.debug( f'Scroll position ({cur_pos}) for current ' f'{self.current_backend.name} game (ID: {cur_id}) ' 'unchanged' ) else: # Reset the index to 0 if there are any tracked games, # otherwise set it to None to signify no tracked games for the # backend. if self.current_backend.games: self.game_map[self.backend_id] = 0 self.logger.debug( f'Tracked {self.current_backend.name} games updated, ' f'setting scroll position to 0 ' f'(ID: {self.current_game_id})' ) else: self.game_map[self.backend_id] = None self.logger.debug( f'No tracked {self.current_backend.name} games for ' f'{self.current_backend.date:%Y-%m-%d}' ) self.current_backend.last_update = time.time() self.refresh_display() def show_refresh_icon(self): self.output['full_text'] = \ self.refresh_icon + self.output.get('full_text', '') def refresh_display(self): if self.current_scroll_index is None: output = self.current_backend.format_no_games color = self.color_no_games else: game = copy.copy(self.current_game) # Set the game_status using the formatter game_status_opt = f'status_{game["status"]}' try: game['game_status'] = formatp( str(getattr(self.current_backend, game_status_opt)), **game ) except AttributeError: self.logger.error( f'Unable to find {self.current_backend.name} option ' f'{game_status_opt}' ) game['game_status'] = 'Unknown Status' for team in ('home', 'away'): team_abbrev = game[f'{team}_abbreviation'] # Set favorite icon, if applicable game[f'{team}_favorite'] = self.favorite_icon \ if team_abbrev in self.current_backend.favorite_teams \ else '' try: game[f'{team}_team'] = game[f'{team}_{self.current_backend.team_format}'] except KeyError: self.logger.debug( f'Unable to find {self.current_backend.team_format} ' f'value, falling back to {team_abbrev}' ) game[f'{team}_team'] = team_abbrev if self.colorize_teams: try: color = self.current_backend.team_colors[team_abbrev] except KeyError: pass else: for val in ('team', 'name', 'city', 'abbreviation'): # Wrap in Pango markup game[f'{team}_{val}'] = ''.join(( f'<span color="{color}">', game[f'{team}_{val}'], '</span>', )) game['scroll'] = self.scroll_arrow \ if len(self.current_backend.games) > 1 \ else '' output = formatp(self.current_backend.format, **game).strip() self.output = {'full_text': output, 'color': self.color} def run(self): pass