Basic support for running as a module

This commit is contained in:
2023-03-08 00:33:59 +01:00
parent 69a49d530f
commit f7b6b99129
7 changed files with 451 additions and 255 deletions

1
.gitignore vendored
View File

@@ -4,3 +4,4 @@ __pycache__
dyn2py.egg-info dyn2py.egg-info
build build
dist dist
docs

View File

@@ -2,6 +2,11 @@
Extract python nodes from Dynamo graphs Extract python nodes from Dynamo graphs
Use cases:
- Track changes in python nodes in source control systems like git
- Work on python code in your favorite code editor outside Dynamo. `dyn2py` can also update Dynamo graphs from the previously exported python files.
## Installation ## Installation
*TODO* *TODO*
@@ -17,6 +22,8 @@ Extract python nodes from Dynamo graphs
## Usage ## Usage
### As a standalone command line program
``` ```
> dyn2py --help > dyn2py --help
usage: dyn2py [-h] [-l LOGLEVEL] [-n] [-F] [-b] [-f {py,dyn}] [-u] [-p path/to/folder] source usage: dyn2py [-h] [-l LOGLEVEL] [-n] [-F] [-b] [-f {py,dyn}] [-u] [-p path/to/folder] source
@@ -45,6 +52,41 @@ The script by default overwrites older files with newer files.
Do not move the source Dynamo graphs, or update won't work with them later. Do not move the source Dynamo graphs, or update won't work with them later.
``` ```
### As a python module
API documentation available here: *TODO*
Most basic example to extract all nodes next to a dynamo file:
```python
import dyn2py
dynamo_file = dyn2py.DynamoFile("path/to/dynamofile.dyn")
dynamo_file.extract_python()
```
Change options like with the command line switches with the `Options` class:
```python
import dyn2py
# Create a backup on overwrite, read python files from a different folder:
options = dyn2py.Options(
backup=True,
python_folder="path/to/pythonfiles")
dynamo_file = dyn2py.DynamoFile("path/to/dynamofile")
python_files = dynamo_file.get_related_python_files(options)
# Read python files and update the graph:
if python_files:
for python_file in python_files:
python_file.update_dynamo(options)
# Don't forget to save at the end:
dynamo_file.write(options)
```
## Development ## Development
### Installation ### Installation
@@ -54,7 +96,7 @@ Requirements: git, pip
``` ```
git clone https://github.com/infeeeee/dyn2py git clone https://github.com/infeeeee/dyn2py
cd dyn2py cd dyn2py
py -m pip install -e . pip install -e .
``` ```
With venv: With venv:
@@ -62,14 +104,21 @@ With venv:
``` ```
git clone https://github.com/infeeeee/dyn2py git clone https://github.com/infeeeee/dyn2py
cd dyn2py cd dyn2py
py -m venv .venv venv .venv
. ./.venv/bin/activate . ./.venv/bin/activate
py -m pip install -e . pip install -e .
``` ```
Build: ### Build
```shell ```
pip install -e .[build] pip install -e .[build]
pyinstaller dyn2py.spec pyinstaller dyn2py.spec
``` ```
### Generate module documentation
```
pip install -e .[doc]
pdoc -d google -o docs dyn2py
```

View File

@@ -1,16 +1,43 @@
#!/usr/bin/env python3 #!/usr/bin/env python3
"""
.. include:: ../README.md
"""
import argparse import argparse
import pathlib import pathlib
from importlib_metadata import metadata from importlib_metadata import metadata
import textwrap import textwrap
import logging import logging
from dyn2py.classes import * from dyn2py.files import *
from dyn2py.options import *
def run():
METADATA = metadata("dyn2py") METADATA = metadata("dyn2py")
__version__ = METADATA["Version"]
__all__ = [
"run",
"Options",
"File",
"DynamoFile",
"PythonFile",
"PythonNode",
"DynamoFileException",
"PythonNodeNotFoundException",
"PythonFileException"
]
def run(options: Options | None = None) -> None:
"""Run an extraction as from the command line
Args:
options (Options): Options as from the command line.
Raises:
FileNotFoundError: If the source file does not exist
"""
if not options:
parser = argparse.ArgumentParser( parser = argparse.ArgumentParser(
prog=METADATA["Name"], prog=METADATA["Name"],
@@ -24,13 +51,9 @@ def run():
parser.add_argument("-l", "--loglevel", parser.add_argument("-l", "--loglevel",
metavar="LOGLEVEL", metavar="LOGLEVEL",
choices=["CRITICAL", choices=LOGLEVELS,
"ERROR", default=DEFAULT_LOGLEVEL,
"WARNING", help=f"set log level, possible options: {', '.join(LOGLEVELS)} ")
"INFO",
"DEBUG"],
default="INFO",
help="set log level, possible options: CRITICAL, ERROR, WARNING, INFO, DEBUG ")
parser.add_argument("-n", "--dry-run", parser.add_argument("-n", "--dry-run",
help="do not modify files, only show log", help="do not modify files, only show log",
@@ -46,7 +69,7 @@ def run():
action="store_true") action="store_true")
parser.add_argument("-f", "--filter", parser.add_argument("-f", "--filter",
choices=["py", "dyn"], choices=FILTERS,
help="only check python or Dynamo graphs, skip the others, useful for folders" help="only check python or Dynamo graphs, skip the others, useful for folders"
) )
@@ -68,19 +91,17 @@ def run():
action="append" action="append"
) )
args = parser.parse_args() options = parser.parse_args(namespace=Options())
# Set up logging: # Set up logging:
logging.basicConfig(format='%(levelname)s: %(message)s', logging.basicConfig(format='%(levelname)s: %(message)s',
level=args.loglevel) level=options.loglevel)
logging.debug(options)
logging.debug(args) logging.debug(f"Parsed arguments: {vars(options)}")
logging.debug(f"Parsed arguments: {vars(args)}")
# Set up sources:
source_files = [] source_files = []
for source in options.source:
for source in args.source:
if not source.exists(): if not source.exists():
raise FileNotFoundError(f"Source file does not exist!") raise FileNotFoundError(f"Source file does not exist!")
@@ -96,42 +117,28 @@ def run():
else: else:
source_files.append(source) source_files.append(source)
# Dynamo files come first: # Dynamo files come first, sort sources:
files = [File(f) for f in source_files] files = [File(f) for f in source_files]
files.sort(key=lambda f: f.extension) files.sort(key=lambda f: f.extension)
# Filters: # Filters:
if args.filter == "py": if options.filter == "py":
files = [f for f in files if f.is_python_file()] files = [f for f in files if f.is_python_file()]
elif args.filter == "dyn": elif options.filter == "dyn":
files = [f for f in files if f.is_dynamo_file()] files = [f for f in files if f.is_dynamo_file()]
# Update mode: # Update mode:
elif args.update: elif options.update:
dynamo_files = [DynamoFile(f.filepath) dynamo_files = [DynamoFile(f.filepath)
for f in files if f.is_dynamo_file()] for f in files if f.is_dynamo_file()]
for d in dynamo_files: python_files = set()
try:
logging.info(f"Reading file for update: {d.filepath}")
d.read()
DynamoFileStorage().update_files.append(d)
except json.JSONDecodeError:
logging.error(
"File is not correctly formatted. Is it a Dynamo2 file?")
# Find python files' folders and remove duplicates: for dynamo_file in dynamo_files:
if args.python_folder: p = dynamo_file.get_related_python_files(options)
python_folders = [args.python_folder] if p:
else: python_files.update(p)
python_folders = []
[python_folders.append(f.dirpath)
for f in dynamo_files if f.dirpath not in python_folders]
# Add python files: files = list(python_files)
source_files = []
[source_files.extend(pf.iterdir()) for pf in python_folders]
files = [PythonFile(fp)
for fp in source_files if File(fp).is_python_file()]
# Cycle through files: # Cycle through files:
for f in files: for f in files:
@@ -139,11 +146,13 @@ def run():
if f.is_dynamo_file(): if f.is_dynamo_file():
logging.debug("Source is a Dynamo file") logging.debug("Source is a Dynamo file")
dynamo_file = DynamoFile(f.filepath) dynamo_file = DynamoFile(f.filepath)
dynamo_file.extract_python(args) dynamo_file.extract_python(options)
elif f.is_python_file(): elif f.is_python_file():
logging.debug("Source is a Python file") logging.debug("Source is a Python file")
python_file = PythonFile(f.filepath) python_file = PythonFile(f.filepath)
python_file.update_dynamo(args) python_file.update_dynamo(options)
DynamoFileStorage().write_open_files(args) # Dynamo files are written only at the end, so they don't get updated too frequently
for f in DynamoFile.open_files:
f.write(options)

10
dyn2py/exceptions.py Normal file
View File

@@ -0,0 +1,10 @@
class DynamoFileException(Exception):
pass
class PythonNodeNotFoundException(Exception):
pass
class PythonFileException(Exception):
pass

View File

@@ -3,44 +3,53 @@ import hashlib
import pathlib import pathlib
import textwrap import textwrap
import logging import logging
import argparse
from datetime import datetime from datetime import datetime
from decimal import Decimal from decimal import Decimal
from pathvalidate import sanitize_filename from pathvalidate import sanitize_filename
from importlib_metadata import metadata from importlib_metadata import metadata
from dyn2py.exceptions import *
from dyn2py.options import Options
METADATA = metadata("dyn2py") METADATA = metadata("dyn2py")
HEADER_SEPARATOR = "*" * 60 HEADER_SEPARATOR = "*" * 60
class DynamoFileException(Exception):
pass
class PythonNodeNotFoundException(Exception):
pass
class PythonFileException(Exception):
pass
class File(): class File():
"""Base class for managing files""" """Base class for managing files"""
def __init__(self, filepath: pathlib.Path) -> None: def __init__(self, filepath: pathlib.Path | str) -> None:
"""Generate a file object.
Args:
filepath (pathlib.Path | str): Path to the python file or Dynamo graph
"""
self.filepath: pathlib.Path
"""Path to the file as a pathlib.Path object"""
if isinstance(filepath, str):
self.filepath = pathlib.Path(filepath)
else:
self.filepath = filepath self.filepath = filepath
# basename: only the name of the file, without extension
self.basename = filepath.stem self.basename: str = self.filepath.stem
# dirpath: containing folder """Only the name of the file, without path or extension"""
self.dirpath = filepath.parent self.dirpath: pathlib.Path = self.filepath.parent
self.realpath = filepath.resolve() """Containing folder"""
self.mtime = False self.realpath: pathlib.Path = self.filepath.resolve()
self.mtimeiso = False """Full resolved path to the file"""
self.exists = self.filepath.exists() self.mtime: float = 0.0
self.extension = self.filepath.suffix """Modification time. 0 if does not exist"""
self.mtimeiso: str = ""
"""Modification time as an iso formatted string"""
self.exists: bool = self.filepath.exists()
"""If the file exists"""
self.extension: str = self.filepath.suffix
"""File extension as string"""
self.modified: bool = False
"""If an existing file was modified"""
if self.exists: if self.exists:
logging.debug(f"File exists: {self.filepath}") logging.debug(f"File exists: {self.filepath}")
@@ -79,15 +88,20 @@ class File():
""" """
return bool(self.extension == ".py") return bool(self.extension == ".py")
def write(self, args: argparse.Namespace) -> None: def write(self, options: Options) -> None:
"""Prepare writing file to the disk: """Prepare writing file to the disk:
create backup, process dry-run, call filetype specific write_file() methods create backup, process dry-run, call filetype specific write_file() methods
Args: Args:
args (argparse.Namespace): parsed arguments options(Options): Run options.
""" """
if not self.modified:
logging.debug("File not modified, not saving")
return
# Create backup: # Create backup:
if not args.dry_run and self.filepath.exists() and args.backup: if not options.dry_run and self.filepath.exists() and options.backup:
backup_filename = sanitize_filename( backup_filename = sanitize_filename(
f"{self.basename}_{self.mtimeiso}{self.extension}") f"{self.basename}_{self.mtimeiso}{self.extension}")
backup_path = self.dirpath.joinpath(backup_filename) backup_path = self.dirpath.joinpath(backup_filename)
@@ -95,7 +109,7 @@ class File():
self.filepath.rename(backup_path) self.filepath.rename(backup_path)
# Call filetype specific methods: # Call filetype specific methods:
if args.dry_run: if options.dry_run:
logging.info( logging.info(
f"Should write file, but it's a dry-run: {self.filepath}") f"Should write file, but it's a dry-run: {self.filepath}")
else: else:
@@ -108,14 +122,28 @@ class File():
class DynamoFile(File): class DynamoFile(File):
"""A Dynamo file, subclass of File()"""
def extract_python(self, args: argparse.Namespace) -> None: full_dict: dict
"""The contents of the Dynamo file, as dict."""
uuid: str
"""The uuid of the graph"""
name: str
"""The name of the graph, read from the file. Not the name of the file"""
open_files: set["DynamoFile"] = set()
"""A set of open Dynamo files, before saving"""
def extract_python(self, options: Options | None = None) -> None:
"""Extract and write python files """Extract and write python files
Args: Args:
args (argparse.Namespace): parsed arguments options(Options | None, optional): Run options. Defaults to None.
""" """
if not options:
options = Options()
logging.info(f"Extracting from file: {self.filepath}") logging.info(f"Extracting from file: {self.filepath}")
try: try:
@@ -123,8 +151,8 @@ class DynamoFile(File):
# Go through nodes in the file: # Go through nodes in the file:
for python_node in self.get_python_nodes(): for python_node in self.get_python_nodes():
if args.python_folder: if options.python_folder:
python_file_path = args.python_folder.joinpath( python_file_path = options.python_folder.joinpath(
python_node.filename) python_node.filename)
else: else:
python_file_path = python_node.filepath python_file_path = python_node.filepath
@@ -133,12 +161,12 @@ class DynamoFile(File):
python_file.generate_text( python_file.generate_text(
dynamo_file=self, python_node=python_node) dynamo_file=self, python_node=python_node)
if python_file.is_newer(self) and not args.force: if python_file.is_newer(self) and not options.force:
logging.info( logging.info(
f"Python file is newer, skipping: {python_file.filepath}") f"Existing file is newer, skipping: {python_file.filepath}")
continue continue
python_file.write(args) python_file.write(options)
except DynamoFileException as e: except DynamoFileException as e:
logging.warn(e) logging.warn(e)
@@ -150,13 +178,16 @@ class DynamoFile(File):
def read(self) -> None: def read(self) -> None:
"""Read Dynamo graph to parameters""" """Read Dynamo graph to parameters"""
logging.debug(f"Reading file: {self.filepath}") # Only read if it's not already open:
if not self in self.open_files:
logging.debug(f"Reading file: {self.filepath}")
with open(self.filepath, "r", encoding="utf-8") as input_json: with open(self.filepath, "r", encoding="utf-8") as input_json:
self.full_dict = json.load(input_json, self.full_dict = json.load(input_json,
use_decimal=True) use_decimal=True)
self.uuid = self.full_dict["Uuid"] self.uuid = self.full_dict["Uuid"]
self.name = self.full_dict["Name"] self.name = self.full_dict["Name"]
self.open_files.add(self)
def get_python_nodes(self) -> list["PythonNode"]: def get_python_nodes(self) -> list["PythonNode"]:
"""Get python nodes from the Dynamo graph """Get python nodes from the Dynamo graph
@@ -217,14 +248,66 @@ class DynamoFile(File):
raise PythonNodeNotFoundException() raise PythonNodeNotFoundException()
else: else:
node_dict["Code"] = python_node.code node_dict["Code"] = python_node.code
self.modified = True
def write_file(self) -> None: def write_file(self) -> None:
"""Write this file to the disk. Should be called only from File.write()""" """Write this file to the disk. Should be called only from File.write()"""
with open(self.filepath, "w", encoding="utf-8") as output_file: with open(self.filepath, "w", encoding="utf-8") as output_file:
json.dump(self.full_dict, output_file, indent=2, use_decimal=True) json.dump(self.full_dict, output_file, indent=2, use_decimal=True)
def get_related_python_files(self, options: Options | None = None) -> list["PythonFile"]:
"""Get python files exported from this Dynamo file
Returns:
list[PythonFile]: A list of PythonFile objects
"""
if not options:
options = Options()
self.read()
# Find the folder of the python files
if options.python_folder:
python_folder = options.python_folder
else:
python_folder = self.dirpath
python_files_in_folder = [PythonFile(f) for f in python_folder.iterdir()
if File(f).is_python_file()]
[p.read() for p in python_files_in_folder]
related_python_files = [
p for p in python_files_in_folder if p.get_source_dynamo_file().uuid == self.uuid]
return related_python_files
@staticmethod
def get_open_file_by_uuid(uuid: str) -> "DynamoFile | None":
"""Get an open Dynamo graph by its uuid
Args:
uuid(str): Uuid of the file
Returns:
DynamoFile: The file. None if not found
"""
f = next((d for d in DynamoFile.open_files if d.uuid == uuid), None)
if f:
logging.debug(f"Found open file {f.uuid}")
return f
class PythonFile(File): class PythonFile(File):
"""A Python file, subclass of File()"""
code: str
"""The python code as a string"""
header_data: dict
"""Parsed dict from the header of a python file"""
text: str
"""Full contents of the file before writing"""
open_files: set["PythonFile"] = set()
"""A set of open Python files"""
def generate_text(self, dynamo_file: DynamoFile, python_node: "PythonNode") -> None: def generate_text(self, dynamo_file: DynamoFile, python_node: "PythonNode") -> None:
"""Generate full text to write with header """Generate full text to write with header
@@ -265,42 +348,27 @@ class PythonFile(File):
python_node.code python_node.code
]) ])
def update_dynamo(self, args: argparse.Namespace) -> None: self.modified = True
def update_dynamo(self, options: Options | None = None) -> None:
"""Update a the source Dynamo graph from this python script """Update a the source Dynamo graph from this python script
Args: Args:
args (argparse.Namespace): parsed arguments options (Options | None, optional): Run options. Defaults to None.
""" """
if not options:
options = Options()
self.read() self.read()
dynamo_file_storage = DynamoFileStorage()
# Update mode, check if needed:
if args.update:
if not dynamo_file_storage.is_uuid_on_update_list(self.header_data["dyn_uuid"]):
logging.info(
"Dynamo graph of this script shouldn't be updated")
return
# Check if it was already opened: # Check if it was already opened:
logging.debug(f"Open files: {dynamo_file_storage.open_files}") dynamo_file = DynamoFile.get_open_file_by_uuid(
dynamo_file = dynamo_file_storage.get_open_file_by_uuid(
self.header_data["dyn_uuid"]) self.header_data["dyn_uuid"])
# Open and read if it's the first time: # Open and read if it's the first time:
if not dynamo_file: if not dynamo_file:
dynamo_file = DynamoFile( dynamo_file = self.get_source_dynamo_file()
pathlib.Path(self.header_data["dyn_path"]))
if not dynamo_file.exists:
raise FileNotFoundError(
f"Dynamo graph not found: {dynamo_file.filepath}")
dynamo_file.read()
# Check if uuid is ok:
if not dynamo_file.uuid == self.header_data["dyn_uuid"]:
raise DynamoFileException(f"Dynamo graph uuid changed!")
new_python_node = PythonNode( new_python_node = PythonNode(
node_id=self.header_data["py_id"], node_id=self.header_data["py_id"],
@@ -317,16 +385,44 @@ class PythonFile(File):
logging.info("Python file not changed, skipping") logging.info("Python file not changed, skipping")
return return
if dynamo_file.is_newer(self) and not args.force: if dynamo_file.is_newer(self) and not options.force:
logging.info("Dynamo graph is newer, skipping") logging.info("Dynamo graph is newer, skipping")
return return
logging.info(f"Dynamo graph will be updated: {dynamo_file.filepath}") logging.info(f"Dynamo graph will be updated: {dynamo_file.filepath}")
dynamo_file.update_python_node(new_python_node) dynamo_file.update_python_node(new_python_node)
dynamo_file_storage.append_open_file(dynamo_file)
def get_source_dynamo_file(self) -> DynamoFile:
"""Get the source Dynamo file of this PythonFile
Raises:
FileNotFoundError: The dynamo file not found
DynamoFileException: The uuid of the dynamo file changed
Returns:
DynamoFile: The DynamoFile
"""
dynamo_file = DynamoFile(
pathlib.Path(self.header_data["dyn_path"]))
if not dynamo_file.exists:
raise FileNotFoundError(
f"Dynamo graph not found: {dynamo_file.filepath}")
dynamo_file.read()
# Check if uuid is ok:
if not dynamo_file.uuid == self.header_data["dyn_uuid"]:
raise DynamoFileException(f"Dynamo graph uuid changed!")
return dynamo_file
def read(self) -> None: def read(self) -> None:
"""Read python script to parameters""" """Read python script to parameters"""
# Only read if it's not already open:
if not self in self.open_files:
logging.info(f"Reading file: {self.filepath}") logging.info(f"Reading file: {self.filepath}")
with open(self.filepath, mode="r", newline="\r\n", encoding="utf-8") as input_py: with open(self.filepath, mode="r", newline="\r\n", encoding="utf-8") as input_py:
python_lines = input_py.readlines() python_lines = input_py.readlines()
@@ -357,6 +453,7 @@ class PythonFile(File):
self.header_data[line[0:sl]] = line[sl+1:] self.header_data[line[0:sl]] = line[sl+1:]
self.code = "".join(python_lines[code_start_line:]) self.code = "".join(python_lines[code_start_line:])
self.open_files.add(self)
logging.debug(f"Header data from python file: {self.header_data}") logging.debug(f"Header data from python file: {self.header_data}")
# logging.debug(f"Code from python file: {self.code}") # logging.debug(f"Code from python file: {self.code}")
@@ -370,14 +467,27 @@ class PythonFile(File):
class PythonNode(): class PythonNode():
"""A Python node with all data""" """A Python node with all data"""
id: str
"""The id of the node"""
engine: str
"""The engine of the node, IronPython2 or CPython3"""
code: str
"""The full code"""
checksum: str
"""The checksum of the code, for checking changes"""
name: str
"""The name of the node"""
filename: pathlib.Path | str
"""The filename the node should be saved as, including the .py extension"""
filepath: pathlib.Path
def __init__(self, node_dict_from_dyn: dict = {}, full_nodeviews_dict_from_dyn: dict = {}, def __init__(self, node_dict_from_dyn: dict = {}, full_nodeviews_dict_from_dyn: dict = {},
node_id: str = "", engine: str = "IronPython2", code: str = "", checksum: str = "", name: str = "", node_id: str = "", engine: str = "IronPython2", code: str = "", checksum: str = "", name: str = "",
source_dynamo_file: DynamoFile = None) -> None: # type: ignore source_dynamo_file: DynamoFile | None = None) -> None:
"""A PythonNode object """A PythonNode object. If node_dict_view is given, string parameters are ignored.
Args: Args:
node_dict_from_dyn (dict, optional): The dict of the node from a dyn file. node_dict_from_dyn (dict, optional): The dict of the node from a dyn file. Defaults to {}.
If this is given, string parameters are ignored. Defaults to {}.
full_nodeviews_dict_from_dyn (dict, optional): The full nodeviews dict from a dyn file. Defaults to {}. full_nodeviews_dict_from_dyn (dict, optional): The full nodeviews dict from a dyn file. Defaults to {}.
node_id (str, optional): Id of the node. Defaults to "". node_id (str, optional): Id of the node. Defaults to "".
engine (str, optional): Engine of the node. Defaults to "". engine (str, optional): Engine of the node. Defaults to "".
@@ -397,7 +507,7 @@ class PythonNode():
self.checksum = hashlib.md5(self.code.encode()).hexdigest() self.checksum = hashlib.md5(self.code.encode()).hexdigest()
if full_nodeviews_dict_from_dyn: if full_nodeviews_dict_from_dyn:
self.name = next( self.name = next(
(v["Name"] for v in full_nodeviews_dict_from_dyn if v["Id"] == node_dict_from_dyn["Id"]), None) (v["Name"] for v in full_nodeviews_dict_from_dyn if v["Id"] == node_dict_from_dyn["Id"]), "")
else: else:
self.name = name self.name = name
else: else:
@@ -419,59 +529,3 @@ class PythonNode():
self.filename = sanitize_filename( self.filename = sanitize_filename(
"_".join(filename_parts) + ".py") "_".join(filename_parts) + ".py")
self.filepath = source_dynamo_file.dirpath.joinpath(self.filename) self.filepath = source_dynamo_file.dirpath.joinpath(self.filename)
class DynamoFileStorage():
open_files: list[DynamoFile] = []
update_files: list[DynamoFile] = []
# This is a singleton:
def __new__(cls):
if not hasattr(cls, 'instance'):
cls.instance = super(DynamoFileStorage, cls).__new__(cls)
return cls.instance
def get_open_file_by_uuid(self, uuid: str) -> DynamoFile:
"""Get an open Dynamo graph by its uuid
Args:
uuid (str): Uuid of the file
Returns:
DynamoFile: The file. None if not found
"""
f = next((d for d in self.open_files if d.uuid == uuid), None)
if f:
logging.debug(f"Found open file {f.uuid}")
return f # type: ignore
def is_uuid_on_update_list(self, uuid: str) -> bool:
"""Check if this file is on the list of files to update
Args:
uuid (str): Uuid of the file
Returns:
bool: True, if the file is on the list
"""
f = next((d for d in self.update_files if d.uuid == uuid), False)
return bool(f)
def append_open_file(self, dynamo_file: DynamoFile) -> None:
"""Add a file to the list of open files
Args:
dynamo_file (DynamoFile): The file to add
"""
if not dynamo_file in self.open_files:
self.open_files.append(dynamo_file)
logging.debug("Dynamo file added to open files")
def write_open_files(self, args: argparse.Namespace) -> None:
"""Save open files to disk
Args:
args (argparse.Namespace): parsed arguments
"""
for f in self.open_files:
f.write(args)

74
dyn2py/options.py Normal file
View File

@@ -0,0 +1,74 @@
import argparse
import pathlib
LOGLEVELS = ["CRITICAL", "ERROR", "WARNING", "INFO", "DEBUG"]
DEFAULT_LOGLEVEL = "INFO"
FILTERS = ["py", "dyn"]
class Options(argparse.Namespace):
"""Class for options for running a conversion like from the command line"""
def __init__(
self,
source: list[pathlib.Path | str] = [],
loglevel: str = DEFAULT_LOGLEVEL,
dry_run: bool = False,
backup: bool = False,
filter: str = "",
update: bool = False,
python_folder: pathlib.Path | str | None = None
) -> None:
"""Generate an option object for running it like from the command line
Args:
source (list[pathlib.Path | str], optional): List of files to run on. Defaults to [].
loglevel (str, optional): log level. Defaults to DEFAULT_LOGLEVEL.
dry_run (bool, optional): If it's a dry run. Defaults to False.
backup (bool, optional): Create backup of modified files. Defaults to False.
filter (str, optional): 'dyn' or 'py' file filter for running on folders. Defaults to "".
update (bool, optional): Update mode, like inverse on Dynamo files. Defaults to False.
python_folder (pathlib.Path | str | None, optional): Path to export python files to, or import from there. Defaults to None.
Raises:
ValueError: If loglevel or filter is invalid
"""
self.source = []
for s in source:
if isinstance(s, str):
self.source.append(pathlib.Path(s))
else:
self.source.append(s)
if loglevel.upper() in LOGLEVELS:
self.loglevel = loglevel.upper()
else:
raise ValueError
self.dry_run = dry_run
self.backup = backup
if not filter or filter in FILTERS:
self.filter = filter
else:
raise ValueError
self.update = update
if isinstance(python_folder, str):
self.python_folder = pathlib.Path(python_folder)
else:
self.python_folder = python_folder
# super().__init__(
# source=self.source,
# loglevel=self.loglevel,
# dry_run=self.dry_run,
# backup=self.backup,
# filter=self.filter,
# update=self.update,
# python_folder=self.python_folder,
# )

View File

@@ -1,6 +1,6 @@
[project] [project]
name = "dyn2py" name = "dyn2py"
version = "0.0.1" version = "0.1.0"
description = "Extract python code from Dynamo graphs" description = "Extract python code from Dynamo graphs"
readme = "README.md" readme = "README.md"
requires-python = ">=3.7" requires-python = ">=3.7"
@@ -14,9 +14,8 @@ classifiers = []
dependencies = ["importlib_metadata", "pathvalidate", "simplejson"] dependencies = ["importlib_metadata", "pathvalidate", "simplejson"]
[project.optional-dependencies] [project.optional-dependencies]
build = [ build = ["pyinstaller"]
"pyinstaller" doc = ["pdoc"]
]
[project.urls] [project.urls]
homepage = "https://github.com/infeeeee/dyn2py" homepage = "https://github.com/infeeeee/dyn2py"