Source code for i3pystatus.pyload
import urllib.request
import urllib.parse
import urllib.error
import http.cookiejar
import webbrowser
import json
from i3pystatus import IntervalModule
[docs]class pyLoad(IntervalModule):
"""
Shows pyLoad status
.. rubric:: Available formatters
* `{captcha}` — see captcha_true and captcha_false, which are the values filled in for this formatter
* `{progress}` — average over all running downloads
* `{progress_all}` — percentage of completed files/links in queue
* `{speed}` — kilobytes/s
* `{download}` — downloads enabled, also see download_true and download_false
* `{total}` — number of downloads
* `{free_space}` — free space in download directory in gigabytes
"""
interval = 5
settings = (
("address", "Address of pyLoad webinterface"),
"format",
"captcha_true", "captcha_false",
"download_true", "download_false",
"username", "password",
('keyring_backend', 'alternative keyring backend for retrieving credentials'),
)
required = ("username", "password")
keyring_backend = None
address = "http://127.0.0.1:8000"
format = "{captcha} {progress_all:.1f}% {speed:.1f} kb/s"
captcha_true = "Captcha waiting"
captcha_false = ""
download_true = "Downloads enabled"
download_false = "Downloads disabled"
on_leftclick = "open_webbrowser"
def _rpc_call(self, method, data=None):
if not data:
data = {}
urlencoded = urllib.parse.urlencode(data).encode("ascii")
return json.loads(self.opener.open("{address}/api/{method}/".format(address=self.address, method=method),
urlencoded).read().decode("utf-8"))
def init(self):
self.cj = http.cookiejar.CookieJar()
self.opener = urllib.request.build_opener(
urllib.request.HTTPCookieProcessor(self.cj))
def login(self):
return self._rpc_call("login", {
"username": self.username,
"password": self.password,
})
def run(self):
self.login()
server_status = self._rpc_call("statusServer")
downloads_status = self._rpc_call("statusDownloads")
if downloads_status:
progress = sum(dl["percent"]
for dl in downloads_status) / len(downloads_status) * 100
else:
progress = 100.0
fdict = {
"download": self.download_true if server_status["download"] else self.download_false,
"speed": server_status["speed"] / 1024,
"progress": progress,
"progress_all": sum(pkg["linksdone"] for pkg in self._rpc_call("getQueue")) / server_status["total"] * 100,
"captcha": self.captcha_true if self._rpc_call("isCaptchaWaiting") else self.captcha_false,
"free_space": self._rpc_call("freeSpace") / (1024 ** 3),
}
self.data = fdict
self.output = {
"full_text": self.format.format(**fdict).strip(),
"instance": self.address,
}
def open_webbrowser(self):
webbrowser.open_new_tab(self.address)