134 lines
5.0 KiB
Python
134 lines
5.0 KiB
Python
import argparse
|
|
import logging
|
|
import os
|
|
from os.path import relpath
|
|
from pathlib import Path
|
|
import sys
|
|
import warnings
|
|
|
|
from file_type import FileType
|
|
from search_file import search_file
|
|
|
|
class Link(FileType):
|
|
|
|
@classmethod
|
|
def fix(cls, ln_str, tgt_dir_str):
|
|
logging.captureWarnings(True)
|
|
print()
|
|
|
|
link = Path(ln_str)
|
|
tgt_dir = Path(tgt_dir_str)
|
|
|
|
try:
|
|
to_be_fixed = cls.get_file_type(link)
|
|
except Exception as e:
|
|
#print(f"Error: could not access link {ln_str}.")
|
|
sys.exit(f"Error: could not access link {ln_str}.")
|
|
|
|
logging.debug(f"fix called on {ln_str}, determined type: {to_be_fixed}")
|
|
if not tgt_dir.is_dir():
|
|
sys.exit(f"Error: target dir {tgt_dir} does not seem to exist or be a directory. Abort.")
|
|
|
|
match to_be_fixed:
|
|
case "file":
|
|
sys.exit(f"Error: link {ln_str} is not a link at all. Abort.")
|
|
case "symlink":
|
|
sys.exit(f"Error: link {ln_str} is not broken. Abort.")
|
|
case "broken-link":
|
|
pointed = link.readlink().name
|
|
try:
|
|
tgt = search_file(pointed, tgt_dir)
|
|
logging.debug(f"Search for matching target {pointed} in {tgt_dir} returned {tgt}")
|
|
except Exception as e:
|
|
warnings.warn(f"No match for link target in {tgt_dir_str}. Link {link} not modified.", Warning)
|
|
return
|
|
if tgt != "":
|
|
cls._swap_link(link, tgt)
|
|
else:
|
|
warnings.warn(f"Reaching point at which link is to be fixed, but Variable {tgt} seems to be unset!", Warning)
|
|
case "directory":
|
|
logging.debug(f"Now walking through directory {tgt_dir} searching for broken links")
|
|
for (root, dirs, files) in os.walk(link):
|
|
for name in files:
|
|
filetype = cls.get_file_type(root+"/"+name)
|
|
logging.debug(f"Checking file {root}/{name}... type: {filetype}")
|
|
if filetype == "broken-link":
|
|
logging.info(f"Calling link fixer on {root+"/"+name} targeting {tgt_dir_str}")
|
|
cls.fix(root+"/"+name, tgt_dir_str)
|
|
|
|
@classmethod
|
|
def _swap_link(cls, lnk, tgt):
|
|
# relnk lnk to tgt as symlink, relative path
|
|
# assumes type verifications already made!
|
|
#lnk: symbolic link to swap, Path
|
|
#tgt: target for new link, str
|
|
|
|
tpath = Path(tgt)
|
|
|
|
logging.debug(f"Swapping {lnk.name} pointee from {lnk.resolve()} to {tgt}")
|
|
|
|
if tpath.is_symlink():
|
|
if not tpath.exists():
|
|
raise Exception("Target is also a broken link!")
|
|
elif tpath.readlink().resolve() == lnpath.resolve() :
|
|
raise Exception("Target is a link to link to be fixed...")
|
|
|
|
tmp_suffix = 0
|
|
tmp_prefix = 'temp'
|
|
tmp_name = tmp_prefix + str(tmp_suffix)
|
|
tmp_path = Path(lnk.parent / tmp_name)
|
|
while tmp_path.exists() or tmp_path.is_symlink():
|
|
tmp_suffix += 1
|
|
tmp_name = tmp_prefix + str(tmp_suffix)
|
|
tmp_path = Path(lnk.parent / tmp_name)
|
|
|
|
#sym_path = relpath(tgt, lnk.resolve())[3:]
|
|
sym_path = str(tpath.relative_to(lnk, walk_up=True))[3:]
|
|
logging.debug(f"symbolic path found: {sym_path}")
|
|
|
|
try:
|
|
tmp_path.symlink_to(sym_path)
|
|
except Exception as e:
|
|
logging.error(f"Attempted to create temporary link: {tmp_name} failed. Check permission. {lnk} not fixed.")
|
|
#raise Exception("Failed to create new symlink. Check permissions!")
|
|
return
|
|
logging.debug(f"Temporary link created")
|
|
|
|
try:
|
|
lnk.unlink()
|
|
except Exception:
|
|
logging.error(f"Failed to replace broken link. Check permissions.{lnk} not fixed.")
|
|
tmp_path.unlink()
|
|
return
|
|
logging.debug("Broken link removed")
|
|
|
|
tmp_path.rename(lnk)
|
|
logging.debug("Temporary link renamed to broken link's name")
|
|
|
|
|
|
|
|
if __name__ == "__main__":
|
|
|
|
parser = argparse.ArgumentParser(
|
|
description='''Fix broken symbolic links.
|
|
Search for file with same name as link in target dir.
|
|
Replace link to point to found file if any.
|
|
'''
|
|
)
|
|
parser.add_argument("link", type=str, help="Broken link, or directory with broken links")
|
|
parser.add_argument("tgt_path", type=str, help="Directory in which to find target(s)")
|
|
parser.add_argument("-log", "--loglevel", default="warning", help="Set log level (debug, info, warning, error)")
|
|
args = parser.parse_args()
|
|
|
|
link = args.link
|
|
tgt_dir = args.tgt_path
|
|
logger = logging.getLogger('linkfixer')
|
|
logging.basicConfig(
|
|
format = '%(asctime)s %(module)s %(levelname)s: %(message)s',
|
|
level=args.loglevel.upper()
|
|
)
|
|
logging.debug(f"Log configuration set")
|
|
|
|
Link.fix(link, tgt_dir)
|
|
|