From 0051a5ef50cdaf88975c2496ca32fd9b5f06050b Mon Sep 17 00:00:00 2001 From: Brian Pugh Date: Fri, 4 Nov 2022 19:49:20 -0700 Subject: [PATCH] pathlib: Add initial pathlib implementation. This adds most of the common functionality of pathlib.Path. The glob functionality could use some work; currently it only supports a single "*" wildcard; however, this is the vast majority of common use-cases and it won't fail silently if non-supported glob patterns are provided. --- python-stdlib/pathlib/manifest.py | 3 + python-stdlib/pathlib/pathlib.py | 207 +++++++++++++ python-stdlib/pathlib/tests/test_pathlib.py | 324 ++++++++++++++++++++ 3 files changed, 534 insertions(+) create mode 100644 python-stdlib/pathlib/manifest.py create mode 100644 python-stdlib/pathlib/pathlib.py create mode 100644 python-stdlib/pathlib/tests/test_pathlib.py diff --git a/python-stdlib/pathlib/manifest.py b/python-stdlib/pathlib/manifest.py new file mode 100644 index 0000000..37dcaf6 --- /dev/null +++ b/python-stdlib/pathlib/manifest.py @@ -0,0 +1,3 @@ +metadata(version="0.0.1") + +module("pathlib.py") diff --git a/python-stdlib/pathlib/pathlib.py b/python-stdlib/pathlib/pathlib.py new file mode 100644 index 0000000..d01d81d --- /dev/null +++ b/python-stdlib/pathlib/pathlib.py @@ -0,0 +1,207 @@ +import errno +import os + +from micropython import const + +_SEP = const("/") + + +def _mode_if_exists(path): + try: + return os.stat(path)[0] + except OSError as e: + if e.errno == errno.ENOENT: + return 0 + raise e + + +def _clean_segment(segment): + segment = str(segment) + if not segment: + return "." + segment = segment.rstrip(_SEP) + if not segment: + return _SEP + while True: + no_double = segment.replace(_SEP + _SEP, _SEP) + if no_double == segment: + break + segment = no_double + return segment + + +class Path: + def __init__(self, *segments): + segments_cleaned = [] + for segment in segments: + segment = _clean_segment(segment) + if segment[0] == _SEP: + segments_cleaned = [segment] + elif segment == ".": + continue + else: + segments_cleaned.append(segment) + + self._path = _clean_segment(_SEP.join(segments_cleaned)) + + def __truediv__(self, other): + return Path(self._path, str(other)) + + def __repr__(self): + return f'{type(self).__name__}("{self._path}")' + + def __str__(self): + return self._path + + def __eq__(self, other): + return self.absolute() == Path(other).absolute() + + def absolute(self): + path = self._path + cwd = os.getcwd() + if not path or path == ".": + return cwd + if path[0] == _SEP: + return path + return _SEP + path if cwd == _SEP else cwd + _SEP + path + + def resolve(self): + return self.absolute() + + def open(self, mode="r", encoding=None): + return open(self._path, mode, encoding=encoding) + + def exists(self): + return bool(_mode_if_exists(self._path)) + + def mkdir(self, parents=False, exist_ok=False): + try: + os.mkdir(self._path) + return + except OSError as e: + if e.errno == errno.EEXIST and exist_ok: + return + elif e.errno == errno.ENOENT and parents: + pass # handled below + else: + raise e + + segments = self._path.split(_SEP) + progressive_path = "" + if segments[0] == "": + segments = segments[1:] + progressive_path = _SEP + for segment in segments: + progressive_path += _SEP + segment + try: + os.mkdir(progressive_path) + except OSError as e: + if e.errno != errno.EEXIST: + raise e + + def is_dir(self): + return bool(_mode_if_exists(self._path) & 0x4000) + + def is_file(self): + return bool(_mode_if_exists(self._path) & 0x8000) + + def _glob(self, path, pattern, recursive): + # Currently only supports a single "*" pattern. + n_wildcards = pattern.count("*") + n_single_wildcards = pattern.count("?") + + if n_single_wildcards: + raise NotImplementedError("? single wildcards not implemented.") + + if n_wildcards == 0: + raise ValueError + elif n_wildcards > 1: + raise NotImplementedError("Multiple * wildcards not implemented.") + + prefix, suffix = pattern.split("*") + + for name, mode, *_ in os.ilistdir(path): + full_path = path + _SEP + name + if name.startswith(prefix) and name.endswith(suffix): + yield full_path + if recursive and mode & 0x4000: # is_dir + yield from self._glob(full_path, pattern, recursive=recursive) + + def glob(self, pattern): + """Iterate over this subtree and yield all existing files (of any + kind, including directories) matching the given relative pattern. + + Currently only supports a single "*" pattern. + """ + return self._glob(self._path, pattern, recursive=False) + + def rglob(self, pattern): + return self._glob(self._path, pattern, recursive=True) + + def stat(self): + return os.stat(self._path) + + def read_bytes(self): + with open(self._path, "rb") as f: + return f.read() + + def read_text(self, encoding=None): + with open(self._path, "r", encoding=encoding) as f: + return f.read() + + def rename(self, target): + os.rename(self._path, target) + + def rmdir(self): + os.rmdir(self._path) + + def touch(self, exist_ok=True): + if self.exists(): + if exist_ok: + return # TODO: should update timestamp + else: + # In lieue of FileExistsError + raise OSError(errno.EEXIST) + with open(self._path, "w"): + pass + + def unlink(self, missing_ok=False): + try: + os.unlink(self._path) + except OSError as e: + if not (missing_ok and e.errno == errno.ENOENT): + raise e + + def write_bytes(self, data): + with open(self._path, "wb") as f: + f.write(data) + + def write_text(self, data, encoding=None): + with open(self._path, "w", encoding=encoding) as f: + f.write(data) + + def with_suffix(self, suffix): + index = -len(self.suffix) or None + return Path(self._path[:index] + suffix) + + @property + def stem(self): + return self.name.rsplit(".", 1)[0] + + @property + def parent(self): + tokens = self._path.rsplit(_SEP, 1) + if len(tokens) == 2: + if not tokens[0]: + tokens[0] = _SEP + return Path(tokens[0]) + return Path(".") + + @property + def name(self): + return self._path.rsplit(_SEP, 1)[-1] + + @property + def suffix(self): + elems = self._path.rsplit(".", 1) + return "" if len(elems) == 1 else "." + elems[1] diff --git a/python-stdlib/pathlib/tests/test_pathlib.py b/python-stdlib/pathlib/tests/test_pathlib.py new file mode 100644 index 0000000..c52cd97 --- /dev/null +++ b/python-stdlib/pathlib/tests/test_pathlib.py @@ -0,0 +1,324 @@ +import os +import unittest +from pathlib import Path +from tempfile import TemporaryDirectory + + +def _isgenerator(x): + return isinstance(x, type((lambda: (yield))())) + + +class TestPathlib(unittest.TestCase): + def assertExists(self, fn): + os.stat(fn) + + def assertNotExists(self, fn): + with self.assertRaises(OSError): + os.stat(fn) + + def setUp(self): + self._tmp_path_obj = TemporaryDirectory() + self.tmp_path = self._tmp_path_obj.name + + def tearDown(self): + self._tmp_path_obj.cleanup() + + def test_init_single_segment(self): + path = Path("foo") + self.assertTrue(path._path == "foo") + + path = Path("foo/") + self.assertTrue(path._path == "foo") + + path = Path("/foo") + self.assertTrue(path._path == "/foo") + + path = Path("/////foo") + self.assertTrue(path._path == "/foo") + + path = Path("") + self.assertTrue(path._path == ".") + + def test_init_multiple_segment(self): + path = Path("foo", "bar") + self.assertTrue(path._path == "foo/bar") + + path = Path("foo/", "bar") + self.assertTrue(path._path == "foo/bar") + + path = Path("/foo", "bar") + self.assertTrue(path._path == "/foo/bar") + + path = Path("/foo", "", "bar") + self.assertTrue(path._path == "/foo/bar") + + path = Path("/foo/", "", "/bar/") + self.assertTrue(path._path == "/bar") + + path = Path("", "") + self.assertTrue(path._path == ".") + + def test_truediv_join_str(self): + actual = Path("foo") / "bar" + self.assertTrue(actual == Path("foo/bar")) + + def test_truediv_join_path(self): + actual = Path("foo") / Path("bar") + self.assertTrue(actual == Path("foo/bar")) + + actual = Path("foo") / Path("/bar") + self.assertTrue(actual == "/bar") + + def test_eq_and_absolute(self): + self.assertTrue(Path("") == Path(".")) + self.assertTrue(Path("foo") == Path(os.getcwd(), "foo")) + self.assertTrue(Path("foo") == "foo") + self.assertTrue(Path("foo") == os.getcwd() + "/foo") + + self.assertTrue(Path("foo") != Path("bar")) + self.assertTrue(Path(".") != Path("/")) + + def test_open(self): + fn = self.tmp_path + "/foo.txt" + path = Path(fn) + + with open(fn, "w") as f: + f.write("file contents") + + with path.open("r") as f: + actual = f.read() + + self.assertTrue(actual == "file contents") + + def test_exists(self): + fn = self.tmp_path + "/foo.txt" + + path = Path(str(fn)) + self.assertTrue(not path.exists()) + + with open(fn, "w"): + pass + + self.assertTrue(path.exists()) + + def test_mkdir(self): + target = self.tmp_path + "/foo/bar/baz" + path = Path(target) + + with self.assertRaises(OSError): + path.mkdir() + + with self.assertRaises(OSError): + path.mkdir(exist_ok=True) + + path.mkdir(parents=True) + self.assertExists(target) + + with self.assertRaises(OSError): + path.mkdir(exist_ok=False) + + path.mkdir(exist_ok=True) + + def test_is_dir(self): + target = self.tmp_path + path = Path(target) + self.assertTrue(path.is_dir()) + + target = self.tmp_path + "/foo" + path = Path(target) + self.assertTrue(not path.is_dir()) + os.mkdir(target) + self.assertTrue(path.is_dir()) + + target = self.tmp_path + "/bar.txt" + path = Path(target) + self.assertTrue(not path.is_dir()) + with open(target, "w"): + pass + self.assertTrue(not path.is_dir()) + + def test_is_file(self): + target = self.tmp_path + path = Path(target) + self.assertTrue(not path.is_file()) + + target = self.tmp_path + "/bar.txt" + path = Path(target) + self.assertTrue(not path.is_file()) + with open(target, "w"): + pass + self.assertTrue(path.is_file()) + + def test_glob(self): + foo_txt = self.tmp_path + "/foo.txt" + with open(foo_txt, "w"): + pass + bar_txt = self.tmp_path + "/bar.txt" + with open(bar_txt, "w"): + pass + baz_bin = self.tmp_path + "/baz.bin" + with open(baz_bin, "w"): + pass + + path = Path(self.tmp_path) + glob_gen = path.glob("*.txt") + self.assertTrue(_isgenerator(glob_gen)) + + res = [str(x) for x in glob_gen] + self.assertTrue(len(res) == 2) + self.assertTrue(foo_txt in res) + self.assertTrue(bar_txt in res) + + def test_rglob(self): + foo_txt = self.tmp_path + "/foo.txt" + with open(foo_txt, "w"): + pass + bar_txt = self.tmp_path + "/bar.txt" + with open(bar_txt, "w"): + pass + baz_bin = self.tmp_path + "/baz.bin" + with open(baz_bin, "w"): + pass + + boop_folder = self.tmp_path + "/boop" + os.mkdir(boop_folder) + bap_txt = self.tmp_path + "/boop/bap.txt" + with open(bap_txt, "w"): + pass + + path = Path(self.tmp_path) + glob_gen = path.rglob("*.txt") + self.assertTrue(_isgenerator(glob_gen)) + + res = [str(x) for x in glob_gen] + self.assertTrue(len(res) == 3) + self.assertTrue(foo_txt in res) + self.assertTrue(bar_txt in res) + self.assertTrue(bap_txt in res) + + def test_stat(self): + expected = os.stat(self.tmp_path) + path = Path(self.tmp_path) + actual = path.stat() + self.assertTrue(expected == actual) + + def test_rmdir(self): + target = self.tmp_path + "/foo" + path = Path(target) + + with self.assertRaises(OSError): + # Doesn't exist + path.rmdir() + + os.mkdir(target) + self.assertExists(target) + path.rmdir() + self.assertNotExists(target) + + os.mkdir(target) + with open(target + "/bar.txt", "w"): + pass + + with self.assertRaises(OSError): + # Cannot rmdir; contains file. + path.rmdir() + + def test_touch(self): + target = self.tmp_path + "/foo.txt" + + path = Path(target) + path.touch() + self.assertExists(target) + + path.touch() # touching existing file is fine + self.assertExists(target) + + # Technically should be FileExistsError, + # but thats not builtin to micropython + with self.assertRaises(OSError): + path.touch(exist_ok=False) + + path = Path(self.tmp_path + "/bar/baz.txt") + with self.assertRaises(OSError): + # Parent directory does not exist + path.touch() + + def test_unlink(self): + target = self.tmp_path + "/foo.txt" + + path = Path(target) + with self.assertRaises(OSError): + # File does not exist + path.unlink() + + with open(target, "w"): + pass + + self.assertExists(target) + path.unlink() + self.assertNotExists(target) + + path = Path(self.tmp_path) + with self.assertRaises(OSError): + # File does not exist + path.unlink() + + def test_write_bytes(self): + target = self.tmp_path + "/foo.bin" + path = Path(target) + path.write_bytes(b"test byte data") + with open(target, "rb") as f: + actual = f.read() + self.assertTrue(actual == b"test byte data") + + def test_write_text(self): + target = self.tmp_path + "/foo.txt" + path = Path(target) + path.write_text("test string") + with open(target, "r") as f: + actual = f.read() + self.assertTrue(actual == "test string") + + def test_read_bytes(self): + target = self.tmp_path + "/foo.bin" + with open(target, "wb") as f: + f.write(b"test byte data") + + path = Path(target) + actual = path.read_bytes() + self.assertTrue(actual == b"test byte data") + + def test_read_text(self): + target = self.tmp_path + "/foo.bin" + with open(target, "w") as f: + f.write("test string") + + path = Path(target) + actual = path.read_text() + self.assertTrue(actual == "test string") + + def test_stem(self): + self.assertTrue(Path("foo/test").stem == "test") + self.assertTrue(Path("foo/bar.bin").stem == "bar") + self.assertTrue(Path("").stem == "") + + def test_name(self): + self.assertTrue(Path("foo/test").name == "test") + self.assertTrue(Path("foo/bar.bin").name == "bar.bin") + + def test_parent(self): + self.assertTrue(Path("foo/test").parent == Path("foo")) + self.assertTrue(Path("foo/bar.bin").parent == Path("foo")) + self.assertTrue(Path("bar.bin").parent == Path(".")) + self.assertTrue(Path(".").parent == Path(".")) + self.assertTrue(Path("/").parent == Path("/")) + + def test_suffix(self): + self.assertTrue(Path("foo/test").suffix == "") + self.assertTrue(Path("foo/bar.bin").suffix == ".bin") + self.assertTrue(Path("bar.txt").suffix == ".txt") + + def test_with_suffix(self): + self.assertTrue(Path("foo/test").with_suffix(".tar") == Path("foo/test.tar")) + self.assertTrue(Path("foo/bar.bin").with_suffix(".txt") == Path("foo/bar.txt")) + self.assertTrue(Path("bar.txt").with_suffix("") == Path("bar"))