fix: Bugs related to geonodes, end-of-chain unit conversion

main
Sofus Albert Høgsbro Rose 2024-04-02 16:40:02 +02:00
parent e080d16893
commit 505a12fa25
Signed by: so-rose
GPG Key ID: AD901CB0F3701434
17 changed files with 890 additions and 1142 deletions

339
;
View File

@ -1,339 +0,0 @@
"""Provides for the linking and/or appending of geometry nodes trees from vendored libraries included in Blender maxwell."""
import enum
import typing as typ
from pathlib import Path
import bpy
import typing_extensions as typx
from .. import info
from ..utils import logger
log = logger.get(__name__)
BLOperatorStatus: typ.TypeAlias = set[
typx.Literal['RUNNING_MODAL', 'CANCELLED', 'FINISHED', 'PASS_THROUGH', 'INTERFACE']
]
####################
# - GeoNodes Specification
####################
class GeoNodes(enum.StrEnum):
"""Defines available GeoNodes groups vendored as part of Blender Maxwell.
The value of this StrEnum is both the name of the .blend file containing the GeoNodes group, and of the GeoNodes group itself.
"""
PrimitiveBox = 'box'
PrimitiveRing = 'ring'
PrimitiveSphere = 'sphere'
# GeoNodes Path Mapping
GN_PRIMITIVES_PATH = info.PATH_ASSETS / 'geonodes' / 'primitives'
GN_PARENT_PATHS: dict[GeoNodes, Path] = {
GeoNodes.PrimitiveBox: GN_PRIMITIVES_PATH,
GeoNodes.PrimitiveRing: GN_PRIMITIVES_PATH,
GeoNodes.PrimitiveSphere: GN_PRIMITIVES_PATH,
}
####################
# - Import GeoNodes (Link/Append)
####################
ImportMethod: typ.TypeAlias = typx.Literal['append', 'link']
def import_geonodes(
geonodes: GeoNodes,
import_method: ImportMethod,
force_import: bool = False,
) -> bpy.types.GeometryNodeGroup:
"""Given a pre-defined GeoNodes group packaged with Blender Maxwell.
The procedure is as follows:
- Link it to the current .blend file.
- Retrieve the node group and return it.
"""
if geonodes in bpy.data.node_groups and not force_import:
log.info(
'Found Existing GeoNodes Tree (name=%s)',
geonodes
)
return bpy.data.node_groups[geonodes]
filename = geonodes
filepath = str(
GN_PARENT_PATHS[geonodes] / (geonodes + '.blend') / 'NodeTree' / geonodes
)
directory = filepath.removesuffix(geonodes)
log.info(
'% GeoNodes Tree (filename=%s, directory=%s, filepath=%s)',
"Linking" if import_method == 'link' else "Appending"
filename,
directory,
filepath,
)
bpy.ops.wm.append(
filepath=filepath,
directory=directory,
filename=filename,
check_existing=False,
set_fake=True,
link=import_method == 'link',
)
return bpy.data.node_groups[geonodes]
####################
# - GeoNodes Asset Shelf
####################
# class GeoNodesAssetShelf(bpy.types.AssetShelf):
# bl_space_type = 'NODE_EDITOR'
# bl_idname = 'blender_maxwell.asset_shelf__geonodes'
# bl_options = {'NO_ASSET_DRAG'}
#
# @classmethod
# def poll(cls, context):
# return (
# (space := context.get('space_data'))
# and (node_tree := space.get('node_tree'))
# and (node_tree.bl_idname == 'MaxwellSimTreeType')
# )
#
# @classmethod
# def asset_poll(cls, asset: bpy.types.AssetRepresentation):
# return asset.id_type == 'NODETREE'
####################
# - GeoNodes Asset Shelf Panel for MaxwellSimTree
####################
class NodeAssetPanel(bpy.types.Panel):
bl_idname = 'blender_maxwell.panel__node_asset_panel'
bl_label = 'Node GeoNodes Asset Panel'
bl_space_type = 'NODE_EDITOR'
bl_region_type = 'UI'
bl_category = 'Assets'
# @classmethod
# def poll(cls, context):
# return (
# (space := context.get('space_data')) is not None
# and (node_tree := space.get('node_tree')) is not None
# and (node_tree.bl_idname == 'MaxwellSimTreeType')
# )
def draw(self, context):
layout = self.layout
workspace = context.workspace
wm = context.window_manager
# list_id must be unique otherwise behaviour gets weird when the template_asset_view is shown twice
# (drag operator stops working in AssetPanelDrag, clickable area of all Assets in AssetPanelNoDrag gets
# reduced to below the Asset name and clickable area of Current File Assets in AssetPanelDrag gets
# reduced as if it didn't have a drag operator)
_activate_op_props, _drag_op_props = layout.template_asset_view(
'geo_nodes_asset_shelf',
workspace,
'asset_library_reference',
wm,
'active_asset_list',
wm,
'active_asset_index',
drag_operator=AppendGeoNodes.bl_idname,
)
####################
# - Append GeoNodes Operator
####################
def get_view_location(region, coords, ui_scale):
x, y = region.view2d.region_to_view(*coords)
return x / ui_scale, y / ui_scale
class AppendGeoNodes(bpy.types.Operator):
"""Operator allowing the user to append a vendored GeoNodes tree for use in a simulation."""
bl_idname = 'blender_maxwell.blends__import_geo_nodes'
bl_label = 'Import GeoNode Tree'
bl_description = 'Append a geometry node tree from the Blender Maxwell plugin, either via linking or appending'
bl_options = frozenset({'REGISTER'})
####################
# - Properties
####################
_asset: bpy.types.AssetRepresentation | None = None
_start_drag_x: bpy.props.IntProperty()
_start_drag_y: bpy.props.IntProperty()
####################
# - UI
####################
def draw(self, _: bpy.types.Context) -> None:
"""Draws the UI of the operator."""
layout = self.layout
col = layout.column()
col.prop(self, 'geonodes_to_append', expand=True)
####################
# - Execution
####################
@classmethod
def poll(cls, context: bpy.types.Context) -> bool:
"""Defines when the operator can be run.
Returns:
Whether the operator can be run.
"""
return context.asset is not None
def invoke(self, context, event):
self._start_drag_x = event.mouse_x
self._start_drag_y = event.mouse_y
return self.execute(context)
def execute(self, context: bpy.types.Context) -> BLOperatorStatus:
"""Initializes the while-dragging modal handler, which executes custom logic when the mouse button is released.
Runs in response to drag_handler of a `UILayout.template_asset_view`.
"""
asset: bpy.types.AssetRepresentation = context.asset
log.info('Dragging Asset: %s', asset.name)
# Store Asset for Modal & Drag Start
self._asset = context.asset
# Register Modal Operator & Tag Area for Redraw
context.window_manager.modal_handler_add(self)
context.area.tag_redraw()
# Set Modal Cursor
context.window.cursor_modal_set('CROSS')
# Return Status of Running Modal
return {'RUNNING_MODAL'}
def modal(
self, context: bpy.types.Context, event: bpy.types.Event
) -> BLOperatorStatus:
"""When LMB is released, creates a GeoNodes Structure node.
Runs in response to events in the node editor while dragging an asset from the side panel.
"""
if (asset := self._asset) is None:
return {'PASS_THROUGH'}
if event.type == 'LEFTMOUSE' and event.value == 'RELEASE':
log.info('Released Dragged Asset: %s', asset.name)
area = context.area
editor_region = next(
region for region in area.regions.values() if region.type == 'WINDOW'
)
# Check if Mouse Coordinates are:
## - INSIDE of Node Editor
## - INSIDE of Node Editor's WINDOW Region
if (
(event.mouse_x >= area.x and event.mouse_x < area.x + area.width)
and (event.mouse_y >= area.y and event.mouse_y < area.y + area.height)
) and (
(
event.mouse_x >= editor_region.x
and event.mouse_x < editor_region.x + editor_region.width
)
and (
event.mouse_y >= editor_region.y
and event.mouse_y < editor_region.y + editor_region.height
)
):
log.info(
'Asset "%s" Released in Main Window of Node Editor', asset.name
)
space = context.space_data
node_tree = space.node_tree
ui_scale = context.preferences.system.ui_scale
node_location = get_view_location(
editor_region,
[
event.mouse_x - editor_region.x,
event.mouse_y - editor_region.y,
],
ui_scale,
)
# Create GeoNodes Structure Node
#space.cursor_location_from_region(*node_location)
log.info(
'Creating GeoNodes Structure Node at (%d, %d)',
*tuple(space.cursor_location),
)
bpy.ops.node.select_all(action='DESELECT')
structure_node = node_tree.nodes.new('GeoNodesStructureNodeType')
structure_node.select = True
structure_node.location.x = node_location[0]
structure_node.location.y = node_location[1]
context.area.tag_redraw()
print(structure_node.location)
# Import the GeoNodes Structure
geonodes = import_geonodes(asset.name, 'append')
# Create the GeoNodes Node
# Create a GeoNodes Structure w/Designated GeoNodes Group @ Mouse Position
context.window.cursor_modal_restore()
return {'FINISHED'}
return {'RUNNING_MODAL'}
####################
# - Blender Registration
####################
# def initialize_asset_libraries(_: bpy.types.Scene):
# bpy.app.handlers.load_post.append(initialize_asset_libraries)
## TODO: Move to top-level registration.
asset_libraries = bpy.context.preferences.filepaths.asset_libraries
if (
asset_library_idx := asset_libraries.find('Blender Maxwell')
) != -1 and asset_libraries['Blender Maxwell'].path != str(info.PATH_ASSETS):
bpy.ops.preferences.asset_library_remove(asset_library_idx)
if 'Blender Maxwell' not in asset_libraries:
bpy.ops.preferences.asset_library_add()
asset_library = asset_libraries[-1] ## Since the operator adds to the end
asset_library.name = 'Blender Maxwell'
asset_library.path = str(info.PATH_ASSETS)
bpy.types.WindowManager.active_asset_list = bpy.props.CollectionProperty(
type=bpy.types.AssetHandle
)
bpy.types.WindowManager.active_asset_index = bpy.props.IntProperty()
## TODO: Do something differently
BL_REGISTER = [
# GeoNodesAssetShelf,
NodeAssetPanel,
AppendGeoNodes,
]
BL_KEYMAP_ITEM_DEFS = [
# {
# '_': [
# AppendGeoNodes.bl_idname,
# 'LEFTMOUSE',
# 'CLICK_DRAG',
# ],
# 'ctrl': False,
# 'shift': False,
# 'alt': False,
# }
]

Binary file not shown.

View File

@ -1,28 +1,35 @@
"""Tools for translating between BLMaxwell sockets and pure Blender sockets.
Attributes:
SOCKET_DEFS: Maps BLMaxwell SocketType objects to their corresponding SocketDef.
BL_SOCKET_3D_TYPE_PREFIXES: Blender socket prefixes which indicate that the Blender socket has three values.
BL_SOCKET_4D_TYPE_PREFIXES: Blender socket prefixes which indicate that the Blender socket has four values.
"""
import functools
import typing as typ import typing as typ
import bpy import bpy
import sympy as sp import sympy as sp
import sympy.physics.units as spu
import typing_extensions as typx
from ...utils import extra_sympy_units as spux
from ...utils import logger as _logger from ...utils import logger as _logger
from . import contracts as ct from . import contracts as ct
from . import sockets as sck from . import sockets as sck
from .contracts import SocketType as ST from .contracts import SocketType as ST # noqa: N817
log = _logger.get(__name__) log = _logger.get(__name__)
# TODO: Caching? BLSocketType: typ.TypeAlias = str ## A Blender-Defined Socket Type
# TODO: Move the manual labor stuff to contracts BLSocketValue: typ.TypeAlias = typ.Any ## A Blender Socket Value
BLSocketSize: typ.TypeAlias = int
BLSocketType = str ## A Blender-Defined Socket Type DescType: typ.TypeAlias = str
BLSocketSize = int Unit: typ.TypeAlias = typ.Any ## Type of a valid unit
DescType = str
Unit = typ.Any ## Type of a valid unit
#################### ####################
# - Socket to SocketDef # - Socket to SocketDef
#################### ####################
## TODO: It's only smelly because of the way we bubble up SocketDefs
SOCKET_DEFS = { SOCKET_DEFS = {
socket_type: getattr( socket_type: getattr(
sck, sck,
@ -31,7 +38,6 @@ SOCKET_DEFS = {
for socket_type in ST for socket_type in ST
if hasattr(sck, socket_type.value.removesuffix('SocketType') + 'SocketDef') if hasattr(sck, socket_type.value.removesuffix('SocketType') + 'SocketDef')
} }
## TODO: Bit of a hack. Is it robust enough?
for socket_type in ST: for socket_type in ST:
if not hasattr( if not hasattr(
@ -53,9 +59,11 @@ BL_SOCKET_4D_TYPE_PREFIXES = {
} }
def size_from_bl_interface_socket( @functools.lru_cache(maxsize=4096)
bl_interface_socket: bpy.types.NodeTreeInterfaceSocket, def _size_from_bl_socket(
) -> typx.Literal[1, 2, 3, 4]: description: str,
bl_socket_type: BLSocketType,
):
"""Parses the `size`, aka. number of elements, contained within the `default_value` of a Blender interface socket. """Parses the `size`, aka. number of elements, contained within the `default_value` of a Blender interface socket.
Since there are no 2D sockets in Blender, the user can specify "2D" in the Blender socket's description to "promise" that only the first two values will be used. Since there are no 2D sockets in Blender, the user can specify "2D" in the Blender socket's description to "promise" that only the first two values will be used.
@ -65,15 +73,15 @@ def size_from_bl_interface_socket(
- For 3D sockets, a hard-coded list of Blender node socket types is used. - For 3D sockets, a hard-coded list of Blender node socket types is used.
- Else, it is a 1D socket type. - Else, it is a 1D socket type.
""" """
if bl_interface_socket.description.startswith('2D'): if description.startswith('2D'):
return 2 return 2
if any( if any(
bl_interface_socket.socket_type.startswith(bl_socket_3d_type_prefix) bl_socket_type.startswith(bl_socket_3d_type_prefix)
for bl_socket_3d_type_prefix in BL_SOCKET_3D_TYPE_PREFIXES for bl_socket_3d_type_prefix in BL_SOCKET_3D_TYPE_PREFIXES
): ):
return 3 return 3
if any( if any(
bl_interface_socket.socket_type.startswith(bl_socket_4d_type_prefix) bl_socket_type.startswith(bl_socket_4d_type_prefix)
for bl_socket_4d_type_prefix in BL_SOCKET_4D_TYPE_PREFIXES for bl_socket_4d_type_prefix in BL_SOCKET_4D_TYPE_PREFIXES
): ):
return 4 return 4
@ -84,176 +92,171 @@ def size_from_bl_interface_socket(
#################### ####################
# - BL Socket Type / Unit Parser # - BL Socket Type / Unit Parser
#################### ####################
def parse_bl_interface_socket( @functools.lru_cache(maxsize=4096)
bl_interface_socket: bpy.types.NodeTreeInterfaceSocket, def _socket_type_from_bl_socket(
) -> tuple[ST, sp.Expr | None]: description: str,
"""Parse a Blender interface socket by parsing its description, falling back to any direct type links. bl_socket_type: BLSocketType,
) -> ST:
"""Parse a Blender socket for a matching BLMaxwell socket type, relying on both the Blender socket type and user-generated hints in the description.
Arguments: Arguments:
bl_interface_socket: An interface socket associated with the global input to a node tree. description: The description from Blender socket, aka. `bl_socket.description`.
bl_socket_type: The Blender socket type, aka. `bl_socket.socket_type`.
Returns: Returns:
The type of a corresponding MaxwellSimSocket, as well as a unit (if a particular unit was requested by the Blender interface socket). The type of a MaxwellSimSocket that corresponds to the Blender socket.
""" """
size = size_from_bl_interface_socket(bl_interface_socket) size = _size_from_bl_socket(description, bl_socket_type)
# Determine Direct Socket Type # Determine Socket Type Directly
## The naive mapping from BL socket -> Maxwell socket may be good enough.
if ( if (
direct_socket_type := ct.BL_SOCKET_DIRECT_TYPE_MAP.get( direct_socket_type := ct.BL_SOCKET_DIRECT_TYPE_MAP.get((bl_socket_type, size))
(bl_interface_socket.socket_type, size)
)
) is None: ) is None:
msg = "Blender interface socket has no mapping among 'MaxwellSimSocket's." msg = "Blender interface socket has no mapping among 'MaxwellSimSocket's."
raise ValueError(msg) raise ValueError(msg)
# (Maybe) Return Direct Socket Type # (No Description) Return Direct Socket Type
## When there's no description, that's it; return. if ct.BL_SOCKET_DESCR_ANNOT_STRING not in description:
if not ct.BL_SOCKET_DESCR_ANNOT_STRING in bl_interface_socket.description: return direct_socket_type
return (direct_socket_type, None)
# Parse Description for Socket Type # Parse Description for Socket Type
tokens = ( ## The "2D" token is special; don't include it if it's there.
_tokens tokens = _tokens if (_tokens := description.split(' '))[0] != '2D' else _tokens[1:]
if (_tokens := bl_interface_socket.description.split(' '))[0] != '2D'
else _tokens[1:]
) ## Don't include the "2D" token, if defined.
if ( if (
socket_type := ct.BL_SOCKET_DESCR_TYPE_MAP.get( socket_type := ct.BL_SOCKET_DESCR_TYPE_MAP.get(
(tokens[0], bl_interface_socket.socket_type, size) (tokens[0], bl_socket_type, size)
) )
) is None: ) is None:
return ( msg = f'Socket description "{(tokens[0], bl_socket_type, size)}" doesn\'t map to a socket type + unit'
direct_socket_type, raise ValueError(msg)
None,
) ## Description doesn't map to anything
# Determine Socket Unit (to use instead of "unit system") return socket_type
## This is entirely OPTIONAL
socket_unit = None
if socket_type in ct.SOCKET_UNITS:
## Case: Unit is User-Defined
if len(tokens) > 1 and '(' in tokens[1] and ')' in tokens[1]:
# Compute (<unit_str>) as Unit Token
unit_token = tokens[1].removeprefix('(').removesuffix(')')
# Compare Unit Token to Valid Sympy-Printed Units
socket_unit = (
_socket_unit
if (
_socket_unit := [
unit
for unit in ct.SOCKET_UNITS[socket_type][
'values'
].values()
if str(unit) == unit_token
]
)
else ct.SOCKET_UNITS[socket_type]['values'][
ct.SOCKET_UNITS[socket_type]['default']
]
)
## TODO: Enforce abbreviated sympy printing here, not globally
return (socket_type, socket_unit)
#################### ####################
# - BL Socket Interface Definition # - BL Socket Interface Definition
#################### ####################
def socket_def_from_bl_interface_socket( @functools.lru_cache(maxsize=4096)
def _socket_def_from_bl_socket(
description: str,
bl_socket_type: BLSocketType,
) -> ST:
return SOCKET_DEFS[_socket_type_from_bl_socket(description, bl_socket_type)]
def socket_def_from_bl_socket(
bl_interface_socket: bpy.types.NodeTreeInterfaceSocket, bl_interface_socket: bpy.types.NodeTreeInterfaceSocket,
): ) -> ct.schemas.SocketDef:
"""Computes an appropriate (no-arg) SocketDef from the given `bl_interface_socket`, by parsing it.""" """Computes an appropriate (no-arg) SocketDef from the given `bl_interface_socket`, by parsing it."""
return SOCKET_DEFS[parse_bl_interface_socket(bl_interface_socket)[0]] return _socket_def_from_bl_socket(
bl_interface_socket.description, bl_interface_socket.socket_type
)
#################### ####################
# - Extract Default Interface Socket Value # - Extract Default Interface Socket Value
#################### ####################
def value_from_bl( @functools.lru_cache(maxsize=4096)
def _read_bl_socket_default_value(
description: str,
bl_socket_type: BLSocketType,
bl_socket_value: BLSocketValue,
unit_system: dict | None = None,
) -> typ.Any:
# Parse the BL Socket Type and Value
## The 'lambda' delays construction until size is determined.
socket_type = _socket_type_from_bl_socket(description, bl_socket_type)
parsed_socket_value = {
1: lambda: bl_socket_value,
2: lambda: sp.Matrix(tuple(bl_socket_value)[:2]),
3: lambda: sp.Matrix(tuple(bl_socket_value)),
4: lambda: sp.Matrix(tuple(bl_socket_value)),
}[_size_from_bl_socket(description, bl_socket_type)]()
# Add Unit-System Unit to Parsed
## Use the matching socket type to lookup the unit in the unit system.
if unit_system is not None:
if (unit := unit_system.get(socket_type)) is None:
msg = f'Unit system does not provide a unit for {socket_type}'
raise RuntimeError(msg)
if unit not in (valid_units := ct.SOCKET_UNITS[socket_type]['values'].values()):
msg = f'Unit system provided a unit "{unit}" that is invalid for socket type "{socket_type}" (valid units: {valid_units})'
raise RuntimeError(msg)
return parsed_socket_value * unit
return parsed_socket_value
def read_bl_socket_default_value(
bl_interface_socket: bpy.types.NodeTreeInterfaceSocket, bl_interface_socket: bpy.types.NodeTreeInterfaceSocket,
unit_system: dict | None = None, unit_system: dict | None = None,
) -> typ.Any: ) -> typ.Any:
"""Reads the value of any Blender socket, and writes its `default_value` to the `value` of any `MaxwellSimSocket`. """Reads the `default_value` of a Blender socket, guaranteeing a well-formed value consistent with the passed unit system.
- If the size of the Blender socket is >1, then `value` is written to as a `sympy.Matrix`.
- If a unit system is given, then the Blender socket is matched to a `MaxwellSimSocket`, which is used to lookup an appropriate unit in the given `unit_system`. Arguments:
bl_interface_socket: The Blender interface socket to analyze for description, socket type, and default value.
unit_system: The mapping from BLMaxwell SocketType to corresponding unit, used to apply the appropriate unit to the output.
Returns:
The parsed, well-formed version of `bl_socket.default_value`, of the appropriate form and unit.
""" """
## TODO: Consider sympy.S()'ing the default_value return _read_bl_socket_default_value(
parsed_bl_socket_value = { bl_interface_socket.description,
1: lambda: bl_interface_socket.default_value, bl_interface_socket.socket_type,
2: lambda: sp.Matrix(tuple(bl_interface_socket.default_value)[:2]), bl_interface_socket.default_value,
3: lambda: sp.Matrix(tuple(bl_interface_socket.default_value)), unit_system,
4: lambda: sp.Matrix(tuple(bl_interface_socket.default_value)), )
}[size_from_bl_interface_socket(bl_interface_socket)]()
## The 'lambda' delays construction until size is determined
socket_type, unit = parse_bl_interface_socket(bl_interface_socket)
# Add Unit to Parsed (if relevant)
if unit is not None:
parsed_bl_socket_value *= unit
elif unit_system is not None:
parsed_bl_socket_value *= unit_system[socket_type]
return parsed_bl_socket_value
#################### @functools.lru_cache(maxsize=4096)
# - Convert to Blender-Compatible Value def _writable_bl_socket_value(
#################### description: str,
def make_scalar_bl_compat(scalar: typ.Any) -> typ.Any: bl_socket_type: BLSocketType,
"""Blender doesn't accept ex. Sympy numbers as values.
Therefore, we need to do some conforming.
Currently hard-coded; this is probably best.
"""
if isinstance(scalar, sp.Integer):
return int(scalar)
elif isinstance(scalar, sp.Float):
return float(scalar)
elif isinstance(scalar, sp.Rational):
return float(scalar)
elif isinstance(scalar, sp.Expr):
return float(scalar.n())
## TODO: More?
return scalar
def value_to_bl(
bl_interface_socket: bpy.types.NodeSocket,
value: typ.Any, value: typ.Any,
unit_system: dict | None = None, unit_system: dict | None = None,
) -> typ.Any: ) -> typ.Any:
socket_type, unit = parse_bl_interface_socket(bl_interface_socket) socket_type = _socket_type_from_bl_socket(description, bl_socket_type)
# Set Socket # Retrieve Unit-System Unit
if unit is not None: if unit_system is not None:
bl_socket_value = spu.convert_to(value, unit) / unit if (unit := unit_system.get(socket_type)) is None:
elif unit_system is not None and socket_type in unit_system: msg = f'Unit system does not provide a unit for {socket_type}'
bl_socket_value = ( raise RuntimeError(msg)
spu.convert_to(value, unit_system[socket_type])
/ unit_system[socket_type] _bl_socket_value = spux.scale_to_unit(value, unit)
)
else: else:
bl_socket_value = value _bl_socket_value = value
return { # Compute Blender Socket Value
1: lambda: make_scalar_bl_compat(bl_socket_value), bl_socket_value = spux.sympy_to_python(_bl_socket_value)
2: lambda: tuple( if _size_from_bl_socket(description, bl_socket_type) == 2: # noqa: PLR2004
[ bl_socket_value = bl_socket_value[:2]
make_scalar_bl_compat(bl_socket_value[0]), return bl_socket_value
make_scalar_bl_compat(bl_socket_value[1]),
bl_interface_socket.default_value[2],
## Don't touch (unused) 3rd bl_socket coordinate def writable_bl_socket_value(
] bl_interface_socket: bpy.types.NodeTreeInterfaceSocket,
), value: typ.Any,
3: lambda: tuple( unit_system: dict | None = None,
[make_scalar_bl_compat(el) for el in bl_socket_value] ) -> typ.Any:
), """Processes a value to be ready-to-write to a Blender socket.
4: lambda: tuple(
[make_scalar_bl_compat(el) for el in bl_socket_value] Arguments:
), bl_interface_socket: The Blender interface socket to analyze
}[size_from_bl_interface_socket(bl_interface_socket)]() value: The value to prepare for writing to the given Blender socket.
## The 'lambda' delays construction until size is determined unit_system: The mapping from BLMaxwell SocketType to corresponding unit, used to scale the value to the the appropriate unit.
Returns:
A value corresponding to the input, which is guaranteed to be compatible with the Blender socket (incl. via a GeoNodes modifier), as well as correctly scaled with respect to the given unit system.
"""
return _writable_bl_socket_value(
bl_interface_socket.description,
bl_interface_socket.bl_socket_type,
value,
unit_system,
)

View File

@ -30,6 +30,8 @@ from .socket_units import SOCKET_UNITS
from .socket_colors import SOCKET_COLORS from .socket_colors import SOCKET_COLORS
from .socket_shapes import SOCKET_SHAPES from .socket_shapes import SOCKET_SHAPES
from .unit_systems import UNITS_BLENDER, UNITS_TIDY3D
from .socket_from_bl_desc import BL_SOCKET_DESCR_TYPE_MAP from .socket_from_bl_desc import BL_SOCKET_DESCR_TYPE_MAP
from .socket_from_bl_direct import BL_SOCKET_DIRECT_TYPE_MAP from .socket_from_bl_direct import BL_SOCKET_DIRECT_TYPE_MAP
@ -73,6 +75,8 @@ __all__ = [
'SOCKET_UNITS', 'SOCKET_UNITS',
'SOCKET_COLORS', 'SOCKET_COLORS',
'SOCKET_SHAPES', 'SOCKET_SHAPES',
'UNITS_BLENDER',
'UNITS_TIDY3D',
'BL_SOCKET_DESCR_TYPE_MAP', 'BL_SOCKET_DESCR_TYPE_MAP',
'BL_SOCKET_DIRECT_TYPE_MAP', 'BL_SOCKET_DIRECT_TYPE_MAP',
'BL_SOCKET_DESCR_ANNOT_STRING', 'BL_SOCKET_DESCR_ANNOT_STRING',

View File

@ -1,7 +1,7 @@
import sympy.physics.units as spu import sympy.physics.units as spu
from ....utils import extra_sympy_units as spux
from .socket_types import SocketType as ST from ....utils import extra_sympy_units as spux
from .socket_types import SocketType as ST # noqa: N817
SOCKET_UNITS = { SOCKET_UNITS = {
ST.PhysicalTime: { ST.PhysicalTime: {

View File

@ -0,0 +1,63 @@
import typing as typ
import sympy.physics.units as spu
from ....utils import extra_sympy_units as spux
from ....utils.pydantic_sympy import SympyExpr
from .socket_types import SocketType as ST # noqa: N817
from .socket_units import SOCKET_UNITS
def _socket_units(socket_type):
return SOCKET_UNITS[socket_type]['values']
UnitSystem: typ.TypeAlias = dict[ST, SympyExpr]
####################
# - Unit Systems
####################
UNITS_BLENDER: UnitSystem = {
socket_type: _socket_units(socket_type)[socket_unit_prop]
for socket_type, socket_unit_prop in {
ST.PhysicalTime: spu.picosecond,
ST.PhysicalAngle: spu.radian,
ST.PhysicalLength: spu.micrometer,
ST.PhysicalArea: spu.micrometer**2,
ST.PhysicalVolume: spu.micrometer**3,
ST.PhysicalPoint2D: spu.micrometer,
ST.PhysicalPoint3D: spu.micrometer,
ST.PhysicalSize2D: spu.micrometer,
ST.PhysicalSize3D: spu.micrometer,
ST.PhysicalMass: spu.microgram,
ST.PhysicalSpeed: spu.um / spu.second,
ST.PhysicalAccelScalar: spu.um / spu.second**2,
ST.PhysicalForceScalar: spu.micronewton,
ST.PhysicalAccel3D: spu.um / spu.second**2,
ST.PhysicalForce3D: spu.micronewton,
ST.PhysicalFreq: spu.terahertz,
ST.PhysicalPol: spu.radian,
}.items()
} ## TODO: Load (dynamically?) from addon preferences
UNITS_TIDY3D: UnitSystem = {
socket_type: _socket_units(socket_type)[socket_unit_prop]
for socket_type, socket_unit_prop in {
ST.PhysicalTime: spu.picosecond,
ST.PhysicalAngle: spu.radian,
ST.PhysicalLength: spu.micrometer,
ST.PhysicalArea: spu.micrometer**2,
ST.PhysicalVolume: spu.micrometer**3,
ST.PhysicalPoint2D: spu.micrometer,
ST.PhysicalPoint3D: spu.micrometer,
ST.PhysicalSize2D: spu.micrometer,
ST.PhysicalSize3D: spu.micrometer,
ST.PhysicalMass: spu.microgram,
ST.PhysicalSpeed: spu.um / spu.second,
ST.PhysicalAccelScalar: spu.um / spu.second**2,
ST.PhysicalForceScalar: spu.micronewton,
ST.PhysicalAccel3D: spu.um / spu.second**2,
ST.PhysicalForce3D: spu.micronewton,
ST.PhysicalFreq: spu.terahertz,
ST.PhysicalPol: spu.radian,
}.items()
}

View File

@ -1,8 +1,19 @@
from .managed_bl_empty import ManagedBLEmpty
from .managed_bl_image import ManagedBLImage from .managed_bl_image import ManagedBLImage
#from .managed_bl_collection import ManagedBLCollection #from .managed_bl_collection import ManagedBLCollection
#from .managed_bl_object import ManagedBLObject #from .managed_bl_object import ManagedBLObject
from .managed_bl_mesh import ManagedBLMesh from .managed_bl_mesh import ManagedBLMesh
from .managed_bl_empty import ManagedBLEmpty
#from .managed_bl_volume import ManagedBLVolume #from .managed_bl_volume import ManagedBLVolume
from .managed_bl_modifier import ManagedBLModifier from .managed_bl_modifier import ManagedBLModifier
__all__ = [
'ManagedBLEmpty',
'ManagedBLImage',
#'ManagedBLCollection',
#'ManagedBLObject',
'ManagedBLMesh',
#'ManagedBLVolume',
'ManagedBLModifier',
]

View File

@ -3,7 +3,6 @@ import contextlib
import bmesh import bmesh
import bpy import bpy
import numpy as np import numpy as np
import typing_extensions as typx
from ....utils import logger from ....utils import logger
from .. import contracts as ct from .. import contracts as ct
@ -98,21 +97,29 @@ class ManagedBLMesh(ct.schemas.ManagedObj):
If it's already included, do nothing. If it's already included, do nothing.
""" """
bl_object = self.bl_object() if (
if bl_object.name not in preview_collection().objects: bl_object := bpy.data.objects.get(self.name)
) is not None and bl_object.name not in preview_collection().objects:
log.info('Moving "%s" to Preview Collection', bl_object.name) log.info('Moving "%s" to Preview Collection', bl_object.name)
preview_collection().objects.link(bl_object) preview_collection().objects.link(bl_object)
msg = 'Managed BLMesh does not exist'
raise ValueError(msg)
def hide_preview(self) -> None: def hide_preview(self) -> None:
"""Removes the managed Blender object from the preview collection. """Removes the managed Blender object from the preview collection.
If it's already removed, do nothing. If it's already removed, do nothing.
""" """
bl_object = self.bl_object() if (
if bl_object.name not in preview_collection().objects: bl_object := bpy.data.objects.get(self.name)
) is not None and bl_object.name in preview_collection().objects:
log.info('Removing "%s" from Preview Collection', bl_object.name) log.info('Removing "%s" from Preview Collection', bl_object.name)
preview_collection.objects.unlink(bl_object) preview_collection.objects.unlink(bl_object)
msg = 'Managed BLMesh does not exist'
raise ValueError(msg)
def bl_select(self) -> None: def bl_select(self) -> None:
"""Selects the managed Blender object, causing it to be ex. outlined in the 3D viewport.""" """Selects the managed Blender object, causing it to be ex. outlined in the 3D viewport."""
if (bl_object := bpy.data.objects.get(self.name)) is not None: if (bl_object := bpy.data.objects.get(self.name)) is not None:
@ -125,7 +132,7 @@ class ManagedBLMesh(ct.schemas.ManagedObj):
#################### ####################
# - BLMesh Management # - BLMesh Management
#################### ####################
def bl_object(self): def bl_object(self, location: tuple[float, float, float] = (0, 0, 0)):
"""Returns the managed blender object.""" """Returns the managed blender object."""
# Create Object w/Appropriate Data Block # Create Object w/Appropriate Data Block
if not (bl_object := bpy.data.objects.get(self.name)): if not (bl_object := bpy.data.objects.get(self.name)):
@ -141,6 +148,10 @@ class ManagedBLMesh(ct.schemas.ManagedObj):
) )
managed_collection().objects.link(bl_object) managed_collection().objects.link(bl_object)
for i, coord in enumerate(location):
if bl_object.location[i] != coord:
bl_object.location[i] = coord
return bl_object return bl_object
#################### ####################

View File

@ -1,9 +1,10 @@
import typing as typ import typing as typ
import bpy import bpy
import typing_extensions as typx import typing_extensions as typx
from ....utils import analyze_geonodes from ....utils import analyze_geonodes, logger
from ....utils import logger from .. import bl_socket_map
from .. import contracts as ct from .. import contracts as ct
log = logger.get(__name__) log = logger.get(__name__)
@ -16,6 +17,7 @@ NodeTreeInterfaceID: typ.TypeAlias = str
class ModifierAttrsNODES(typ.TypedDict): class ModifierAttrsNODES(typ.TypedDict):
node_group: bpy.types.GeometryNodeTree node_group: bpy.types.GeometryNodeTree
unit_system: bpy.types.GeometryNodeTree
inputs: dict[NodeTreeInterfaceID, typ.Any] inputs: dict[NodeTreeInterfaceID, typ.Any]
@ -31,7 +33,7 @@ MODIFIER_NAMES = {
#################### ####################
# - Read/Write Modifier Attributes # - Read Modifier Information
#################### ####################
def read_modifier(bl_modifier: bpy.types.Modifier) -> ModifierAttrs: def read_modifier(bl_modifier: bpy.types.Modifier) -> ModifierAttrs:
if bl_modifier.type == 'NODES': if bl_modifier.type == 'NODES':
@ -44,6 +46,66 @@ def read_modifier(bl_modifier: bpy.types.Modifier) -> ModifierAttrs:
raise NotImplementedError raise NotImplementedError
####################
# - Write Modifier Information
####################
def write_modifier_geonodes(
bl_modifier: bpy.types.Modifier,
modifier_attrs: ModifierAttrsNODES,
) -> bool:
# Alter GeoNodes Group
if bl_modifier.node_group != modifier_attrs['node_group']:
log.info(
'Changing GeoNodes Modifier NodeTree from "%s" to "%s"',
str(bl_modifier.node_group),
str(modifier_attrs['node_group']),
)
bl_modifier.node_group = modifier_attrs['node_group']
modifier_altered = True
# Alter GeoNodes Modifier Inputs
## First we retrieve the interface items by-Socket Name
geonodes_interface = analyze_geonodes.interface(
bl_modifier.node_group, direct='INPUT'
)
for (
socket_name,
value,
) in modifier_attrs['inputs'].items():
# Compute Writable BL Socket Value
## Analyzes the socket and unitsys to prep a ready-to-write value.
## Writte directly to the modifier dict.
bl_socket_value = bl_socket_map.writable_bl_socket_value(
geonodes_interface[socket_name],
value,
modifier_attrs['unit_system'],
)
# Compute Interface ID from Socket Name
## We can't index the modifier by socket name; only by Interface ID.
## Still, we require that socket names are unique.
iface_id = geonodes_interface[socket_name].identifier
# IF List-Like: Alter Differing Elements
if isinstance(bl_socket_value, tuple):
for i, bl_socket_subvalue in enumerate(bl_socket_value):
if bl_modifier[iface_id][i] != bl_socket_subvalue:
bl_modifier[iface_id][i] = bl_socket_subvalue
# IF int/float Mismatch: Assign Float-Cast of Integer
## Blender is strict; only floats can set float vals.
## We are less strict; if the user passes an int, that's okay.
elif isinstance(bl_socket_value, int) and isinstance(
bl_modifier[iface_id],
float,
):
bl_modifier[iface_id] = float(bl_socket_value)
modifier_altered = True
else:
bl_modifier[iface_id] = bl_socket_value
modifier_altered = True
def write_modifier( def write_modifier(
bl_modifier: bpy.types.Modifier, bl_modifier: bpy.types.Modifier,
modifier_attrs: ModifierAttrs, modifier_attrs: ModifierAttrs,
@ -55,55 +117,7 @@ def write_modifier(
""" """
modifier_altered = False modifier_altered = False
if bl_modifier.type == 'NODES': if bl_modifier.type == 'NODES':
# Alter GeoNodes Group modifier_altered = write_modifier_geonodes(bl_modifier, modifier_attrs)
if bl_modifier.node_group != modifier_attrs['node_group']:
log.info(
'Changing GeoNodes Modifier NodeTree from "%s" to "%s"',
str(bl_modifier.node_group),
str(modifier_attrs['node_group']),
)
bl_modifier.node_group = modifier_attrs['node_group']
modifier_altered = True
# Alter GeoNodes Input (Interface) Socket Values
## The modifier's dict-like setter actually sets NodeTree interface vals
## By setting the interface value, this particular NodeTree will change
geonodes_interface = analyze_geonodes.interface(
bl_modifier.node_group, direct='INPUT'
)
for (
socket_name,
raw_value,
) in modifier_attrs['inputs'].items():
iface_id = geonodes_interface[socket_name].identifier
# Alter Interface Value
if bl_modifier[iface_id] != raw_value:
# Determine IDPropertyArray Equality
## The equality above doesn't work for IDPropertyArrays.
## BUT, IDPropertyArrays must have a 'to_list' method.
## To do the comparison, we tuple-ify the IDPropertyArray.
## raw_value is always a tuple if it's listy.
if (
hasattr(bl_modifier[iface_id], 'to_list')
and tuple(bl_modifier[iface_id].to_list()) == raw_value
):
continue
# Determine int/float Mismatch
## Blender is strict; only floats can set float vals.
## We are less strict; if the user passes an int, that's okay.
if isinstance(
bl_modifier[iface_id],
float,
) and isinstance(raw_value, int):
value = float(raw_value)
bl_modifier[iface_id] = value
modifier_altered = True
## TODO: Altering existing values is much better for performance.
## - GC churn is real!
## - Especially since this is in a hot path
elif bl_modifier.type == 'ARRAY': elif bl_modifier.type == 'ARRAY':
raise NotImplementedError raise NotImplementedError
else: else:
@ -144,8 +158,7 @@ class ManagedBLModifier(ct.schemas.ManagedObj):
# - Deallocation # - Deallocation
#################### ####################
def free(self): def free(self):
log.info('Freeing BLModifier w/Name "%s" (NOT IMPLEMENTED)', self.name) pass
## TODO: Implement
#################### ####################
# - Modifiers # - Modifiers
@ -161,6 +174,7 @@ class ManagedBLModifier(ct.schemas.ManagedObj):
- Modifier Type Names: <https://docs.blender.org/api/current/bpy_types_enum_items/object_modifier_type_items.html#rna-enum-object-modifier-type-items> - Modifier Type Names: <https://docs.blender.org/api/current/bpy_types_enum_items/object_modifier_type_items.html#rna-enum-object-modifier-type-items>
""" """
# Remove Mismatching Modifier # Remove Mismatching Modifier
modifier_was_removed = False
if ( if (
bl_modifier := bl_object.modifiers.get(self.name) bl_modifier := bl_object.modifiers.get(self.name)
) and bl_modifier.type != modifier_type: ) and bl_modifier.type != modifier_type:
@ -172,9 +186,10 @@ class ManagedBLModifier(ct.schemas.ManagedObj):
modifier_type, modifier_type,
) )
self.free() self.free()
modifier_was_removed = True
# Create Modifier # Create Modifier
if not (bl_modifier := bl_object.modifiers.get(self.name)): if bl_modifier is None or modifier_was_removed:
log.info( log.info(
'Creating BLModifier "%s" on BLObject "%s" with modifier_type "%s"', 'Creating BLModifier "%s" on BLObject "%s" with modifier_type "%s"',
self.name, self.name,

View File

@ -1,4 +1,3 @@
import inspect
import json import json
import typing as typ import typing as typ
import uuid import uuid
@ -43,16 +42,18 @@ class MaxwellSimNode(bpy.types.Node):
# Sockets # Sockets
_output_socket_methods: dict _output_socket_methods: dict
input_sockets: dict[str, ct.schemas.SocketDef] = {} input_sockets: typ.ClassVar[dict[str, ct.schemas.SocketDef]] = {}
output_sockets: dict[str, ct.schemas.SocketDef] = {} output_sockets: typ.ClassVar[dict[str, ct.schemas.SocketDef]] = {}
input_socket_sets: dict[str, dict[str, ct.schemas.SocketDef]] = {} input_socket_sets: typ.ClassVar[dict[str, dict[str, ct.schemas.SocketDef]]] = {}
output_socket_sets: dict[str, dict[str, ct.schemas.SocketDef]] = {} output_socket_sets: typ.ClassVar[dict[str, dict[str, ct.schemas.SocketDef]]] = {}
# Presets # Presets
presets = {} presets: typ.ClassVar = {}
# Managed Objects # Managed Objects
managed_obj_defs: dict[ct.ManagedObjName, ct.schemas.ManagedObjDef] = {} managed_obj_defs: typ.ClassVar[
dict[ct.ManagedObjName, ct.schemas.ManagedObjDef]
] = {}
#################### ####################
# - Initialization # - Initialization
@ -82,6 +83,14 @@ class MaxwellSimNode(bpy.types.Node):
update=(lambda self, context: self.sync_sim_node_name(context)), update=(lambda self, context: self.sync_sim_node_name(context)),
) )
# Setup "Previewing" Property for Node
cls.__annotations__['preview_active'] = bpy.props.BoolProperty(
name='Preview Active',
description='Whether the preview (if any) is currently active',
default='',
update=lambda self, context: self.sync_prop('preview_active', context),
)
# Setup Locked Property for Node # Setup Locked Property for Node
cls.__annotations__['locked'] = bpy.props.BoolProperty( cls.__annotations__['locked'] = bpy.props.BoolProperty(
name='Locked State', name='Locked State',
@ -96,34 +105,30 @@ class MaxwellSimNode(bpy.types.Node):
# Setup Callback Methods # Setup Callback Methods
cls._output_socket_methods = { cls._output_socket_methods = {
method._index_by: method (method.extra_data['output_socket_name'], method.extra_data['kind']): method
for attr_name in dir(cls) for attr_name in dir(cls)
if hasattr(method := getattr(cls, attr_name), '_callback_type') if hasattr(method := getattr(cls, attr_name), 'action_type')
and method._callback_type == 'computes_output_socket' and method.action_type == 'computes_output_socket'
and hasattr(method, 'extra_data')
and method.extra_data
} }
cls._on_value_changed_methods = { cls._on_value_changed_methods = {
method method
for attr_name in dir(cls) for attr_name in dir(cls)
if hasattr(method := getattr(cls, attr_name), '_callback_type') if hasattr(method := getattr(cls, attr_name), 'action_type')
and method._callback_type == 'on_value_changed' and method.action_type == 'on_value_changed'
}
cls._on_show_preview = {
method
for attr_name in dir(cls)
if hasattr(method := getattr(cls, attr_name), '_callback_type')
and method._callback_type == 'on_show_preview'
} }
cls._on_show_plot = { cls._on_show_plot = {
method method
for attr_name in dir(cls) for attr_name in dir(cls)
if hasattr(method := getattr(cls, attr_name), '_callback_type') if hasattr(method := getattr(cls, attr_name), 'action_type')
and method._callback_type == 'on_show_plot' and method.action_type == 'on_show_plot'
} }
cls._on_init = { cls._on_init = {
method method
for attr_name in dir(cls) for attr_name in dir(cls)
if hasattr(method := getattr(cls, attr_name), '_callback_type') if hasattr(method := getattr(cls, attr_name), 'action_type')
and method._callback_type == 'on_init' and method.action_type == 'on_init'
} }
# Setup Socket Set Dropdown # Setup Socket Set Dropdown
@ -135,7 +140,7 @@ class MaxwellSimNode(bpy.types.Node):
_input_socket_set_names := list(cls.input_socket_sets.keys()) _input_socket_set_names := list(cls.input_socket_sets.keys())
) + [ ) + [
output_socket_set_name output_socket_set_name
for output_socket_set_name in cls.output_socket_sets.keys() for output_socket_set_name in cls.output_socket_sets
if output_socket_set_name not in _input_socket_set_names if output_socket_set_name not in _input_socket_set_names
] ]
socket_set_ids = [ socket_set_ids = [
@ -160,9 +165,7 @@ class MaxwellSimNode(bpy.types.Node):
) )
], ],
default=socket_set_names[0], default=socket_set_names[0],
update=lambda self, context: self.sync_active_socket_set( update=lambda self, context: self.sync_active_socket_set(context),
context
),
) )
# Setup Preset Dropdown # Setup Preset Dropdown
@ -181,8 +184,8 @@ class MaxwellSimNode(bpy.types.Node):
) )
for preset_name, preset_def in cls.presets.items() for preset_name, preset_def in cls.presets.items()
], ],
default=list(cls.presets.keys())[0], default=next(cls.presets.keys()),
update=lambda self, context: (self.sync_active_preset()()), update=lambda self, _: (self.sync_active_preset()()),
) )
#################### ####################
@ -192,7 +195,7 @@ class MaxwellSimNode(bpy.types.Node):
self.sync_sockets() self.sync_sockets()
self.sync_prop('active_socket_set', context) self.sync_prop('active_socket_set', context)
def sync_sim_node_name(self, context): def sync_sim_node_name(self, _):
if (mobjs := CACHE[self.instance_id].get('managed_objs')) is None: if (mobjs := CACHE[self.instance_id].get('managed_objs')) is None:
return return
@ -212,7 +215,6 @@ class MaxwellSimNode(bpy.types.Node):
#################### ####################
@property @property
def managed_objs(self): def managed_objs(self):
global CACHE
if not CACHE.get(self.instance_id): if not CACHE.get(self.instance_id):
CACHE[self.instance_id] = {} CACHE[self.instance_id] = {}
@ -229,9 +231,7 @@ class MaxwellSimNode(bpy.types.Node):
# Fill w/Managed Objects by Name Socket # Fill w/Managed Objects by Name Socket
for mobj_id, mobj_def in self.managed_obj_defs.items(): for mobj_id, mobj_def in self.managed_obj_defs.items():
name = mobj_def.name_prefix + self.sim_node_name name = mobj_def.name_prefix + self.sim_node_name
CACHE[self.instance_id]['managed_objs'][mobj_id] = mobj_def.mk( CACHE[self.instance_id]['managed_objs'][mobj_id] = mobj_def.mk(name)
name
)
return CACHE[self.instance_id]['managed_objs'] return CACHE[self.instance_id]['managed_objs']
@ -253,9 +253,7 @@ class MaxwellSimNode(bpy.types.Node):
# Retrieve Active Socket Set Sockets # Retrieve Active Socket Set Sockets
socket_sets = ( socket_sets = (
self.input_socket_sets self.input_socket_sets if direc == 'input' else self.output_socket_sets
if direc == 'input'
else self.output_socket_sets
) )
active_socket_set_sockets = socket_sets.get(self.active_socket_set) active_socket_set_sockets = socket_sets.get(self.active_socket_set)
@ -265,24 +263,13 @@ class MaxwellSimNode(bpy.types.Node):
return active_socket_set_sockets return active_socket_set_sockets
def active_sockets(self, direc: typx.Literal['input', 'output']): def active_sockets(self, direc: typx.Literal['input', 'output']):
static_sockets = ( static_sockets = self.input_sockets if direc == 'input' else self.output_sockets
self.input_sockets if direc == 'input' else self.output_sockets
)
socket_sets = (
self.input_socket_sets
if direc == 'input'
else self.output_socket_sets
)
loose_sockets = ( loose_sockets = (
self.loose_input_sockets self.loose_input_sockets if direc == 'input' else self.loose_output_sockets
if direc == 'input'
else self.loose_output_sockets
) )
return ( return (
static_sockets static_sockets | self.active_socket_set_sockets(direc=direc) | loose_sockets
| self.active_socket_set_sockets(direc=direc)
| loose_sockets
) )
#################### ####################
@ -302,12 +289,8 @@ class MaxwellSimNode(bpy.types.Node):
) )
## Internal Serialization/Deserialization Methods (yuck) ## Internal Serialization/Deserialization Methods (yuck)
def _ser_loose_sockets( def _ser_loose_sockets(self, deser: dict[str, ct.schemas.SocketDef]) -> str:
self, deser: dict[str, ct.schemas.SocketDef] if not all(isinstance(model, pyd.BaseModel) for model in deser.values()):
) -> str:
if not all(
isinstance(model, pyd.BaseModel) for model in deser.values()
):
msg = 'Trying to deserialize loose sockets with invalid SocketDefs (they must be `pydantic` BaseModels).' msg = 'Trying to deserialize loose sockets with invalid SocketDefs (they must be `pydantic` BaseModels).'
raise ValueError(msg) raise ValueError(msg)
@ -325,9 +308,7 @@ class MaxwellSimNode(bpy.types.Node):
} }
) ## Big reliance on order-preservation of dicts here.) ) ## Big reliance on order-preservation of dicts here.)
def _deser_loose_sockets( def _deser_loose_sockets(self, ser: str) -> dict[str, ct.schemas.SocketDef]:
self, ser: str
) -> dict[str, ct.schemas.SocketDef]:
semi_deser = json.loads(ser) semi_deser = json.loads(ser)
return { return {
socket_name: getattr(sockets, socket_def_name)(**model_kwargs) socket_name: getattr(sockets, socket_def_name)(**model_kwargs)
@ -354,6 +335,11 @@ class MaxwellSimNode(bpy.types.Node):
self, self,
value: dict[str, ct.schemas.SocketDef], value: dict[str, ct.schemas.SocketDef],
) -> None: ) -> None:
log.info(
'Setting Loose Input Sockets on "%s" to "%s"',
self.bl_label,
str(value),
)
if not value: if not value:
self.ser_loose_input_sockets = _DEFAULT_LOOSE_SOCKET_SER self.ser_loose_input_sockets = _DEFAULT_LOOSE_SOCKET_SER
else: else:
@ -448,9 +434,7 @@ class MaxwellSimNode(bpy.types.Node):
# - Preset Management # - Preset Management
#################### ####################
def sync_active_preset(self) -> None: def sync_active_preset(self) -> None:
"""Applies the active preset by overwriting the value of """Applies the active preset by overwriting the value of preset-defined input sockets."""
preset-defined input sockets.
"""
if not (preset_def := self.presets.get(self.active_preset)): if not (preset_def := self.presets.get(self.active_preset)):
msg = f'Tried to apply active preset, but the active preset ({self.active_preset}) is not in presets ({self.presets})' msg = f'Tried to apply active preset, but the active preset ({self.active_preset}) is not in presets ({self.presets})'
raise RuntimeError(msg) raise RuntimeError(msg)
@ -507,7 +491,7 @@ class MaxwellSimNode(bpy.types.Node):
## TODO: Side panel buttons for fanciness. ## TODO: Side panel buttons for fanciness.
def draw_plot_settings(self, context, layout): def draw_plot_settings(self, _: bpy.types.Context, layout: bpy.types.UILayout):
if self.locked: if self.locked:
layout.enabled = False layout.enabled = False
@ -522,17 +506,15 @@ class MaxwellSimNode(bpy.types.Node):
"""Computes the data of an input socket, by socket name and data flow kind, by asking the socket nicely via `bl_socket.compute_data`. """Computes the data of an input socket, by socket name and data flow kind, by asking the socket nicely via `bl_socket.compute_data`.
Args: Args:
input_socket_name: The name of the input socket, as defined in input_socket_name: The name of the input socket, as defined in `self.input_sockets`.
`self.input_sockets`. kind: The kind of data flow to compute.
kind: The data flow kind to compute retrieve.
""" """
if not (bl_socket := self.inputs.get(input_socket_name)): if bl_socket := self.inputs.get(input_socket_name):
return None
# msg = f"Input socket name {input_socket_name} is not an active input sockets."
# raise ValueError(msg)
return bl_socket.compute_data(kind=kind) return bl_socket.compute_data(kind=kind)
msg = f'Input socket "{input_socket_name}" on "{self.bl_idname}" is not an active input socket'
raise ValueError(msg)
def compute_output( def compute_output(
self, self,
output_socket_name: ct.SocketName, output_socket_name: ct.SocketName,
@ -544,27 +526,25 @@ class MaxwellSimNode(bpy.types.Node):
This method is run to produce the value. This method is run to produce the value.
Args: Args:
output_socket_name: The name declaring the output socket, output_socket_name: The name declaring the output socket, for which this method computes the output.
for which this method computes the output. kind: The DataFlowKind to use when computing the output socket value.
Returns: Returns:
The value of the output socket, as computed by the dedicated method The value of the output socket, as computed by the dedicated method
registered using the `@computes_output_socket` decorator. registered using the `@computes_output_socket` decorator.
""" """
if not ( if output_socket_method := self._output_socket_methods.get(
output_socket_method := self._output_socket_methods.get(
(output_socket_name, kind) (output_socket_name, kind)
)
): ):
return output_socket_method(self)
msg = f'No output method for ({output_socket_name}, {str(kind.value)}' msg = f'No output method for ({output_socket_name}, {str(kind.value)}'
raise ValueError(msg) raise ValueError(msg)
return output_socket_method(self)
#################### ####################
# - Action Chain # - Action Chain
#################### ####################
def sync_prop(self, prop_name: str, context: bpy.types.Context): def sync_prop(self, prop_name: str, _: bpy.types.Context):
"""Called when a property has been updated.""" """Called when a property has been updated."""
if not hasattr(self, prop_name): if not hasattr(self, prop_name):
msg = f'Property {prop_name} not defined on socket {self}' msg = f'Property {prop_name} not defined on socket {self}'
@ -598,17 +578,12 @@ class MaxwellSimNode(bpy.types.Node):
if ( if (
( (
socket_name socket_name
and socket_name and socket_name in method.extra_data['changed_sockets']
in method._extra_data.get('changed_sockets')
)
or (
prop_name
and prop_name
in method._extra_data.get('changed_props')
) )
or (prop_name and prop_name in method.extra_data['changed_props'])
or ( or (
socket_name socket_name
and method._extra_data['changed_loose_input'] and method.extra_data['changed_loose_input']
and socket_name in self.loose_input_sockets and socket_name in self.loose_input_sockets
) )
): ):
@ -635,8 +610,11 @@ class MaxwellSimNode(bpy.types.Node):
elif action == 'show_preview': elif action == 'show_preview':
# Run User Callbacks # Run User Callbacks
for method in self._on_show_preview: ## "On Show Preview" callbacks are 'on_value_changed' callbacks...
method(self) ## ...which simply hook into the 'preview_active' property.
## By (maybe) altering 'preview_active', callbacks run as needed.
if not self.preview_active:
self.preview_active = True
## Propagate via Input Sockets ## Propagate via Input Sockets
for bl_socket in self.active_bl_sockets('input'): for bl_socket in self.active_bl_sockets('input'):
@ -648,7 +626,7 @@ class MaxwellSimNode(bpy.types.Node):
## ...because they can stop propagation, they should go first. ## ...because they can stop propagation, they should go first.
for method in self._on_show_plot: for method in self._on_show_plot:
method(self) method(self)
if method._extra_data['stop_propagation']: if method.extra_data['stop_propagation']:
return return
## Propagate via Input Sockets ## Propagate via Input Sockets
@ -669,8 +647,6 @@ class MaxwellSimNode(bpy.types.Node):
def init(self, context: bpy.types.Context): def init(self, context: bpy.types.Context):
"""Run (by Blender) on node creation.""" """Run (by Blender) on node creation."""
global CACHE
# Initialize Cache and Instance ID # Initialize Cache and Instance ID
self.instance_id = str(uuid.uuid4()) self.instance_id = str(uuid.uuid4())
CACHE[self.instance_id] = {} CACHE[self.instance_id] = {}
@ -695,7 +671,6 @@ class MaxwellSimNode(bpy.types.Node):
def free(self) -> None: def free(self) -> None:
"""Run (by Blender) when deleting the node.""" """Run (by Blender) when deleting the node."""
global CACHE
if not CACHE.get(self.instance_id): if not CACHE.get(self.instance_id):
CACHE[self.instance_id] = {} CACHE[self.instance_id] = {}
node_tree = self.id_data node_tree = self.id_data
@ -725,306 +700,3 @@ class MaxwellSimNode(bpy.types.Node):
# Finally: Free Instance Cache # Finally: Free Instance Cache
if self.instance_id in CACHE: if self.instance_id in CACHE:
del CACHE[self.instance_id] del CACHE[self.instance_id]
def chain_event_decorator(
callback_type: typ.Literal[
'computes_output_socket',
'on_value_changed',
'on_show_preview',
'on_show_plot',
'on_init',
],
index_by: typ.Any | None = None,
extra_data: dict[str, typ.Any] | None = None,
kind: ct.DataFlowKind = ct.DataFlowKind.Value,
input_sockets: set[str] = set(), ## For now, presume
output_sockets: set[str] = set(), ## For now, presume
loose_input_sockets: bool = False,
loose_output_sockets: bool = False,
props: set[str] = set(),
managed_objs: set[str] = set(),
req_params: set[str] = set(),
):
def decorator(method: typ.Callable) -> typ.Callable:
# Check Function Signature Validity
func_sig = set(inspect.signature(method).parameters.keys())
## Too Little
if func_sig != req_params and func_sig.issubset(req_params):
msg = f'Decorated method {method.__name__} is missing arguments {req_params - func_sig}'
## Too Much
if func_sig != req_params and func_sig.issuperset(req_params):
msg = f'Decorated method {method.__name__} has superfluous arguments {func_sig - req_params}'
raise ValueError(msg)
## Just Right :)
# TODO: Check Function Annotation Validity
# - w/pydantic and/or socket capabilities
def decorated(node: MaxwellSimNode):
# Assemble Keyword Arguments
method_kw_args = {}
## Add Input Sockets
if input_sockets:
_input_sockets = {
input_socket_name: node._compute_input(
input_socket_name, kind
)
for input_socket_name in input_sockets
}
method_kw_args |= dict(input_sockets=_input_sockets)
## Add Output Sockets
if output_sockets:
_output_sockets = {
output_socket_name: node.compute_output(
output_socket_name, kind
)
for output_socket_name in output_sockets
}
method_kw_args |= dict(output_sockets=_output_sockets)
## Add Loose Sockets
if loose_input_sockets:
_loose_input_sockets = {
input_socket_name: node._compute_input(
input_socket_name, kind
)
for input_socket_name in node.loose_input_sockets
}
method_kw_args |= dict(
loose_input_sockets=_loose_input_sockets
)
if loose_output_sockets:
_loose_output_sockets = {
output_socket_name: node.compute_output(
output_socket_name, kind
)
for output_socket_name in node.loose_output_sockets
}
method_kw_args |= dict(
loose_output_sockets=_loose_output_sockets
)
## Add Props
if props:
_props = {
prop_name: getattr(node, prop_name) for prop_name in props
}
method_kw_args |= dict(props=_props)
## Add Managed Object
if managed_objs:
_managed_objs = {
managed_obj_name: node.managed_objs[managed_obj_name]
for managed_obj_name in managed_objs
}
method_kw_args |= dict(managed_objs=_managed_objs)
# Call Method
return method(
node,
**method_kw_args,
)
# Set Attributes for Discovery
decorated._callback_type = callback_type
if index_by:
decorated._index_by = index_by
if extra_data:
decorated._extra_data = extra_data
return decorated
return decorator
####################
# - Decorator: Output Socket
####################
def computes_output_socket(
output_socket_name: ct.SocketName,
kind: ct.DataFlowKind = ct.DataFlowKind.Value,
input_sockets: set[str] = set(),
props: set[str] = set(),
managed_objs: set[str] = set(),
cacheable: bool = True,
):
"""Given a socket name, defines a function-that-makes-a-function (aka.
decorator) which has the name of the socket attached.
Must be used as a decorator, ex. `@compute_output_socket("name")`.
Args:
output_socket_name: The name of the output socket to attach the
decorated method to.
input_sockets: The values of these input sockets will be computed
using `_compute_input`, then passed to the decorated function
as `input_sockets: list[Any]`. If the input socket doesn't exist (ex. is contained in an inactive loose socket or socket set), then None is returned.
managed_objs: These managed objects will be passed to the
function as `managed_objs: list[Any]`.
kind: Requests for this `output_socket_name, DataFlowKind` pair will
be returned by the decorated function.
cacheable: The output of th
be returned by the decorated function.
Returns:
The decorator, which takes the output-socket-computing method
and returns a new output-socket-computing method, now annotated
and discoverable by the `MaxwellSimTreeNode`.
"""
req_params = (
{'self'}
| ({'input_sockets'} if input_sockets else set())
| ({'props'} if props else set())
| ({'managed_objs'} if managed_objs else set())
)
return chain_event_decorator(
callback_type='computes_output_socket',
index_by=(output_socket_name, kind),
kind=kind,
input_sockets=input_sockets,
props=props,
managed_objs=managed_objs,
req_params=req_params,
)
####################
# - Decorator: On Show Preview
####################
def on_value_changed(
socket_name: set[ct.SocketName] | ct.SocketName | None = None,
prop_name: set[str] | str | None = None,
any_loose_input_socket: bool = False,
kind: ct.DataFlowKind = ct.DataFlowKind.Value,
input_sockets: set[str] = set(),
props: set[str] = set(),
managed_objs: set[str] = set(),
):
if (
sum(
[
int(socket_name is not None),
int(prop_name is not None),
int(any_loose_input_socket),
]
)
> 1
):
msg = 'Define only one of socket_name, prop_name or any_loose_input_socket'
raise ValueError(msg)
req_params = (
{'self'}
| ({'input_sockets'} if input_sockets else set())
| ({'loose_input_sockets'} if any_loose_input_socket else set())
| ({'props'} if props else set())
| ({'managed_objs'} if managed_objs else set())
)
return chain_event_decorator(
callback_type='on_value_changed',
extra_data={
'changed_sockets': (
socket_name if isinstance(socket_name, set) else {socket_name}
),
'changed_props': (
prop_name if isinstance(prop_name, set) else {prop_name}
),
'changed_loose_input': any_loose_input_socket,
},
kind=kind,
input_sockets=input_sockets,
loose_input_sockets=any_loose_input_socket,
props=props,
managed_objs=managed_objs,
req_params=req_params,
)
def on_show_preview(
kind: ct.DataFlowKind = ct.DataFlowKind.Value,
input_sockets: set[str] = set(), ## For now, presume only same kind
output_sockets: set[str] = set(), ## For now, presume only same kind
props: set[str] = set(),
managed_objs: set[str] = set(),
):
req_params = (
{'self'}
| ({'input_sockets'} if input_sockets else set())
| ({'output_sockets'} if output_sockets else set())
| ({'props'} if props else set())
| ({'managed_objs'} if managed_objs else set())
)
return chain_event_decorator(
callback_type='on_show_preview',
kind=kind,
input_sockets=input_sockets,
output_sockets=output_sockets,
props=props,
managed_objs=managed_objs,
req_params=req_params,
)
def on_show_plot(
kind: ct.DataFlowKind = ct.DataFlowKind.Value,
input_sockets: set[str] = set(),
output_sockets: set[str] = set(),
props: set[str] = set(),
managed_objs: set[str] = set(),
stop_propagation: bool = False,
):
req_params = (
{'self'}
| ({'input_sockets'} if input_sockets else set())
| ({'output_sockets'} if output_sockets else set())
| ({'props'} if props else set())
| ({'managed_objs'} if managed_objs else set())
)
return chain_event_decorator(
callback_type='on_show_plot',
extra_data={
'stop_propagation': stop_propagation,
},
kind=kind,
input_sockets=input_sockets,
output_sockets=output_sockets,
props=props,
managed_objs=managed_objs,
req_params=req_params,
)
def on_init(
kind: ct.DataFlowKind = ct.DataFlowKind.Value,
input_sockets: set[str] = set(),
output_sockets: set[str] = set(),
props: set[str] = set(),
managed_objs: set[str] = set(),
):
req_params = (
{'self'}
| ({'input_sockets'} if input_sockets else set())
| ({'output_sockets'} if output_sockets else set())
| ({'props'} if props else set())
| ({'managed_objs'} if managed_objs else set())
)
return chain_event_decorator(
callback_type='on_init',
kind=kind,
input_sockets=input_sockets,
output_sockets=output_sockets,
props=props,
managed_objs=managed_objs,
req_params=req_params,
)

View File

@ -0,0 +1,300 @@
import enum
import inspect
import typing as typ
from types import MappingProxyType
from ....utils import sympy_extra_units as spux
from .. import contracts as ct
from .base import MaxwellSimNode
UnitSystemID = str
UnitSystem = dict[ct.SocketType, typ.Any]
class EventCallbackType(enum.StrEnum):
"""Names of actions that support callbacks."""
computes_output_socket = enum.auto()
on_value_changed = enum.auto()
on_show_plot = enum.auto()
on_init = enum.auto()
####################
# - Event Callback Information
####################
class EventCallbackData_ComputesOutputSocket(typ.TypedDict): # noqa: N801
"""Extra data used to select a method to compute output sockets."""
output_socket_name: ct.SocketName
kind: ct.DataFlowKind
class EventCallbackData_OnValueChanged(typ.TypedDict): # noqa: N801
"""Extra data used to select a method to compute output sockets."""
changed_sockets: set[ct.SocketName]
changed_props: set[str]
changed_loose_input: set[str]
class EventCallbackData_OnShowPlot(typ.TypedDict): # noqa: N801
"""Extra data in the callback, used when showing a plot."""
stop_propagation: bool
class EventCallbackData_OnInit(typ.TypedDict): # noqa: D101, N801
pass
EventCallbackData: typ.TypeAlias = (
EventCallbackData_ComputesOutputSocket
| EventCallbackData_OnValueChanged
| EventCallbackData_OnShowPlot
| EventCallbackData_OnInit
)
####################
# - Event Decorator
####################
ManagedObjName: typ.TypeAlias = str
PropName: typ.TypeAlias = str
def event_decorator(
action_type: EventCallbackType,
extra_data: EventCallbackData,
kind: ct.DataFlowKind = ct.DataFlowKind.Value,
props: set[PropName] = frozenset(),
managed_objs: set[ManagedObjName] = frozenset(),
input_sockets: set[ct.SocketName] = frozenset(),
output_sockets: set[ct.SocketName] = frozenset(),
all_loose_input_sockets: bool = False,
all_loose_output_sockets: bool = False,
unit_systems: dict[UnitSystemID, UnitSystem] = MappingProxyType({}),
scale_input_sockets: dict[ct.SocketName, UnitSystemID] = MappingProxyType({}),
scale_output_sockets: dict[ct.SocketName, UnitSystemID] = MappingProxyType({}),
):
"""Returns a decorator for a method of `MaxwellSimNode`, declaring it as able respond to events passing through a node.
Parameters:
action_type: A name describing which event the decorator should respond to.
Set to `return_method.action_type`
extra_data: A dictionary that provides the caller with additional per-`action_type` information.
This might include parameters to help select the most appropriate method(s) to respond to an event with, or actions to take after running the callback.
kind: The `ct.DataFlowKind` used to compute all input and output socket data for methods with.
Only affects data passed to the decorated method; namely `input_sockets`, `output_sockets`, and their loose variants.
props: Set of `props` to compute, then pass to the decorated method.
managed_objs: Set of `managed_objs` to retrieve, then pass to the decorated method.
input_sockets: Set of `input_sockets` to compute, then pass to the decorated method.
output_sockets: Set of `output_sockets` to compute, then pass to the decorated method.
all_loose_input_sockets: Whether to compute all loose input sockets and pass them to the decorated method.
Used when the names of the loose input sockets are unknown, but all of their values are needed.
all_loose_output_sockets: Whether to compute all loose output sockets and pass them to the decorated method.
Used when the names of the loose output sockets are unknown, but all of their values are needed.
Returns:
A decorator, which can be applied to a method of `MaxwellSimNode`.
When a `MaxwellSimNode` subclass initializes, such a decorated method will be picked up on.
When the `action_type` action passes through the node, then `extra_data` is used to determine
"""
req_params = (
{'self'}
| ({'props'} if props else set())
| ({'managed_objs'} if managed_objs else set())
| ({'input_sockets'} if input_sockets else set())
| ({'output_sockets'} if output_sockets else set())
| ({'loose_input_sockets'} if all_loose_input_sockets else set())
| ({'loose_output_sockets'} if all_loose_output_sockets else set())
| ({'unit_systems'} if unit_systems else set())
)
# TODO: Check that all Unit System IDs referenced are also defined in 'unit_systems'.
## TODO: More ex. introspective checks and such, to make it really hard to write invalid methods.
def decorator(method: typ.Callable) -> typ.Callable:
# Check Function Signature Validity
func_sig = set(inspect.signature(method).parameters.keys())
## Too Few Arguments
if func_sig != req_params and func_sig.issubset(req_params):
msg = f'Decorated method {method.__name__} is missing arguments {req_params - func_sig}'
## Too Many Arguments
if func_sig != req_params and func_sig.issuperset(req_params):
msg = f'Decorated method {method.__name__} has superfluous arguments {func_sig - req_params}'
raise ValueError(msg)
# TODO: Check Function Annotation Validity
## - socket capabilities
def decorated(node: MaxwellSimNode):
method_kw_args = {} ## Keyword Arguments for Decorated Method
# Compute Requested Props
if props:
_props = {prop_name: getattr(node, prop_name) for prop_name in props}
method_kw_args |= {'props': _props}
# Retrieve Requested Managed Objects
if managed_objs:
_managed_objs = {
managed_obj_name: node.managed_objs[managed_obj_name]
for managed_obj_name in managed_objs
}
method_kw_args |= {'managed_objs': _managed_objs}
# Requested Sockets
## Compute Requested Input Sockets
if input_sockets:
_input_sockets = {
input_socket_name: node._compute_input(input_socket_name, kind)
for input_socket_name in input_sockets
}
# Scale Specified Input Sockets to Unit System
## First, scale the input socket value to the given unit system
## Then, convert the symbol-less sympy scalar to a python type.
for input_socket_name, unit_system_id in scale_input_sockets.items():
unit_system = unit_systems[unit_system_id]
_input_sockets[input_socket_name] = spux.sympy_to_python(
spux.scale_to_unit(
_input_sockets[input_socket_name],
unit_system[node.inputs[input_socket_name].socket_type],
)
)
method_kw_args |= {'input_sockets': _input_sockets}
## Compute Requested Output Sockets
if output_sockets:
_output_sockets = {
output_socket_name: node.compute_output(output_socket_name, kind)
for output_socket_name in output_sockets
}
# Scale Specified Output Sockets to Unit System
## First, scale the output socket value to the given unit system
## Then, convert the symbol-less sympy scalar to a python type.
for output_socket_name, unit_system_id in scale_output_sockets.items():
unit_system = unit_systems[unit_system_id]
_output_sockets[output_socket_name] = spux.sympy_to_python(
spux.scale_to_unit(
_output_sockets[output_socket_name],
unit_system[node.outputs[output_socket_name].socket_type],
)
)
method_kw_args |= {'output_sockets': _output_sockets}
# Loose Sockets
## Compute All Loose Input Sockets
if all_loose_input_sockets:
_loose_input_sockets = {
input_socket_name: node._compute_input(input_socket_name, kind)
for input_socket_name in node.loose_input_sockets
}
method_kw_args |= {'loose_input_sockets': _loose_input_sockets}
## Compute All Loose Output Sockets
if all_loose_output_sockets:
_loose_output_sockets = {
output_socket_name: node.compute_output(output_socket_name, kind)
for output_socket_name in node.loose_output_sockets
}
method_kw_args |= {'loose_output_sockets': _loose_output_sockets}
# Call Method
return method(
node,
**method_kw_args,
)
# Set Decorated Attributes and Return
## Fix Introspection + Documentation
decorated.__name__ = method.__name__
decorated.__module__ = method.__module__
decorated.__qualname__ = method.__qualname__
decorated.__doc__ = method.__doc__
## Add Spice
decorated.action_type = action_type
decorated.extra_data = extra_data
return decorated
return decorator
####################
# - Simplified Event Callbacks
####################
def computes_output_socket(
output_socket_name: ct.SocketName,
kind: ct.DataFlowKind = ct.DataFlowKind.Value,
**kwargs,
):
return event_decorator(
action_type='computes_output_socket',
extra_data={
'output_socket_name': output_socket_name,
'kind': kind,
},
**kwargs,
)
## TODO: Consider changing socket_name and prop_name to more obvious names.
def on_value_changed(
socket_name: set[ct.SocketName] | ct.SocketName | None = None,
prop_name: set[str] | str | None = None,
any_loose_input_socket: bool = False,
**kwargs,
):
if (
sum(
[
int(socket_name is not None),
int(prop_name is not None),
int(any_loose_input_socket),
]
)
> 1
):
msg = 'Define only one of socket_name, prop_name or any_loose_input_socket'
raise ValueError(msg)
return event_decorator(
action_type=EventCallbackType.on_value_changed,
extra_data={
'changed_sockets': (
socket_name if isinstance(socket_name, set) else {socket_name}
),
'changed_props': (prop_name if isinstance(prop_name, set) else {prop_name}),
'changed_loose_input': any_loose_input_socket,
},
**kwargs,
)
def on_show_plot(
stop_propagation: bool = False,
**kwargs,
):
return event_decorator(
action_type=EventCallbackType.on_show_plot,
extra_data={
'stop_propagation': stop_propagation,
},
**kwargs,
)
def on_init(**kwargs):
return event_decorator(
action_type=EventCallbackType.on_init,
extra_data={},
**kwargs,
)

View File

@ -2,11 +2,13 @@ import typing as typ
import tidy3d as td import tidy3d as td
from .....utils import analyze_geonodes from .....utils import analyze_geonodes, logger
from ... import bl_socket_map, managed_objs, sockets from ... import bl_socket_map, managed_objs, sockets
from ... import contracts as ct from ... import contracts as ct
from .. import base from .. import base
log = logger.get(__name__)
class GeoNodesStructureNode(base.MaxwellSimNode): class GeoNodesStructureNode(base.MaxwellSimNode):
node_type = ct.NodeType.GeoNodesStructure node_type = ct.NodeType.GeoNodesStructure
@ -17,8 +19,8 @@ class GeoNodesStructureNode(base.MaxwellSimNode):
# - Sockets # - Sockets
#################### ####################
input_sockets: typ.ClassVar = { input_sockets: typ.ClassVar = {
'Unit System': sockets.PhysicalUnitSystemSocketDef(),
'Medium': sockets.MaxwellMediumSocketDef(), 'Medium': sockets.MaxwellMediumSocketDef(),
'Center': sockets.PhysicalPoint3DSocketDef(),
'GeoNodes': sockets.BlenderGeoNodesSocketDef(), 'GeoNodes': sockets.BlenderGeoNodesSocketDef(),
} }
output_sockets: typ.ClassVar = { output_sockets: typ.ClassVar = {
@ -26,36 +28,38 @@ class GeoNodesStructureNode(base.MaxwellSimNode):
} }
managed_obj_defs: typ.ClassVar = { managed_obj_defs: typ.ClassVar = {
'geometry': ct.schemas.ManagedObjDef( 'mesh': ct.schemas.ManagedObjDef(
mk=lambda name: managed_objs.ManagedBLObject(name), mk=lambda name: managed_objs.ManagedBLMesh(name),
name_prefix='', ),
) 'modifier': ct.schemas.ManagedObjDef(
mk=lambda name: managed_objs.ManagedBLModifier(name),
),
} }
#################### ####################
# - Output Socket Computation # - Event Methods
#################### ####################
@base.computes_output_socket( @base.computes_output_socket(
'Structure', 'Structure',
input_sockets={'Medium'}, input_sockets={'Medium'},
managed_objs={'geometry'}, managed_objs={'geometry'},
) )
def compute_structure( def compute_output(
self, self,
input_sockets: dict[str, typ.Any], input_sockets: dict[str, typ.Any],
managed_objs: dict[str, ct.schemas.ManagedObj], managed_objs: dict[str, ct.schemas.ManagedObj],
) -> td.Structure: ) -> td.Structure:
# Extract the Managed Blender Object # Simulate Input Value Change
mobj = managed_objs['geometry'] ## This ensures that the mesh has been re-computed.
self.on_input_changed()
# Extract Geometry as Arrays ## TODO: mesh_as_arrays might not take the Center into account.
geometry_as_arrays = mobj.mesh_as_arrays ## - Alternatively, Tidy3D might have a way to transform?
mesh_as_arrays = managed_objs['mesh'].mesh_as_arrays
# Return TriMesh Structure
return td.Structure( return td.Structure(
geometry=td.TriangleMesh.from_vertices_faces( geometry=td.TriangleMesh.from_vertices_faces(
geometry_as_arrays['verts'], mesh_as_arrays['verts'],
geometry_as_arrays['faces'], mesh_as_arrays['faces'],
), ),
medium=input_sockets['Medium'], medium=input_sockets['Medium'],
) )
@ -65,104 +69,87 @@ class GeoNodesStructureNode(base.MaxwellSimNode):
#################### ####################
@base.on_value_changed( @base.on_value_changed(
socket_name='GeoNodes', socket_name='GeoNodes',
managed_objs={'geometry'}, prop_name='preview_active',
any_loose_input_socket=True,
# Method Data
managed_objs={'mesh', 'modifier'},
input_sockets={'GeoNodes'}, input_sockets={'GeoNodes'},
# Unit System Scaling
unit_systems={'BlenderUnits': ct.UNITS_BLENDER},
) )
def on_value_changed__geonodes( def on_input_changed(
self, self,
props: dict,
managed_objs: dict[str, ct.schemas.ManagedObj], managed_objs: dict[str, ct.schemas.ManagedObj],
input_sockets: dict[str, typ.Any], input_sockets: dict,
loose_input_sockets: dict,
unit_systems: dict,
) -> None: ) -> None:
"""Called whenever the GeoNodes socket is changed. # No GeoNodes: Remove Modifier (if any)
if (geonodes := input_sockets['GeoNodes']) is None:
if (
managed_objs['modifier'].name
in managed_objs['mesh'].bl_object().modifiers
):
log.info(
'Removing Modifier "%s" from BLObject "%s"',
managed_objs['modifier'].name,
managed_objs['mesh'].name,
)
managed_objs['mesh'].bl_object().modifiers.remove(
managed_objs['modifier'].name
)
Refreshes the Loose Input Sockets, which map directly to the GeoNodes tree input sockets. # Reset Loose Input Sockets
"""
if not (geo_nodes := input_sockets['GeoNodes']):
managed_objs['geometry'].free()
self.loose_input_sockets = {} self.loose_input_sockets = {}
return return
# Analyze GeoNodes # No Loose Input Sockets: Create from GeoNodes Interface
## Extract Valid Inputs (via GeoNodes Tree "Interface") ## TODO: Other reasons to trigger re-filling loose_input_sockets.
geonodes_interface = analyze_geonodes.interface(geo_nodes, direc='INPUT') if not loose_input_sockets:
# Retrieve the GeoNodes Interface
geonodes_interface = analyze_geonodes.interface(
input_sockets['GeoNodes'], direc='INPUT'
)
# Set Loose Input Sockets # Fill the Loose Input Sockets
## Retrieve the appropriate SocketDef for the Blender Interface Socket log.info(
'Initializing GeoNodes Structure Node "%s" from GeoNodes Group "%s"',
self.bl_label,
str(geonodes),
)
self.loose_input_sockets = { self.loose_input_sockets = {
socket_name: bl_socket_map.socket_def_from_bl_interface_socket( socket_name: bl_socket_map.socket_def_from_bl_socket(iface_socket)()
bl_interface_socket for socket_name, iface_socket in geonodes_interface.items()
)() ## === <SocketType>SocketDef(), but with dynamic SocketDef
for socket_name, bl_interface_socket in geonodes_interface.items()
} }
## Set Loose `socket.value` from Interface `default_value` # Set Loose Input Sockets to Interface (Default) Values
## Changing socket.value invokes recursion of this function.
## The else: below ensures that only one push occurs.
## (well, one push per .value set, which simplifies to one push)
log.debug(
'Setting Loose Input Sockets of "%s" to GeoNodes Defaults',
self.bl_label,
)
for socket_name in self.loose_input_sockets: for socket_name in self.loose_input_sockets:
socket = self.inputs[socket_name] socket = self.inputs[socket_name]
bl_interface_socket = geonodes_interface[socket_name] socket.value = bl_socket_map.read_bl_socket_default_value(
geonodes_interface[socket_name]
socket.value = bl_socket_map.value_from_bl(bl_interface_socket)
## Implicitly triggers the loose-input `on_value_changed` for each.
@base.on_value_changed(
any_loose_input_socket=True,
managed_objs={'geometry'},
input_sockets={'Unit System', 'GeoNodes'},
) )
def on_value_changed__loose_inputs( else:
self, # Push Loose Input Values to GeoNodes Modifier
managed_objs: dict[str, ct.schemas.ManagedObj], managed_objs['modifier'].bl_modifier(
input_sockets: dict[str, typ.Any], managed_objs['mesh'].bl_object(location=input_sockets['Center']),
loose_input_sockets: dict[str, typ.Any], 'NODES',
): {
"""Called whenever a Loose Input Socket is altered. 'node_group': input_sockets['GeoNodes'],
'unit_system': unit_systems['BlenderUnits'],
Synchronizes the change to the actual GeoNodes modifier, so that the change is immediately visible. 'inputs': loose_input_sockets,
"""
# Retrieve Data
unit_system = input_sockets['Unit System']
mobj = managed_objs['geometry']
if not (geo_nodes := input_sockets['GeoNodes']):
return
# Analyze GeoNodes Interface (input direction)
## This retrieves NodeTreeSocketInterface elements
geonodes_interface = analyze_geonodes.interface(geo_nodes, direc='INPUT')
## TODO: Check that Loose Sockets matches the Interface
## - If the user deletes an interface socket, bad things will happen.
## - We will try to set an identifier that doesn't exist!
## - Instead, this should update the loose input sockets.
## Push Values to the GeoNodes Modifier
mobj.sync_geonodes_modifier(
geonodes_node_group=geo_nodes,
geonodes_identifier_to_value={
bl_interface_socket.identifier: bl_socket_map.value_to_bl(
bl_interface_socket,
loose_input_sockets[socket_name],
unit_system,
)
for socket_name, bl_interface_socket in (geonodes_interface.items())
}, },
) )
# Push Preview State
#################### if props['preview_active']:
# - Event Methods managed_objs['mesh'].show_preview()
####################
@base.on_show_preview(
managed_objs={'geometry'},
)
def on_show_preview(
self,
managed_objs: dict[str, ct.schemas.ManagedObj],
):
"""Called whenever a Loose Input Socket is altered.
Synchronizes the change to the actual GeoNodes modifier, so that the change is immediately visible.
"""
managed_objs['geometry'].show_preview('MESH')
#################### ####################

View File

@ -1,13 +1,11 @@
import typing as typ import typing as typ
import bpy
import sympy as sp import sympy as sp
import sympy.physics.units as spu import sympy.physics.units as spu
import tidy3d as td import tidy3d as td
from ......utils import analyze_geonodes
from .... import contracts as ct
from .....assets.import_geonodes import import_geonodes from .....assets.import_geonodes import import_geonodes
from .... import contracts as ct
from .... import managed_objs, sockets from .... import managed_objs, sockets
from ... import base from ... import base
@ -22,7 +20,7 @@ class BoxStructureNode(base.MaxwellSimNode):
#################### ####################
# - Sockets # - Sockets
#################### ####################
input_sockets = { input_sockets: typ.ClassVar = {
'Medium': sockets.MaxwellMediumSocketDef(), 'Medium': sockets.MaxwellMediumSocketDef(),
'Center': sockets.PhysicalPoint3DSocketDef(), 'Center': sockets.PhysicalPoint3DSocketDef(),
'Size': sockets.PhysicalSize3DSocketDef( 'Size': sockets.PhysicalSize3DSocketDef(
@ -36,78 +34,71 @@ class BoxStructureNode(base.MaxwellSimNode):
managed_obj_defs: typ.ClassVar = { managed_obj_defs: typ.ClassVar = {
'mesh': ct.schemas.ManagedObjDef( 'mesh': ct.schemas.ManagedObjDef(
mk=lambda name: managed_objs.ManagedBLMesh(name), mk=lambda name: managed_objs.ManagedBLMesh(name),
name_prefix='',
), ),
'box': ct.schemas.ManagedObjDef( 'modifier': ct.schemas.ManagedObjDef(
mk=lambda name: managed_objs.ManagedBLModifier(name), mk=lambda name: managed_objs.ManagedBLModifier(name),
name_prefix='',
), ),
} }
#################### ####################
# - Output Socket Computation # - Event Methods
#################### ####################
@base.computes_output_socket( @base.computes_output_socket(
'Structure', 'Structure',
input_sockets={'Medium', 'Center', 'Size'}, input_sockets={'Medium', 'Center', 'Size'},
unit_systems={'Tidy3DUnits': ct.UNITS_TIDY3D},
scale_input_sockets={
'Center': 'Tidy3DUnits',
'Size': 'Tidy3DUnits',
},
) )
def compute_structure(self, input_sockets: dict) -> td.Box: def compute_output(self, input_sockets: dict, unit_systems: dict) -> td.Box:
medium = input_sockets['Medium']
center = as_unit_system(input_sockets['Center'], 'tidy3d')
size = as_unit_system(input_sockets['Size'], 'tidy3d')
#_center = input_sockets['Center']
#_size = input_sockets['Size']
#center = tuple(spu.convert_to(_center, spu.um) / spu.um)
#size = tuple(spu.convert_to(_size, spu.um) / spu.um)
return td.Structure( return td.Structure(
geometry=td.Box( geometry=td.Box(
center=center, center=input_sockets['Center'],
size=size, size=input_sockets['Size'],
), ),
medium=medium, medium=input_sockets['Medium'],
) )
####################
# - Events
####################
@base.on_value_changed( @base.on_value_changed(
socket_name={'Center', 'Size'}, socket_name={'Center', 'Size'},
prop_name='preview_active',
# Method Data
input_sockets={'Center', 'Size'}, input_sockets={'Center', 'Size'},
managed_objs={'mesh', 'box'}, managed_objs={'mesh', 'modifier'},
) # Unit System Scaling
def on_value_changed__center_size( unit_systems={'BlenderUnits': ct.UNITS_BLENDER},
self, scale_input_sockets={
input_sockets: dict, 'Center': 'BlenderUnits',
managed_objs: dict[str, ct.schemas.ManagedObj],
):
center = as_unit_system(input_sockets['Center'], 'blender')
#center = tuple([float(el) for el in spu.convert_to(_center, spu.um) / spu.um])
## TODO: Implement + aggressively memoize as_unit_system
## - This should also understand that ex. Blender likes tuples, Tidy3D might like something else.
size = as_unit_system(input_sockets['Size'], 'blender')
#size = tuple([float(el) for el in spu.convert_to(_size, spu.um) / spu.um])
# Sync Attributes
managed_objs['mesh'].bl_object().location = center
managed_objs['box'].bl_modifier(managed_objs['mesh'].bl_object(), 'NODES', {
'node_group': import_geonodes(GEONODES_BOX, 'link'),
'inputs': {
'Size': size,
}, },
})
@base.on_show_preview(
managed_objs={'mesh'},
) )
def on_show_preview( def on_input_changed(
self, self,
props: dict,
managed_objs: dict[str, ct.schemas.ManagedObj], managed_objs: dict[str, ct.schemas.ManagedObj],
input_sockets: dict,
unit_systems: dict,
): ):
# Push Input Values to GeoNodes Modifier
managed_objs['modifier'].bl_modifier(
managed_objs['mesh'].bl_object(location=input_sockets['Center']),
'NODES',
{
'node_group': import_geonodes(GEONODES_BOX, 'link'),
'unit_system': unit_systems['BlenderUnits'],
'inputs': {
'Size': input_sockets['Size'],
},
},
)
# Push Preview State
if props['preview_active']:
managed_objs['mesh'].show_preview() managed_objs['mesh'].show_preview()
self.on_value_changed__center_size()
@base.on_init()
def on_init(self):
self.on_input_change()
#################### ####################

View File

@ -4,7 +4,6 @@ import functools
import bpy import bpy
import pydantic as pyd
import sympy as sp import sympy as sp
import sympy.physics.units as spu import sympy.physics.units as spu
from .. import contracts as ct from .. import contracts as ct
@ -303,9 +302,7 @@ class MaxwellSimSocket(bpy.types.NodeSocket):
return self._compute_data(kind) return self._compute_data(kind)
## Linked: Compute Output of Linked Sockets ## Linked: Compute Output of Linked Sockets
linked_values = [ linked_values = [link.from_socket.compute_data(kind) for link in self.links]
link.from_socket.compute_data(kind) for link in self.links
]
## Return Single Value / List of Values ## Return Single Value / List of Values
if len(linked_values) == 1: if len(linked_values) == 1:

View File

@ -263,8 +263,7 @@ class PhysicalUnitSystemBLSocket(base.MaxwellSimSocket):
@property @property
def value(self) -> dict[ST, SympyExpr]: def value(self) -> dict[ST, SympyExpr]:
return { return {
socket_type: SU(socket_type)[socket_unit_prop] socket_type: SU(socket_type)[socket_unit_prop] for socket_type, socket_unit_prop in [
for socket_type, socket_unit_prop in [
(ST.PhysicalTime, self.unit_time), (ST.PhysicalTime, self.unit_time),
(ST.PhysicalAngle, self.unit_angle), (ST.PhysicalAngle, self.unit_angle),
(ST.PhysicalLength, self.unit_length), (ST.PhysicalLength, self.unit_length),

View File

@ -1,3 +1,4 @@
import bpy
import typing_extensions as typx import typing_extensions as typx
INVALID_BL_SOCKET_TYPES = { INVALID_BL_SOCKET_TYPES = {
@ -6,10 +7,11 @@ INVALID_BL_SOCKET_TYPES = {
def interface( def interface(
geo_nodes, ## TODO: bpy type geonodes: bpy.types.GeometryNodeTree, ## TODO: bpy type
direc: typx.Literal['INPUT', 'OUTPUT'], direc: typx.Literal['INPUT', 'OUTPUT'],
): ):
"""Returns 'valid' GeoNodes interface sockets, meaning that: """Returns 'valid' GeoNodes interface sockets.
- The Blender socket type is not something invalid (ex. "Geometry"). - The Blender socket type is not something invalid (ex. "Geometry").
- The socket has a default value. - The socket has a default value.
- The socket's direction (input/output) matches the requested direction. - The socket's direction (input/output) matches the requested direction.
@ -17,13 +19,11 @@ def interface(
return { return {
interface_item_name: bl_interface_socket interface_item_name: bl_interface_socket
for interface_item_name, bl_interface_socket in ( for interface_item_name, bl_interface_socket in (
geo_nodes.interface.items_tree.items() geonodes.interface.items_tree.items()
) )
if all( if (
[ bl_interface_socket.socket_type not in INVALID_BL_SOCKET_TYPES
bl_interface_socket.socket_type not in INVALID_BL_SOCKET_TYPES, and hasattr(bl_interface_socket, 'default_value')
hasattr(bl_interface_socket, 'default_value'), and bl_interface_socket.in_out == direc
bl_interface_socket.in_out == direc,
]
) )
} }

View File

@ -1,4 +1,6 @@
import functools import functools
import itertools
import typing as typ
from . import pydeps from . import pydeps
@ -10,9 +12,10 @@ with pydeps.syspath_from_bpy_prefs():
#################### ####################
# - Useful Methods # - Useful Methods
#################### ####################
@functools.lru_cache(maxsize=4096)
def uses_units(expression: sp.Expr) -> bool: def uses_units(expression: sp.Expr) -> bool:
## TODO: An LFU cache could do better than an LRU.
"""Checks if an expression uses any units (`Quantity`).""" """Checks if an expression uses any units (`Quantity`)."""
for arg in sp.preorder_traversal(expression): for arg in sp.preorder_traversal(expression):
if isinstance(arg, spu.Quantity): if isinstance(arg, spu.Quantity):
return True return True
@ -20,9 +23,10 @@ def uses_units(expression: sp.Expr) -> bool:
# Function to return a set containing all units used in the expression # Function to return a set containing all units used in the expression
@functools.lru_cache(maxsize=4096)
def get_units(expression: sp.Expr): def get_units(expression: sp.Expr):
## TODO: An LFU cache could do better than an LRU.
"""Gets all the units of an expression (as `Quantity`).""" """Gets all the units of an expression (as `Quantity`)."""
return { return {
arg arg
for arg in sp.preorder_traversal(expression) for arg in sp.preorder_traversal(expression)
@ -79,29 +83,59 @@ ALL_UNIT_SYMBOLS = {
unit.abbrev: unit unit.abbrev: unit
for unit in spu.__dict__.values() for unit in spu.__dict__.values()
if isinstance(unit, spu.Quantity) if isinstance(unit, spu.Quantity)
} | { } | {unit.abbrev: unit for unit in globals().values() if isinstance(unit, spu.Quantity)}
unit.abbrev: unit
for unit in globals().values()
if isinstance(unit, spu.Quantity)
}
@functools.lru_cache(maxsize=1024) @functools.lru_cache(maxsize=4096)
def parse_abbrev_symbols_to_units(expr: sp.Basic) -> sp.Basic: def parse_abbrev_symbols_to_units(expr: sp.Basic) -> sp.Basic:
print('IN ABBREV', expr)
return expr.subs(ALL_UNIT_SYMBOLS) return expr.subs(ALL_UNIT_SYMBOLS)
# def has_units(expr: sp.Expr): ####################
# return any( # - Units <-> Scalars
# symbol in ALL_UNIT_SYMBOLS ####################
# for symbol in expr.atoms(sp.Symbol) @functools.lru_cache(maxsize=8192)
# ) def scale_to_unit(expr: sp.Expr, unit: sp.Quantity) -> typ.Any:
# def is_exactly_expressed_as_unit(expr: sp.Expr, unit) -> bool: ## TODO: An LFU cache could do better than an LRU.
# #try: unitless_expr = spu.convert_to(expr, unit) / unit
# converted_expr = expr / unit if not uses_units(unitless_expr):
# return unitless_expr
# return (
# converted_expr.is_number msg = f'Expression "{expr}" was scaled to the unit "{unit}" with the expectation that the result would be unitless, but the result "{unitless_expr}" has units "{get_units(unitless_expr)}"'
# and not converted_expr.has(spu.Quantity) raise ValueError(msg)
# )
####################
# - Sympy <-> Scalars
####################
@functools.lru_cache(maxsize=8192)
def sympy_to_python(scalar: sp.Basic) -> int | float | complex | tuple | list:
"""Convert a scalar sympy expression to the directly corresponding Python type.
Arguments:
scalar: A sympy expression that has no symbols, but is expressed as a Sympy type.
For expressions that are equivalent to a scalar (ex. "(2a + a)/a"), you must simplify the expression with ex. `sp.simplify()` before passing to this parameter.
Returns:
A pure Python type that directly corresponds to the input scalar expression.
"""
## TODO: If there are symbols, we could simplify.
## - Someone has to do it somewhere, might as well be here.
## - ...Since we have all the information we need.
if isinstance(scalar, sp.MatrixBase):
list_2d = [[sympy_to_python(el) for el in row] for row in scalar.tolist()]
# Detect Row / Column Vector
## When it's "actually" a 1D structure, flatten and return as tuple.
if 1 in scalar.shape:
return tuple(itertools.from_iterable(list_2d))
return list_2d
if scalar.is_integer:
return int(scalar)
if scalar.is_rational or scalar.is_real:
return float(scalar)
if scalar.is_complex:
return complex(scalar)
msg = f'Cannot convert sympy scalar expression "{scalar}" to a Python type. Check the assumptions on the expr (current expr assumptions: "{scalar._assumptions}")' # noqa: SLF001
raise ValueError(msg)