Files
dyn2py/dyn2py/files.py
2023-04-24 20:05:54 +02:00

747 lines
26 KiB
Python

from __future__ import annotations
import simplejson as json
import hashlib
import pathlib
import logging
import os
from datetime import datetime
from decimal import Decimal
from pathvalidate import sanitize_filename
from importlib_metadata import metadata
from dyn2py.options import Options
METADATA = metadata("dyn2py")
HEADER_SEPARATOR = "*" * 60
class File():
"""Base class for managing files"""
open_files: set[File] = set()
"""A set of open files."""
def __init__(self, filepath: pathlib.Path | str, read_from_disk: bool = True) -> None:
"""Generate a file object. If the path is correct it will become a DynamoFile or PythonFile object.
Calls DynamoFile.read_file() and PythonFile.read_file()
Args:
filepath (pathlib.Path | str): Path to the python file or Dynamo graph
read_from_disk (bool, optional): Read the file from disk. False to get only metadata. Defaults to True.
"""
self.filepath: pathlib.Path
"""Path to the file as a pathlib.Path object"""
if isinstance(filepath, str):
self.filepath = pathlib.Path(filepath)
else:
self.filepath = filepath
self.basename: str = self.filepath.stem
"""Only the name of the file, without path or extension"""
self.dirpath: pathlib.Path = self.filepath.parent
"""Containing folder"""
self.realpath: pathlib.Path = self.filepath.resolve()
"""Full resolved path to the file"""
self.mtime: float = 0.0
"""Modification time. 0 if does not exist"""
self.mtimeiso: str = ""
"""Modification time as an iso formatted string"""
self.exists: bool = self.filepath.exists()
"""If the file exists"""
self.extension: str = self.filepath.suffix
"""File extension as string"""
self.modified: bool = False
"""If an existing file was modified"""
# Change class if extension is correct:
if self.is_dynamo_file():
self.__class__ = DynamoFile
# Read DynamoFiles, they should exist:
if read_from_disk:
self.read_file()
elif self.is_python_file():
self.__class__ = PythonFile
# Python files can be virtual:
if self.exists and read_from_disk:
self.read_file()
if self.exists:
logging.debug(f"File exists: {self.filepath}")
self.mtime = self.filepath.stat().st_mtime
self.mtimeiso = datetime.fromtimestamp(self.mtime).isoformat()
def read_file(self):
"""Should be implemented in subclasses"""
pass
def is_newer(self, other_file: File) -> bool:
"""Check if this file is newer than the other file
Args:
other_file (File): The other file
Returns:
bool: True if this file is newer or the other doesn't exist
"""
if self.mtime and other_file.mtime:
return bool(self.mtime > other_file.mtime)
elif self.mtime:
return True
else:
return False
def is_dynamo_file(self) -> bool:
"""Check if this is a Dynamo file
Returns:
bool: True if it's Dynamo file
"""
return bool(self.extension in [".dyn", ".dyf"])
def is_python_file(self) -> bool:
"""Check if this is a python file
Returns:
bool: True if it's python file
"""
return bool(self.extension == ".py")
def write(self, options: Options | None = None, **option_args) -> None:
"""Prepare writing file to the disk:
create backup, process dry-run, call filetype specific write_file() methods
Should be called on subclasses!
Args:
options (Options | None, optional): Run options. Defaults to None.
**option_args: Options() arguments
Raises:
ValueError: Both options and other arguments given
TypeError: If called on a File object
File.Error: Target folder does not exist
"""
if not options:
options = Options.from_kwargs(kwargs=option_args)
elif option_args:
# Should not give both options and arguments:
raise ValueError("Options object and extra arguments!")
# This should only work on subclasses:
if type(self).__name__ == "File":
raise TypeError("This method shouldn't be called on File objects!")
if not self.modified:
logging.debug("File not modified, not saving")
return
# Create backup:
if not options.dry_run and self.filepath.exists() and options.backup:
backup_filename = sanitize_filename(
filename=f"{self.basename}_{self.mtimeiso}{self.extension}")
backup_path = self.dirpath.joinpath(backup_filename)
logging.info(f"Creating backup to {backup_path}")
self.filepath.rename(backup_path)
if options.loglevel == "HEADLESS":
print(backup_path)
# Call filetype specific methods:
if options.dry_run:
logging.info(
f"Should write file, but it's a dry-run: {self.filepath}")
else:
if not self.dirpath.exists():
raise File.Error("File dir does not exist!", self)
logging.info(f"Writing file: {self.filepath}")
self._write_file()
if options.loglevel == "HEADLESS":
print(self.filepath)
def _write_file(self):
"""Should be implemented in subclasses
Raises:
NotImplementedError: If called on a File object
"""
raise NotImplementedError(
"Should be called only on DynamoFile and PythonFile objects!")
@classmethod
def get_open_files(cls) -> set:
"""Get open files of this class and subclasses
Returns:
set: A set of open files
"""
return {f for f in File.open_files if
isinstance(f, cls)}
@classmethod
def write_open_files(cls, options: Options | None = None, **option_args) -> None:
"""Write open files of this class and subclasses
Args:
options (Options | None, optional): Run options. Defaults to None.
**option_args: Options() arguments
Raises:
ValueError: Both options and other arguments given
"""
if not options:
options = Options.from_kwargs(kwargs=option_args)
elif option_args:
# Should not give both options and arguments:
raise ValueError("Options object and extra arguments!")
for f in cls.get_open_files():
f.write(options)
@classmethod
def close_open_files(cls) -> None:
"""Close open files of this class and subclasses"""
File.open_files = File.open_files - cls.get_open_files()
class Error(Exception):
def __init__(self, message: str, file: File) -> None:
"""There is some problem with this file
Args:
message (str): The message to display
file (File): The problem File
"""
super().__init__(message)
self.file = file
class DynamoFile(File):
"""A Dynamo file, subclass of File()"""
full_dict: dict
"""The contents of the Dynamo file, as dict."""
uuid: str
"""The uuid of the graph."""
name: str
"""The name of the graph, read from the file, not the filename"""
python_nodes: set[PythonNode]
"""Python node objects, read from this file."""
def extract_python(self, options: Options | None = None, **option_args) -> list[PythonFile]:
"""Extract python files from Dynamo graphs, add them to open_files
Args:
options (Options | None, optional): Run options. Defaults to None.
**option_args: Options() arguments
Raises:
ValueError: Both options and other arguments given
Returns:
list[PythonFile]: List of PythonFile objects extracted from this DynamoFile
"""
if not options:
options = Options.from_kwargs(kwargs=option_args)
elif option_args:
# Should not give both options and arguments:
raise ValueError("Options object and extra arguments!")
logging.info(f"Extracting from file: {self.filepath}")
python_files = []
# Go through nodes in the file:
for python_node in self.python_nodes:
if options.python_folder:
python_file_path = options.python_folder.joinpath(
python_node.filename)
else:
python_file_path = python_node.filepath
python_file = PythonFile(
filepath=python_file_path,
dynamo_file=self,
python_node=python_node
)
if python_file.is_newer(self) and not options.force:
PythonFile.open_files.remove(python_file)
logging.warning(
f"Existing file is newer, skipping: {python_file.filepath}")
continue
python_files.append(python_file)
return python_files
def read_file(self, reread: bool = False) -> None:
"""Read Dynamo graph to parameters. Automatically called by __init__()
Args:
reread (bool, optional): Reread the file, even if it was read already. Defaults to False.
Raises:
FileNotFoundError: The file does not exist
DynamoFile.Error: If the file is a Dynamo 1 file
json.JSONDecodeError: If there are any other problem with the file
DynamoFile.PythonNodeNotFound: No python nodes in the file
"""
if not self.exists:
raise FileNotFoundError
if not self in self.open_files or reread:
logging.debug(f"Reading file: {self.filepath}")
# Read the json:
try:
with open(self.filepath, "r", encoding="utf-8") as input_json:
self.full_dict = json.load(input_json,
use_decimal=True)
except json.JSONDecodeError as e:
with open(self.filepath, "r", encoding="utf-8") as input_json:
if input_json.readline().startswith("<Workspace Version="):
raise self.Error("This is a Dynamo 1 file!", self)
else:
raise e
# Parameters:
self.uuid = self.full_dict["Uuid"]
self.name = self.full_dict["Name"]
self.open_files.add(self)
full_python_nodes = [n for n in self.full_dict["Nodes"]
if n["NodeType"] == "PythonScriptNode"]
# The name of the node is stored here:
node_views = self.full_dict["View"]["NodeViews"]
if not full_python_nodes:
raise self.PythonNodeNotFound(
"No python nodes in this file!", self, "")
self.python_nodes = set()
# Create PythonNodes from the dict:
for p_node in full_python_nodes:
python_node = PythonNode(
node_dict_from_dyn=p_node,
dynamo_file=self)
self.python_nodes.add(python_node)
def get_python_node_by_id(self, node_id: str) -> PythonNode:
"""Get a PythonNode object from this Dynamo graph, by its id
Args:
node_id (str): The id of the python node as string
Returns:
PythonNode: The PythonNode with the given id
Raises:
DynamoFile.PythonNodeNotFound: No python node with this id
"""
python_node = next((
p for p in self.python_nodes if p.id == node_id
), None)
if not python_node:
raise self.PythonNodeNotFound(
"Node not found", self, node_id)
return python_node
def update_python_node(self, python_node: PythonNode) -> None:
"""Update the code of a PythonNode in this file
Args:
python_node (PythonNode): The new node
Raises:
DynamoFile.PythonNodeNotFound: Existing node not found
"""
# Find the old node:
python_node_in_file = self.get_python_node_by_id(python_node.id)
node_dict = next((
n for n in self.full_dict["Nodes"] if n["Id"] == python_node.id
), {})
if not node_dict:
raise self.PythonNodeNotFound(
"Existing node not found in file", self, python_node.id)
# Remove the old and add the new:
self.python_nodes.remove(python_node_in_file)
self.python_nodes.add(python_node)
# Update the dict, Dyn files are always CRLF:
node_dict["Code"] = "\r\n".join(python_node.code)
self.modified = True
def _write_file(self) -> None:
"""Write this file to the disk. Should be called only from File.write()"""
with open(self.filepath, "w", encoding="utf-8", newline="") as output_file:
json.dump(self.full_dict, output_file, indent=2, use_decimal=True)
def get_related_python_files(self, options: Options | None = None, **option_args) -> list[PythonFile]:
"""Get python files exported from this Dynamo file
Args:
options (Options | None, optional): Run options. Defaults to None.
**option_args: Options() arguments
Raises:
ValueError: Both options and other arguments given
Returns:
list[PythonFile]: A list of PythonFile objects
"""
if not options:
options = Options.from_kwargs(kwargs=option_args)
elif option_args:
# Should not give both options and arguments:
raise ValueError("Options object and extra arguments!")
# Find the folder of the python files
if options.python_folder:
python_folder = options.python_folder
else:
python_folder = self.dirpath
python_files_in_folder = [PythonFile(f) for f in python_folder.iterdir()
if File(f, read_from_disk=False).is_python_file()]
related_python_files = [
p for p in python_files_in_folder if p.get_source_dynamo_file().uuid == self.uuid]
return related_python_files
@staticmethod
def get_open_file_by_uuid(uuid: str) -> DynamoFile | None:
"""Get an open Dynamo graph by its uuid
Args:
uuid (str): Uuid of the file
Returns:
DynamoFile: The file. None if not found
"""
f = next((d for d in DynamoFile.get_open_files() if d.uuid == uuid), None)
if f:
logging.debug(f"Found open file {f.uuid}")
return f
class PythonNodeNotFound(Exception):
def __init__(self, message: str, file: DynamoFile, node_id: str) -> None:
"""Python node not found with this id
Args:
message (str): The message to display
file (DynamoFile): The problem DynamoFile
node_id (str): The missing id
"""
super().__init__(message)
self.file = file
self.node_id = node_id
class PythonFile(File):
"""A Python file, subclass of File()"""
code: list[str]
"""The python code. Lines as list items, without newlines."""
header_data: dict
"""Parsed dict from the header of a python file."""
text: str
"""Full contents of the file."""
def __init__(self,
filepath: pathlib.Path | str,
dynamo_file: DynamoFile | None = None,
python_node: PythonNode | None = None
) -> None:
"""Generate a PythonFile. If both dynamo_file and python_node given, generate the text of the file, do not read from disk
Args:
filepath (pathlib.Path | str): Path to the python file
dynamo_file (DynamoFile | None, optional): The source dynamo file. Defaults to None.
python_node (PythonNode | None, optional): The python node to write. Defaults to None.
"""
# Generate the text, if dynamo file and python node were given:
if python_node and dynamo_file:
# Do not read from disk:
super().__init__(filepath, read_from_disk=False)
header_notice = os.linesep.join([
"This file was generated with dyn2py from a Dynamo graph.",
"Do not edit this section, if you want to update the Dynamo graph!"
])
# Calculate relative path, change to forward slash
dyn_path_string = os.path.relpath(
dynamo_file.filepath, self.dirpath)
if "\\" in dyn_path_string:
dyn_path_string = dyn_path_string.replace("\\", "/")
self.header_data = {
"dyn2py_version": METADATA["Version"],
"dyn2py_extracted": datetime.now().isoformat(),
"dyn_uuid": dynamo_file.uuid,
"dyn_name": dynamo_file.name,
"dyn_path": dyn_path_string,
"dyn_modified": dynamo_file.mtimeiso,
"py_id": python_node.id,
"py_engine": python_node.engine
}
header_string = os.linesep.join(
[f"{k}:{self.header_data[k]}" for k in self.header_data])
header_wrapper = '"""'
self.text = os.linesep.join([
header_wrapper,
HEADER_SEPARATOR,
header_notice,
HEADER_SEPARATOR,
header_string,
HEADER_SEPARATOR,
header_wrapper,
os.linesep.join(python_node.code)
])
self.code = python_node.code
self.modified = True
else:
# Try to read from disk:
super().__init__(filepath, read_from_disk=True)
self.open_files.add(self)
def read_file(self, reread: bool = False) -> None:
"""Read python script to parameters
Args:
reread (bool, optional): Reread the file, even if it was read already. Defaults to False.
Raises:
FileNotFoundError: The file does not exist
PythonFile.Error: Some error reading the file
"""
if not self.exists:
raise FileNotFoundError
# Only read if it's not already open:
if not self in self.open_files or reread:
logging.info(f"Reading file: {self.filepath}")
with open(self.filepath, mode="r", newline="", encoding="utf-8") as input_py:
python_lines = [line.strip("\r\n")
for line in input_py.readlines()]
self.text = os.linesep.join(python_lines)
self.header_data = {}
header_separator_count = 0
code_start_line = 0
for i, line in enumerate(python_lines):
logging.debug(f"Reading line: {line}")
# Skip the first lines:
if header_separator_count < 2:
if line == HEADER_SEPARATOR:
header_separator_count += 1
continue
# It's the last line of the header:
elif line == HEADER_SEPARATOR:
code_start_line = i+2
break
else:
# Find the location of the separator
sl = line.find(":")
if sl == -1:
raise self.Error("Error reading header!", self)
self.header_data[line[0:sl]] = line[sl+1:]
self.code = python_lines[code_start_line:]
self.open_files.add(self)
logging.debug(f"Header data from python file: {self.header_data}")
# logging.debug(f"Code from python file: {self.code}")
def update_dynamo(self, options: Options | None = None, **option_args) -> None:
"""Update a the source Dynamo graph from this python script
Args:
options (Options | None, optional): Run options. Defaults to None.
**option_args: Options() arguments
Raises:
ValueError: Both options and other arguments given
"""
if not options:
options = Options.from_kwargs(kwargs=option_args)
elif option_args:
# Should not give both options and arguments:
raise ValueError("Options object and extra arguments!")
dynamo_file = self.get_source_dynamo_file()
new_python_node = PythonNode(python_file=self)
old_python_node = dynamo_file.get_python_node_by_id(
self.header_data["py_id"])
# Check checksum:
if new_python_node.checksum == old_python_node.checksum:
logging.info("Python file not changed, skipping")
return
if dynamo_file.is_newer(self) and not options.force:
logging.info("Dynamo graph is newer, skipping")
return
logging.info(f"Dynamo graph will be updated: {dynamo_file.filepath}")
dynamo_file.update_python_node(new_python_node)
def get_source_dynamo_file(self) -> DynamoFile:
"""Get the source Dynamo file of this PythonFile
Raises:
DynamoFile.Error: The uuid of the dynamo file changed
Returns:
DynamoFile: The DynamoFile
"""
# Check if it was already opened:
dynamo_file = DynamoFile.get_open_file_by_uuid(
self.header_data["dyn_uuid"])
# Open if it's the first time:
if not dynamo_file:
cwd = pathlib.Path(os.getcwd()).resolve()
# Change to pythonfiles' dir:
os.chdir(self.dirpath)
dynpath = os.path.realpath(self.header_data["dyn_path"])
logging.debug(f"Resolved path: {dynpath}")
# Change back to the original path:
os.chdir(cwd)
dynamo_file = DynamoFile(pathlib.Path(dynpath))
# Check if uuid is ok:
if not dynamo_file.uuid == self.header_data["dyn_uuid"]:
raise DynamoFile.Error(
"Dynamo graph uuid changed!", dynamo_file)
return dynamo_file
def _write_file(self) -> None:
"""Write this file to the disk. Should be called only from File.write()"""
with open(self.filepath, "w", encoding="utf-8", newline="") as output_file:
output_file.write(self.text)
class PythonNode():
"""A Python node with all data"""
id: str
"""The id of the node"""
engine: str
"""The engine of the node, IronPython2 or CPython3"""
code: list[str]
"""The full code. Lines as list items, without newlines."""
checksum: str
"""The checksum of the code, for checking changes"""
name: str
"""The name of the node"""
filename: pathlib.Path | str
"""The filename the node should be saved as, including the .py extension"""
filepath: pathlib.Path
"""The full path the node should be saved as"""
def __init__(self,
node_dict_from_dyn: dict = {},
dynamo_file: DynamoFile | None = None,
python_file: PythonFile | None = None,
) -> None:
"""A PythonNode object. Add dict and dynamo_file, or only python_file.
Args:
node_dict_from_dyn (dict, optional): The dict of the node from a dyn file. Defaults to {}.
dynamo_file (DynamoFile, optional): The file the node is from. Defaults to None.
python_file (PythonFile, optional): The python file to be converted to node. Defaults to None.
Raises:
PythonNode.Error: Wrong arguments were given
"""
# Initialize from dynamo file:
if node_dict_from_dyn and dynamo_file and not python_file:
self.id = node_dict_from_dyn["Id"]
# Older dynamo files doesn't have "Engine" property, fall back to IronPython2
if "Engine" in node_dict_from_dyn:
self.engine = node_dict_from_dyn["Engine"]
else:
self.engine = "IronPython2"
# It's read from a dynamo file, so separator is always CRLF
self.code = node_dict_from_dyn["Code"].split("\r\n")
# Get the name of the node:
self.name = next(
(v["Name"] for v in dynamo_file.full_dict["View"]["NodeViews"]
if v["Id"] == node_dict_from_dyn["Id"]), "")
# Generate the filename:
filename_parts = [dynamo_file.basename, self.id]
# Only add the name of the node if it's changed:
if self.name and self.name != "Python Script":
filename_parts.append(self.name)
logging.debug(f"Generating filename from: {filename_parts}")
self.filename = sanitize_filename(
"_".join(filename_parts) + ".py")
self.filepath = dynamo_file.dirpath.joinpath(self.filename)
# Initialize from a python file:
elif python_file and not node_dict_from_dyn and not dynamo_file:
self.id = python_file.header_data["py_id"]
self.engine = python_file.header_data["py_engine"]
self.code = python_file.code
self.filename = python_file.basename + ".py"
self.filepath = python_file.filepath
elif python_file and node_dict_from_dyn and dynamo_file:
raise self.Error("Too much arguments given!")
elif not python_file and not node_dict_from_dyn and not dynamo_file:
raise self.Error("No arguments given!")
else:
raise self.Error("Something wrong!")
# Calculate checksum:
checksums = [hashlib.md5(l.encode()).hexdigest() for l in self.code]
self.checksum = hashlib.md5("".join(checksums).encode()).hexdigest()
class Error(Exception):
"""Something wrong with this node"""
pass