commit 9a294607127c6205df2ff86a3eb145bc7f8141f3 Author: infeeeee Date: Fri Mar 3 03:09:13 2023 +0100 Initial commit diff --git a/.gitignore b/.gitignore new file mode 100644 index 0000000..61ad37e --- /dev/null +++ b/.gitignore @@ -0,0 +1,4 @@ +.venv +example +__pycache__ +dyn2py.egg-info \ No newline at end of file diff --git a/README.md b/README.md new file mode 100644 index 0000000..a44eddf --- /dev/null +++ b/README.md @@ -0,0 +1,72 @@ +# dyn2py + +Extract python nodes from Dynamo graphs + +## Installation + +*TODO* + + + +## Usage + +```shell +> dyn2py --help +usage: dyn2py [-h] [-l LOGLEVEL] [-n] [-F] [-b] [-f {py,dyn}] [-u] [-p path/to/folder] source + +Extract python code from Dynamo graphs + +positional arguments: + source path to a Dynamo graph, a python script or a folder containing them + +options: + -h, --help show this help message and exit + -l LOGLEVEL, --loglevel LOGLEVEL + set log level, possible options: CRITICAL, ERROR, WARNING, INFO, DEBUG + -n, --dry-run do not modify files, only show log + -F, --force overwrite even if the files are older + -b, --backup create a backup for updated files + -f {py,dyn}, --filter {py,dyn} + only check python or Dynamo graphs, skip the others, useful for folders + +dynamo options, only for processing Dynamo graphs: + -u, --update update Dynamo graph from python scripts in the same folder + -p path/to/folder, --python-folder path/to/folder + extract python scripts to this folder, read python scripts from here with --update + +The script by default overwrites older files with newer files. +Do not move the source Dynamo graphs, or update won't work with them later. +``` + + + + +## Development + +### Installation + +Requirements: git, pip + +```shell +git clone https://github.com/infeeeee/dyn2py +cd dyn2py +py -m pip install -e . +``` + +With venv: + +```shell +git clone https://github.com/infeeeee/dyn2py +cd dyn2py +py -m venv .venv +. ./.venv/bin/activate +py -m pip install -e . +``` + diff --git a/dyn2py/__init__.py b/dyn2py/__init__.py new file mode 100644 index 0000000..431b2be --- /dev/null +++ b/dyn2py/__init__.py @@ -0,0 +1,149 @@ +#!/usr/bin/env python3 + +import argparse +import pathlib +from importlib_metadata import metadata +import textwrap +import logging +from dyn2py.classes import * + + +def run(): + + METADATA = metadata("dyn2py") + + parser = argparse.ArgumentParser( + prog=METADATA["Name"], + description=METADATA["Summary"], + formatter_class=argparse.RawDescriptionHelpFormatter, + epilog=textwrap.dedent("""\ + The script by default overwrites older files with newer files. + Do not move the source Dynamo graphs, or update won't work with them later. + """) + ) + + parser.add_argument("-l", "--loglevel", + metavar="LOGLEVEL", + choices=["CRITICAL", + "ERROR", + "WARNING", + "INFO", + "DEBUG"], + default="INFO", + help="set log level, possible options: CRITICAL, ERROR, WARNING, INFO, DEBUG ") + + parser.add_argument("-n", "--dry-run", + help="do not modify files, only show log", + action="store_true" + ) + + parser.add_argument("-F", "--force", + help="overwrite even if the files are older", + action="store_true") + + parser.add_argument("-b", "--backup", + help="create a backup for updated files", + action="store_true") + + parser.add_argument("-f", "--filter", + choices=["py", "dyn"], + help="only check python or Dynamo graphs, skip the others, useful for folders" + ) + + dynamo_options = parser.add_argument_group( + title="dynamo options, only for processing Dynamo graphs") + + dynamo_options.add_argument("-u", "--update", + help="update Dynamo graph from python scripts in the same folder", + action="store_true") + + dynamo_options.add_argument("-p", "--python-folder", + metavar="path/to/folder", + help="extract python scripts to this folder, read python scripts from here with --update", + type=pathlib.Path) + + parser.add_argument("source", + type=pathlib.Path, + help="path to a Dynamo graph, a python script or a folder containing them", + action="append" + ) + + args = parser.parse_args() + + # Set up logging: + logging.basicConfig(format='%(levelname)s: %(message)s', + level=args.loglevel) + + logging.debug(args) + + logging.debug(f"Parsed arguments: {vars(args)}") + + source_files = [] + + for source in args.source: + + if not source.exists(): + raise FileNotFoundError(f"Source file does not exist!") + + # Get files from folder: + elif source.is_dir(): + logging.debug(f"Source is a folder") + + for f in source.iterdir(): + source_files.append(f) + + # It's a single file: + else: + source_files.append(source) + + # Dynamo files come first: + files = [File(f) for f in source_files] + files.sort(key=lambda f: f.extension) + + # Filters: + if args.filter == "py": + files = [f for f in files if f.is_python_file()] + elif args.filter == "dyn": + files = [f for f in files if f.is_dynamo_file()] + + # Update mode: + elif args.update: + dynamo_files = [DynamoFile(f.filepath) + for f in files if f.is_dynamo_file()] + for d in dynamo_files: + try: + logging.info(f"Reading file for update: {d.filepath}") + d.read() + DynamoFileStorage().update_files.append(d) + except json.JSONDecodeError: + logging.error( + "File is not correctly formatted. Is it a Dynamo2 file?") + + # Find python files' folders and remove duplicates: + if args.python_folder: + python_folders = [args.python_folder] + else: + python_folders = [] + [python_folders.append(f.dirpath) + for f in dynamo_files if f.dirpath not in python_folders] + + # Add python files: + source_files = [] + [source_files.extend(pf.iterdir()) for pf in python_folders] + files = [PythonFile(fp) + for fp in source_files if File(fp).is_python_file()] + + # Cycle through files: + for f in files: + + if f.is_dynamo_file(): + logging.debug("Source is a Dynamo file") + dynamo_file = DynamoFile(f.filepath) + dynamo_file.extract_python(args) + + elif f.is_python_file(): + logging.debug("Source is a Python file") + python_file = PythonFile(f.filepath) + python_file.update_dynamo(args) + + DynamoFileStorage().write_open_files(args) diff --git a/dyn2py/__main__.py b/dyn2py/__main__.py new file mode 100644 index 0000000..dfc7bbc --- /dev/null +++ b/dyn2py/__main__.py @@ -0,0 +1,6 @@ +#!/usr/bin/env python3 + +from dyn2py import run + +if __name__ == "__main__": + run() \ No newline at end of file diff --git a/dyn2py/classes.py b/dyn2py/classes.py new file mode 100644 index 0000000..61f87c1 --- /dev/null +++ b/dyn2py/classes.py @@ -0,0 +1,477 @@ +import simplejson as json +import hashlib +import pathlib +import textwrap +import logging +import argparse +from datetime import datetime +from decimal import Decimal +from pathvalidate import sanitize_filename +from importlib_metadata import metadata + + +METADATA = metadata("dyn2py") +HEADER_SEPARATOR = "*" * 60 + + +class DynamoFileException(Exception): + pass + + +class PythonNodeNotFoundException(Exception): + pass + + +class PythonFileException(Exception): + pass + + +class File(): + """Base class for managing files""" + + def __init__(self, filepath: pathlib.Path) -> None: + + self.filepath = filepath + # basename: only the name of the file, without extension + self.basename = filepath.stem + # dirpath: containing folder + self.dirpath = filepath.parent + self.realpath = filepath.resolve() + self.mtime = False + self.mtimeiso = False + self.exists = self.filepath.exists() + self.extension = self.filepath.suffix + + if self.exists: + logging.debug(f"File exists: {self.filepath}") + self.mtime = self.filepath.stat().st_mtime + self.mtimeiso = datetime.fromtimestamp(self.mtime).isoformat() + + def is_newer(self, other_file: "File") -> bool: + """Check if this file is newer than the other file + + Args: + other_file (File): The other file + + Returns: + bool: True if this file is newer or the other doesn't exist + """ + if self.mtime and other_file.mtime: + return bool(self.mtime > other_file.mtime) + elif self.mtime: + return True + else: + return False + + def is_dynamo_file(self) -> bool: + """Check if this is a Dynamo file + + Returns: + bool: True if it's Dynamo file + """ + return bool(self.extension in [".dyn", ".dyf"]) + + def is_python_file(self) -> bool: + """Check if this is a python file + + Returns: + bool: True if it's python file + """ + return bool(self.extension == ".py") + + def write(self, args: argparse.Namespace) -> None: + """Prepare writing file to the disk: + create backup, process dry-run, call filetype specific write_file() methods + + Args: + args (argparse.Namespace): parsed arguments + """ + # Create backup: + if not args.dry_run and self.filepath.exists() and args.backup: + backup_filename = sanitize_filename( + f"{self.basename}_{self.mtimeiso}{self.extension}") + backup_path = self.dirpath.joinpath(backup_filename) + logging.info(f"Creating backup to {backup_path}") + self.filepath.rename(backup_path) + + # Call filetype specific methods: + if args.dry_run: + logging.info( + f"Should write file, but it's a dry-run: {self.filepath}") + else: + logging.info(f"Writing file: {self.filepath}") + self.write_file() + + def write_file(self): + """Should be implemented in subclasses""" + pass + + +class DynamoFile(File): + + def extract_python(self, args: argparse.Namespace) -> None: + """Extract and write python files + + Args: + args (argparse.Namespace): parsed arguments + """ + + logging.info(f"Extracting from file: {self.filepath}") + + try: + self.read() + + # Go through nodes in the file: + for python_node in self.get_python_nodes(): + if args.python_folder: + python_file_path = args.python_folder.joinpath( + python_node.filename) + else: + python_file_path = python_node.filepath + + python_file = PythonFile(python_file_path) + python_file.generate_text( + dynamo_file=self, python_node=python_node) + + if python_file.is_newer(self) and not args.force: + logging.info( + f"Python file is newer, skipping: {python_file.filepath}") + continue + + python_file.write(args) + + except DynamoFileException as e: + logging.warn(e) + return + except json.JSONDecodeError: + logging.error( + "File is not correctly formatted. Is it a Dynamo2 file?") + return + + def read(self) -> None: + """Read Dynamo graph to parameters""" + logging.debug(f"Reading file: {self.filepath}") + + with open(self.filepath, "r", encoding="utf-8") as input_json: + self.full_dict = json.load(input_json, + use_decimal=True) + self.uuid = self.full_dict["Uuid"] + self.name = self.full_dict["Name"] + + def get_python_nodes(self) -> list["PythonNode"]: + """Get python nodes from the Dynamo graph + + Returns: + list[PythonNode]: A list of PythonNodes in the file + """ + full_python_nodes = [n for n in self.full_dict["Nodes"] + if n["NodeType"] == "PythonScriptNode"] + + python_nodes = [] + + for p_node in full_python_nodes: + # The name of the node is stored here: + node_views = self.full_dict["View"]["NodeViews"] + python_node = PythonNode( + node_dict_from_dyn=p_node, + full_nodeviews_dict_from_dyn=node_views, + source_dynamo_file=self) + python_nodes.append(python_node) + + if not python_nodes: + raise DynamoFileException("No python nodes in this file!") + + return python_nodes + + def get_python_node_by_id(self, node_id: str) -> "PythonNode": + """Get a PythonNode object from this Dynamo graph, by its id + + Args: + node_id (str): The id of the python node as string + + Returns: + PythonNode: The PythonNode with the given id + """ + python_node_dict = next(( + n for n in self.full_dict["Nodes"] if n["Id"] == node_id + ), {}) + if not python_node_dict: + raise PythonNodeNotFoundException( + f"Node not found with id {node_id}") + + python_node = PythonNode( + node_dict_from_dyn=python_node_dict) + + return python_node + + def update_python_node(self, python_node: "PythonNode") -> None: + """Update the code of a PythonNode in this file + + Args: + python_node (PythonNode): The new node data + """ + node_dict = next(( + n for n in self.full_dict["Nodes"] if n["Id"] == python_node.id + ), {}) + if not node_dict: + raise PythonNodeNotFoundException() + else: + node_dict["Code"] = python_node.code + + def write_file(self) -> None: + """Write this file to the disk. Should be called only from File.write()""" + with open(self.filepath, "w", encoding="utf-8") as output_file: + json.dump(self.full_dict, output_file, indent=2, use_decimal=True) + + +class PythonFile(File): + + def generate_text(self, dynamo_file: DynamoFile, python_node: "PythonNode") -> None: + """Generate full text to write with header + + Args: + dynamo_file (DynamoFile): The source dynamo file + python_node (PythonNode): The python node to write + """ + + header_notice = """\ + This file was generated with dyn2py from a Dynamo graph. + Do not edit this section, if you want to update the Dynamo graph!\ + """ + + self.header_data = { + "dyn2py_version": METADATA["Version"], + "dyn2py_extracted": datetime.now().isoformat(), + "dyn_uuid": dynamo_file.uuid, + "dyn_name": dynamo_file.name, + "dyn_path": dynamo_file.realpath, + "dyn_modified": dynamo_file.mtimeiso, + "py_id": python_node.id, + "py_engine": python_node.engine + } + + header_string = "\r\n".join( + [f"{k}:{self.header_data[k]}" for k in self.header_data]) + header_wrapper = '"""' + + self.text = "\r\n".join([ + header_wrapper, + HEADER_SEPARATOR, + textwrap.dedent(header_notice), + HEADER_SEPARATOR, + header_string, + HEADER_SEPARATOR, + header_wrapper, + python_node.code + ]) + + def update_dynamo(self, args: argparse.Namespace) -> None: + """Update a the source Dynamo graph from this python script + + Args: + args (argparse.Namespace): parsed arguments + """ + self.read() + + dynamo_file_storage = DynamoFileStorage() + + # Update mode, check if needed: + if args.update: + if not dynamo_file_storage.is_uuid_on_update_list(self.header_data["dyn_uuid"]): + logging.info( + "Dynamo graph of this script shouldn't be updated") + return + + # Check if it was already opened: + logging.debug(f"Open files: {dynamo_file_storage.open_files}") + dynamo_file = dynamo_file_storage.get_open_file_by_uuid( + self.header_data["dyn_uuid"]) + + # Open and read if it's the first time: + if not dynamo_file: + dynamo_file = DynamoFile( + pathlib.Path(self.header_data["dyn_path"])) + + if not dynamo_file.exists: + raise FileNotFoundError( + f"Dynamo graph not found: {dynamo_file.filepath}") + + dynamo_file.read() + + # Check if uuid is ok: + if not dynamo_file.uuid == self.header_data["dyn_uuid"]: + raise DynamoFileException(f"Dynamo graph uuid changed!") + + new_python_node = PythonNode( + node_id=self.header_data["py_id"], + engine=self.header_data["py_engine"], + code=self.code, + checksum=hashlib.md5(self.code.encode()).hexdigest() + ) + + old_python_node = dynamo_file.get_python_node_by_id( + self.header_data["py_id"]) + + # Check checksum: + if new_python_node.checksum == old_python_node.checksum: + logging.info("Python file not changed, skipping") + return + + if dynamo_file.is_newer(self) and not args.force: + logging.info("Dynamo graph is newer, skipping") + return + + logging.info(f"Dynamo graph will be updated: {dynamo_file.filepath}") + dynamo_file.update_python_node(new_python_node) + dynamo_file_storage.append_open_file(dynamo_file) + + def read(self) -> None: + """Read python script to parameters""" + logging.info(f"Reading file: {self.filepath}") + with open(self.filepath, mode="r", newline="\r\n", encoding="utf-8") as input_py: + python_lines = input_py.readlines() + + self.header_data = {} + header_separator_count = 0 + code_start_line = 0 + + for i, line in enumerate(python_lines): + line = line.strip() + logging.debug(f"Reading line: {line}") + + # Skip the first lines: + if header_separator_count < 2: + if line == HEADER_SEPARATOR: + header_separator_count += 1 + continue + # It's the last line of the header: + elif line == HEADER_SEPARATOR: + code_start_line = i+2 + break + + else: + # Find the location of the separator + sl = line.find(":") + if sl == -1: + raise PythonFileException("Error reading header!") + self.header_data[line[0:sl]] = line[sl+1:] + + self.code = "".join(python_lines[code_start_line:]) + + logging.debug(f"Header data from python file: {self.header_data}") + # logging.debug(f"Code from python file: {self.code}") + + def write_file(self) -> None: + """Write this file to the disk. Should be called only from File.write()""" + with open(self.filepath, "w", encoding="utf-8") as output_file: + output_file.write(self.text) + + +class PythonNode(): + """A Python node with all data""" + + def __init__(self, node_dict_from_dyn: dict = {}, full_nodeviews_dict_from_dyn: dict = {}, + node_id: str = "", engine: str = "IronPython2", code: str = "", checksum: str = "", name: str = "", + source_dynamo_file: DynamoFile = None) -> None: # type: ignore + """A PythonNode object + + Args: + node_dict_from_dyn (dict, optional): The dict of the node from a dyn file. + If this is given, string parameters are ignored. Defaults to {}. + full_nodeviews_dict_from_dyn (dict, optional): The full nodeviews dict from a dyn file. Defaults to {}. + node_id (str, optional): Id of the node. Defaults to "". + engine (str, optional): Engine of the node. Defaults to "". + code (str, optional): The code text. Defaults to "". + checksum (str, optional): Checksum of the code . Defaults to "". + name (str, optional): The name of the node. Defaults to "". + source_dynamo_file (DynamoFile, optional): The file the node is from, to generate filename and filepath. Defaults to None. + """ + if node_dict_from_dyn: + self.id = node_dict_from_dyn["Id"] + # Older dynamo files doesn't have "Engine" property, fall back to the default + if "Engine" in node_dict_from_dyn: + self.engine = node_dict_from_dyn["Engine"] + else: + self.engine = engine + self.code = node_dict_from_dyn["Code"] + self.checksum = hashlib.md5(self.code.encode()).hexdigest() + if full_nodeviews_dict_from_dyn: + self.name = next( + (v["Name"] for v in full_nodeviews_dict_from_dyn if v["Id"] == node_dict_from_dyn["Id"]), None) + else: + self.name = name + else: + self.id = node_id + self.engine = engine + self.code = code + self.checksum = checksum + self.name = name + + # Generate filename and filepath if source is given: + if source_dynamo_file: + filename_parts = [source_dynamo_file.basename, self.id] + + # Only add the name of the node if it's changed: + if self.name and self.name != "Python Script": + filename_parts.append(self.name) + + logging.debug(f"Generating filename from: {filename_parts}") + self.filename = sanitize_filename( + "_".join(filename_parts) + ".py") + self.filepath = source_dynamo_file.dirpath.joinpath(self.filename) + + +class DynamoFileStorage(): + open_files: list[DynamoFile] = [] + update_files: list[DynamoFile] = [] + + # This is a singleton: + def __new__(cls): + if not hasattr(cls, 'instance'): + cls.instance = super(DynamoFileStorage, cls).__new__(cls) + return cls.instance + + def get_open_file_by_uuid(self, uuid: str) -> DynamoFile: + """Get an open Dynamo graph by its uuid + + Args: + uuid (str): Uuid of the file + + Returns: + DynamoFile: The file. None if not found + """ + f = next((d for d in self.open_files if d.uuid == uuid), None) + if f: + logging.debug(f"Found open file {f.uuid}") + return f # type: ignore + + def is_uuid_on_update_list(self, uuid: str) -> bool: + """Check if this file is on the list of files to update + + Args: + uuid (str): Uuid of the file + + Returns: + bool: True, if the file is on the list + """ + f = next((d for d in self.update_files if d.uuid == uuid), False) + return bool(f) + + def append_open_file(self, dynamo_file: DynamoFile) -> None: + """Add a file to the list of open files + + Args: + dynamo_file (DynamoFile): The file to add + """ + if not dynamo_file in self.open_files: + self.open_files.append(dynamo_file) + logging.debug("Dynamo file added to open files") + + def write_open_files(self, args: argparse.Namespace) -> None: + """Save open files to disk + + Args: + args (argparse.Namespace): parsed arguments + """ + for f in self.open_files: + f.write(args) diff --git a/pyproject.toml b/pyproject.toml new file mode 100644 index 0000000..459388b --- /dev/null +++ b/pyproject.toml @@ -0,0 +1,28 @@ +[project] +name = "dyn2py" +version = "0.0.1" +description = "Extract python code from Dynamo graphs" +readme = "README.md" +requires-python = ">=3.7" +license = { file = "COPYING" } + + +authors = [{ name = "infeeeee", email = "gyetpet@mailbox.org" }] +maintainers = [{ name = "infeeeee", email = "gyetpet@mailbox.org" }] +classifiers = [] + +dependencies = ["importlib_metadata", "pathvalidate", "simplejson"] + +[project.urls] +homepage = "https://github.com/infeeeee/dyn2py" +documentation = "https://github.com/infeeeee/dyn2py" +repository = "https://github.com/infeeeee/dyn2py" +changelog = "https://github.com/infeeeee/dyn2py/releases" + + +[project.scripts] +dyn2py = "dyn2py:run" + + +[build-system] +requires = ["setuptools", "wheel"]