#!/bin/python3 from os import path, chdir, listdir, remove from typing import List import requests as req import logging as log import configparser import argparse import tarfile import hashlib class BadResponse(Exception): def __init__(self, msg: str) -> None: super().__init__(f"Bad response: {msg}") class Pkg: def __init__(self, name: str, version: str, sum: str) -> None: self.archive = f"{name}_{version}.mpf" self.sig = f"{name}_{version}.mpf.sig" self.version = version self.name = name self.sum = sum def remove_old(self) -> None: files = listdir(".") for f in files: if f.startswith(f"{self.name}_"): remove(f) class Repo: def __init__(self, uri: str, out: str) -> None: self.pkgs: List[Pkg] = [] self.author: str self.name: str self.pub: str self.uri = uri self.out = out def join_url(self, pth: str) -> str: if self.uri.endswith("/") and not pth.startswith("/"): return self.uri+pth elif self.uri.endswith("/") and pth.startswith("/"): return self.uri[:-1]+pth elif not self.uri.endswith("/") and pth.startswith("/"): return self.uri+pth return self.uri+"/"+pth def get_repo(self) -> None: repourl = self.join_url("repo") res = req.get(repourl) if res.status_code != 200: raise BadResponse(f"{res.status_code} - {repourl}") cfg = configparser.ConfigParser() cfg.read_string(res.content.decode("utf-8")) for k in cfg.keys(): if k == "DEFAULT": continue self.name = k self.pub = cfg[self.name]["pub"] self.author = cfg[self.name]["author"] f = open("repo", "wb") f.write(res.content) f.close() def process_pkgs(self, pkgs: str) -> None: cfg = configparser.ConfigParser() cfg.read_string(pkgs) for k in cfg.keys(): try: ver = cfg[k]["version"] sum = cfg[k]["sum"] except: continue self.pkgs.append(Pkg(k, ver, sum)) def check_pkg(self, pkg: Pkg) -> bool: # true -> package is in the outdir and its up-to-date # false -> its not ^ if not path.exists(pkg.archive) or not path.exists(pkg.sig): return False fhash = hashlib.sha256() f = open(pkg.archive, "rb") while chunk := f.read(8192): fhash.update(chunk) f.close() if pkg.sum != fhash.hexdigest(): return False return True def check_pkgs(self) -> None: pkgcl = [] for p in self.pkgs: pkgcl.append(p) for p in self.pkgs: if self.check_pkg(p): pkgcl.remove(p) self.pkgs = pkgcl def get_pkglist(self) -> None: arcname = f"{self.name}.tar.gz" pkgsurl = self.join_url(arcname) res = req.get(pkgsurl) if res.status_code != 200: raise BadResponse(f"{res.status_code} - {pkgsurl}") f = open(arcname, "wb") f.write(res.content) f.close() t = tarfile.open(arcname) for m in t.getmembers(): if m.name != "pkgs": continue f = t.extractfile(m) if f == None: continue self.process_pkgs(f.read().decode("utf-8")) f.close() t.close() def download_pkg(self, p: Pkg) -> bool: p.remove_old() arcurl = self.join_url(p.archive) sigurl = self.join_url(p.sig) arcres = req.get(arcurl) sigres = req.get(sigurl) if arcres.status_code != 200: raise BadResponse(f"{arcres.status_code} - {arcurl}") if sigres.status_code != 200: raise BadResponse(f"{sigres.status_code} - {sigurl}") arcf = open(p.archive, "wb") arcf.write(arcres.content) arcf.close() sigf = open(p.sig, "wb") sigf.write(sigres.content) sigf.close() return True if __name__ == "__main__": log.basicConfig( format="[%(levelname)s] [%(asctime)s]: %(message)s", datefmt="%H:%M:%S", level=log.INFO ) parser = argparse.ArgumentParser( prog="matter-mirror", description="Create and manage MatterLinux mirrors", epilog="Part of matter-tools | https://git.matterlinux.xyz/matter/matter-tools") parser.add_argument("-u", help="Repo URI", required=True, dest="uri") parser.add_argument("-o", help="Download directory", required=True, dest="out") args = parser.parse_args() if not args.uri.startswith("http://") and not args.uri.startswith("https://"): log.error(f"Bad URI: {args.uri}") exit(1) if not path.exists(args.out): log.error(f"Out directory not found: {args.out}") exit(1) if not path.isdir(args.out): log.error(f"Out directory is not a directory: {args.out}") exit(1) try: chdir(args.out) except Exception as e: log.error(f"Cannot change dir: {args.out}") exit(1) try: repo = Repo(args.uri, args.out) repo.get_repo() except Exception as e: log.error(e) exit(1) log.info(f"Got repo file => {repo.name}:{repo.author}:{repo.pub}") log.info("Downloading package list") try: repo.get_pkglist() except Exception as e: log.error(e) exit(1) all = len(repo.pkgs) if all == 0: log.error("Got no valid packages!") exit(1) log.info(f"Got total of {all} packages") try: repo.check_pkgs() except Exception as e: log.error(e) exit(1) old = len(repo.pkgs) if old == 0: log.info("All packages are up-to-date!") exit() print(f" Up-to-date packages: {all-old} ({int(100*(all-old)/all)}%)") print(f" New packages: {old} ({int(100*old/all)}%)") resc = 0 for p in repo.pkgs: try: log.info(f"({repo.pkgs.index(p)+1}/{len(repo.pkgs)}) Downloading {p.name}") try: repo.download_pkg(p) except KeyboardInterrupt: log.error("Stopping downloads") exit(1) resc += 1 except Exception as e: log.error(f"Download failed: {e}") continue log.info(f"Downloaded {resc} out of {old} packages ({int(100*resc/old)}%)")