Move read() methods to __init__(), new tests

This commit is contained in:
2023-03-13 19:04:24 +01:00
parent 4acc0e5da2
commit c40ff88bcf
8 changed files with 248 additions and 217 deletions

View File

@@ -100,7 +100,7 @@ def run(options: Options | None = None) -> None:
logging.debug(f"Parsed arguments: {vars(options)}")
# Set up sources:
source_files = []
files = []
for source in options.source:
if not source.exists():
@@ -111,14 +111,13 @@ def run(options: Options | None = None) -> None:
logging.debug(f"Source is a folder")
for f in source.iterdir():
source_files.append(f)
files.append(File(f))
# It's a single file:
else:
source_files.append(source)
files.append(File(source))
# Dynamo files come first, sort sources:
files = [File(f) for f in source_files]
files.sort(key=lambda f: f.extension)
# Filters:
@@ -145,17 +144,15 @@ def run(options: Options | None = None) -> None:
if f.is_dynamo_file():
logging.debug("Source is a Dynamo file")
dynamo_file = DynamoFile(f.filepath)
try:
dynamo_file.extract_python(options)
f.extract_python(options)
except DynamoFileException as e:
logging.error(f"{e} Skipping")
elif f.is_python_file():
logging.debug("Source is a Python file")
python_file = PythonFile(f.filepath)
python_file.update_dynamo(options)
f.update_dynamo(options)
# Dynamo files are written only at the end, so they don't get updated too frequently
for f in DynamoFile.open_files:

View File

@@ -20,11 +20,12 @@ HEADER_SEPARATOR = "*" * 60
class File():
"""Base class for managing files"""
def __init__(self, filepath: pathlib.Path | str) -> None:
"""Generate a file object.
def __init__(self, filepath: pathlib.Path | str, read_from_disk: bool = True) -> None:
"""Generate a file object. If the path is correct it will become a DynamoFile or PythonFile object
Args:
filepath (pathlib.Path | str): Path to the python file or Dynamo graph
read_from_disk (bool, optional): Read the file from disk. Useful for new PythonFiles. Defaults to True.
"""
self.filepath: pathlib.Path
@@ -52,11 +53,29 @@ class File():
self.modified: bool = False
"""If an existing file was modified"""
if self.exists:
# Change class if extension is correct:
if self.is_dynamo_file():
self.__class__ = DynamoFile
# Always read DynamoFiles, they should exist:
self.read_file()
elif self.is_python_file():
self.__class__ = PythonFile
# Python files can be virtual:
if self.exists and read_from_disk:
self.read_file()
if self.exists and read_from_disk:
logging.debug(f"File exists: {self.filepath}")
self.mtime = self.filepath.stat().st_mtime
self.mtimeiso = datetime.fromtimestamp(self.mtime).isoformat()
def read_file(self):
"""Should be implemented in subclasses"""
pass
def is_newer(self, other_file: "File") -> bool:
"""Check if this file is newer than the other file
@@ -134,15 +153,15 @@ class DynamoFile(File):
"""A Dynamo file, subclass of File()"""
full_dict: dict
"""The contents of the Dynamo file, as dict. Initialized with read()"""
"""The contents of the Dynamo file, as dict."""
uuid: str
"""The uuid of the graph. Initialized with read()"""
"""The uuid of the graph."""
name: str
"""The name of the graph, read from the file. Initialized with read()"""
python_nodes: list["PythonNode"]
"""Python node objects, read from this file. Initialized with get_python_nodes()"""
"""The name of the graph, read from the file, not the filename"""
python_nodes: set[PythonNode]
"""Python node objects, read from this file."""
open_files: set["DynamoFile"] = set()
open_files: set[DynamoFile] = set()
"""A set of open Dynamo files, before saving. Self added by read()"""
def extract_python(self, options: Options | None = None) -> None:
@@ -157,19 +176,19 @@ class DynamoFile(File):
logging.info(f"Extracting from file: {self.filepath}")
self.read()
# Go through nodes in the file:
for python_node in self.get_python_nodes():
for python_node in self.python_nodes:
if options.python_folder:
python_file_path = options.python_folder.joinpath(
python_node.filename)
else:
python_file_path = python_node.filepath
python_file = PythonFile(python_file_path)
python_file.generate_text(
dynamo_file=self, python_node=python_node)
python_file = PythonFile(
filepath=python_file_path,
dynamo_file=self,
python_node=python_node
)
if python_file.is_newer(self) and not options.force:
logging.info(
@@ -178,29 +197,31 @@ class DynamoFile(File):
python_file.write(options)
def read(self) -> None:
"""Read Dynamo graph to parameters
def read_file(self, reread: bool = False) -> None:
"""Read Dynamo graph to parameters. Automatically called by __init__()
Args:
reread (bool, optional): Reread the file, even if it was read already. Defaults to False.
Raises:
FileNotFoundError: The file does not exist
DynamoFileException: If the file is a Dynamo 1 file
json.JSONDecodeError: If there are any other problem with the file
PythonNodeNotFoundException: No python nodes in the file
"""
if not self.exists:
raise FileNotFoundError
# Only read if it's not already open:
if not self in self.open_files:
if not self in self.open_files or reread:
logging.debug(f"Reading file: {self.filepath}")
# Read the json:
try:
with open(self.filepath, "r", encoding="utf-8") as input_json:
self.full_dict = json.load(input_json,
use_decimal=True)
self.uuid = self.full_dict["Uuid"]
self.name = self.full_dict["Name"]
self.open_files.add(self)
except json.JSONDecodeError as e:
with open(self.filepath, "r", encoding="utf-8") as input_json:
if input_json.readline().startswith("<Workspace Version="):
@@ -208,41 +229,29 @@ class DynamoFile(File):
else:
raise e
def get_python_nodes(self) -> list["PythonNode"]:
"""Get python nodes from the Dynamo graph
# Parameters:
self.uuid = self.full_dict["Uuid"]
self.name = self.full_dict["Name"]
self.open_files.add(self)
Returns:
list[PythonNode]: A list of PythonNodes in the file
full_python_nodes = [n for n in self.full_dict["Nodes"]
if n["NodeType"] == "PythonScriptNode"]
# The name of the node is stored here:
node_views = self.full_dict["View"]["NodeViews"]
Raises:
DynamoFileException: If no Python nodes in the file
"""
if not self in self.open_files:
self.read()
if not full_python_nodes:
raise PythonNodeNotFoundException(
"No python nodes in this file!")
full_python_nodes = [n for n in self.full_dict["Nodes"]
if n["NodeType"] == "PythonScriptNode"]
self.python_nodes = set()
# Check if it was already read:
try:
self.python_nodes
except AttributeError:
self.python_nodes = []
if not self.python_nodes:
# Create PythonNodes from the dict:
for p_node in full_python_nodes:
# The name of the node is stored here:
node_views = self.full_dict["View"]["NodeViews"]
python_node = PythonNode(
node_dict_from_dyn=p_node,
full_nodeviews_dict_from_dyn=node_views,
source_dynamo_file=self)
self.python_nodes.append(python_node)
if not self.python_nodes:
raise DynamoFileException("No python nodes in this file!")
return self.python_nodes
self.python_nodes.add(python_node)
def get_python_node_by_id(self, node_id: str) -> "PythonNode":
"""Get a PythonNode object from this Dynamo graph, by its id
@@ -252,46 +261,49 @@ class DynamoFile(File):
Returns:
PythonNode: The PythonNode with the given id
Raises:
PythonNodeNotFoundException: No python node with this id
"""
if not self in self.open_files:
self.read()
# Find the node, if the nodes are not read yet:
if not self.python_nodes:
python_node_dict = next((
n for n in self.full_dict["Nodes"] if n["Id"] == node_id
), {})
if not python_node_dict:
raise PythonNodeNotFoundException(
f"Node not found with id {node_id}")
python_node = next((
p for p in self.python_nodes if p.id == node_id
), None)
python_node = PythonNode(
node_dict_from_dyn=python_node_dict)
else:
python_node = next((
p for p in self.python_nodes if p.id == node_id
), None)
if not python_node:
raise PythonNodeNotFoundException(
f"Node not found with id {node_id}")
if not python_node:
raise PythonNodeNotFoundException(
f"Node not found with id {node_id}")
return python_node
def update_python_node(self, python_node: "PythonNode") -> None:
def update_python_node(self, python_node: PythonNode) -> None:
"""Update the code of a PythonNode in this file
Args:
python_node(PythonNode): The new node data
python_node(PythonNode): The new node
Raises:
PythonNodeNotFoundException: Existing node not found
"""
# Find the old node:
python_node_in_file = self.get_python_node_by_id(python_node.id)
node_dict = next((
n for n in self.full_dict["Nodes"] if n["Id"] == python_node.id
), {})
if not node_dict:
if not node_dict or not python_node_in_file:
raise PythonNodeNotFoundException()
else:
node_dict["Code"] = python_node.code
self.modified = True
# Remove the old and add the new:
self.python_nodes.remove(python_node_in_file)
self.python_nodes.add(python_node)
# Update the dict:
node_dict["Code"] = python_node.code
self.modified = True
def write_file(self) -> None:
"""Write this file to the disk. Should be called only from File.write()"""
@@ -301,14 +313,15 @@ class DynamoFile(File):
def get_related_python_files(self, options: Options | None = None) -> list["PythonFile"]:
"""Get python files exported from this Dynamo file
Args:
options(Options | None, optional): Run options. Defaults to None.
Returns:
list[PythonFile]: A list of PythonFile objects
"""
if not options:
options = Options()
self.read()
# Find the folder of the python files
if options.python_folder:
python_folder = options.python_folder
@@ -318,7 +331,6 @@ class DynamoFile(File):
python_files_in_folder = [PythonFile(f) for f in python_folder.iterdir()
if File(f).is_python_file()]
[p.read() for p in python_files_in_folder]
related_python_files = [
p for p in python_files_in_folder if p.get_source_dynamo_file().uuid == self.uuid]
@@ -343,60 +355,127 @@ class PythonFile(File):
"""A Python file, subclass of File()"""
code: str
"""The python code as a string. Initialized with read()"""
"""The python code as a string."""
header_data: dict
"""Parsed dict from the header of a python file. Initialized with read()"""
"""Parsed dict from the header of a python file."""
text: str
"""Full contents of the file before writing. Initialized with generate_text()"""
"""Full contents of the file before writing."""
open_files: set["PythonFile"] = set()
"""A set of open Python files. Self added by read()"""
"""A set of open Python files."""
def generate_text(self, dynamo_file: DynamoFile, python_node: "PythonNode") -> None:
"""Generate full text to write with header
def __init__(self,
filepath: pathlib.Path | str,
dynamo_file: DynamoFile | None = None,
python_node: PythonNode | None = None
) -> None:
"""Generate a PythonFile. If both dynamo_file and python_node given, generate the text of the file, do not read from disk
Args:
dynamo_file(DynamoFile): The source dynamo file
python_node(PythonNode): The python node to write
filepath (pathlib.Path | str): Path to the python file
dynamo_file (DynamoFile | None, optional): The source dynamo file. Defaults to None.
python_node (PythonNode | None, optional): The python node to write. Defaults to None.
"""
header_notice = """\
# Generate the text, if dynamo file and python node were given:
if python_node and dynamo_file:
# Do not read from disk:
super().__init__(filepath, read_from_disk=False)
header_notice = """\
This file was generated with dyn2py from a Dynamo graph.
Do not edit this section, if you want to update the Dynamo graph!\
"""
# Double escape path:
dyn_path_string = str(dynamo_file.realpath)
if "\\" in dyn_path_string:
dyn_path_string = dyn_path_string.replace("\\", "\\\\")
# Double escape path:
dyn_path_string = str(dynamo_file.realpath)
if "\\" in dyn_path_string:
dyn_path_string = dyn_path_string.replace("\\", "\\\\")
self.header_data = {
"dyn2py_version": METADATA["Version"],
"dyn2py_extracted": datetime.now().isoformat(),
"dyn_uuid": dynamo_file.uuid,
"dyn_name": dynamo_file.name,
"dyn_path": dyn_path_string,
"dyn_modified": dynamo_file.mtimeiso,
"py_id": python_node.id,
"py_engine": python_node.engine
}
self.header_data = {
"dyn2py_version": METADATA["Version"],
"dyn2py_extracted": datetime.now().isoformat(),
"dyn_uuid": dynamo_file.uuid,
"dyn_name": dynamo_file.name,
"dyn_path": dyn_path_string,
"dyn_modified": dynamo_file.mtimeiso,
"py_id": python_node.id,
"py_engine": python_node.engine
}
header_string = "\r\n".join(
[f"{k}:{self.header_data[k]}" for k in self.header_data])
header_wrapper = '"""'
header_string = "\r\n".join(
[f"{k}:{self.header_data[k]}" for k in self.header_data])
header_wrapper = '"""'
self.text = "\r\n".join([
header_wrapper,
HEADER_SEPARATOR,
textwrap.dedent(header_notice),
HEADER_SEPARATOR,
header_string,
HEADER_SEPARATOR,
header_wrapper,
python_node.code
])
self.text = "\r\n".join([
header_wrapper,
HEADER_SEPARATOR,
textwrap.dedent(header_notice),
HEADER_SEPARATOR,
header_string,
HEADER_SEPARATOR,
header_wrapper,
python_node.code
])
self.modified = True
self.modified = True
else:
# Try to read from disk:
super().__init__(filepath, read_from_disk=True)
self.open_files.add(self)
def read_file(self, reread: bool = False) -> None:
"""Read python script to parameters
Args:
reread (bool, optional): Reread the file, even if it was read already. Defaults to False.
Raises:
FileNotFoundError: The file does not exist
PythonFileException: Some error reading the file
"""
if not self.exists:
raise FileNotFoundError
# Only read if it's not already open:
if not self in self.open_files or reread:
logging.info(f"Reading file: {self.filepath}")
with open(self.filepath, mode="r", newline="", encoding="utf-8") as input_py:
python_lines = input_py.readlines()
self.header_data = {}
header_separator_count = 0
code_start_line = 0
for i, line in enumerate(python_lines):
line = line.strip()
logging.debug(f"Reading line: {line}")
# Skip the first lines:
if header_separator_count < 2:
if line == HEADER_SEPARATOR:
header_separator_count += 1
continue
# It's the last line of the header:
elif line == HEADER_SEPARATOR:
code_start_line = i+2
break
else:
# Find the location of the separator
sl = line.find(":")
if sl == -1:
raise PythonFileException("Error reading header!")
self.header_data[line[0:sl]] = line[sl+1:]
self.code = "".join(python_lines[code_start_line:])
self.open_files.add(self)
logging.debug(f"Header data from python file: {self.header_data}")
# logging.debug(f"Code from python file: {self.code}")
def update_dynamo(self, options: Options | None = None) -> None:
"""Update a the source Dynamo graph from this python script
@@ -408,13 +487,11 @@ class PythonFile(File):
if not options:
options = Options()
self.read()
# Check if it was already opened:
dynamo_file = DynamoFile.get_open_file_by_uuid(
self.header_data["dyn_uuid"])
# Open and read if it's the first time:
# Open if it's the first time:
if not dynamo_file:
dynamo_file = self.get_source_dynamo_file()
@@ -457,55 +534,12 @@ class PythonFile(File):
raise FileNotFoundError(
f"Dynamo graph not found: {dynamo_file.filepath}")
dynamo_file.read()
# Check if uuid is ok:
if not dynamo_file.uuid == self.header_data["dyn_uuid"]:
raise DynamoFileException(f"Dynamo graph uuid changed!")
return dynamo_file
def read(self) -> None:
"""Read python script to parameters"""
# Only read if it's not already open:
if not self in self.open_files:
logging.info(f"Reading file: {self.filepath}")
with open(self.filepath, mode="r", newline="", encoding="utf-8") as input_py:
python_lines = input_py.readlines()
self.header_data = {}
header_separator_count = 0
code_start_line = 0
for i, line in enumerate(python_lines):
line = line.strip()
logging.debug(f"Reading line: {line}")
# Skip the first lines:
if header_separator_count < 2:
if line == HEADER_SEPARATOR:
header_separator_count += 1
continue
# It's the last line of the header:
elif line == HEADER_SEPARATOR:
code_start_line = i+2
break
else:
# Find the location of the separator
sl = line.find(":")
if sl == -1:
raise PythonFileException("Error reading header!")
self.header_data[line[0:sl]] = line[sl+1:]
self.code = "".join(python_lines[code_start_line:])
self.open_files.add(self)
logging.debug(f"Header data from python file: {self.header_data}")
# logging.debug(f"Code from python file: {self.code}")
def write_file(self) -> None:
"""Write this file to the disk. Should be called only from File.write()"""
with open(self.filepath, "w", encoding="utf-8", newline="") as output_file: