Source code for i3pystatus.mail.imap

try:
    from imaplib2 import IMAP4, IMAP4_SSL
    use_idle = True
except:
    from imaplib import IMAP4, IMAP4_SSL
    use_idle = False
import socket
from threading import *

from i3pystatus.mail import Backend


# This is the threading object that does all the waiting on
# the event
class Idler(object):
    def __init__(self, conn, callback, callback_reconnect):
        self.thread = Thread(target=self.idle)
        self.M = conn
        self.callback_reconnect = callback_reconnect
        self.event = Event()
        self.callback = callback

    def start(self):
        self.thread.start()

    def stop(self):
        self.event.set()

    def join(self):
        self.thread.join()

    def idle(self):

        while True:
            if self.event.isSet():
                return

            self.needsync = False

            def callback(args):
                if not self.event.isSet():
                    self.needsync = True
                    self.event.set()

            try:
                self.M.idle(callback=callback)
                self.event.wait()

                if self.needsync:
                    self.event.clear()
                    self.callback()
            except:
                break

        self.M = self.callback_reconnect()
        self.stop()
        self.start()


[docs]class IMAP(Backend): """ Checks for mail on a IMAP server """ settings = ( "host", "port", "username", "password", ('keyring_backend', 'alternative keyring backend for retrieving credentials'), "ssl", "mailbox", ) required = ("host", "username", "password") keyring_backend = None port = 993 ssl = True mailbox = "INBOX" imap_class = IMAP4 connection = None last = 0 def init(self): if self.ssl: self.imap_class = IMAP4_SSL self.conn = self.get_connection() if use_idle: idler = Idler(self.conn, self.count_new_mail, self.get_connection) idler.start() def get_connection(self): if self.connection: try: self.connection.select(self.mailbox) except socket.error: # NOTE(sileht): retry just once if the connection have been # broken to ensure this is not a sporadic connection lost. # Like wifi reconnect, sleep wake up try: self.connection.logout() except socket.error: pass self.connection = None if not self.connection: self.connection = self.imap_class(self.host, self.port) self.connection.login(self.username, self.password) self.connection.select(self.mailbox) return self.connection def count_new_mail(self): self.last = len(self.conn.search(None, "UnSeen")[1][0].split()) @property def unread(self): try: conn = self.get_connection() except socket.gaierror: pass else: self.last = len(conn.search(None, "UnSeen")[1][0].split()) finally: if not use_idle: self.count_new_mail() return self.last
Backend = IMAP