Rewrite in class, cleaner
This commit is contained in:
@@ -1,3 +1,5 @@
|
||||
# link-fixer
|
||||
|
||||
Fix broken symbolic links. Search for file of same name as original target in provided directory and link to that target if unique and if exists.
|
||||
Fix broken symbolic links.
|
||||
Search for file of same name as original target in provided directory and link to that target if unique and if exists.
|
||||
If pointed to a directory, link_fixer scans the directory for broken links.
|
||||
23
file_type.py
Normal file
23
file_type.py
Normal file
@@ -0,0 +1,23 @@
|
||||
# file_type.py
|
||||
# Identify type of file: file, directory, link, broken link
|
||||
|
||||
from pathlib import Path
|
||||
|
||||
class FileType():
|
||||
|
||||
@classmethod
|
||||
def get_file_type(cls, filename):
|
||||
# Return type of file ("file", "dir", "link", "broken-link", "other")
|
||||
p = Path(filename)
|
||||
if not p.exists():
|
||||
if p.is_symlink():
|
||||
return "broken-link"
|
||||
else:
|
||||
raise FileNotFound(errno.ENOENT, os.strerror(errno.ENOENT), filename)
|
||||
if p.is_symlink():
|
||||
return "symlink"
|
||||
if p.is_dir():
|
||||
return "directory"
|
||||
elif p.is_file():
|
||||
return "file"
|
||||
return "other"
|
||||
106
link_fixer.py
106
link_fixer.py
@@ -1,46 +1,86 @@
|
||||
import argparse
|
||||
import os
|
||||
from os.path import relpath
|
||||
from pathlib import Path
|
||||
import sys
|
||||
|
||||
from file_type import FileType
|
||||
from search_file import search_file
|
||||
from swap_link import swap_link
|
||||
|
||||
def link_fixer(ln_path, tgt_dir_path):
|
||||
ln_is_dir = False
|
||||
class Link(FileType):
|
||||
|
||||
link = Path(ln_path)
|
||||
tgt_dir = Path(tgt_dir_path)
|
||||
@classmethod
|
||||
def fix(cls, ln_str, tgt_dir_str):
|
||||
|
||||
if link.is_dir():
|
||||
ln_is_dir = True
|
||||
elif not link.is_symlink():
|
||||
if not link.exists():
|
||||
sys.exit("Link argument matches no file or directory")
|
||||
else:
|
||||
sys.exit("Link argument is not a symbolic link")
|
||||
elif link.exists():
|
||||
sys.exit("Link is not broken!")
|
||||
link = Path(ln_str)
|
||||
tgt_dir = Path(tgt_dir_str)
|
||||
|
||||
try:
|
||||
to_be_fixed = cls.get_file_type(link)
|
||||
except Exception as e:
|
||||
#print(f"Error: could not access link {ln_str}.")
|
||||
sys.exit(f"Error: could not access link {ln_str}.")
|
||||
|
||||
if not tgt_dir.exists():
|
||||
sys.exit("Target directory not found.")
|
||||
if not tgt_dir.is_dir():
|
||||
sys.exit("Pointed target is not a directory")
|
||||
sys.exit(f"Error: target dir {tgt_dir} does not seem to exist or be a directory. Abort.")
|
||||
|
||||
print("Starting link fixer")
|
||||
if ln_is_dir:
|
||||
print("Links dir: \t", link)
|
||||
else:
|
||||
print("Link: \t\t", link)
|
||||
print("Targets dir: \t", tgt_dir)
|
||||
if ln_is_dir:
|
||||
sys.exit("But ln dir version not yet implemented. Sorry!")
|
||||
match to_be_fixed:
|
||||
case "file":
|
||||
sys.exit(f"Error: link {ln_str} is not a link at all. Abort.")
|
||||
case "symlink":
|
||||
#print(f"Error: link {ln_str} is not broken. Abort.")
|
||||
sys.exit(f"Error: link {ln_str} is not broken. Abort.")
|
||||
case "broken-link":
|
||||
try:
|
||||
tgt = search_file(link.resolve().name, tgt_dir)
|
||||
except Exception as e:
|
||||
#print("No match for link reference filename in target directory.")
|
||||
sys.exit("Error: no match for link target in {tgt_dir_str}")
|
||||
cls._swap_link(link, tgt)
|
||||
case "directory":
|
||||
for root, dirs, files in os.walk(to_be_fixed):
|
||||
for name in files:
|
||||
if get_file_type(name) == "broken-link":
|
||||
cls.link_fixer(name, tgt_dir_path)
|
||||
|
||||
tgt = search_file(link.resolve().name, tgt_dir_path)
|
||||
swap_link(ln_path, tgt)
|
||||
@classmethod
|
||||
def _swap_link(cls, lnk, tgt):
|
||||
# relnk lnk to tgt as symlink, relative path
|
||||
# assumes type verifications already made!
|
||||
#lnk: symbolic link to swap, Path
|
||||
#tgt: target for new link, Path
|
||||
|
||||
def fix_link(lnk, tgt_dir_path):
|
||||
return True
|
||||
tpath = Path(tgt)
|
||||
if tpath.is_symlink():
|
||||
if not tpath.exists():
|
||||
raise Exception("Target is also a broken link!")
|
||||
elif tpath.readlink().resolve() == lnpath.resolve() :
|
||||
raise Exception("Target is a link to link to be fixed...")
|
||||
|
||||
tmp_suffix = 0
|
||||
tmp_prefix = 'temp'
|
||||
tmp_name = tmp_prefix + str(tmp_suffix)
|
||||
tmp_path = Path(lnk.parent / tmp_name)
|
||||
while tmp_path.exists() or tmp_path.is_symlink():
|
||||
tmp_suffix += 1
|
||||
tmp_name = tmp_prefix + str(tmp_suffix)
|
||||
tmp_path = Path(lnk.parent / tmp_name)
|
||||
|
||||
sym_path = relpath(tgt, lnk.resolve())[3:]
|
||||
|
||||
try:
|
||||
tmp_path.symlink_to(sym_path)
|
||||
except Exception as e:
|
||||
print(f"Attempted to create tmp link: {tmp_name}")
|
||||
#raise Exception("Failed to create new symlink. Check permissions!")
|
||||
|
||||
try:
|
||||
lnk.unlink()
|
||||
except Exception:
|
||||
tmp_path.unlink()
|
||||
#raise Exception("Failed to remove broken link. Check permissions!")
|
||||
|
||||
tmp_path.rename(lnk)
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
@@ -50,13 +90,13 @@ if __name__ == "__main__":
|
||||
Search for file with same name as link in target dir.
|
||||
Replace link to point to found file if any.
|
||||
'''
|
||||
|
||||
)
|
||||
parser.add_argument("link", type=str, help="Broken link, or directory with broken links")
|
||||
parser.add_argument("tgt_path", type=str, help="Directory in which to find target(s)")
|
||||
args = parser.parse_args()
|
||||
|
||||
link = Path(args.link)
|
||||
tgt_dir = Path(args.tgt_path)
|
||||
link = args.link
|
||||
tgt_dir = args.tgt_path
|
||||
|
||||
Link.fix(link, tgt_dir)
|
||||
|
||||
link_fixer(link, tgt_dir)
|
||||
|
||||
47
swap_link.py
47
swap_link.py
@@ -1,47 +0,0 @@
|
||||
# swap_link.py
|
||||
# Replace a link file with another one pointing to a new target
|
||||
#
|
||||
|
||||
from pathlib import Path
|
||||
from os.path import relpath # won't be necessary with Python 3.12 thanks to walk_up arg in relative_to()
|
||||
|
||||
def swap_link(lnk, tgt):
|
||||
#lnk: symbolic link to swap, str
|
||||
#tgt: target for new link, str
|
||||
lnpath = Path(lnk)
|
||||
if not lnpath.is_symlink():
|
||||
raise Exception("First argument is not a symbolic link")
|
||||
elif lnpath.exists():
|
||||
raise Exception("Link is not broken")
|
||||
|
||||
tpath = Path(tgt)
|
||||
if tpath.is_symlink():
|
||||
if not tpath.exists():
|
||||
raise Exception("Target is also a broken link!")
|
||||
elif tpath.readlink().resolve() == lnpath.resolve() :
|
||||
raise Exception("Target is a link to link to be fixed...")
|
||||
|
||||
tmp_suffix = 0
|
||||
tmp_prefix = 'temp'
|
||||
tmp_name = tmp_prefix + str(tmp_suffix)
|
||||
tmp_path = Path(lnpath.parent / tmp_name)
|
||||
while tmp_path.exists() or tmp_path.is_symlink():
|
||||
tmp_suffix += 1
|
||||
tmp_name = tmp_prefix + str(tmp_suffix)
|
||||
tmp_path = Path(lnk.parent / tmp_name)
|
||||
|
||||
sym_path = relpath(tgt, lnk)[3:]
|
||||
|
||||
try:
|
||||
tmp_path.symlink_to(sym_path)
|
||||
except Exception:
|
||||
print(f"Attempted to create tmp link: {tmp_name}")
|
||||
raise Exception("Failed to create new symlink. Check permissions!")
|
||||
|
||||
try:
|
||||
lnpath.unlink()
|
||||
except Exception:
|
||||
tmp_path.unlink()
|
||||
raise Exception("Failed to remove broken link. Check permissions!")
|
||||
|
||||
tmp_path.rename(lnk)
|
||||
BIN
test/test_data.tar
Normal file
BIN
test/test_data.tar
Normal file
Binary file not shown.
@@ -1,25 +1,34 @@
|
||||
from link_fixer import link_fixer
|
||||
from link_fixer import Link
|
||||
import os
|
||||
from pathlib import Path
|
||||
from shutil import rmtree
|
||||
import tarfile
|
||||
import unittest
|
||||
from unittest.mock import patch
|
||||
|
||||
class TestLinkFixer(unittest.TestCase):
|
||||
|
||||
tgt_dir = Path("test/data/tgt_dir")
|
||||
lnk_dir = Path("test/data/lnk_dir")
|
||||
tgt_file = Path("test/data/tgt_dir/tgt_file.txt")
|
||||
ln_non_exists = Path("test/data/link.txt")
|
||||
ln_valid = Path("test/data/lnk_dir/ln_valid")
|
||||
tgt_dir = "test/data/tgt_dir"
|
||||
lnk_dir = "test/data/lnk_dir"
|
||||
tgt_file = "test/data/tgt_dir/tgt_file.txt"
|
||||
ln_non_exists = "test/data/link.txt"
|
||||
ln_valid = "test/data/lnk_dir/ln_valid"
|
||||
ln_valid_path = "../tgt_dir/tgt_file.txt"
|
||||
ln_broken = Path("test/data/lnk_dir/tgt_file.txt")
|
||||
ln_broken = "test/data/lnk_dir/tgt_file.txt"
|
||||
ln_broken_path = "../tgt/tgt_.txt"
|
||||
inexistent = Path("test/data/tgt_dir/inexistent.txt")
|
||||
|
||||
ln_ne_error = "Link argument matches no file or directory"
|
||||
ln_gd_error = "Link is not broken!"
|
||||
ln_ns_error = "Link argument is not a symbolic link"
|
||||
ln_no_target = "test/data/lnk_dir/ln_no_target"
|
||||
tgt_invalid = "test/data/tgt"
|
||||
inexistent = "test/data/tgt_dir/inexistent.txt"
|
||||
|
||||
|
||||
ln_ne_error = "Error: could not access link test/data/link.txt"
|
||||
ln_gd_error = "Error: link test/data/lnk_dr/ln_valid is not broken. Abort."
|
||||
ln_ns_error = "Error: link test/data/tgt_dir/tgt_file.txt is not a link at all. Abort."
|
||||
tgt_invalid_error = "Error: target dir test/data/tgt does not seem to exist or be a directory. Abort."
|
||||
ln_no_target_error = "Error: no match for link target in test/data/tgt_dir"
|
||||
|
||||
|
||||
"""
|
||||
def reset_dir(self, dir_path):
|
||||
if not dir_path.is_dir():
|
||||
if dir_path.exists():
|
||||
@@ -49,29 +58,51 @@ class TestLinkFixer(unittest.TestCase):
|
||||
self.ln_broken.unlink()
|
||||
self.ln_broken.symlink_to(self.ln_broken_path)
|
||||
self.inexistent.unlink()
|
||||
"""
|
||||
def setup_test(self):
|
||||
try:
|
||||
rmtree("test/data")
|
||||
except Exception:
|
||||
sys.exit("Error: couldn't reset test data directory! Cancelling test!")
|
||||
with tarfile.open("test/test_data.tar", "r:*") as tar:
|
||||
tar.extraction_filter = getattr(tarfile, 'data_filter')
|
||||
tar.extractall("test")
|
||||
|
||||
|
||||
def test_link_fixer(self):
|
||||
@patch('builtins.print')
|
||||
def test_link_fixer(self, mock_print):
|
||||
self.setup_test()
|
||||
|
||||
print("Test for inexistent link")
|
||||
with self.assertRaises(SystemExit) as err:
|
||||
link_fixer(self.ln_non_exists, self.tgt_dir)
|
||||
self.assertEqual(err.exception.code, self.ln_ne_error)
|
||||
Link.fix(self.ln_non_exists, self.tgt_dir)
|
||||
self.assertEqual(err.exception_code, self.ln_ne_error)
|
||||
|
||||
print("Test for non-broken link")
|
||||
with self.assertRaises(SystemExit) as err:
|
||||
link_fixer(self.ln_valid, self.tgt_dir)
|
||||
Link.fix(self.ln_valid, self.tgt_dir)
|
||||
self.assertEqual(err.exception.code, self.ln_gd_error)
|
||||
|
||||
print("Test for non-link link")
|
||||
with self.assertRaises(SystemExit) as err:
|
||||
link_fixer(self.tgt_file, self.tgt_dir)
|
||||
Link.fix(self.tgt_file, self.tgt_dir)
|
||||
self.assertEqual(err.exception.code, self.ln_ns_error)
|
||||
|
||||
print("Test for inexistent ")
|
||||
print("Test for invalid target")
|
||||
with self.assertRaises(SystemExit) as err:
|
||||
link
|
||||
Link.fix(self.ln_valid, self.tgt_invalid)
|
||||
self.assertEqual(err.exception.code, self.tgt_invalid_error)
|
||||
|
||||
print("Test for unfound target")
|
||||
with self.assertRaises(SystemExit) as err:
|
||||
Link.fix(self.ln_no_target, self.tgt_dir)
|
||||
self.assertEqual(erro.exception.code, self.ln_no_target_error)
|
||||
|
||||
|
||||
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
unittest.main()
|
||||
logging.basicConfig(level=logging.DEBUG)
|
||||
runner = unittest.TextTestRunner(verbosity=2)
|
||||
unittest.main(verbosity=2)
|
||||
Reference in New Issue
Block a user