diff --git a/README.md b/README.md index 97ffe56..2d100f7 100644 --- a/README.md +++ b/README.md @@ -75,7 +75,7 @@ options = dyn2py.Options( backup=True, python_folder="path/to/pythonfiles") -dynamo_file = dyn2py.DynamoFile("path/to/dynamofile") +dynamo_file = dyn2py.DynamoFile("path/to/dynamofile.dyn") python_files = dynamo_file.get_related_python_files(options) # Read python files and update the graph: diff --git a/TODO.md b/TODO.md index 23decbc..08c91b1 100644 --- a/TODO.md +++ b/TODO.md @@ -7,6 +7,7 @@ - [ ] DynamoFile - [ ] PythonFile - [x] PythonNode +- [ ] run() ## CI/CD diff --git a/dyn2py/__init__.py b/dyn2py/__init__.py index c1d4d65..39bad7e 100644 --- a/dyn2py/__init__.py +++ b/dyn2py/__init__.py @@ -100,7 +100,7 @@ def run(options: Options | None = None) -> None: logging.debug(f"Parsed arguments: {vars(options)}") # Set up sources: - source_files = [] + files = [] for source in options.source: if not source.exists(): @@ -111,14 +111,13 @@ def run(options: Options | None = None) -> None: logging.debug(f"Source is a folder") for f in source.iterdir(): - source_files.append(f) + files.append(File(f)) # It's a single file: else: - source_files.append(source) + files.append(File(source)) # Dynamo files come first, sort sources: - files = [File(f) for f in source_files] files.sort(key=lambda f: f.extension) # Filters: @@ -145,17 +144,15 @@ def run(options: Options | None = None) -> None: if f.is_dynamo_file(): logging.debug("Source is a Dynamo file") - dynamo_file = DynamoFile(f.filepath) try: - dynamo_file.extract_python(options) + f.extract_python(options) except DynamoFileException as e: logging.error(f"{e} Skipping") elif f.is_python_file(): logging.debug("Source is a Python file") - python_file = PythonFile(f.filepath) - python_file.update_dynamo(options) + f.update_dynamo(options) # Dynamo files are written only at the end, so they don't get updated too frequently for f in DynamoFile.open_files: diff --git a/dyn2py/files.py b/dyn2py/files.py index 8f2a5d9..b01e99b 100644 --- a/dyn2py/files.py +++ b/dyn2py/files.py @@ -20,11 +20,12 @@ HEADER_SEPARATOR = "*" * 60 class File(): """Base class for managing files""" - def __init__(self, filepath: pathlib.Path | str) -> None: - """Generate a file object. + def __init__(self, filepath: pathlib.Path | str, read_from_disk: bool = True) -> None: + """Generate a file object. If the path is correct it will become a DynamoFile or PythonFile object Args: filepath (pathlib.Path | str): Path to the python file or Dynamo graph + read_from_disk (bool, optional): Read the file from disk. Useful for new PythonFiles. Defaults to True. """ self.filepath: pathlib.Path @@ -52,11 +53,29 @@ class File(): self.modified: bool = False """If an existing file was modified""" - if self.exists: + # Change class if extension is correct: + if self.is_dynamo_file(): + self.__class__ = DynamoFile + + # Always read DynamoFiles, they should exist: + self.read_file() + + elif self.is_python_file(): + self.__class__ = PythonFile + + # Python files can be virtual: + if self.exists and read_from_disk: + self.read_file() + + if self.exists and read_from_disk: logging.debug(f"File exists: {self.filepath}") self.mtime = self.filepath.stat().st_mtime self.mtimeiso = datetime.fromtimestamp(self.mtime).isoformat() + def read_file(self): + """Should be implemented in subclasses""" + pass + def is_newer(self, other_file: "File") -> bool: """Check if this file is newer than the other file @@ -134,15 +153,15 @@ class DynamoFile(File): """A Dynamo file, subclass of File()""" full_dict: dict - """The contents of the Dynamo file, as dict. Initialized with read()""" + """The contents of the Dynamo file, as dict.""" uuid: str - """The uuid of the graph. Initialized with read()""" + """The uuid of the graph.""" name: str - """The name of the graph, read from the file. Initialized with read()""" - python_nodes: list["PythonNode"] - """Python node objects, read from this file. Initialized with get_python_nodes()""" + """The name of the graph, read from the file, not the filename""" + python_nodes: set[PythonNode] + """Python node objects, read from this file.""" - open_files: set["DynamoFile"] = set() + open_files: set[DynamoFile] = set() """A set of open Dynamo files, before saving. Self added by read()""" def extract_python(self, options: Options | None = None) -> None: @@ -157,19 +176,19 @@ class DynamoFile(File): logging.info(f"Extracting from file: {self.filepath}") - self.read() - # Go through nodes in the file: - for python_node in self.get_python_nodes(): + for python_node in self.python_nodes: if options.python_folder: python_file_path = options.python_folder.joinpath( python_node.filename) else: python_file_path = python_node.filepath - python_file = PythonFile(python_file_path) - python_file.generate_text( - dynamo_file=self, python_node=python_node) + python_file = PythonFile( + filepath=python_file_path, + dynamo_file=self, + python_node=python_node + ) if python_file.is_newer(self) and not options.force: logging.info( @@ -178,29 +197,31 @@ class DynamoFile(File): python_file.write(options) - def read(self) -> None: - """Read Dynamo graph to parameters + def read_file(self, reread: bool = False) -> None: + """Read Dynamo graph to parameters. Automatically called by __init__() + + Args: + reread (bool, optional): Reread the file, even if it was read already. Defaults to False. Raises: FileNotFoundError: The file does not exist DynamoFileException: If the file is a Dynamo 1 file json.JSONDecodeError: If there are any other problem with the file + PythonNodeNotFoundException: No python nodes in the file """ if not self.exists: raise FileNotFoundError - # Only read if it's not already open: - if not self in self.open_files: + if not self in self.open_files or reread: logging.debug(f"Reading file: {self.filepath}") + # Read the json: try: with open(self.filepath, "r", encoding="utf-8") as input_json: self.full_dict = json.load(input_json, use_decimal=True) - self.uuid = self.full_dict["Uuid"] - self.name = self.full_dict["Name"] - self.open_files.add(self) + except json.JSONDecodeError as e: with open(self.filepath, "r", encoding="utf-8") as input_json: if input_json.readline().startswith(" "PythonNode": """Get a PythonNode object from this Dynamo graph, by its id @@ -252,46 +261,49 @@ class DynamoFile(File): Returns: PythonNode: The PythonNode with the given id + + Raises: + PythonNodeNotFoundException: No python node with this id """ - if not self in self.open_files: - self.read() - # Find the node, if the nodes are not read yet: - if not self.python_nodes: - python_node_dict = next(( - n for n in self.full_dict["Nodes"] if n["Id"] == node_id - ), {}) - if not python_node_dict: - raise PythonNodeNotFoundException( - f"Node not found with id {node_id}") + python_node = next(( + p for p in self.python_nodes if p.id == node_id + ), None) - python_node = PythonNode( - node_dict_from_dyn=python_node_dict) - else: - python_node = next(( - p for p in self.python_nodes if p.id == node_id - ), None) - - if not python_node: - raise PythonNodeNotFoundException( - f"Node not found with id {node_id}") + if not python_node: + raise PythonNodeNotFoundException( + f"Node not found with id {node_id}") return python_node - def update_python_node(self, python_node: "PythonNode") -> None: + def update_python_node(self, python_node: PythonNode) -> None: """Update the code of a PythonNode in this file Args: - python_node(PythonNode): The new node data + python_node(PythonNode): The new node + + Raises: + PythonNodeNotFoundException: Existing node not found """ + + # Find the old node: + python_node_in_file = self.get_python_node_by_id(python_node.id) + node_dict = next(( n for n in self.full_dict["Nodes"] if n["Id"] == python_node.id ), {}) - if not node_dict: + + if not node_dict or not python_node_in_file: raise PythonNodeNotFoundException() - else: - node_dict["Code"] = python_node.code - self.modified = True + + # Remove the old and add the new: + self.python_nodes.remove(python_node_in_file) + self.python_nodes.add(python_node) + + # Update the dict: + node_dict["Code"] = python_node.code + + self.modified = True def write_file(self) -> None: """Write this file to the disk. Should be called only from File.write()""" @@ -301,14 +313,15 @@ class DynamoFile(File): def get_related_python_files(self, options: Options | None = None) -> list["PythonFile"]: """Get python files exported from this Dynamo file + Args: + options(Options | None, optional): Run options. Defaults to None. + Returns: list[PythonFile]: A list of PythonFile objects """ if not options: options = Options() - self.read() - # Find the folder of the python files if options.python_folder: python_folder = options.python_folder @@ -318,7 +331,6 @@ class DynamoFile(File): python_files_in_folder = [PythonFile(f) for f in python_folder.iterdir() if File(f).is_python_file()] - [p.read() for p in python_files_in_folder] related_python_files = [ p for p in python_files_in_folder if p.get_source_dynamo_file().uuid == self.uuid] @@ -343,60 +355,127 @@ class PythonFile(File): """A Python file, subclass of File()""" code: str - """The python code as a string. Initialized with read()""" + """The python code as a string.""" header_data: dict - """Parsed dict from the header of a python file. Initialized with read()""" + """Parsed dict from the header of a python file.""" text: str - """Full contents of the file before writing. Initialized with generate_text()""" + """Full contents of the file before writing.""" open_files: set["PythonFile"] = set() - """A set of open Python files. Self added by read()""" + """A set of open Python files.""" - def generate_text(self, dynamo_file: DynamoFile, python_node: "PythonNode") -> None: - """Generate full text to write with header + def __init__(self, + filepath: pathlib.Path | str, + dynamo_file: DynamoFile | None = None, + python_node: PythonNode | None = None + ) -> None: + """Generate a PythonFile. If both dynamo_file and python_node given, generate the text of the file, do not read from disk Args: - dynamo_file(DynamoFile): The source dynamo file - python_node(PythonNode): The python node to write + filepath (pathlib.Path | str): Path to the python file + dynamo_file (DynamoFile | None, optional): The source dynamo file. Defaults to None. + python_node (PythonNode | None, optional): The python node to write. Defaults to None. """ - header_notice = """\ + # Generate the text, if dynamo file and python node were given: + if python_node and dynamo_file: + # Do not read from disk: + super().__init__(filepath, read_from_disk=False) + + header_notice = """\ This file was generated with dyn2py from a Dynamo graph. Do not edit this section, if you want to update the Dynamo graph!\ """ - # Double escape path: - dyn_path_string = str(dynamo_file.realpath) - if "\\" in dyn_path_string: - dyn_path_string = dyn_path_string.replace("\\", "\\\\") + # Double escape path: + dyn_path_string = str(dynamo_file.realpath) + if "\\" in dyn_path_string: + dyn_path_string = dyn_path_string.replace("\\", "\\\\") - self.header_data = { - "dyn2py_version": METADATA["Version"], - "dyn2py_extracted": datetime.now().isoformat(), - "dyn_uuid": dynamo_file.uuid, - "dyn_name": dynamo_file.name, - "dyn_path": dyn_path_string, - "dyn_modified": dynamo_file.mtimeiso, - "py_id": python_node.id, - "py_engine": python_node.engine - } + self.header_data = { + "dyn2py_version": METADATA["Version"], + "dyn2py_extracted": datetime.now().isoformat(), + "dyn_uuid": dynamo_file.uuid, + "dyn_name": dynamo_file.name, + "dyn_path": dyn_path_string, + "dyn_modified": dynamo_file.mtimeiso, + "py_id": python_node.id, + "py_engine": python_node.engine + } - header_string = "\r\n".join( - [f"{k}:{self.header_data[k]}" for k in self.header_data]) - header_wrapper = '"""' + header_string = "\r\n".join( + [f"{k}:{self.header_data[k]}" for k in self.header_data]) + header_wrapper = '"""' - self.text = "\r\n".join([ - header_wrapper, - HEADER_SEPARATOR, - textwrap.dedent(header_notice), - HEADER_SEPARATOR, - header_string, - HEADER_SEPARATOR, - header_wrapper, - python_node.code - ]) + self.text = "\r\n".join([ + header_wrapper, + HEADER_SEPARATOR, + textwrap.dedent(header_notice), + HEADER_SEPARATOR, + header_string, + HEADER_SEPARATOR, + header_wrapper, + python_node.code + ]) - self.modified = True + self.modified = True + + else: + # Try to read from disk: + super().__init__(filepath, read_from_disk=True) + + self.open_files.add(self) + + def read_file(self, reread: bool = False) -> None: + """Read python script to parameters + + Args: + reread (bool, optional): Reread the file, even if it was read already. Defaults to False. + + Raises: + FileNotFoundError: The file does not exist + PythonFileException: Some error reading the file + """ + if not self.exists: + raise FileNotFoundError + + # Only read if it's not already open: + if not self in self.open_files or reread: + + logging.info(f"Reading file: {self.filepath}") + with open(self.filepath, mode="r", newline="", encoding="utf-8") as input_py: + python_lines = input_py.readlines() + + self.header_data = {} + header_separator_count = 0 + code_start_line = 0 + + for i, line in enumerate(python_lines): + line = line.strip() + logging.debug(f"Reading line: {line}") + + # Skip the first lines: + if header_separator_count < 2: + if line == HEADER_SEPARATOR: + header_separator_count += 1 + continue + # It's the last line of the header: + elif line == HEADER_SEPARATOR: + code_start_line = i+2 + break + + else: + # Find the location of the separator + sl = line.find(":") + if sl == -1: + raise PythonFileException("Error reading header!") + self.header_data[line[0:sl]] = line[sl+1:] + + self.code = "".join(python_lines[code_start_line:]) + self.open_files.add(self) + + logging.debug(f"Header data from python file: {self.header_data}") + # logging.debug(f"Code from python file: {self.code}") def update_dynamo(self, options: Options | None = None) -> None: """Update a the source Dynamo graph from this python script @@ -408,13 +487,11 @@ class PythonFile(File): if not options: options = Options() - self.read() - # Check if it was already opened: dynamo_file = DynamoFile.get_open_file_by_uuid( self.header_data["dyn_uuid"]) - # Open and read if it's the first time: + # Open if it's the first time: if not dynamo_file: dynamo_file = self.get_source_dynamo_file() @@ -457,55 +534,12 @@ class PythonFile(File): raise FileNotFoundError( f"Dynamo graph not found: {dynamo_file.filepath}") - dynamo_file.read() - # Check if uuid is ok: if not dynamo_file.uuid == self.header_data["dyn_uuid"]: raise DynamoFileException(f"Dynamo graph uuid changed!") return dynamo_file - def read(self) -> None: - """Read python script to parameters""" - - # Only read if it's not already open: - if not self in self.open_files: - - logging.info(f"Reading file: {self.filepath}") - with open(self.filepath, mode="r", newline="", encoding="utf-8") as input_py: - python_lines = input_py.readlines() - - self.header_data = {} - header_separator_count = 0 - code_start_line = 0 - - for i, line in enumerate(python_lines): - line = line.strip() - logging.debug(f"Reading line: {line}") - - # Skip the first lines: - if header_separator_count < 2: - if line == HEADER_SEPARATOR: - header_separator_count += 1 - continue - # It's the last line of the header: - elif line == HEADER_SEPARATOR: - code_start_line = i+2 - break - - else: - # Find the location of the separator - sl = line.find(":") - if sl == -1: - raise PythonFileException("Error reading header!") - self.header_data[line[0:sl]] = line[sl+1:] - - self.code = "".join(python_lines[code_start_line:]) - self.open_files.add(self) - - logging.debug(f"Header data from python file: {self.header_data}") - # logging.debug(f"Code from python file: {self.code}") - def write_file(self) -> None: """Write this file to the disk. Should be called only from File.write()""" with open(self.filepath, "w", encoding="utf-8", newline="") as output_file: diff --git a/pyproject.toml b/pyproject.toml index b810f2e..518b7ea 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -1,6 +1,6 @@ [project] name = "dyn2py" -version = "0.1.0" +version = "0.2.0" description = "Extract python code from Dynamo graphs" readme = "README.md" requires-python = ">=3.8" diff --git a/tests/test_DynamoFile.py b/tests/test_DynamoFile.py index 2b61522..c02e9b8 100644 --- a/tests/test_DynamoFile.py +++ b/tests/test_DynamoFile.py @@ -9,48 +9,42 @@ class TestDynamoFile(unittest.TestCase): # Missing methods: # get_related_python_files - # get_open_file_by_uuid # update_python_node # write - def test_read_and_variables(self): + def test_init(self): + dyn2py.DynamoFile.open_files.clear() + dyn = dyn2py.DynamoFile(f"{INPUT_DIR}/python_nodes.dyn") - dyn.read() self.assertEqual(dyn.uuid, "3c3b4c05-9716-4e93-9360-ca0637cb5486") self.assertEqual(dyn.name, "python_nodes") self.assertTrue(dyn in dyn.open_files) + # Dynamo 1 file: with self.assertRaises(dyn2py.DynamoFileException): dyn1 = dyn2py.DynamoFile(f"{INPUT_DIR}/dynamo1file.dyn") - dyn1.read() + # Not existing file: with self.assertRaises(FileNotFoundError): dyn2 = dyn2py.DynamoFile(f"{INPUT_DIR}/not_existing.dyn") - dyn2.read() - + + # No python nodes: + with self.assertRaises(dyn2py.PythonNodeNotFoundException): + dyn2 = dyn2py.DynamoFile(f"{INPUT_DIR}/no_python.dyn") def test_get_python_nodes(self): dyn = dyn2py.DynamoFile(f"{INPUT_DIR}/python_nodes.dyn") - py_nodes = dyn.get_python_nodes() py_node = dyn.get_python_node_by_id("d7704617c75e4bf1a5c387b7c3f001ea") - self.assertEqual(len(py_nodes), 6) + self.assertEqual(len(dyn.python_nodes), 6) self.assertTrue(py_node) - self.assertTrue(py_node in py_nodes) self.assertTrue(py_node in dyn.python_nodes) self.assertEqual(py_node.checksum, "1f3d9e6153804fe1ed37571a9cda8e26") with self.assertRaises(dyn2py.PythonNodeNotFoundException): dyn.get_python_node_by_id("wrongid") - dyn2 = dyn2py.DynamoFile(f"{INPUT_DIR}/no_python.dyn") - - # Raise error on file without python nodes: - with self.assertRaises(dyn2py.DynamoFileException): - dyn2.get_python_nodes() - - def test_extract_python(self): cleanup_output_dir() @@ -60,3 +54,15 @@ class TestDynamoFile(unittest.TestCase): output_dir = pathlib.Path(OUTPUT_DIR) self.assertEqual(len(list(output_dir.iterdir())), 6) + + def test_get_open_file_by_uuid(self): + dyn2py.DynamoFile.open_files.clear() + + dyn1 = dyn2py.DynamoFile(f"{INPUT_DIR}/python_nodes.dyn") + dyn2 = dyn2py.DynamoFile(f"{INPUT_DIR}/single_node.dyn") + + + self.assertEqual(dyn1, + dyn2py.DynamoFile.get_open_file_by_uuid("3c3b4c05-9716-4e93-9360-ca0637cb5486")) + self.assertEqual(dyn2, + dyn2py.DynamoFile.get_open_file_by_uuid("76de5c79-17c5-4c74-9f90-ad99a213d339")) diff --git a/tests/test_File.py b/tests/test_File.py index 5750fce..a972294 100644 --- a/tests/test_File.py +++ b/tests/test_File.py @@ -8,6 +8,8 @@ from tests.support import * class TestFile(unittest.TestCase): + # Write methods should be tested in subclasses! + def test_init(self): paths = [ f"{INPUT_DIR}/python_nodes.dyn", @@ -33,23 +35,25 @@ class TestFile(unittest.TestCase): self.assertEqual(the_file.extension, ".dyn") self.assertFalse(the_file.modified) + self.assertEqual(the_file.__class__, dyn2py.DynamoFile) + def test_init_newfile(self): paths = [ - f"{INPUT_DIR}/new_file.dyn", - pathlib.Path(f"{INPUT_DIR}/new_file.dyn") + f"{INPUT_DIR}/new_file.py", + pathlib.Path(f"{INPUT_DIR}/new_file.py") ] if platform.system() == "Windows": paths.extend([ - fr"{INPUT_DIR}\new_file.dyn", - pathlib.Path(fr"{INPUT_DIR}\new_file.dyn") + fr"{INPUT_DIR}\new_file.py", + pathlib.Path(fr"{INPUT_DIR}\new_file.py") ]) for path in paths: the_file = dyn2py.File(path) self.assertEqual(the_file.filepath, - pathlib.Path(f"{INPUT_DIR}/new_file.dyn")) + pathlib.Path(f"{INPUT_DIR}/new_file.py")) self.assertEqual(the_file.basename, "new_file") self.assertEqual(the_file.dirpath, pathlib.Path(INPUT_DIR)) self.assertEqual(the_file.realpath, pathlib.Path(path).resolve()) @@ -57,17 +61,21 @@ class TestFile(unittest.TestCase): self.assertEqual(the_file.mtime, 0.0) self.assertEqual(the_file.mtimeiso, "") self.assertFalse(the_file.exists) - self.assertEqual(the_file.extension, ".dyn") + self.assertEqual(the_file.extension, ".py") self.assertFalse(the_file.modified) - def test_newer(self): - # Touch a file, so it will be always newer than the others: - touched_file = pathlib.Path(f"{OUTPUT_DIR}/touched_file.py") - touched_file.touch() - newer_file = dyn2py.File(touched_file) + self.assertEqual(the_file.__class__, dyn2py.PythonFile) - older_file = dyn2py.File(f"{INPUT_DIR}/python_nodes.dyn") - nonexisting_file = dyn2py.File(f"{INPUT_DIR}/new_file.dyn") + def test_newer(self): + older_file = dyn2py.File(f"{INPUT_DIR}/single_node.dyn") + nonexisting_file = dyn2py.File(f"{INPUT_DIR}/new_file.py") + + # Extract a python file so it is always newer than the others: + cleanup_output_dir() + opt = dyn2py.Options(python_folder=OUTPUT_DIR) + older_file.extract_python(options=opt) # type: ignore + newer_file = dyn2py.File( + f"{OUTPUT_DIR}/single_node_1c5d99792882409e97e132b3e9f814b0.py") self.assertTrue(newer_file.is_newer(older_file)) self.assertTrue(newer_file.is_newer(nonexisting_file)) @@ -78,17 +86,6 @@ class TestFile(unittest.TestCase): self.assertFalse(nonexisting_file.is_newer(older_file)) self.assertFalse(nonexisting_file.is_newer(newer_file)) - def test_write(self): - existing_file = dyn2py.File(f"{INPUT_DIR}/python_nodes.dyn") - nonexisting_file = dyn2py.File(f"{INPUT_DIR}/new_file.dyn") - options = dyn2py.Options() - - with self.assertRaises(TypeError): - existing_file.write(options) - - with self.assertRaises(TypeError): - nonexisting_file.write(options) - def test_is_file(self): extract_single_node_dyn() @@ -101,8 +98,6 @@ class TestFile(unittest.TestCase): for path, f in paths: file = dyn2py.File(path) - - self.assertEqual(file.is_dynamo_file(), f=="dyn") - self.assertEqual(file.is_python_file(), f=="py") - + self.assertEqual(file.is_dynamo_file(), f == "dyn") + self.assertEqual(file.is_python_file(), f == "py") diff --git a/tests/test_PythonNode.py b/tests/test_PythonNode.py index e910547..e9274ba 100644 --- a/tests/test_PythonNode.py +++ b/tests/test_PythonNode.py @@ -8,7 +8,6 @@ class TestPythonNode(unittest.TestCase): def test_init_from_dyn(self): dyn = dyn2py.DynamoFile(f"{INPUT_DIR}/single_node.dyn") - dyn.read() node_dict = next((n for n in dyn.full_dict["Nodes"] if n["NodeType"] == "PythonScriptNode"), {}) @@ -43,8 +42,7 @@ class TestPythonNode(unittest.TestCase): mod_py.write(line) py = dyn2py.PythonFile(f"{OUTPUT_DIR}/single_node_mod.py") - py.read() - + node = dyn2py.PythonNode( node_id=py.header_data["py_id"], engine=py.header_data["py_engine"],