All tests pass!
This commit is contained in:
@@ -1,6 +1,8 @@
|
|||||||
# file_type.py
|
# file_type.py
|
||||||
# Identify type of file: file, directory, link, broken link
|
# Identify type of file: file, directory, link, broken link
|
||||||
|
|
||||||
|
import errno
|
||||||
|
import os
|
||||||
from pathlib import Path
|
from pathlib import Path
|
||||||
|
|
||||||
class FileType():
|
class FileType():
|
||||||
@@ -13,7 +15,7 @@ class FileType():
|
|||||||
if p.is_symlink():
|
if p.is_symlink():
|
||||||
return "broken-link"
|
return "broken-link"
|
||||||
else:
|
else:
|
||||||
raise FileNotFound(errno.ENOENT, os.strerror(errno.ENOENT), filename)
|
raise FileNotFoundError(errno.ENOENT, os.strerror(errno.ENOENT), filename)
|
||||||
if p.is_symlink():
|
if p.is_symlink():
|
||||||
return "symlink"
|
return "symlink"
|
||||||
if p.is_dir():
|
if p.is_dir():
|
||||||
|
|||||||
@@ -1,8 +1,10 @@
|
|||||||
import argparse
|
import argparse
|
||||||
|
import logging
|
||||||
import os
|
import os
|
||||||
from os.path import relpath
|
from os.path import relpath
|
||||||
from pathlib import Path
|
from pathlib import Path
|
||||||
import sys
|
import sys
|
||||||
|
import warnings
|
||||||
|
|
||||||
from file_type import FileType
|
from file_type import FileType
|
||||||
from search_file import search_file
|
from search_file import search_file
|
||||||
@@ -11,6 +13,8 @@ class Link(FileType):
|
|||||||
|
|
||||||
@classmethod
|
@classmethod
|
||||||
def fix(cls, ln_str, tgt_dir_str):
|
def fix(cls, ln_str, tgt_dir_str):
|
||||||
|
logging.captureWarnings(True)
|
||||||
|
print()
|
||||||
|
|
||||||
link = Path(ln_str)
|
link = Path(ln_str)
|
||||||
tgt_dir = Path(tgt_dir_str)
|
tgt_dir = Path(tgt_dir_str)
|
||||||
@@ -21,6 +25,7 @@ class Link(FileType):
|
|||||||
#print(f"Error: could not access link {ln_str}.")
|
#print(f"Error: could not access link {ln_str}.")
|
||||||
sys.exit(f"Error: could not access link {ln_str}.")
|
sys.exit(f"Error: could not access link {ln_str}.")
|
||||||
|
|
||||||
|
logging.debug(f"fix called on {ln_str}, determined type: {to_be_fixed}")
|
||||||
if not tgt_dir.is_dir():
|
if not tgt_dir.is_dir():
|
||||||
sys.exit(f"Error: target dir {tgt_dir} does not seem to exist or be a directory. Abort.")
|
sys.exit(f"Error: target dir {tgt_dir} does not seem to exist or be a directory. Abort.")
|
||||||
|
|
||||||
@@ -28,29 +33,40 @@ class Link(FileType):
|
|||||||
case "file":
|
case "file":
|
||||||
sys.exit(f"Error: link {ln_str} is not a link at all. Abort.")
|
sys.exit(f"Error: link {ln_str} is not a link at all. Abort.")
|
||||||
case "symlink":
|
case "symlink":
|
||||||
#print(f"Error: link {ln_str} is not broken. Abort.")
|
|
||||||
sys.exit(f"Error: link {ln_str} is not broken. Abort.")
|
sys.exit(f"Error: link {ln_str} is not broken. Abort.")
|
||||||
case "broken-link":
|
case "broken-link":
|
||||||
|
pointed = link.readlink().name
|
||||||
try:
|
try:
|
||||||
tgt = search_file(link.resolve().name, tgt_dir)
|
tgt = search_file(pointed, tgt_dir)
|
||||||
|
logging.debug(f"Search for matching target {pointed} in {tgt_dir} returned {tgt}")
|
||||||
except Exception as e:
|
except Exception as e:
|
||||||
#print("No match for link reference filename in target directory.")
|
warnings.warn(f"No match for link target in {tgt_dir_str}. Link {link} not modified.", Warning)
|
||||||
sys.exit("Error: no match for link target in {tgt_dir_str}")
|
return
|
||||||
|
if tgt != "":
|
||||||
cls._swap_link(link, tgt)
|
cls._swap_link(link, tgt)
|
||||||
|
else:
|
||||||
|
warnings.warn(f"Reaching point at which link is to be fixed, but Variable {tgt} seems to be unset!", Warning)
|
||||||
case "directory":
|
case "directory":
|
||||||
for root, dirs, files in os.walk(to_be_fixed):
|
logging.debug(f"Now walking through directory {tgt_dir} searching for broken links")
|
||||||
|
for (root, dirs, files) in os.walk(link):
|
||||||
for name in files:
|
for name in files:
|
||||||
if get_file_type(name) == "broken-link":
|
filetype = cls.get_file_type(root+"/"+name)
|
||||||
cls.link_fixer(name, tgt_dir_path)
|
logging.debug(f"Checking file {root}/{name}... type: {filetype}")
|
||||||
|
if filetype == "broken-link":
|
||||||
|
logging.info(f"Calling link fixer on {root+"/"+name} targeting {tgt_dir_str}")
|
||||||
|
cls.fix(root+"/"+name, tgt_dir_str)
|
||||||
|
|
||||||
@classmethod
|
@classmethod
|
||||||
def _swap_link(cls, lnk, tgt):
|
def _swap_link(cls, lnk, tgt):
|
||||||
# relnk lnk to tgt as symlink, relative path
|
# relnk lnk to tgt as symlink, relative path
|
||||||
# assumes type verifications already made!
|
# assumes type verifications already made!
|
||||||
#lnk: symbolic link to swap, Path
|
#lnk: symbolic link to swap, Path
|
||||||
#tgt: target for new link, Path
|
#tgt: target for new link, str
|
||||||
|
|
||||||
tpath = Path(tgt)
|
tpath = Path(tgt)
|
||||||
|
|
||||||
|
logging.debug(f"Swapping {lnk.name} pointee from {lnk.resolve()} to {tgt}")
|
||||||
|
|
||||||
if tpath.is_symlink():
|
if tpath.is_symlink():
|
||||||
if not tpath.exists():
|
if not tpath.exists():
|
||||||
raise Exception("Target is also a broken link!")
|
raise Exception("Target is also a broken link!")
|
||||||
@@ -66,21 +82,29 @@ class Link(FileType):
|
|||||||
tmp_name = tmp_prefix + str(tmp_suffix)
|
tmp_name = tmp_prefix + str(tmp_suffix)
|
||||||
tmp_path = Path(lnk.parent / tmp_name)
|
tmp_path = Path(lnk.parent / tmp_name)
|
||||||
|
|
||||||
sym_path = relpath(tgt, lnk.resolve())[3:]
|
#sym_path = relpath(tgt, lnk.resolve())[3:]
|
||||||
|
sym_path = str(tpath.relative_to(lnk, walk_up=True))[3:]
|
||||||
|
logging.debug(f"symbolic path found: {sym_path}")
|
||||||
|
|
||||||
try:
|
try:
|
||||||
tmp_path.symlink_to(sym_path)
|
tmp_path.symlink_to(sym_path)
|
||||||
except Exception as e:
|
except Exception as e:
|
||||||
print(f"Attempted to create tmp link: {tmp_name}")
|
logging.error(f"Attempted to create temporary link: {tmp_name} failed. Check permission. {lnk} not fixed.")
|
||||||
#raise Exception("Failed to create new symlink. Check permissions!")
|
#raise Exception("Failed to create new symlink. Check permissions!")
|
||||||
|
return
|
||||||
|
logging.debug(f"Temporary link created")
|
||||||
|
|
||||||
try:
|
try:
|
||||||
lnk.unlink()
|
lnk.unlink()
|
||||||
except Exception:
|
except Exception:
|
||||||
|
logging.error(f"Failed to replace broken link. Check permissions.{lnk} not fixed.")
|
||||||
tmp_path.unlink()
|
tmp_path.unlink()
|
||||||
#raise Exception("Failed to remove broken link. Check permissions!")
|
return
|
||||||
|
logging.debug("Broken link removed")
|
||||||
|
|
||||||
tmp_path.rename(lnk)
|
tmp_path.rename(lnk)
|
||||||
|
logging.debug("Temporary link renamed to broken link's name")
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
if __name__ == "__main__":
|
if __name__ == "__main__":
|
||||||
@@ -93,10 +117,17 @@ if __name__ == "__main__":
|
|||||||
)
|
)
|
||||||
parser.add_argument("link", type=str, help="Broken link, or directory with broken links")
|
parser.add_argument("link", type=str, help="Broken link, or directory with broken links")
|
||||||
parser.add_argument("tgt_path", type=str, help="Directory in which to find target(s)")
|
parser.add_argument("tgt_path", type=str, help="Directory in which to find target(s)")
|
||||||
|
parser.add_argument("-log", "--loglevel", default="warning", help="Set log level (debug, info, warning, error)")
|
||||||
args = parser.parse_args()
|
args = parser.parse_args()
|
||||||
|
|
||||||
link = args.link
|
link = args.link
|
||||||
tgt_dir = args.tgt_path
|
tgt_dir = args.tgt_path
|
||||||
|
logger = logging.getLogger('linkfixer')
|
||||||
|
logging.basicConfig(
|
||||||
|
format = '%(asctime)s %(module)s %(levelname)s: %(message)s',
|
||||||
|
level=args.loglevel.upper()
|
||||||
|
)
|
||||||
|
logging.debug(f"Log configuration set")
|
||||||
|
|
||||||
Link.fix(link, tgt_dir)
|
Link.fix(link, tgt_dir)
|
||||||
|
|
||||||
|
|||||||
@@ -5,6 +5,7 @@
|
|||||||
import os
|
import os
|
||||||
import errno
|
import errno
|
||||||
from fnmatch import fnmatch
|
from fnmatch import fnmatch
|
||||||
|
import warnings
|
||||||
|
|
||||||
def search_all_match(filename, path):
|
def search_all_match(filename, path):
|
||||||
# search part shamelessly copied from stack overflow...
|
# search part shamelessly copied from stack overflow...
|
||||||
@@ -42,7 +43,6 @@ def select_path(paths):
|
|||||||
def search_file(filename, path):
|
def search_file(filename, path):
|
||||||
paths = search_all_match(filename, path)
|
paths = search_all_match(filename, path)
|
||||||
if len(paths) == 0 :
|
if len(paths) == 0 :
|
||||||
print(f"No match for \"{filename}\". Link not modified.")
|
raise Exception(f"No match found for {filename}")
|
||||||
raise FileNotFoundError(errno.ENOENT, os.strerror(errno.ENOENT), filename)
|
|
||||||
p = select_path(paths)
|
p = select_path(paths)
|
||||||
return p
|
return p
|
||||||
|
|||||||
@@ -1,10 +1,21 @@
|
|||||||
from link_fixer import Link
|
from link_fixer import Link
|
||||||
|
import logging
|
||||||
import os
|
import os
|
||||||
from pathlib import Path
|
from pathlib import Path
|
||||||
from shutil import rmtree
|
from shutil import rmtree
|
||||||
|
import sys
|
||||||
import tarfile
|
import tarfile
|
||||||
import unittest
|
import unittest
|
||||||
from unittest.mock import patch
|
import warnings
|
||||||
|
|
||||||
|
logger = logging.getLogger('linkfixer')
|
||||||
|
logging.basicConfig(
|
||||||
|
format = '%(asctime)s %(module)s %(levelname)s: %(message)s',
|
||||||
|
level=logging.DEBUG,
|
||||||
|
stream = sys.stdout
|
||||||
|
)
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
class TestLinkFixer(unittest.TestCase):
|
class TestLinkFixer(unittest.TestCase):
|
||||||
|
|
||||||
@@ -25,40 +36,8 @@ class TestLinkFixer(unittest.TestCase):
|
|||||||
ln_gd_error = "Error: link test/data/lnk_dr/ln_valid is not broken. Abort."
|
ln_gd_error = "Error: link test/data/lnk_dr/ln_valid is not broken. Abort."
|
||||||
ln_ns_error = "Error: link test/data/tgt_dir/tgt_file.txt is not a link at all. Abort."
|
ln_ns_error = "Error: link test/data/tgt_dir/tgt_file.txt is not a link at all. Abort."
|
||||||
tgt_invalid_error = "Error: target dir test/data/tgt does not seem to exist or be a directory. Abort."
|
tgt_invalid_error = "Error: target dir test/data/tgt does not seem to exist or be a directory. Abort."
|
||||||
ln_no_target_error = "Error: no match for link target in test/data/tgt_dir"
|
ln_no_target_warning = "No match for link target in test/data/tgt_dir. Link test/data/lnk_dir/ln_no_target not modified."
|
||||||
|
|
||||||
|
|
||||||
"""
|
|
||||||
def reset_dir(self, dir_path):
|
|
||||||
if not dir_path.is_dir():
|
|
||||||
if dir_path.exists():
|
|
||||||
dir_path.unlink()
|
|
||||||
os.mkdir(dir_path)
|
|
||||||
return
|
|
||||||
|
|
||||||
def reset_file(self, file_path):
|
|
||||||
if not file_path.is_file():
|
|
||||||
if file_path.exists():
|
|
||||||
filepath.unlink()
|
|
||||||
open(file_path, 'a').close()
|
|
||||||
|
|
||||||
def setup_test(self):
|
|
||||||
|
|
||||||
self.reset_dir(self.tgt_dir)
|
|
||||||
self.reset_dir(self.lnk_dir)
|
|
||||||
self.reset_file(self.tgt_file)
|
|
||||||
self.reset_file(self.inexistent)
|
|
||||||
|
|
||||||
|
|
||||||
if self.ln_valid.exists() or self.ln_valid.is_symlink():
|
|
||||||
self.ln_valid.unlink()
|
|
||||||
self.ln_valid.symlink_to(self.ln_valid_path)
|
|
||||||
|
|
||||||
if self.ln_broken.exists() or self.ln_broken.is_symlink():
|
|
||||||
self.ln_broken.unlink()
|
|
||||||
self.ln_broken.symlink_to(self.ln_broken_path)
|
|
||||||
self.inexistent.unlink()
|
|
||||||
"""
|
|
||||||
def setup_test(self):
|
def setup_test(self):
|
||||||
try:
|
try:
|
||||||
rmtree("test/data")
|
rmtree("test/data")
|
||||||
@@ -69,40 +48,56 @@ class TestLinkFixer(unittest.TestCase):
|
|||||||
tar.extractall("test")
|
tar.extractall("test")
|
||||||
|
|
||||||
|
|
||||||
@patch('builtins.print')
|
def test_non_existing_link(self):
|
||||||
def test_link_fixer(self, mock_print):
|
|
||||||
self.setup_test()
|
self.setup_test()
|
||||||
|
#print("Test for inexistent link")
|
||||||
print("Test for inexistent link")
|
|
||||||
with self.assertRaises(SystemExit) as err:
|
with self.assertRaises(SystemExit) as err:
|
||||||
Link.fix(self.ln_non_exists, self.tgt_dir)
|
Link.fix(self.ln_non_exists, self.tgt_dir)
|
||||||
self.assertEqual(err.exception_code, self.ln_ne_error)
|
self.assertEqual(err.exception_code, self.ln_ne_error)
|
||||||
|
|
||||||
print("Test for non-broken link")
|
|
||||||
|
def test_non_broken_link(self):
|
||||||
|
self.setup_test()
|
||||||
|
#print("Test for non-broken link")
|
||||||
with self.assertRaises(SystemExit) as err:
|
with self.assertRaises(SystemExit) as err:
|
||||||
Link.fix(self.ln_valid, self.tgt_dir)
|
Link.fix(self.ln_valid, self.tgt_dir)
|
||||||
self.assertEqual(err.exception.code, self.ln_gd_error)
|
self.assertEqual(err.exception.code, self.ln_gd_error)
|
||||||
|
|
||||||
print("Test for non-link link")
|
def test_not_a_link(self):
|
||||||
|
self.setup_test()
|
||||||
|
#print("Test for non-link link")
|
||||||
with self.assertRaises(SystemExit) as err:
|
with self.assertRaises(SystemExit) as err:
|
||||||
Link.fix(self.tgt_file, self.tgt_dir)
|
Link.fix(self.tgt_file, self.tgt_dir)
|
||||||
self.assertEqual(err.exception.code, self.ln_ns_error)
|
self.assertEqual(err.exception.code, self.ln_ns_error)
|
||||||
|
|
||||||
print("Test for invalid target")
|
def test_invalid_target(self):
|
||||||
|
self.setup_test()
|
||||||
|
#print("Test for invalid target")
|
||||||
with self.assertRaises(SystemExit) as err:
|
with self.assertRaises(SystemExit) as err:
|
||||||
Link.fix(self.ln_valid, self.tgt_invalid)
|
Link.fix(self.ln_valid, self.tgt_invalid)
|
||||||
self.assertEqual(err.exception.code, self.tgt_invalid_error)
|
self.assertEqual(err.exception.code, self.tgt_invalid_error)
|
||||||
|
|
||||||
print("Test for unfound target")
|
def test_unfound_target(self):
|
||||||
with self.assertRaises(SystemExit) as err:
|
self.setup_test()
|
||||||
|
#print("Test for unfound target")
|
||||||
|
with warnings.catch_warnings(record=True) as w:
|
||||||
Link.fix(self.ln_no_target, self.tgt_dir)
|
Link.fix(self.ln_no_target, self.tgt_dir)
|
||||||
self.assertEqual(erro.exception.code, self.ln_no_target_error)
|
assert len(w) == 1
|
||||||
|
self.assertEqual(str(w[-1].message), self.ln_no_target_warning)
|
||||||
|
|
||||||
|
def test_fix_link(self):
|
||||||
|
self.setup_test()
|
||||||
|
Link.fix(self.ln_broken, self.tgt_dir)
|
||||||
|
self.assertEqual(Path(self.ln_broken).resolve(), Path(self.tgt_file).resolve())
|
||||||
|
|
||||||
|
|
||||||
|
def test_fix_linkdir(self):
|
||||||
|
self.setup_test()
|
||||||
|
Link.fix(self.lnk_dir, self.tgt_dir)
|
||||||
|
self.assertEqual(Path(self.ln_broken).resolve(), Path(self.tgt_file).resolve())
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
if __name__ == "__main__":
|
if __name__ == "__main__":
|
||||||
logging.basicConfig(level=logging.DEBUG)
|
#runner = unittest.TestLinkFixer(verbosity=2)
|
||||||
runner = unittest.TextTestRunner(verbosity=2)
|
unittest.main(verbosity=0)
|
||||||
unittest.main(verbosity=2)
|
|
||||||
@@ -47,7 +47,7 @@ class TestSearchFile(unittest.TestCase):
|
|||||||
@unittest.mock.patch("search_file.input", create=True)
|
@unittest.mock.patch("search_file.input", create=True)
|
||||||
def test_search_file(self, mock_input):
|
def test_search_file(self, mock_input):
|
||||||
search_file.input.side_effect=["1"]
|
search_file.input.side_effect=["1"]
|
||||||
with self.assertRaises(FileNotFoundError):
|
with self.assertRaises(Exception):
|
||||||
search_file.search_file("inexistent.txt", self.data_dir)
|
search_file.search_file("inexistent.txt", self.data_dir)
|
||||||
search_result = search_all_match(self.target, self.data_dir)
|
search_result = search_all_match(self.target, self.data_dir)
|
||||||
self.assertTrue(search_result, self.data_dir+"/match1.txt")
|
self.assertTrue(search_result, self.data_dir+"/match1.txt")
|
||||||
|
|||||||
Reference in New Issue
Block a user