Files
link-fixer/link_fixer.py
2024-08-16 00:27:16 -04:00

138 lines
5.3 KiB
Python

import argparse
import logging
import os
from os.path import relpath
from pathlib import Path
import sys
import warnings
from file_type import FileType
from search_file import search_file
class Link(FileType):
@classmethod
def fix(cls, ln_str, tgt_dir_str):
logging.captureWarnings(True)
print()
print(f"Link fixer called for link {ln_str} targeting {tgt_dir_str}")
logging.info(f"Link fixer called for link {ln_str} targeting {tgt_dir_str}")
link = Path(ln_str)
tgt_dir = Path(tgt_dir_str)
try:
to_be_fixed = cls.get_file_type(link)
except Exception as e:
#print(f"Error: could not access link {ln_str}.")
sys.exit(f"Error: could not access link {ln_str}.")
logging.debug(f"Determined input type: {to_be_fixed}")
if not tgt_dir.is_dir():
sys.exit(f"Error: target dir {tgt_dir} does not seem to exist or be a directory. Abort.")
match to_be_fixed:
case "file":
sys.exit(f"Error: link {ln_str} is not a link at all. Abort.")
case "symlink":
sys.exit(f"Error: link {ln_str} is not broken. Abort.")
case "broken-link":
pointed = link.readlink().name
try:
tgt = search_file(pointed, tgt_dir)
logging.debug(f"Search for matching target {pointed} in {tgt_dir} returned {tgt}")
except Exception as e:
logging.warning(f"No match for {pointed} in {tgt_dir_str} . {link} not modified")
warnings.warn("link_fix aborted.", Warning)
return
if tgt != "":
cls._swap_link(link, tgt)
logging.info(f"Link now pointing to {tgt} . Swap of link successfully completed.")
else:
warnings.warn(f"Reaching point at which link is to be fixed, but Variable {tgt} seems to be unset!", Warning)
case "directory":
logging.info(f"Now walking through directory {tgt_dir} searching for broken links")
for (root, dirs, files) in os.walk(link):
for name in files:
filetype = cls.get_file_type(root+"/"+name)
logging.debug(f"Checking file {root}/{name}... type: {filetype}")
if filetype == "broken-link":
logging.info(f"Calling link fixer on {root+"/"+name} targeting {tgt_dir_str}")
cls.fix(root+"/"+name, tgt_dir_str)
logging.info(f"All repairable broken links in {ln_str} should be.")
@classmethod
def _swap_link(cls, lnk, tgt):
# relnk lnk to tgt as symlink, relative path
# assumes type verifications already made!
#lnk: symbolic link to swap, Path
#tgt: target for new link, str
tpath = Path(tgt)
logging.debug(f"Swapping {lnk.name} pointee from {lnk.resolve()} to {tgt}")
if tpath.is_symlink():
if not tpath.exists():
raise Exception("Target is also a broken link!")
elif tpath.readlink().resolve() == lnpath.resolve() :
raise Exception("Target is a link to link to be fixed...")
tmp_suffix = 0
tmp_prefix = 'temp'
tmp_name = tmp_prefix + str(tmp_suffix)
tmp_path = Path(lnk.parent / tmp_name)
while tmp_path.exists() or tmp_path.is_symlink():
tmp_suffix += 1
tmp_name = tmp_prefix + str(tmp_suffix)
tmp_path = Path(lnk.parent / tmp_name)
#sym_path = relpath(tgt, lnk.resolve())[3:]
sym_path = str(tpath.relative_to(lnk, walk_up=True))[3:]
logging.debug(f"symbolic path found: {sym_path}")
try:
tmp_path.symlink_to(sym_path)
except Exception as e:
logging.error(f"Attempted to create temporary link: {tmp_name} failed. Check permission. {lnk} not fixed.")
#raise Exception("Failed to create new symlink. Check permissions!")
return
logging.debug(f"Temporary link created")
try:
lnk.unlink()
except Exception:
logging.error(f"Failed to replace broken link. Check permissions.{lnk} not fixed.")
tmp_path.unlink()
return
logging.debug("Broken link removed")
tmp_path.rename(lnk)
logging.debug("Temporary link renamed to broken link's name")
if __name__ == "__main__":
parser = argparse.ArgumentParser(
description='''Fix broken symbolic links.
Search for file with same name as link in target dir.
Replace link to point to found file if any.
'''
)
parser.add_argument("link", type=str, help="Broken link, or directory with broken links")
parser.add_argument("tgt_path", type=str, help="Directory in which to find target(s)")
parser.add_argument("-log", "--loglevel", default="warning", help="Set log level (debug, info, warning, error)")
args = parser.parse_args()
link = args.link
tgt_dir = args.tgt_path
logger = logging.getLogger('linkfixer')
logging.basicConfig(
format = '%(asctime)s %(module)s %(levelname)s: %(message)s\n',
level=args.loglevel.upper()
)
logging.debug(f"Log configuration set")
Link.fix(link, tgt_dir)