fix: Broken GN unit evaluation

main
Sofus Albert Høgsbro Rose 2024-04-03 10:13:10 +02:00
parent a282d1e7ef
commit c2db40ca6d
Signed by: so-rose
GPG Key ID: AD901CB0F3701434
25 changed files with 536 additions and 547 deletions

Binary file not shown.

View File

@ -59,7 +59,6 @@ def import_geonodes(
- Retrieve the node group and return it.
"""
if geonodes in bpy.data.node_groups and not force_import:
log.info('Found Existing GeoNodes Tree (name=%s)', geonodes)
return bpy.data.node_groups[geonodes]
filename = geonodes

View File

@ -101,13 +101,19 @@ def _socket_type_from_bl_socket(
# Parse Description for Socket Type
## The "2D" token is special; don't include it if it's there.
tokens = _tokens if (_tokens := description.split(' '))[0] != '2D' else _tokens[1:]
descr_params = description.split(ct.BL_SOCKET_DESCR_ANNOT_STRING)[0]
directive = (
_tokens[0] if (_tokens := descr_params.split(' '))[0] != '2D' else _tokens[1]
)
if directive == 'Preview':
return direct_socket_type ## TODO: Preview element handling
if (
socket_type := ct.BL_SOCKET_DESCR_TYPE_MAP.get(
(tokens[0], bl_socket_type, size)
(directive, bl_socket_type, size)
)
) is None:
msg = f'Socket description "{(tokens[0], bl_socket_type, size)}" doesn\'t map to a socket type + unit'
msg = f'Socket description "{(directive, bl_socket_type, size)}" doesn\'t map to a socket type + unit'
raise ValueError(msg)
return socket_type
@ -129,19 +135,19 @@ def socket_def_from_bl_socket(
) -> ct.schemas.SocketDef:
"""Computes an appropriate (no-arg) SocketDef from the given `bl_interface_socket`, by parsing it."""
return _socket_def_from_bl_socket(
bl_interface_socket.description, bl_interface_socket.socket_type
bl_interface_socket.description, bl_interface_socket.bl_socket_idname
)
####################
# - Extract Default Interface Socket Value
####################
@functools.lru_cache(maxsize=4096)
def _read_bl_socket_default_value(
description: str,
bl_socket_type: BLSocketType,
bl_socket_value: BLSocketValue,
unit_system: dict | None = None,
allow_unit_not_in_unit_system: bool = False,
) -> typ.Any:
# Parse the BL Socket Type and Value
## The 'lambda' delays construction until size is determined.
@ -157,21 +163,20 @@ def _read_bl_socket_default_value(
## Use the matching socket type to lookup the unit in the unit system.
if unit_system is not None:
if (unit := unit_system.get(socket_type)) is None:
if allow_unit_not_in_unit_system:
return parsed_socket_value
msg = f'Unit system does not provide a unit for {socket_type}'
raise RuntimeError(msg)
if unit not in (valid_units := ct.SOCKET_UNITS[socket_type]['values'].values()):
msg = f'Unit system provided a unit "{unit}" that is invalid for socket type "{socket_type}" (valid units: {valid_units})'
raise RuntimeError(msg)
return parsed_socket_value * unit
return parsed_socket_value
def read_bl_socket_default_value(
bl_interface_socket: bpy.types.NodeTreeInterfaceSocket,
unit_system: dict | None = None,
allow_unit_not_in_unit_system: bool = False,
) -> typ.Any:
"""Reads the `default_value` of a Blender socket, guaranteeing a well-formed value consistent with the passed unit system.
@ -185,33 +190,41 @@ def read_bl_socket_default_value(
"""
return _read_bl_socket_default_value(
bl_interface_socket.description,
bl_interface_socket.socket_type,
bl_interface_socket.bl_socket_idname,
bl_interface_socket.default_value,
unit_system,
unit_system=unit_system,
allow_unit_not_in_unit_system=allow_unit_not_in_unit_system,
)
@functools.lru_cache(maxsize=4096)
def _writable_bl_socket_value(
description: str,
bl_socket_type: BLSocketType,
value: typ.Any,
unit_system: dict | None = None,
allow_unit_not_in_unit_system: bool = False,
) -> typ.Any:
socket_type = _socket_type_from_bl_socket(description, bl_socket_type)
# Retrieve Unit-System Unit
if unit_system is not None:
if (unit := unit_system.get(socket_type)) is None:
msg = f'Unit system does not provide a unit for {socket_type}'
raise RuntimeError(msg)
_bl_socket_value = spux.scale_to_unit(value, unit)
if allow_unit_not_in_unit_system:
_bl_socket_value = value
else:
msg = f'Unit system does not provide a unit for {socket_type}'
raise RuntimeError(msg)
else:
_bl_socket_value = spux.scale_to_unit(value, unit)
else:
_bl_socket_value = value
# Compute Blender Socket Value
bl_socket_value = spux.sympy_to_python(_bl_socket_value)
if isinstance(_bl_socket_value, sp.Basic):
bl_socket_value = spux.sympy_to_python(_bl_socket_value)
else:
bl_socket_value = _bl_socket_value
if _size_from_bl_socket(description, bl_socket_type) == 2: # noqa: PLR2004
bl_socket_value = bl_socket_value[:2]
return bl_socket_value
@ -221,6 +234,7 @@ def writable_bl_socket_value(
bl_interface_socket: bpy.types.NodeTreeInterfaceSocket,
value: typ.Any,
unit_system: dict | None = None,
allow_unit_not_in_unit_system: bool = False,
) -> typ.Any:
"""Processes a value to be ready-to-write to a Blender socket.
@ -234,7 +248,8 @@ def writable_bl_socket_value(
"""
return _writable_bl_socket_value(
bl_interface_socket.description,
bl_interface_socket.bl_socket_type,
bl_interface_socket.bl_socket_idname,
value,
unit_system,
unit_system=unit_system,
allow_unit_not_in_unit_system=allow_unit_not_in_unit_system,
)

View File

@ -37,7 +37,8 @@ UNITS_BLENDER: UnitSystem = {
} ## TODO: Load (dynamically?) from addon preferences
UNITS_TIDY3D: UnitSystem = {
ST.PhysicalTime: spu.picosecond,
## https://docs.flexcompute.com/projects/tidy3d/en/latest/faq/docs/faq/What-are-the-units-used-in-the-simulation.html
ST.PhysicalTime: spu.second,
ST.PhysicalAngle: spu.radian,
ST.PhysicalLength: spu.micrometer,
ST.PhysicalArea: spu.micrometer**2,
@ -52,6 +53,6 @@ UNITS_TIDY3D: UnitSystem = {
ST.PhysicalForceScalar: spux.micronewton,
ST.PhysicalAccel3D: spu.um / spu.second**2,
ST.PhysicalForce3D: spux.micronewton,
ST.PhysicalFreq: spux.terahertz,
ST.PhysicalFreq: spu.hertz,
ST.PhysicalPol: spu.radian,
}

View File

@ -1,4 +1,4 @@
from .managed_bl_empty import ManagedBLEmpty
#from .managed_bl_empty import ManagedBLEmpty
from .managed_bl_image import ManagedBLImage
# from .managed_bl_collection import ManagedBLCollection
@ -9,7 +9,7 @@ from .managed_bl_mesh import ManagedBLMesh
from .managed_bl_modifier import ManagedBLModifier
__all__ = [
'ManagedBLEmpty',
#'ManagedBLEmpty',
'ManagedBLImage',
#'ManagedBLCollection',
#'ManagedBLObject',

View File

@ -1,3 +1,5 @@
import functools
import bpy
from ....utils import logger
@ -11,6 +13,7 @@ PREVIEW_COLLECTION_NAME = 'BLMaxwell Visible'
####################
# - Global Collection Handling
####################
@functools.cache
def collection(collection_name: str, view_layer_exclude: bool) -> bpy.types.Collection:
# Init the "Managed Collection"
# Ensure Collection exists (and is in the Scene collection)
@ -32,8 +35,8 @@ def collection(collection_name: str, view_layer_exclude: bool) -> bpy.types.Coll
def managed_collection() -> bpy.types.Collection:
return collection(MANAGED_COLLECTION_NAME, view_layer_exclude=False)
return collection(MANAGED_COLLECTION_NAME, view_layer_exclude=True)
def preview_collection() -> bpy.types.Collection:
return collection(PREVIEW_COLLECTION_NAME, view_layer_exclude=True)
return collection(PREVIEW_COLLECTION_NAME, view_layer_exclude=False)

View File

@ -31,7 +31,7 @@ class ManagedBLMesh(ct.schemas.ManagedObj):
'Changing BLMesh w/Name "%s" to Name "%s"', self._bl_object_name, value
)
if not bpy.data.objects.get(value):
if (bl_object := bpy.data.objects.get(value)) is None:
log.info(
'Desired BLMesh Name "%s" Not Taken',
value,
@ -42,7 +42,7 @@ class ManagedBLMesh(ct.schemas.ManagedObj):
'Set New BLMesh Name to "%s"',
value,
)
elif bl_object := bpy.data.objects.get(self._bl_object_name):
elif (bl_object := bpy.data.objects.get(self._bl_object_name)) is not None:
log.info(
'Changed BLMesh Name to "%s"',
value,
@ -97,28 +97,26 @@ class ManagedBLMesh(ct.schemas.ManagedObj):
If it's already included, do nothing.
"""
if (
bl_object := bpy.data.objects.get(self.name)
) is not None and bl_object.name not in preview_collection().objects:
log.info('Moving "%s" to Preview Collection', bl_object.name)
preview_collection().objects.link(bl_object)
msg = 'Managed BLMesh does not exist'
raise ValueError(msg)
if (bl_object := bpy.data.objects.get(self.name)) is not None:
if bl_object.name not in preview_collection().objects:
log.info('Moving "%s" to Preview Collection', bl_object.name)
preview_collection().objects.link(bl_object)
else:
msg = 'Managed BLMesh does not exist'
raise ValueError(msg)
def hide_preview(self) -> None:
"""Removes the managed Blender object from the preview collection.
If it's already removed, do nothing.
"""
if (
bl_object := bpy.data.objects.get(self.name)
) is not None and bl_object.name in preview_collection().objects:
log.info('Removing "%s" from Preview Collection', bl_object.name)
preview_collection.objects.unlink(bl_object)
msg = 'Managed BLMesh does not exist'
raise ValueError(msg)
if (bl_object := bpy.data.objects.get(self.name)) is not None:
if bl_object.name in preview_collection().objects:
log.info('Removing "%s" from Preview Collection', bl_object.name)
preview_collection().objects.unlink(bl_object)
else:
msg = 'Managed BLMesh does not exist'
raise ValueError(msg)
def bl_select(self) -> None:
"""Selects the managed Blender object, causing it to be ex. outlined in the 3D viewport."""
@ -138,7 +136,7 @@ class ManagedBLMesh(ct.schemas.ManagedObj):
if not (bl_object := bpy.data.objects.get(self.name)):
log.info(
'Creating BLMesh Object "%s"',
bl_object.name,
self.name,
)
bl_data = bpy.data.meshes.new(self.name)
bl_object = bpy.data.objects.new(self.name, bl_data)

View File

@ -53,6 +53,7 @@ def write_modifier_geonodes(
bl_modifier: bpy.types.Modifier,
modifier_attrs: ModifierAttrsNODES,
) -> bool:
modifier_altered = False
# Alter GeoNodes Group
if bl_modifier.node_group != modifier_attrs['node_group']:
log.info(
@ -66,7 +67,7 @@ def write_modifier_geonodes(
# Alter GeoNodes Modifier Inputs
## First we retrieve the interface items by-Socket Name
geonodes_interface = analyze_geonodes.interface(
bl_modifier.node_group, direct='INPUT'
bl_modifier.node_group, direc='INPUT'
)
for (
socket_name,
@ -74,11 +75,12 @@ def write_modifier_geonodes(
) in modifier_attrs['inputs'].items():
# Compute Writable BL Socket Value
## Analyzes the socket and unitsys to prep a ready-to-write value.
## Writte directly to the modifier dict.
## Write directly to the modifier dict.
bl_socket_value = bl_socket_map.writable_bl_socket_value(
geonodes_interface[socket_name],
value,
modifier_attrs['unit_system'],
unit_system=modifier_attrs['unit_system'],
allow_unit_not_in_unit_system=True,
)
# Compute Interface ID from Socket Name
@ -91,6 +93,7 @@ def write_modifier_geonodes(
for i, bl_socket_subvalue in enumerate(bl_socket_value):
if bl_modifier[iface_id][i] != bl_socket_subvalue:
bl_modifier[iface_id][i] = bl_socket_subvalue
modifier_altered = True
# IF int/float Mismatch: Assign Float-Cast of Integer
## Blender is strict; only floats can set float vals.
@ -105,6 +108,8 @@ def write_modifier_geonodes(
bl_modifier[iface_id] = bl_socket_value
modifier_altered = True
return modifier_altered
def write_modifier(
bl_modifier: bpy.types.Modifier,
@ -144,7 +149,7 @@ class ManagedBLModifier(ct.schemas.ManagedObj):
def name(self, value: str) -> None:
## TODO: Handle name conflict within same BLObject
log.info(
'Changing BLModifier w/Name "%s" to Name "%s"', self._bl_object_name, value
'Changing BLModifier w/Name "%s" to Name "%s"', self._modifier_name, value
)
self._modifier_name = value

View File

@ -2,7 +2,11 @@ import typing as typ
import bpy
from ...utils import logger
from . import contracts as ct
from .managed_objs.managed_bl_collection import preview_collection
log = logger.get(__name__)
####################
# - Cache Management
@ -73,6 +77,15 @@ class MaxwellSimTree(bpy.types.NodeTree):
for bl_socket in [*node.inputs, *node.outputs]:
bl_socket.locked = False
def unpreview_all(self):
log.info('Disabling All 3D Previews')
for node in self.nodes:
if node.preview_active:
node.preview_active = False
for bl_object in preview_collection().objects.values():
preview_collection().objects.unlink(bl_object)
####################
# - Init Methods
####################

View File

@ -87,8 +87,8 @@ class MaxwellSimNode(bpy.types.Node):
cls.__annotations__['preview_active'] = bpy.props.BoolProperty(
name='Preview Active',
description='Whether the preview (if any) is currently active',
default='',
update=lambda self, context: self.sync_prop('preview_active', context),
default=False,
update=lambda self, context: self.sync_preview_active(context),
)
# Setup Locked Property for Node
@ -211,6 +211,21 @@ class MaxwellSimNode(bpy.types.Node):
## - If altered, set the 'sim_node_name' to the altered name.
## - This will cause recursion, but only once.
def sync_preview_active(self, _: bpy.types.Context):
log.info(
'Changed Preview Active in "%s" to "%s"',
self.name,
self.preview_active,
)
for method in self._on_value_changed_methods:
if 'preview_active' in method.extra_data['changed_props']:
log.info(
'Running Previewer Callback "%s" in "%s")',
method.__name__,
self.name,
)
method(self)
####################
# - Managed Object Properties
####################
@ -571,6 +586,13 @@ class MaxwellSimNode(bpy.types.Node):
Invalidates (recursively) the cache of any managed object or
output socket method that implicitly depends on this input socket.
"""
#log.debug(
# 'Action "%s" Triggered in "%s" (socket_name="%s", prop_name="%s")',
# action,
# self.name,
# socket_name,
# prop_name,
#)
# Forwards Chains
if action == 'value_changed':
# Run User Callbacks
@ -589,6 +611,11 @@ class MaxwellSimNode(bpy.types.Node):
and socket_name in self.loose_input_sockets
)
):
#log.debug(
# 'Running Value-Change Callback "%s" in "%s")',
# method.__name__,
# self.name,
#)
method(self)
# Propagate via Output Sockets
@ -616,6 +643,10 @@ class MaxwellSimNode(bpy.types.Node):
## ...which simply hook into the 'preview_active' property.
## By (maybe) altering 'preview_active', callbacks run as needed.
if not self.preview_active:
log.info(
'Activating Preview in "%s")',
self.name,
)
self.preview_active = True
## Propagate via Input Sockets

View File

@ -3,10 +3,13 @@ import inspect
import typing as typ
from types import MappingProxyType
from ....utils import sympy_extra_units as spux
from ....utils import extra_sympy_units as spux
from ....utils import logger
from .. import contracts as ct
from .base import MaxwellSimNode
log = logger.get(__name__)
UnitSystemID = str
UnitSystem = dict[ct.SocketType, typ.Any]
@ -206,6 +209,10 @@ def event_decorator(
}
method_kw_args |= {'loose_output_sockets': _loose_output_sockets}
# Unit Systems
if unit_systems:
method_kw_args |= {'unit_systems': unit_systems}
# Call Method
return method(
node,
@ -253,19 +260,6 @@ def on_value_changed(
any_loose_input_socket: bool = False,
**kwargs,
):
if (
sum(
[
int(socket_name is not None),
int(prop_name is not None),
int(any_loose_input_socket),
]
)
> 1
):
msg = 'Define only one of socket_name, prop_name or any_loose_input_socket'
raise ValueError(msg)
return event_decorator(
action_type=EventCallbackType.on_value_changed,
extra_data={

View File

@ -1,18 +1,18 @@
import bpy
import typing as typ
import sympy as sp
import sympy.physics.units as spu
import tidy3d as td
from .....utils import analyze_geonodes, logger
from .....assets.import_geonodes import GeoNodes, import_geonodes
from .....utils import extra_sympy_units as spux
from .....utils import logger
from ... import contracts as ct
from ... import managed_objs, sockets
from .. import base, events
log = logger.get(__name__)
GEONODES_MONITOR_BOX = 'monitor_box'
class EHFieldMonitorNode(base.MaxwellSimNode):
"""Node providing for the monitoring of electromagnetic fields within a given planar region or volume."""
@ -24,14 +24,14 @@ class EHFieldMonitorNode(base.MaxwellSimNode):
####################
# - Sockets
####################
input_sockets = {
input_sockets: typ.ClassVar = {
'Center': sockets.PhysicalPoint3DSocketDef(),
'Size': sockets.PhysicalSize3DSocketDef(),
'Samples/Space': sockets.Integer3DVectorSocketDef(
default_value=sp.Matrix([10, 10, 10])
),
}
input_socket_sets = {
input_socket_sets: typ.ClassVar = {
'Freq Domain': {
'Freqs': sockets.PhysicalFreqSocketDef(
is_list=True,
@ -45,15 +45,17 @@ class EHFieldMonitorNode(base.MaxwellSimNode):
),
},
}
output_sockets = {
output_sockets: typ.ClassVar = {
'Monitor': sockets.MaxwellMonitorSocketDef(),
}
managed_obj_defs = {
'monitor_box': ct.schemas.ManagedObjDef(
mk=lambda name: managed_objs.ManagedBLObject(name),
name_prefix='',
)
managed_obj_defs: typ.ClassVar = {
'mesh': ct.schemas.ManagedObjDef(
mk=lambda name: managed_objs.ManagedBLMesh(name),
),
'modifier': ct.schemas.ManagedObjDef(
mk=lambda name: managed_objs.ManagedBLModifier(name),
),
}
####################
@ -61,6 +63,7 @@ class EHFieldMonitorNode(base.MaxwellSimNode):
####################
@events.computes_output_socket(
'Monitor',
props={'active_socket_set', 'sim_node_name'},
input_sockets={
'Rec Start',
'Rec Stop',
@ -70,60 +73,52 @@ class EHFieldMonitorNode(base.MaxwellSimNode):
'Samples/Time',
'Freqs',
},
props={'active_socket_set', 'sim_node_name'},
unit_systems={'Tidy3DUnits': ct.UNITS_TIDY3D},
scale_input_sockets={
'Center': 'Tidy3DUnits',
'Size': 'Tidy3DUnits',
'Samples/Space': 'Tidy3DUnits',
'Rec Start': 'Tidy3DUnits',
'Rec Stop': 'Tidy3DUnits',
'Samples/Time': 'Tidy3DUnits',
},
)
def compute_monitor(
self, input_sockets: dict, props: dict
self, input_sockets: dict, props: dict, unit_systems: dict,
) -> td.FieldMonitor | td.FieldTimeMonitor:
"""Computes the value of the 'Monitor' output socket, which the user can select as being either a `td.FieldMonitor` or `td.FieldTimeMonitor`."""
_center = input_sockets['Center']
_size = input_sockets['Size']
_samples_space = input_sockets['Samples/Space']
center = tuple(spu.convert_to(_center, spu.um) / spu.um)
size = tuple(spu.convert_to(_size, spu.um) / spu.um)
samples_space = tuple(_samples_space)
if props['active_socket_set'] == 'Freq Domain':
freqs = input_sockets['Freqs']
log.info(
'Computing FieldMonitor (name=%s) with center=%s, size=%s',
'Computing FieldMonitor (name="%s") with center="%s", size="%s"',
props['sim_node_name'],
center,
size,
input_sockets['Center'],
input_sockets['Size'],
)
return td.FieldMonitor(
center=center,
size=size,
center=input_sockets['Center'],
size=input_sockets['Size'],
name=props['sim_node_name'],
interval_space=samples_space,
interval_space=input_sockets['Samples/Space'],
freqs=[
float(spu.convert_to(freq, spu.hertz) / spu.hertz) for freq in freqs
],
)
## Time Domain
_rec_start = input_sockets['Rec Start']
_rec_stop = input_sockets['Rec Stop']
samples_time = input_sockets['Samples/Time']
rec_start = spu.convert_to(_rec_start, spu.second) / spu.second
rec_stop = spu.convert_to(_rec_stop, spu.second) / spu.second
log.info(
'Computing FieldTimeMonitor (name=%s) with center=%s, size=%s',
props['sim_node_name'],
center,
size,
input_sockets['Center'],
input_sockets['Size'],
)
return td.FieldTimeMonitor(
center=center,
size=size,
center=input_sockets['Center'],
size=input_sockets['Size'],
name=props['sim_node_name'],
start=rec_start,
stop=rec_stop,
interval=samples_time,
interval_space=samples_space,
start=input_sockets['Rec Start'],
stop=input_sockets['Rec Stop'],
interval=input_sockets['Samples/Time'],
interval_space=input_sockets['Samples/Space'],
)
####################
@ -131,49 +126,37 @@ class EHFieldMonitorNode(base.MaxwellSimNode):
####################
@events.on_value_changed(
socket_name={'Center', 'Size'},
prop_name='preview_active',
props={'preview_active'},
input_sockets={'Center', 'Size'},
managed_objs={'monitor_box'},
managed_objs={'mesh', 'modifier'},
unit_systems={'BlenderUnits': ct.UNITS_BLENDER},
scale_input_sockets={
'Center': 'BlenderUnits',
},
)
def on_value_changed__center_size(
def on_inputs_changed(
self,
input_sockets: dict,
props: dict,
managed_objs: dict[str, ct.schemas.ManagedObj],
input_sockets: dict,
unit_systems: dict,
):
"""Alters the managed 3D preview objects whenever the center or size input sockets are changed."""
_center = input_sockets['Center']
center = tuple([float(el) for el in spu.convert_to(_center, spu.um) / spu.um])
_size = input_sockets['Size']
size = tuple([float(el) for el in spu.convert_to(_size, spu.um) / spu.um])
# Retrieve Hard-Coded GeoNodes and Analyze Input
geo_nodes = bpy.data.node_groups[GEONODES_MONITOR_BOX]
geonodes_interface = analyze_geonodes.interface(geo_nodes, direc='INPUT')
# Sync Modifier Inputs
managed_objs['monitor_box'].sync_geonodes_modifier(
geonodes_node_group=geo_nodes,
geonodes_identifier_to_value={
geonodes_interface['Size'].identifier: size,
# Push Input Values to GeoNodes Modifier
managed_objs['modifier'].bl_modifier(
managed_objs['mesh'].bl_object(location=input_sockets['Center']),
'NODES',
{
'node_group': import_geonodes(GeoNodes.PrimitiveBox, 'link'),
'unit_system': unit_systems['BlenderUnits'],
'inputs': {
'Size': input_sockets['Size'],
},
},
)
# Sync Object Position
managed_objs['monitor_box'].bl_object('MESH').location = center
####################
# - Preview - Show Preview
####################
@events.on_show_preview(
managed_objs={'monitor_box'},
)
def on_show_preview(
self,
managed_objs: dict[str, ct.schemas.ManagedObj],
):
"""Requests that the managed object be previewed in response to a user request to show the preview."""
managed_objs['monitor_box'].show_preview('MESH')
self.on_value_changed__center_size()
# Push Preview State
if props['preview_active']:
managed_objs['mesh'].show_preview()
####################

View File

@ -1,15 +1,18 @@
import typing as typ
import bpy
import sympy as sp
import sympy.physics.units as spu
import tidy3d as td
from .....utils import analyze_geonodes
from .....assets.import_geonodes import GeoNodes, import_geonodes
from .....utils import extra_sympy_units as spux
from .....utils import logger
from ... import contracts as ct
from ... import managed_objs, sockets
from .. import base, events
GEONODES_MONITOR_BOX = 'monitor_flux_box'
log = logger.get(__name__)
class FieldPowerFluxMonitorNode(base.MaxwellSimNode):
@ -20,7 +23,7 @@ class FieldPowerFluxMonitorNode(base.MaxwellSimNode):
####################
# - Sockets
####################
input_sockets = {
input_sockets: typ.ClassVar = {
'Center': sockets.PhysicalPoint3DSocketDef(),
'Size': sockets.PhysicalSize3DSocketDef(),
'Samples/Space': sockets.Integer3DVectorSocketDef(
@ -28,7 +31,7 @@ class FieldPowerFluxMonitorNode(base.MaxwellSimNode):
),
'Direction': sockets.BoolSocketDef(),
}
input_socket_sets = {
input_socket_sets: typ.ClassVar = {
'Freq Domain': {
'Freqs': sockets.PhysicalFreqSocketDef(
is_list=True,
@ -42,35 +45,25 @@ class FieldPowerFluxMonitorNode(base.MaxwellSimNode):
),
},
}
output_sockets = {
output_sockets: typ.ClassVar = {
'Monitor': sockets.MaxwellMonitorSocketDef(),
}
managed_obj_defs = {
'monitor_box': ct.schemas.ManagedObjDef(
mk=lambda name: managed_objs.ManagedBLObject(name),
name_prefix='',
)
managed_obj_defs: typ.ClassVar = {
'mesh': ct.schemas.ManagedObjDef(
mk=lambda name: managed_objs.ManagedBLMesh(name),
),
'modifier': ct.schemas.ManagedObjDef(
mk=lambda name: managed_objs.ManagedBLModifier(name),
),
}
####################
# - Properties
####################
####################
# - UI
####################
def draw_props(self, context, layout):
pass
def draw_info(self, context, col):
pass
####################
# - Output Sockets
# - Event Methods: Computation
####################
@events.computes_output_socket(
'Monitor',
props={'active_socket_set', 'sim_node_name'},
input_sockets={
'Rec Start',
'Rec Stop',
@ -81,102 +74,86 @@ class FieldPowerFluxMonitorNode(base.MaxwellSimNode):
'Freqs',
'Direction',
},
props={'active_socket_set', 'sim_node_name'},
unit_systems={'Tidy3DUnits': ct.UNITS_TIDY3D},
scale_input_sockets={
'Center': 'Tidy3DUnits',
'Size': 'Tidy3DUnits',
'Samples/Space': 'Tidy3DUnits',
'Rec Start': 'Tidy3DUnits',
'Rec Stop': 'Tidy3DUnits',
'Samples/Time': 'Tidy3DUnits',
},
)
def compute_monitor(self, input_sockets: dict, props: dict) -> td.FieldTimeMonitor:
_center = input_sockets['Center']
_size = input_sockets['Size']
_samples_space = input_sockets['Samples/Space']
center = tuple(spu.convert_to(_center, spu.um) / spu.um)
size = tuple(spu.convert_to(_size, spu.um) / spu.um)
samples_space = tuple(_samples_space)
direction = '+' if input_sockets['Direction'] else '-'
if props['active_socket_set'] == 'Freq Domain':
freqs = input_sockets['Freqs']
log.info(
'Computing FluxMonitor (name="%s") with center="%s", size="%s"',
props['sim_node_name'],
input_sockets['Center'],
input_sockets['Size'],
)
return td.FluxMonitor(
center=center,
size=size,
center=input_sockets['Center'],
size=input_sockets['Size'],
name=props['sim_node_name'],
interval_space=samples_space,
interval_space=input_sockets['Samples/Space'],
freqs=[
float(spu.convert_to(freq, spu.hertz) / spu.hertz) for freq in freqs
],
normal_dir=direction,
)
else: ## Time Domain
_rec_start = input_sockets['Rec Start']
_rec_stop = input_sockets['Rec Stop']
samples_time = input_sockets['Samples/Time']
rec_start = spu.convert_to(_rec_start, spu.second) / spu.second
rec_stop = spu.convert_to(_rec_stop, spu.second) / spu.second
return td.FieldTimeMonitor(
center=center,
size=size,
name=props['sim_node_name'],
start=rec_start,
stop=rec_stop,
interval=samples_time,
interval_space=samples_space,
)
return td.FluxTimeMonitor(
center=input_sockets['Center'],
size=input_sockets['Size'],
name=props['sim_node_name'],
start=input_sockets['Rec Start'],
stop=input_sockets['Rec Stop'],
interval=input_sockets['Samples/Time'],
interval_space=input_sockets['Samples/Space'],
normal_dir=direction,
)
####################
# - Preview - Changes to Input Sockets
####################
@events.on_value_changed(
socket_name={'Center', 'Size'},
input_sockets={'Center', 'Size', 'Direction'},
managed_objs={'monitor_box'},
prop_name='preview_active',
props={'preview_active'},
input_sockets={'Center', 'Size'},
managed_objs={'mesh', 'modifier'},
unit_systems={'BlenderUnits': ct.UNITS_BLENDER},
scale_input_sockets={
'Center': 'BlenderUnits',
},
)
def on_value_changed__center_size(
def on_inputs_changed(
self,
input_sockets: dict,
props: dict,
managed_objs: dict[str, ct.schemas.ManagedObj],
input_sockets: dict,
unit_systems: dict,
):
_center = input_sockets['Center']
center = tuple([float(el) for el in spu.convert_to(_center, spu.um) / spu.um])
_size = input_sockets['Size']
size = tuple([float(el) for el in spu.convert_to(_size, spu.um) / spu.um])
## TODO: Preview unit system?? Presume um for now
# Retrieve Hard-Coded GeoNodes and Analyze Input
geo_nodes = bpy.data.node_groups[GEONODES_MONITOR_BOX]
geonodes_interface = analyze_geonodes.interface(geo_nodes, direc='INPUT')
# Sync Modifier Inputs
managed_objs['monitor_box'].sync_geonodes_modifier(
geonodes_node_group=geo_nodes,
geonodes_identifier_to_value={
geonodes_interface['Size'].identifier: size,
geonodes_interface['Direction'].identifier: input_sockets['Direction'],
## TODO: Use 'bl_socket_map.value_to_bl`!
## - This accounts for auto-conversion, unit systems, etc. .
## - We could keep it in the node base class...
## - ...But it needs aligning with Blender, too. Hmm.
# Push Input Values to GeoNodes Modifier
managed_objs['modifier'].bl_modifier(
managed_objs['mesh'].bl_object(location=input_sockets['Center']),
'NODES',
{
'node_group': import_geonodes(GeoNodes.PrimitiveBox, 'link'),
'unit_system': unit_systems['BlenderUnits'],
'inputs': {
'Size': input_sockets['Size'],
},
},
)
# Sync Object Position
managed_objs['monitor_box'].bl_object('MESH').location = center
####################
# - Preview - Show Preview
####################
@events.on_show_preview(
managed_objs={'monitor_box'},
)
def on_show_preview(
self,
managed_objs: dict[str, ct.schemas.ManagedObj],
):
managed_objs['monitor_box'].show_preview('MESH')
self.on_value_changed__center_size()
# Push Preview State
if props['preview_active']:
managed_objs['mesh'].show_preview()
####################

View File

@ -1,10 +1,12 @@
import typing as typ
import bpy
import sympy as sp
from .....utils import logger
from ... import contracts as ct
from ... import sockets
from ...managed_objs import managed_bl_object
from ...managed_objs.managed_bl_collection import preview_collection
from .. import base, events
log = logger.get(__name__)
@ -46,7 +48,7 @@ class ViewerNode(base.MaxwellSimNode):
node_type = ct.NodeType.Viewer
bl_label = 'Viewer'
input_sockets = {
input_sockets: typ.ClassVar = {
'Data': sockets.AnySocketDef(),
}
@ -67,6 +69,12 @@ class ViewerNode(base.MaxwellSimNode):
update=lambda self, context: self.sync_prop('auto_3d_preview', context),
)
cache__data_was_unlinked: bpy.props.BoolProperty(
name='Data Was Unlinked',
description="Whether the Data input was unlinked last time it was checked.",
default=True,
)
####################
# - UI
####################
@ -111,46 +119,39 @@ class ViewerNode(base.MaxwellSimNode):
console.print(data)
####################
# - Updates
# - Event Methods
####################
@events.on_value_changed(
socket_name='Data',
props={'auto_plot'},
)
def on_changed_2d_data(self, props):
# Show Plot
## Don't have to un-show other plots.
if self.inputs['Data'].is_linked and props['auto_plot']:
self.trigger_action('show_plot')
@events.on_value_changed(
socket_name='Data',
props={'auto_3d_preview'},
)
def on_value_changed__data(self, props):
# Show Plot
## Don't have to un-show other plots.
if self.auto_plot:
self.trigger_action('show_plot')
def on_changed_3d_data(self, props):
# Data Not Attached
if not self.inputs['Data'].is_linked:
self.cache__data_was_unlinked = True
# Remove Anything Previewed
preview_collection = managed_bl_object.bl_collection(
managed_bl_object.PREVIEW_COLLECTION_NAME,
view_layer_exclude=False,
)
for bl_object in preview_collection.objects.values():
preview_collection.objects.unlink(bl_object)
# Data Just Attached
elif self.cache__data_was_unlinked:
node_tree = self.id_data
# Preview Anything that Should be Previewed (maybe)
if props['auto_3d_preview']:
self.trigger_action('show_preview')
# Unpreview Everything
node_tree.unpreview_all()
@events.on_value_changed(
prop_name='auto_3d_preview',
props={'auto_3d_preview'},
)
def on_value_changed__auto_3d_preview(self, props):
# Remove Anything Previewed
preview_collection = managed_bl_object.bl_collection(
managed_bl_object.PREVIEW_COLLECTION_NAME,
view_layer_exclude=False,
)
for bl_object in preview_collection.objects.values():
preview_collection.objects.unlink(bl_object)
# Preview Anything that Should be Previewed (maybe)
if props['auto_3d_preview']:
self.trigger_action('show_preview')
# Enable Previews in Tree
if props['auto_3d_preview']:
log.info('Enabling 3D Previews from "%s"', self.name)
self.trigger_action('show_preview')
self.cache__data_was_unlinked = False
####################

View File

@ -1,116 +1,102 @@
import bpy
import typing as typ
import sympy as sp
import sympy.physics.units as spu
from .....utils import analyze_geonodes
from .....assets.import_geonodes import GeoNodes, import_geonodes
from ... import contracts as ct
from ... import managed_objs, sockets
from .. import base, events
GEONODES_DOMAIN_BOX = 'simdomain_box'
class SimDomainNode(base.MaxwellSimNode):
node_type = ct.NodeType.SimDomain
bl_label = 'Sim Domain'
use_sim_node_name = True
input_sockets = {
input_sockets: typ.ClassVar = {
'Duration': sockets.PhysicalTimeSocketDef(
default_value=5 * spu.ps,
default_unit=spu.ps,
),
'Center': sockets.PhysicalSize3DSocketDef(),
'Center': sockets.PhysicalPoint3DSocketDef(),
'Size': sockets.PhysicalSize3DSocketDef(),
'Grid': sockets.MaxwellSimGridSocketDef(),
'Ambient Medium': sockets.MaxwellMediumSocketDef(),
}
output_sockets = {
output_sockets: typ.ClassVar = {
'Domain': sockets.MaxwellSimDomainSocketDef(),
}
managed_obj_defs = {
'domain_box': ct.schemas.ManagedObjDef(
mk=lambda name: managed_objs.ManagedBLObject(name),
managed_obj_defs: typ.ClassVar = {
'mesh': ct.schemas.ManagedObjDef(
mk=lambda name: managed_objs.ManagedBLMesh(name),
name_prefix='',
)
),
'modifier': ct.schemas.ManagedObjDef(
mk=lambda name: managed_objs.ManagedBLModifier(name),
),
}
####################
# - Callbacks
# - Event Methods
####################
@events.computes_output_socket(
'Domain',
input_sockets={'Duration', 'Center', 'Size', 'Grid', 'Ambient Medium'},
unit_systems={'Tidy3DUnits': ct.UNITS_TIDY3D},
scale_input_sockets={
'Duration': 'Tidy3DUnits',
'Center': 'Tidy3DUnits',
'Size': 'Tidy3DUnits',
},
)
def compute_sim_domain(self, input_sockets: dict) -> sp.Expr:
if all(
[
(_duration := input_sockets['Duration']),
(_center := input_sockets['Center']),
(_size := input_sockets['Size']),
(grid := input_sockets['Grid']),
(medium := input_sockets['Ambient Medium']),
]
):
duration = spu.convert_to(_duration, spu.second) / spu.second
center = tuple(spu.convert_to(_center, spu.um) / spu.um)
size = tuple(spu.convert_to(_size, spu.um) / spu.um)
return dict(
run_time=duration,
center=center,
size=size,
grid_spec=grid,
medium=medium,
)
def compute_output(self, input_sockets: dict) -> sp.Expr:
return {
'run_time': input_sockets['Duration'],
'center': input_sockets['Center'],
'size': input_sockets['Size'],
'grid_spec': input_sockets['Grid'],
'medium': input_sockets['Ambient Medium'],
}
####################
# - Preview
####################
@events.on_value_changed(
socket_name={'Center', 'Size'},
prop_name='preview_active',
props={'preview_active'},
input_sockets={'Center', 'Size'},
managed_objs={'domain_box'},
managed_objs={'mesh', 'modifier'},
unit_systems={'BlenderUnits': ct.UNITS_BLENDER},
scale_input_sockets={
'Center': 'BlenderUnits',
},
)
def on_value_changed__center_size(
def on_input_changed(
self,
input_sockets: dict,
props: dict,
managed_objs: dict[str, ct.schemas.ManagedObj],
input_sockets: dict,
unit_systems: dict,
):
_center = input_sockets['Center']
center = tuple([float(el) for el in spu.convert_to(_center, spu.um) / spu.um])
_size = input_sockets['Size']
size = tuple([float(el) for el in spu.convert_to(_size, spu.um) / spu.um])
## TODO: Preview unit system?? Presume um for now
# Retrieve Hard-Coded GeoNodes and Analyze Input
geo_nodes = bpy.data.node_groups[GEONODES_DOMAIN_BOX]
geonodes_interface = analyze_geonodes.interface(geo_nodes, direc='INPUT')
# Sync Modifier Inputs
managed_objs['domain_box'].sync_geonodes_modifier(
geonodes_node_group=geo_nodes,
geonodes_identifier_to_value={
geonodes_interface['Size'].identifier: size,
## TODO: Use 'bl_socket_map.value_to_bl`!
## - This accounts for auto-conversion, unit systems, etc. .
## - We could keep it in the node base class...
## - ...But it needs aligning with Blender, too. Hmm.
# Push Input Values to GeoNodes Modifier
managed_objs['modifier'].bl_modifier(
managed_objs['mesh'].bl_object(location=input_sockets['Center']),
'NODES',
{
'node_group': import_geonodes(GeoNodes.PrimitiveBox, 'link'),
'unit_system': unit_systems['BlenderUnits'],
'inputs': {
'Size': input_sockets['Size'],
},
},
)
# Push Preview State
if props['preview_active']:
managed_objs['mesh'].show_preview()
# Sync Object Position
managed_objs['domain_box'].bl_object('MESH').location = center
@events.on_show_preview(
managed_objs={'domain_box'},
)
def on_show_preview(
self,
managed_objs: dict[str, ct.schemas.ManagedObj],
):
managed_objs['domain_box'].show_preview('MESH')
self.on_value_changed__center_size()
@events.on_init()
def on_init(self):
self.on_input_change()
####################

View File

@ -1,17 +1,13 @@
import math
import typing as typ
import bpy
import sympy as sp
import sympy.physics.units as spu
import tidy3d as td
from .....utils import analyze_geonodes
from ... import contracts as ct
from ... import managed_objs, sockets
from .. import base, events
GEONODES_PLANE_WAVE = 'source_plane_wave'
def convert_vector_to_spherical(
v: sp.MatrixBase,
@ -50,17 +46,17 @@ class PlaneWaveSourceNode(base.MaxwellSimNode):
####################
# - Sockets
####################
input_sockets = {
input_sockets: typ.ClassVar = {
'Temporal Shape': sockets.MaxwellTemporalShapeSocketDef(),
'Center': sockets.PhysicalPoint3DSocketDef(),
'Direction': sockets.Real3DVectorSocketDef(default_value=sp.Matrix([0, 0, -1])),
'Pol Angle': sockets.PhysicalAngleSocketDef(),
}
output_sockets = {
output_sockets: typ.ClassVar = {
'Source': sockets.MaxwellSourceSocketDef(),
}
managed_obj_defs = {
managed_obj_defs: typ.ClassVar = {
'plane_wave_source': ct.schemas.ManagedObjDef(
mk=lambda name: managed_objs.ManagedBLObject(name),
name_prefix='',
@ -73,26 +69,29 @@ class PlaneWaveSourceNode(base.MaxwellSimNode):
@events.computes_output_socket(
'Source',
input_sockets={'Temporal Shape', 'Center', 'Direction', 'Pol Angle'},
unit_systems={'Tidy3DUnits': ct.UNITS_TIDY3D},
scale_input_sockets={
'Center': 'Tidy3DUnits',
},
)
def compute_source(self, input_sockets: dict):
temporal_shape = input_sockets['Temporal Shape']
_center = input_sockets['Center']
direction = input_sockets['Direction']
pol_angle = input_sockets['Pol Angle']
injection_axis, dir_sgn, theta, phi = convert_vector_to_spherical(direction)
injection_axis, dir_sgn, theta, phi = convert_vector_to_spherical(
input_sockets['Direction']
)
size = {
'x': (0, math.inf, math.inf),
'y': (math.inf, 0, math.inf),
'z': (math.inf, math.inf, 0),
}[injection_axis]
center = tuple(spu.convert_to(_center, spu.um) / spu.um)
# Display the results
return td.PlaneWave(
center=center,
source_time=temporal_shape,
center=input_sockets['Center'],
source_time=input_sockets['Temporal Shape'],
size=size,
direction=dir_sgn,
angle_theta=theta,
@ -100,54 +99,54 @@ class PlaneWaveSourceNode(base.MaxwellSimNode):
pol_angle=pol_angle,
)
####################
# - Preview
####################
@events.on_value_changed(
socket_name={'Center', 'Direction'},
input_sockets={'Center', 'Direction'},
managed_objs={'plane_wave_source'},
)
def on_value_changed__center_direction(
self,
input_sockets: dict,
managed_objs: dict[str, ct.schemas.ManagedObj],
):
_center = input_sockets['Center']
center = tuple([float(el) for el in spu.convert_to(_center, spu.um) / spu.um])
#####################
## - Preview
#####################
# @events.on_value_changed(
# socket_name={'Center', 'Direction'},
# input_sockets={'Center', 'Direction'},
# managed_objs={'plane_wave_source'},
# )
# def on_value_changed__center_direction(
# self,
# input_sockets: dict,
# managed_objs: dict[str, ct.schemas.ManagedObj],
# ):
# _center = input_sockets['Center']
# center = tuple([float(el) for el in spu.convert_to(_center, spu.um) / spu.um])
_direction = input_sockets['Direction']
direction = tuple([float(el) for el in _direction])
## TODO: Preview unit system?? Presume um for now
# _direction = input_sockets['Direction']
# direction = tuple([float(el) for el in _direction])
# ## TODO: Preview unit system?? Presume um for now
# Retrieve Hard-Coded GeoNodes and Analyze Input
geo_nodes = bpy.data.node_groups[GEONODES_PLANE_WAVE]
geonodes_interface = analyze_geonodes.interface(geo_nodes, direc='INPUT')
# # Retrieve Hard-Coded GeoNodes and Analyze Input
# geo_nodes = bpy.data.node_groups[GEONODES_PLANE_WAVE]
# geonodes_interface = analyze_geonodes.interface(geo_nodes, direc='INPUT')
# Sync Modifier Inputs
managed_objs['plane_wave_source'].sync_geonodes_modifier(
geonodes_node_group=geo_nodes,
geonodes_identifier_to_value={
geonodes_interface['Direction'].identifier: direction,
## TODO: Use 'bl_socket_map.value_to_bl`!
## - This accounts for auto-conversion, unit systems, etc. .
## - We could keep it in the node base class...
## - ...But it needs aligning with Blender, too. Hmm.
},
)
# # Sync Modifier Inputs
# managed_objs['plane_wave_source'].sync_geonodes_modifier(
# geonodes_node_group=geo_nodes,
# geonodes_identifier_to_value={
# geonodes_interface['Direction'].identifier: direction,
# ## TODO: Use 'bl_socket_map.value_to_bl`!
# ## - This accounts for auto-conversion, unit systems, etc. .
# ## - We could keep it in the node base class...
# ## - ...But it needs aligning with Blender, too. Hmm.
# },
# )
# Sync Object Position
managed_objs['plane_wave_source'].bl_object('MESH').location = center
# # Sync Object Position
# managed_objs['plane_wave_source'].bl_object('MESH').location = center
@events.on_show_preview(
managed_objs={'plane_wave_source'},
)
def on_show_preview(
self,
managed_objs: dict[str, ct.schemas.ManagedObj],
):
managed_objs['plane_wave_source'].show_preview('MESH')
self.on_value_changed__center_direction()
# @events.on_show_preview(
# managed_objs={'plane_wave_source'},
# )
# def on_show_preview(
# self,
# managed_objs: dict[str, ct.schemas.ManagedObj],
# ):
# managed_objs['plane_wave_source'].show_preview('MESH')
# self.on_value_changed__center_direction()
####################

View File

@ -68,6 +68,10 @@ class PointDipoleSourceNode(base.MaxwellSimNode):
'Source',
input_sockets={'Temporal Shape', 'Center', 'Interpolate'},
props={'pol_axis'},
unit_systems={'Tidy3DUnits': ct.UNITS_TIDY3D},
scale_input_sockets={
'Center': 'Tidy3DUnits',
},
)
def compute_source(
self, input_sockets: dict[str, typ.Any], props: dict[str, typ.Any]
@ -78,53 +82,46 @@ class PointDipoleSourceNode(base.MaxwellSimNode):
'EZ': 'Ez',
}[props['pol_axis']]
temporal_shape = input_sockets['Temporal Shape']
_center = input_sockets['Center']
interpolate = input_sockets['Interpolate']
center = tuple(spu.convert_to(_center, spu.um) / spu.um)
_res = td.PointDipole(
center=center,
source_time=temporal_shape,
interpolate=interpolate,
return td.PointDipole(
center=input_sockets['Center'],
source_time=input_sockets['Temporal Shape'],
interpolate=input_sockets['Interpolate'],
polarization=pol_axis,
)
return _res
####################
# - Preview
####################
@events.on_value_changed(
socket_name='Center',
input_sockets={'Center'},
managed_objs={'sphere_empty'},
)
def on_value_changed__center(
self,
input_sockets: dict,
managed_objs: dict[str, ct.schemas.ManagedObj],
):
_center = input_sockets['Center']
center = tuple(spu.convert_to(_center, spu.um) / spu.um)
## TODO: Preview unit system?? Presume um for now
#####################
## - Preview
#####################
# @events.on_value_changed(
# socket_name='Center',
# input_sockets={'Center'},
# managed_objs={'sphere_empty'},
# )
# def on_value_changed__center(
# self,
# input_sockets: dict,
# managed_objs: dict[str, ct.schemas.ManagedObj],
# ):
# _center = input_sockets['Center']
# center = tuple(spu.convert_to(_center, spu.um) / spu.um)
# ## TODO: Preview unit system?? Presume um for now
mobj = managed_objs['sphere_empty']
bl_object = mobj.bl_object('EMPTY')
bl_object.location = center # tuple([float(el) for el in center])
# mobj = managed_objs['sphere_empty']
# bl_object = mobj.bl_object('EMPTY')
# bl_object.location = center # tuple([float(el) for el in center])
@events.on_show_preview(
managed_objs={'sphere_empty'},
)
def on_show_preview(
self,
managed_objs: dict[str, ct.schemas.ManagedObj],
):
managed_objs['sphere_empty'].show_preview(
'EMPTY',
empty_display_type='SPHERE',
)
managed_objs['sphere_empty'].bl_object('EMPTY').empty_display_size = 0.2
# @events.on_show_preview(
# managed_objs={'sphere_empty'},
# )
# def on_show_preview(
# self,
# managed_objs: dict[str, ct.schemas.ManagedObj],
# ):
# managed_objs['sphere_empty'].show_preview(
# 'EMPTY',
# empty_display_type='SPHERE',
# )
# managed_objs['sphere_empty'].bl_object('EMPTY').empty_display_size = 0.2
####################

View File

@ -71,10 +71,10 @@ class GeoNodesStructureNode(base.MaxwellSimNode):
socket_name='GeoNodes',
prop_name='preview_active',
any_loose_input_socket=True,
# Method Data
props={'preview_active'},
managed_objs={'mesh', 'modifier'},
input_sockets={'GeoNodes'},
# Unit System Scaling
input_sockets={'Center', 'GeoNodes'},
all_loose_input_sockets=True,
unit_systems={'BlenderUnits': ct.UNITS_BLENDER},
)
def on_input_changed(
@ -127,15 +127,22 @@ class GeoNodesStructureNode(base.MaxwellSimNode):
## Changing socket.value invokes recursion of this function.
## The else: below ensures that only one push occurs.
## (well, one push per .value set, which simplifies to one push)
log.debug(
log.info(
'Setting Loose Input Sockets of "%s" to GeoNodes Defaults',
self.bl_label,
)
for socket_name in self.loose_input_sockets:
socket = self.inputs[socket_name]
socket.value = bl_socket_map.read_bl_socket_default_value(
geonodes_interface[socket_name]
geonodes_interface[socket_name],
unit_systems['BlenderUnits'],
allow_unit_not_in_unit_system=True,
)
log.info(
'Set Loose Input Sockets of "%s" to: %s',
self.bl_label,
str(self.loose_input_sockets),
)
else:
# Push Loose Input Values to GeoNodes Modifier
managed_objs['modifier'].bl_modifier(

View File

@ -4,13 +4,11 @@ import sympy as sp
import sympy.physics.units as spu
import tidy3d as td
from .....assets.import_geonodes import import_geonodes
from ......assets.import_geonodes import GeoNodes, import_geonodes
from .... import contracts as ct
from .... import managed_objs, sockets
from ... import base, events
GEONODES_BOX = 'box'
class BoxStructureNode(base.MaxwellSimNode):
node_type = ct.NodeType.BoxStructure
@ -64,16 +62,15 @@ class BoxStructureNode(base.MaxwellSimNode):
@events.on_value_changed(
socket_name={'Center', 'Size'},
prop_name='preview_active',
# Method Data
props={'preview_active'},
input_sockets={'Center', 'Size'},
managed_objs={'mesh', 'modifier'},
# Unit System Scaling
unit_systems={'BlenderUnits': ct.UNITS_BLENDER},
scale_input_sockets={
'Center': 'BlenderUnits',
},
)
def on_input_changed(
def on_inputs_changed(
self,
props: dict,
managed_objs: dict[str, ct.schemas.ManagedObj],
@ -85,7 +82,7 @@ class BoxStructureNode(base.MaxwellSimNode):
managed_objs['mesh'].bl_object(location=input_sockets['Center']),
'NODES',
{
'node_group': import_geonodes(GEONODES_BOX, 'link'),
'node_group': import_geonodes(GeoNodes.PrimitiveBox, 'link'),
'unit_system': unit_systems['BlenderUnits'],
'inputs': {
'Size': input_sockets['Size'],
@ -98,7 +95,7 @@ class BoxStructureNode(base.MaxwellSimNode):
@events.on_init()
def on_init(self):
self.on_input_change()
self.on_inputs_changed()
####################

View File

@ -1,38 +1,40 @@
import bpy
import typing as typ
import sympy.physics.units as spu
import tidy3d as td
from ......utils import analyze_geonodes
from ......assets.import_geonodes import GeoNodes, import_geonodes
from .... import contracts as ct
from .... import managed_objs, sockets
from ... import base, events
GEONODES_STRUCTURE_SPHERE = 'structure_sphere'
class SphereStructureNode(base.MaxwellSimNode):
node_type = ct.NodeType.SphereStructure
bl_label = 'Sphere Structure'
use_sim_node_name = True
####################
# - Sockets
####################
input_sockets = {
input_sockets: typ.ClassVar = {
'Center': sockets.PhysicalPoint3DSocketDef(),
'Radius': sockets.PhysicalLengthSocketDef(
default_value=150 * spu.nm,
),
'Medium': sockets.MaxwellMediumSocketDef(),
}
output_sockets = {
output_sockets: typ.ClassVar = {
'Structure': sockets.MaxwellStructureSocketDef(),
}
managed_obj_defs = {
'structure_sphere': ct.schemas.ManagedObjDef(
mk=lambda name: managed_objs.ManagedBLObject(name),
name_prefix='',
)
managed_obj_defs: typ.ClassVar = {
'mesh': ct.schemas.ManagedObjDef(
mk=lambda name: managed_objs.ManagedBLMesh(name),
),
'modifier': ct.schemas.ManagedObjDef(
mk=lambda name: managed_objs.ManagedBLModifier(name),
),
}
####################
@ -41,21 +43,19 @@ class SphereStructureNode(base.MaxwellSimNode):
@events.computes_output_socket(
'Structure',
input_sockets={'Center', 'Radius', 'Medium'},
unit_systems={'Tidy3DUnits': ct.UNITS_TIDY3D},
scale_input_sockets={
'Center': 'Tidy3DUnits',
'Radius': 'Tidy3DUnits',
},
)
def compute_structure(self, input_sockets: dict) -> td.Box:
medium = input_sockets['Medium']
_center = input_sockets['Center']
_radius = input_sockets['Radius']
center = tuple(spu.convert_to(_center, spu.um) / spu.um)
radius = spu.convert_to(_radius, spu.um) / spu.um
return td.Structure(
geometry=td.Sphere(
radius=radius,
center=center,
radius=input_sockets['Radius'],
center=input_sockets['Center'],
),
medium=medium,
medium=input_sockets['Medium'],
)
####################
@ -63,52 +63,42 @@ class SphereStructureNode(base.MaxwellSimNode):
####################
@events.on_value_changed(
socket_name={'Center', 'Radius'},
prop_name='preview_active',
props={'preview_active'},
input_sockets={'Center', 'Radius'},
managed_objs={'structure_sphere'},
managed_objs={'mesh', 'modifier'},
unit_systems={'BlenderUnits': ct.UNITS_BLENDER},
scale_input_sockets={
'Center': 'Tidy3DUnits',
'Radius': 'Tidy3DUnits',
},
)
def on_value_changed__center_radius(
def on_inputs_changed(
self,
input_sockets: dict,
props: dict,
managed_objs: dict[str, ct.schemas.ManagedObj],
input_sockets: dict,
unit_systems: dict,
):
_center = input_sockets['Center']
center = tuple([float(el) for el in spu.convert_to(_center, spu.um) / spu.um])
_radius = input_sockets['Radius']
radius = float(spu.convert_to(_radius, spu.um) / spu.um)
## TODO: Preview unit system?? Presume um for now
# Retrieve Hard-Coded GeoNodes and Analyze Input
geo_nodes = bpy.data.node_groups[GEONODES_STRUCTURE_SPHERE]
geonodes_interface = analyze_geonodes.interface(geo_nodes, direc='INPUT')
# Sync Modifier Inputs
managed_objs['structure_sphere'].sync_geonodes_modifier(
geonodes_node_group=geo_nodes,
geonodes_identifier_to_value={
geonodes_interface['Radius'].identifier: radius,
## TODO: Use 'bl_socket_map.value_to_bl`!
## - This accounts for auto-conversion, unit systems, etc. .
## - We could keep it in the node base class...
## - ...But it needs aligning with Blender, too. Hmm.
# Push Input Values to GeoNodes Modifier
managed_objs['modifier'].bl_modifier(
managed_objs['mesh'].bl_object(location=input_sockets['Center']),
'NODES',
{
'node_group': import_geonodes(GeoNodes.PrimitiveSphere, 'link'),
'unit_system': unit_systems['BlenderUnits'],
'inputs': {
'Radius': input_sockets['Radius'],
},
},
)
# Push Preview State
if props['preview_active']:
managed_objs['mesh'].show_preview()
# Sync Object Position
managed_objs['structure_sphere'].bl_object('MESH').location = center
####################
# - Preview - Show Preview
####################
@events.on_show_preview(
managed_objs={'structure_sphere'},
)
def on_show_preview(
self,
managed_objs: dict[str, ct.schemas.ManagedObj],
):
managed_objs['structure_sphere'].show_preview('MESH')
self.on_value_changed__center_radius()
@events.on_init()
def on_init(self):
self.on_inputs_changed()
####################

View File

@ -42,7 +42,7 @@ class MaxwellSimSocket(bpy.types.NodeSocket):
# - Initialization
####################
def __init_subclass__(cls, **kwargs: typ.Any):
super().__init_subclass__(**kwargs) ## Yucky superclass setup.
super().__init_subclass__(**kwargs)
# Setup Blender ID for Node
if not hasattr(cls, 'socket_type'):
@ -266,15 +266,12 @@ class MaxwellSimSocket(bpy.types.NodeSocket):
if kind == ct.DataFlowKind.Value:
if self.is_list:
return self.value_list
else:
return self.value
elif kind == ct.DataFlowKind.LazyValue:
return self.value
if kind == ct.DataFlowKind.LazyValue:
if self.is_list:
return self.lazy_value_list
else:
return self.lazy_value
return self.lazy_value
elif kind == ct.DataFlowKind.Capabilities:
if kind == ct.DataFlowKind.Capabilities:
return self.capabilities
return None

View File

@ -3,10 +3,14 @@ import numpy as np
import pydantic as pyd
import sympy.physics.units as spu
from .....utils import logger
from .....utils import extra_sympy_units as spux
from .....utils.pydantic_sympy import SympyExpr
from ... import contracts as ct
from .. import base
log = logger.get(__name__)
####################
# - Blender Socket
@ -20,8 +24,8 @@ class PhysicalLengthBLSocket(base.MaxwellSimSocket):
# - Properties
####################
raw_value: bpy.props.FloatProperty(
name='Unitless Force',
description='Represents the unitless part of the force',
name='Unitless Length',
description='Represents the unitless part of the length',
default=0.0,
precision=6,
update=(lambda self, context: self.sync_prop('raw_value', context)),
@ -68,7 +72,7 @@ class PhysicalLengthBLSocket(base.MaxwellSimSocket):
@value.setter
def value(self, value: SympyExpr) -> None:
self.raw_value = spu.convert_to(value, self.unit) / self.unit
self.raw_value = spux.sympy_to_python(spux.scale_to_unit(value, self.unit))
@property
def value_list(self) -> list[SympyExpr]:

View File

@ -12,7 +12,6 @@ with pydeps.syspath_from_bpy_prefs():
####################
# - Useful Methods
####################
@functools.lru_cache(maxsize=4096)
def uses_units(expression: sp.Expr) -> bool:
## TODO: An LFU cache could do better than an LRU.
"""Checks if an expression uses any units (`Quantity`)."""
@ -23,7 +22,6 @@ def uses_units(expression: sp.Expr) -> bool:
# Function to return a set containing all units used in the expression
@functools.lru_cache(maxsize=4096)
def get_units(expression: sp.Expr):
## TODO: An LFU cache could do better than an LRU.
"""Gets all the units of an expression (as `Quantity`)."""
@ -94,7 +92,6 @@ def parse_abbrev_symbols_to_units(expr: sp.Basic) -> sp.Basic:
####################
# - Units <-> Scalars
####################
@functools.lru_cache(maxsize=8192)
def scale_to_unit(expr: sp.Expr, unit: spu.Quantity) -> typ.Any:
## TODO: An LFU cache could do better than an LRU.
unitless_expr = spu.convert_to(expr, unit) / unit
@ -108,7 +105,6 @@ def scale_to_unit(expr: sp.Expr, unit: spu.Quantity) -> typ.Any:
####################
# - Sympy <-> Scalars
####################
@functools.lru_cache(maxsize=8192)
def sympy_to_python(scalar: sp.Basic) -> int | float | complex | tuple | list:
"""Convert a scalar sympy expression to the directly corresponding Python type.
@ -128,7 +124,7 @@ def sympy_to_python(scalar: sp.Basic) -> int | float | complex | tuple | list:
# Detect Row / Column Vector
## When it's "actually" a 1D structure, flatten and return as tuple.
if 1 in scalar.shape:
return tuple(itertools.from_iterable(list_2d))
return tuple(itertools.chain.from_iterable(list_2d))
return list_2d
if scalar.is_integer:

View File

@ -111,28 +111,24 @@ def ConstrSympyExpr(
allowed_sets
and isinstance(expr, sp.Expr)
and not any(
[
{
'integer': expr.is_integer,
'rational': expr.is_rational,
'real': expr.is_real,
'complex': expr.is_complex,
}[allowed_set]
for allowed_set in allowed_sets
]
{
'integer': expr.is_integer,
'rational': expr.is_rational,
'real': expr.is_real,
'complex': expr.is_complex,
}[allowed_set]
for allowed_set in allowed_sets
)
):
msgs.add(
f"allowed_sets={allowed_sets} does not match expression {expr} (remember to add assumptions to symbols, ex. `x = sp.Symbol('x', real=True))"
)
if allowed_structures and not any(
[
{
'matrix': isinstance(expr, sp.MatrixBase),
}[allowed_set]
for allowed_set in allowed_structures
if allowed_structures != 'scalar'
]
{
'matrix': isinstance(expr, sp.MatrixBase),
}[allowed_set]
for allowed_set in allowed_structures
if allowed_structures != 'scalar'
):
msgs.add(
f"allowed_structures={allowed_structures} does not match expression {expr} (remember to add assumptions to symbols, ex. `x = sp.Symbol('x', real=True))"