mtsc/matter-mirror/main.py

263 lines
7.2 KiB
Python
Raw Normal View History

2024-01-09 19:45:45 +00:00
#!/bin/python3
2024-05-01 20:56:27 +00:00
# matter-mirror | MatterLinux Repo Mirror Tool
# MatterLinux 2023-2024 (https://matterlinux.xyz)
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
# You should have received a copy of the GNU General Public License
# along with this program. If not, see <https://www.gnu.org/licenses/>.
2024-01-09 19:45:45 +00:00
from os import path, chdir, listdir, remove
from typing import List
import requests as req
import logging as log
import configparser
import argparse
import tarfile
import hashlib
class BadResponse(Exception):
def __init__(self, msg: str) -> None:
super().__init__(f"Bad response: {msg}")
class Pkg:
def __init__(self, name: str, version: str, sum: str) -> None:
self.archive = f"{name}_{version}.mpf"
self.sig = f"{name}_{version}.mpf.sig"
self.version = version
self.name = name
self.sum = sum
def remove_old(self) -> None:
files = listdir(".")
for f in files:
if f.startswith(f"{self.name}_"):
remove(f)
class Repo:
def __init__(self, uri: str, out: str) -> None:
self.pkgs: List[Pkg] = []
self.author: str
self.name: str
self.pub: str
self.uri = uri
self.out = out
def join_url(self, pth: str) -> str:
if self.uri.endswith("/") and not pth.startswith("/"):
return self.uri+pth
elif self.uri.endswith("/") and pth.startswith("/"):
return self.uri[:-1]+pth
elif not self.uri.endswith("/") and pth.startswith("/"):
return self.uri+pth
return self.uri+"/"+pth
def get_repo(self) -> None:
repourl = self.join_url("repo")
res = req.get(repourl)
if res.status_code != 200:
raise BadResponse(f"{res.status_code} - {repourl}")
cfg = configparser.ConfigParser()
cfg.read_string(res.content.decode("utf-8"))
for k in cfg.keys():
if k == "DEFAULT":
continue
self.name = k
self.pub = cfg[self.name]["pub"]
self.author = cfg[self.name]["author"]
f = open("repo", "wb")
f.write(res.content)
f.close()
def process_pkgs(self, pkgs: str) -> None:
cfg = configparser.ConfigParser()
cfg.read_string(pkgs)
for k in cfg.keys():
try:
ver = cfg[k]["version"]
sum = cfg[k]["sum"]
except:
continue
self.pkgs.append(Pkg(k, ver, sum))
def check_pkg(self, pkg: Pkg) -> bool:
# true -> package is in the outdir and its up-to-date
# false -> its not ^
if not path.exists(pkg.archive) or not path.exists(pkg.sig):
return False
fhash = hashlib.sha256()
f = open(pkg.archive, "rb")
while chunk := f.read(8192):
fhash.update(chunk)
f.close()
if pkg.sum != fhash.hexdigest():
return False
return True
def check_pkgs(self) -> None:
pkgcl = []
for p in self.pkgs:
pkgcl.append(p)
for p in self.pkgs:
if self.check_pkg(p):
pkgcl.remove(p)
self.pkgs = pkgcl
def get_pkglist(self) -> None:
arcname = f"{self.name}.tar.gz"
pkgsurl = self.join_url(arcname)
res = req.get(pkgsurl)
if res.status_code != 200:
raise BadResponse(f"{res.status_code} - {pkgsurl}")
f = open(arcname, "wb")
f.write(res.content)
f.close()
t = tarfile.open(arcname)
for m in t.getmembers():
if m.name != "pkgs":
continue
f = t.extractfile(m)
if f == None: continue
self.process_pkgs(f.read().decode("utf-8"))
f.close()
t.close()
def download_pkg(self, p: Pkg) -> bool:
p.remove_old()
arcurl = self.join_url(p.archive)
sigurl = self.join_url(p.sig)
arcres = req.get(arcurl)
sigres = req.get(sigurl)
if arcres.status_code != 200:
raise BadResponse(f"{arcres.status_code} - {arcurl}")
if sigres.status_code != 200:
raise BadResponse(f"{sigres.status_code} - {sigurl}")
arcf = open(p.archive, "wb")
arcf.write(arcres.content)
arcf.close()
sigf = open(p.sig, "wb")
sigf.write(sigres.content)
sigf.close()
return True
if __name__ == "__main__":
log.basicConfig(
format="[%(levelname)s] [%(asctime)s]: %(message)s",
datefmt="%H:%M:%S",
level=log.INFO
)
parser = argparse.ArgumentParser(
prog="matter-mirror",
description="Create and manage MatterLinux mirrors",
epilog="Part of matter-tools | https://git.matterlinux.xyz/matter/matter-tools")
parser.add_argument("-u", help="Repo URI", required=True, dest="uri")
parser.add_argument("-o", help="Download directory", required=True, dest="out")
args = parser.parse_args()
if not args.uri.startswith("http://") and not args.uri.startswith("https://"):
log.error(f"Bad URI: {args.uri}")
exit(1)
if not path.exists(args.out):
log.error(f"Out directory not found: {args.out}")
exit(1)
if not path.isdir(args.out):
log.error(f"Out directory is not a directory: {args.out}")
exit(1)
try:
chdir(args.out)
except Exception as e:
log.error(f"Cannot change dir: {args.out}")
exit(1)
try:
repo = Repo(args.uri, args.out)
repo.get_repo()
except Exception as e:
log.error(e)
exit(1)
log.info(f"Got repo file => {repo.name}:{repo.author}:{repo.pub}")
log.info("Downloading package list")
try:
repo.get_pkglist()
except Exception as e:
log.error(e)
exit(1)
all = len(repo.pkgs)
if all == 0:
log.error("Got no valid packages!")
exit(1)
log.info(f"Got total of {all} packages")
try:
repo.check_pkgs()
except Exception as e:
log.error(e)
exit(1)
old = len(repo.pkgs)
if old == 0:
log.info("All packages are up-to-date!")
exit()
print(f" Up-to-date packages: {all-old} ({int(100*(all-old)/all)}%)")
print(f" New packages: {old} ({int(100*old/all)}%)")
resc = 0
for p in repo.pkgs:
try:
log.info(f"({repo.pkgs.index(p)+1}/{len(repo.pkgs)}) Downloading {p.name}")
try:
repo.download_pkg(p)
except KeyboardInterrupt:
log.error("Stopping downloads")
exit(1)
resc += 1
except Exception as e:
log.error(f"Download failed: {e}")
continue
log.info(f"Downloaded {resc} out of {old} packages ({int(100*resc/old)}%)")