feat: Demo-grade simulation feedback loop.

Sofus Albert Høgsbro Rose 2024-03-13 19:10:54 +01:00
parent 9a148d7d97
commit ed21617746
54 changed files with 2452 additions and 915 deletions

View File

@ -7,7 +7,7 @@
## Inputs
[x] Wave Constant
- [ ] Implement export of frequency / wavelength array/range.
- [x] Implement export of frequency / wavelength array/range.
[-] Unit System
- [ ] Implement presets, including "Tidy3D" and "Blender", shown in the label row.
@ -41,6 +41,9 @@
[x] Web Export / Tidy3D Web Exporter
- [ ] We need better ways of doing checks before uploading, like for monitor data size. Maybe a SimInfo node?
- [ ] We need to be able to "delete and re-upload" (or maybe just delete from the interface).
- [x] Implement estimation of monitor storage
- [x] Implement cost estimation
- [?] Merge with the Tidy3D File Import (since both are working with HDFs; the web one only really does downloading too).
[x] File Export / JSON File Export
[ ] File Import / Tidy3D File Export
@ -51,11 +54,8 @@
- [ ] Standardize 1D and 2D array loading/saving on numpy's savetxt with gzip enabled.
## Viz
[ ] Sim Info
- [ ] Implement estimation of monitor storage
- [ ] Implement cost estimation
[ ] Monitor Data Viz
- [ ] Implement dropdown to choose which monitor in the SimulationData should be visualized (based on which are available in the SimulationData), and implement visualization based on every kind of monitor-adjascent output data type (<https://docs.flexcompute.com/projects/tidy3d/en/latest/api/output_data.html>)
[x] Monitor Data Viz
- [x] Implement dropdown to choose which monitor in the SimulationData should be visualized (based on which are available in the SimulationData), and implement visualization based on every kind of monitor-adjascent output data type (<https://docs.flexcompute.com/projects/tidy3d/en/latest/api/output_data.html>)
- [ ] Project field values onto a plane object (managed)
## Sources
@ -107,20 +107,20 @@
- [x] Use the modifier itself as memory, via the ManagedObj
- [?] When GeoNodes themselves declare panels, implement a grid-like tab system to select which sockets should be exposed in the node at a given point in time.
[ ] Primitive Structures / Plane
[ ] Primitive Structures / Box Structure
[ ] Primitive Structures / Sphere
[ ] Primitive Structures / Cylinder
[ ] Primitive Structures / Ring
[ ] Primitive Structures / Capsule
[ ] Primitive Structures / Cone
[ ] Primitive Structures / Plane Structure
[x] Primitive Structures / Box Structure
[x] Primitive Structures / Sphere Structure
[ ] Primitive Structures / Cylinder Structure
[ ] Primitive Structures / Ring Structure
[ ] Primitive Structures / Capsule Structure
[ ] Primitive Structures / Cone Structure
## Monitors
- **ALL**: "Steady-State" / "Time Domain" (only if relevant).
[ ] E/H Field Monitor
- [ ] Monitor Domain as dropdown with Frequency or Time
- [ ] Axis-aligned planar 2D (pixel) and coord-aligned box 3D (voxel).
[x] E/H Field Monitor
- [x] Monitor Domain as dropdown with Frequency or Time
- [x] Axis-aligned planar 2D (pixel) and coord-aligned box 3D (voxel).
[ ] Field Power Flux Monitor
- [ ] Monitor Domain as dropdown with Frequency or Time
- [ ] Axis-aligned planar 2D (pixel) and coord-aligned box 3D (voxel).
@ -397,3 +397,10 @@
[ ] Test on Windows
## Node Tree Cache Semantics
# TIDY3D BUGS
- Directly running `SimulationTask.get()` is bugged - it doesn't return some fields, including `created_at`. Listing tasks by folder is not broken.

View File

@ -43,4 +43,7 @@ NODE_CAT_LABELS = {
NC.MAXWELLSIM_UTILITIES: "Utilities",
NC.MAXWELLSIM_UTILITIES_CONVERTERS: "Converters",
NC.MAXWELLSIM_UTILITIES_OPERATIONS: "Operations",
# Viz/
NC.MAXWELLSIM_VIZ: "Viz",
}

View File

@ -51,6 +51,9 @@ class NodeCategory(BlenderTypeEnum):
MAXWELLSIM_UTILITIES_CONVERTERS = enum.auto()
MAXWELLSIM_UTILITIES_OPERATIONS = enum.auto()
# Viz/
MAXWELLSIM_VIZ = enum.auto()
@classmethod
def get_tree(cls):
## TODO: Refactor

View File

@ -145,3 +145,8 @@ class NodeType(BlenderTypeEnum):
## Utilities / Operations
ArrayOperation = enum.auto()
# Viz
FDTDSimDataViz = enum.auto()

View File

@ -62,6 +62,7 @@ SOCKET_COLORS = {
ST.MaxwellBoundCond: (0.8, 0.7, 0.45, 1.0), # Medium Light Gold
ST.MaxwellMonitor: (0.7, 0.6, 0.4, 1.0), # Medium Gold
ST.MaxwellFDTDSim: (0.6, 0.5, 0.35, 1.0), # Medium Dark Gold
ST.MaxwellFDTDSimData: (0.6, 0.5, 0.35, 1.0), # Medium Dark Gold
ST.MaxwellSimGrid: (0.5, 0.4, 0.3, 1.0), # Dark Gold
ST.MaxwellSimGridAxis: (0.4, 0.3, 0.25, 1.0), # Darkest Gold
ST.MaxwellSimDomain: (0.4, 0.3, 0.25, 1.0), # Darkest Gold

View File

@ -4,64 +4,65 @@ SOCKET_SHAPES = {
# Basic
ST.Any: "CIRCLE",
ST.Bool: "CIRCLE",
ST.String: "SQUARE",
ST.FilePath: "SQUARE",
ST.String: "CIRCLE",
ST.FilePath: "CIRCLE",
# Number
ST.IntegerNumber: "CIRCLE",
ST.RationalNumber: "CIRCLE",
ST.RealNumber: "CIRCLE",
ST.ComplexNumber: "CIRCLE_DOT",
ST.ComplexNumber: "CIRCLE",
# Vector
ST.Integer2DVector: "SQUARE_DOT",
ST.Real2DVector: "SQUARE_DOT",
ST.Complex2DVector: "DIAMOND_DOT",
ST.Integer3DVector: "SQUARE_DOT",
ST.Real3DVector: "SQUARE_DOT",
ST.Complex3DVector: "DIAMOND_DOT",
ST.Integer2DVector: "CIRCLE",
ST.Real2DVector: "CIRCLE",
ST.Complex2DVector: "CIRCLE",
ST.Integer3DVector: "CIRCLE",
ST.Real3DVector: "CIRCLE",
ST.Complex3DVector: "CIRCLE",
# Physical
ST.PhysicalUnitSystem: "CIRCLE",
ST.PhysicalTime: "CIRCLE",
ST.PhysicalAngle: "DIAMOND",
ST.PhysicalLength: "SQUARE",
ST.PhysicalArea: "SQUARE",
ST.PhysicalVolume: "SQUARE",
ST.PhysicalPoint2D: "DIAMOND",
ST.PhysicalPoint3D: "DIAMOND",
ST.PhysicalSize2D: "SQUARE",
ST.PhysicalSize3D: "SQUARE",
ST.PhysicalAngle: "CIRCLE",
ST.PhysicalLength: "CIRCLE",
ST.PhysicalArea: "CIRCLE",
ST.PhysicalVolume: "CIRCLE",
ST.PhysicalPoint2D: "CIRCLE",
ST.PhysicalPoint3D: "CIRCLE",
ST.PhysicalSize2D: "CIRCLE",
ST.PhysicalSize3D: "CIRCLE",
ST.PhysicalMass: "CIRCLE",
ST.PhysicalSpeed: "CIRCLE",
ST.PhysicalAccelScalar: "CIRCLE",
ST.PhysicalForceScalar: "CIRCLE",
ST.PhysicalAccel3D: "SQUARE_DOT",
ST.PhysicalForce3D: "SQUARE_DOT",
ST.PhysicalPol: "DIAMOND",
ST.PhysicalAccel3D: "CIRCLE",
ST.PhysicalForce3D: "CIRCLE",
ST.PhysicalPol: "CIRCLE",
ST.PhysicalFreq: "CIRCLE",
# Blender
ST.BlenderObject: "SQUARE",
ST.BlenderCollection: "SQUARE",
ST.BlenderObject: "DIAMOND",
ST.BlenderCollection: "DIAMOND",
ST.BlenderImage: "DIAMOND",
ST.BlenderGeoNodes: "DIAMOND",
ST.BlenderText: "SQUARE",
ST.BlenderText: "DIAMOND",
# Maxwell
ST.MaxwellSource: "CIRCLE",
ST.MaxwellTemporalShape: "CIRCLE",
ST.MaxwellMedium: "CIRCLE",
ST.MaxwellMediumNonLinearity: "CIRCLE",
ST.MaxwellStructure: "SQUARE",
ST.MaxwellBoundConds: "SQUARE",
ST.MaxwellBoundCond: "DIAMOND",
ST.MaxwellStructure: "CIRCLE",
ST.MaxwellBoundConds: "CIRCLE",
ST.MaxwellBoundCond: "CIRCLE",
ST.MaxwellMonitor: "CIRCLE",
ST.MaxwellFDTDSim: "SQUARE",
ST.MaxwellSimGrid: "SQUARE",
ST.MaxwellSimGridAxis: "DIAMOND",
ST.MaxwellSimDomain: "SQUARE",
ST.MaxwellFDTDSim: "CIRCLE",
ST.MaxwellFDTDSimData: "CIRCLE",
ST.MaxwellSimGrid: "CIRCLE",
ST.MaxwellSimGridAxis: "CIRCLE",
ST.MaxwellSimDomain: "CIRCLE",
# Tidy3D
ST.Tidy3DCloudTask: "CIRCLE",
ST.Tidy3DCloudTask: "DIAMOND",
}

View File

@ -53,6 +53,7 @@ class SocketType(BlenderTypeEnum):
MaxwellMonitor = enum.auto()
MaxwellFDTDSim = enum.auto()
MaxwellFDTDSimData = enum.auto()
MaxwellSimDomain = enum.auto()
MaxwellSimGrid = enum.auto()
MaxwellSimGridAxis = enum.auto()

View File

@ -8,7 +8,8 @@ from . import structures
#from . import bounds
from . import monitors
from . import simulations
#from . import utilities
from . import utilities
from . import viz
BL_REGISTER = [
#*kitchen_sink.BL_REGISTER,
@ -20,7 +21,8 @@ BL_REGISTER = [
# *bounds.BL_REGISTER,
*monitors.BL_REGISTER,
*simulations.BL_REGISTER,
# *utilities.BL_REGISTER,
*utilities.BL_REGISTER,
*viz.BL_REGISTER,
]
BL_NODES = {
#**kitchen_sink.BL_NODES,
@ -32,5 +34,6 @@ BL_NODES = {
# **bounds.BL_NODES,
**monitors.BL_NODES,
**simulations.BL_NODES,
# **utilities.BL_NODES,
**utilities.BL_NODES,
**viz.BL_NODES,
}

View File

@ -14,6 +14,12 @@ from .. import sockets
CACHE: dict[str, typ.Any] = {} ## By Instance UUID
## NOTE: CACHE does not persist between file loads.
_DEFAULT_LOOSE_SOCKET_SER = json.dumps({
"socket_names": [],
"socket_def_names": [],
"models": [],
})
class MaxwellSimNode(bpy.types.Node):
# Fundamentals
node_type: ct.NodeType
@ -115,6 +121,14 @@ class MaxwellSimNode(bpy.types.Node):
"_callback_type"
) and method._callback_type == "on_show_plot"
}
cls._on_init = {
method
for attr_name in dir(cls)
if hasattr(
method := getattr(cls, attr_name),
"_callback_type"
) and method._callback_type == "on_init"
}
# Setup Socket Set Dropdown
if not len(cls.input_socket_sets) + len(cls.output_socket_sets) > 0:
@ -151,7 +165,7 @@ class MaxwellSimNode(bpy.types.Node):
)
],
default=socket_set_names[0],
update=(lambda self, _: self.sync_sockets()),
update=lambda self, context: self.sync_active_socket_set(context),
)
# Setup Preset Dropdown
@ -179,6 +193,10 @@ class MaxwellSimNode(bpy.types.Node):
####################
# - Generic Properties
####################
def sync_active_socket_set(self, context):
self.sync_sockets()
self.sync_prop("active_socket_set", context)
def sync_sim_node_name(self, context):
if (mobjs := CACHE[self.instance_id].get("managed_objs")) is None:
return
@ -276,11 +294,6 @@ class MaxwellSimNode(bpy.types.Node):
####################
# - Loose Sockets
####################
_DEFAULT_LOOSE_SOCKET_SER = json.dumps({
"socket_names": [],
"socket_def_names": [],
"models": [],
})
# Loose Sockets
## Only Blender props persist as instance data
ser_loose_input_sockets: bpy.props.StringProperty(
@ -336,7 +349,8 @@ class MaxwellSimNode(bpy.types.Node):
def loose_input_sockets(
self, value: dict[str, ct.schemas.SocketDef],
) -> None:
self.ser_loose_input_sockets = self._ser_loose_sockets(value)
if not value: self.ser_loose_input_sockets = _DEFAULT_LOOSE_SOCKET_SER
else: self.ser_loose_input_sockets = self._ser_loose_sockets(value)
# Synchronize Sockets
self.sync_sockets()
@ -346,7 +360,8 @@ class MaxwellSimNode(bpy.types.Node):
def loose_output_sockets(
self, value: dict[str, ct.schemas.SocketDef],
) -> None:
self.ser_loose_output_sockets = self._ser_loose_sockets(value)
if not value: self.ser_loose_output_sockets = _DEFAULT_LOOSE_SOCKET_SER
else: self.ser_loose_output_sockets = self._ser_loose_sockets(value)
# Synchronize Sockets
self.sync_sockets()
@ -457,7 +472,7 @@ class MaxwellSimNode(bpy.types.Node):
col = layout.column(align=False)
if self.use_sim_node_name:
row = col.row(align=True)
row.label(text="", icon="EVENT_N")
row.label(text="", icon="FILE_TEXT")
row.prop(self, "sim_node_name", text="")
# Draw Name
@ -638,8 +653,11 @@ class MaxwellSimNode(bpy.types.Node):
self.sync_sockets()
# Apply Default Preset
if self.active_preset:
self.sync_active_preset()
if self.active_preset: self.sync_active_preset()
# Callbacks
for method in self._on_init:
method(self)
def update(self) -> None:
pass
@ -652,6 +670,18 @@ class MaxwellSimNode(bpy.types.Node):
CACHE[self.instance_id] = {}
node_tree = self.id_data
# Unlock
## This is one approach to the "deleted locked nodes" problem.
## Essentially, deleting a locked node will unlock along input chain.
## It also counts if any of the input sockets are linked and locked.
## Thus, we prevent "dangling locks".
## TODO: Don't even allow deleting a locked node.
if self.locked or any(
bl_socket.is_linked and bl_socket.locked
for bl_socket in self.inputs.values()
):
self.trigger_action("disable_lock")
# Free Managed Objects
for managed_obj in self.managed_objs.values():
managed_obj.free()
@ -674,6 +704,7 @@ def chain_event_decorator(
"on_value_changed",
"on_show_preview",
"on_show_plot",
"on_init",
],
index_by: typ.Any | None = None,
extra_data: dict[str, typ.Any] | None = None,
@ -938,3 +969,30 @@ def on_show_plot(
managed_objs=managed_objs,
req_params=req_params,
)
def on_init(
kind: ct.DataFlowKind = ct.DataFlowKind.Value,
input_sockets: set[str] = set(),
output_sockets: set[str] = set(),
props: set[str] = set(),
managed_objs: set[str] = set(),
):
req_params = {"self"} | (
{"input_sockets"} if input_sockets else set()
) | (
{"output_sockets"} if output_sockets else set()
) | (
{"props"} if props else set()
) | (
{"managed_objs"} if managed_objs else set()
)
return chain_event_decorator(
callback_type="on_init",
kind=kind,
input_sockets=input_sockets,
output_sockets=output_sockets,
props=props,
managed_objs=managed_objs,
req_params=req_params,
)

View File

@ -3,6 +3,7 @@ import sympy as sp
import sympy.physics.units as spu
import scipy as sc
from .....utils import extra_sympy_units as spux
from ... import contracts as ct
from ... import sockets
from .. import base
@ -18,16 +19,31 @@ class WaveConstantNode(base.MaxwellSimNode):
bl_label = "Wave Constant"
input_socket_sets = {
# Single
"Vacuum WL": {
"WL": sockets.PhysicalLengthSocketDef(),
"WL": sockets.PhysicalLengthSocketDef(
default_value=500*spu.nm,
default_unit=spu.nm,
),
},
"Frequency": {
"Freq": sockets.PhysicalFreqSocketDef(),
"Freq": sockets.PhysicalFreqSocketDef(
default_value=500*spux.THz,
default_unit=spux.THz,
),
},
# Listy
"Vacuum WLs": {
"WLs": sockets.PhysicalLengthSocketDef(
is_list=True,
),
},
"Frequencies": {
"Freqs": sockets.PhysicalFreqSocketDef(
is_list=True,
),
},
}
output_sockets = {
"WL": sockets.PhysicalLengthSocketDef(),
"Freq": sockets.PhysicalFreqSocketDef(),
}
####################
@ -35,33 +51,107 @@ class WaveConstantNode(base.MaxwellSimNode):
####################
@base.computes_output_socket(
"WL",
kind=ct.DataFlowKind.Value,
input_sockets={"WL", "Freq"},
)
def compute_vac_wl(self, input_sockets: dict) -> sp.Expr:
if (vac_wl := input_sockets["WL"]):
if (vac_wl := input_sockets["WL"]) is not None:
return vac_wl
elif (freq := input_sockets["Freq"]):
elif (freq := input_sockets["Freq"]) is not None:
return spu.convert_to(
VAC_SPEED_OF_LIGHT / freq,
spu.meter,
)
raise RuntimeError("Vac WL and Freq are both non-truthy")
raise RuntimeError("Vac WL and Freq are both None")
@base.computes_output_socket(
"Freq",
input_sockets={"WL", "Freq"},
)
def compute_freq(self, input_sockets: dict) -> sp.Expr:
if (vac_wl := input_sockets["WL"]):
if (vac_wl := input_sockets["WL"]) is not None:
return spu.convert_to(
VAC_SPEED_OF_LIGHT / vac_wl,
spu.hertz,
)
elif (freq := input_sockets["Freq"]):
elif (freq := input_sockets["Freq"]) is not None:
return freq
raise RuntimeError("Vac WL and Freq are both None")
####################
# - Listy Callbacks
####################
@base.computes_output_socket(
"WLs",
input_sockets={"WLs", "Freqs"},
)
def compute_vac_wls(self, input_sockets: dict) -> sp.Expr:
if (vac_wls := input_sockets["WLs"]) is not None:
return vac_wls
elif (freqs := input_sockets["Freqs"]) is not None:
return [
spu.convert_to(
VAC_SPEED_OF_LIGHT / freq,
spu.meter,
)
for freq in freqs
][::-1]
raise RuntimeError("Vac WLs and Freqs are both None")
@base.computes_output_socket(
"Freqs",
input_sockets={"WLs", "Freqs"},
)
def compute_freqs(self, input_sockets: dict) -> sp.Expr:
if (vac_wls := input_sockets["WLs"]) is not None:
return [
spu.convert_to(
VAC_SPEED_OF_LIGHT / vac_wl,
spu.hertz,
)
for vac_wl in vac_wls
][::-1]
elif (freqs := input_sockets["Freqs"]) is not None:
return freqs
raise RuntimeError("Vac WLs and Freqs are both None")
####################
# - Callbacks
####################
@base.on_value_changed(
prop_name="active_socket_set",
props={"active_socket_set"}
)
def on_value_changed__active_socket_set(self, props: dict):
# Singular: Normal Output Sockets
if props["active_socket_set"] in {"Vacuum WL", "Frequency"}:
self.loose_output_sockets = {}
self.loose_output_sockets = {
"Freq": sockets.PhysicalFreqSocketDef(),
"WL": sockets.PhysicalLengthSocketDef(),
}
# Plural: Listy Output Sockets
elif props["active_socket_set"] in {"Vacuum WLs", "Frequencies"}:
self.loose_output_sockets = {}
self.loose_output_sockets = {
"Freqs": sockets.PhysicalFreqSocketDef(is_list=True),
"WLs": sockets.PhysicalLengthSocketDef(is_list=True),
}
else:
msg = f"Active socket set invalid for wave constant: {props['active_socket_set']}"
raise RuntimeError(msg)
@base.on_init()
def on_init(self):
self.on_value_changed__active_socket_set()
####################
# - Blender Registration
####################
@ -70,6 +160,6 @@ BL_REGISTER = [
]
BL_NODES = {
ct.NodeType.WaveConstant: (
ct.NodeCategory.MAXWELLSIM_INPUTS_CONSTANTS
ct.NodeCategory.MAXWELLSIM_INPUTS
)
}

View File

@ -8,17 +8,14 @@ import bpy
import sympy as sp
import pydantic as pyd
import tidy3d as td
import tidy3d.web as _td_web
import tidy3d.web as td_web
from ......utils.auth_td_web import g_td_web, is_td_web_authed
from ......utils import tdcloud
from .... import contracts as ct
from .... import sockets
from ... import base
@functools.cache
def task_status(task_id: str):
task = _td_web.api.webapi.get_info(task_id)
return task.status
CACHE = {}
####################
# - Node
@ -29,42 +26,78 @@ class Tidy3DWebImporterNode(base.MaxwellSimNode):
input_sockets = {
"Cloud Task": sockets.Tidy3DCloudTaskSocketDef(
task_exists=True,
should_exist=True,
),
"Cache Path": sockets.FilePathSocketDef(
default_path=Path("loaded_simulation.hdf5")
)
}
output_sockets = {}
####################
# - UI
####################
def draw_info(self, context, layout): pass
####################
# - Output Methods
####################
@base.computes_output_socket(
"FDTD Sim Data",
input_sockets={"Cloud Task", "Cache Path"},
)
def compute_fdtd_sim_data(self, input_sockets: dict) -> str:
global CACHE
if not CACHE.get(self.instance_id):
CACHE[self.instance_id] = {"fdtd_sim_data": None}
if CACHE[self.instance_id]["fdtd_sim_data"] is not None:
return CACHE[self.instance_id]["fdtd_sim_data"]
if not (
(cloud_task := input_sockets["Cloud Task"]) is not None
and isinstance(cloud_task, tdcloud.CloudTask)
and cloud_task.status == "success"
):
msg ="Won't attempt getting SimData"
raise RuntimeError(msg)
# Load the Simulation
cache_path = input_sockets["Cache Path"]
if cache_path is None:
print("CACHE PATH IS NONE WHY")
return ## I guess?
if cache_path.is_file():
sim_data = td.SimulationData.from_file(str(cache_path))
else:
sim_data = td_web.api.webapi.load(
cloud_task.task_id,
path=str(cache_path),
)
CACHE[self.instance_id]["fdtd_sim_data"] = sim_data
return sim_data
@base.computes_output_socket(
"FDTD Sim",
input_sockets={"Cloud Task"},
)
def compute_cloud_task(self, input_sockets: dict) -> str:
if not isinstance(task_id := input_sockets["Cloud Task"], str):
msg ="Input task does not exist"
raise ValueError(msg)
def compute_fdtd_sim(self, input_sockets: dict) -> str:
if not isinstance(
cloud_task := input_sockets["Cloud Task"],
tdcloud.CloudTask
):
msg ="Input cloud task does not exist"
raise RuntimeError(msg)
# Load the Simulation
td_web = g_td_web(None) ## Presume already auth'ed
with tempfile.NamedTemporaryFile(delete=False) as f:
_path_tmp = Path(f.name)
_path_tmp.rename(f.name + ".json")
path_tmp = Path(f.name + ".json")
cloud_sim = _td_web.api.webapi.load_simulation(
task_id,
sim = td_web.api.webapi.load_simulation(
cloud_task.task_id,
path=str(path_tmp),
)
) ## TODO: Don't use td_web directly. Only through tdcloud
Path(path_tmp).unlink()
return cloud_sim
return sim
####################
# - Update
@ -74,22 +107,22 @@ class Tidy3DWebImporterNode(base.MaxwellSimNode):
input_sockets={"Cloud Task"}
)
def on_value_changed__cloud_task(self, input_sockets: dict):
task_status.cache_clear()
if (
(task_id := input_sockets["Cloud Task"]) is None
or isinstance(task_id, dict)
or task_status(task_id) != "success"
or not is_td_web_authed
(cloud_task := input_sockets["Cloud Task"]) is not None
and isinstance(cloud_task, tdcloud.CloudTask)
and cloud_task.status == "success"
):
if self.loose_output_sockets: self.loose_output_sockets = {}
self.loose_output_sockets = {
"FDTD Sim Data": sockets.MaxwellFDTDSimDataSocketDef(),
"FDTD Sim": sockets.MaxwellFDTDSimSocketDef(),
}
return
td_web = g_td_web(None) ## Presume already auth'ed
self.loose_output_sockets = {}
self.loose_output_sockets = {
"FDTD Sim": sockets.MaxwellFDTDSimSocketDef(),
"FDTD Sim Data": sockets.AnySocketDef(),
}
@base.on_init()
def on_init(self):
self.on_value_changed__cloud_task()
####################

View File

@ -34,7 +34,7 @@ class LibraryMediumNode(base.MaxwellSimNode):
managed_obj_defs = {
"nk_plot": ct.schemas.ManagedObjDef(
mk=lambda name: managed_objs.ManagedBLImage(name),
name_prefix="nkplot_",
name_prefix="",
)
}

View File

@ -1,17 +1,17 @@
from . import eh_field_monitor
#from . import field_power_flux_monitor
from . import field_power_flux_monitor
#from . import epsilon_tensor_monitor
#from . import diffraction_monitor
BL_REGISTER = [
*eh_field_monitor.BL_REGISTER,
# *field_power_flux_monitor.BL_REGISTER,
*field_power_flux_monitor.BL_REGISTER,
# *epsilon_tensor_monitor.BL_REGISTER,
# *diffraction_monitor.BL_REGISTER,
]
BL_NODES = {
**eh_field_monitor.BL_NODES,
# **field_power_flux_monitor.BL_NODES,
**field_power_flux_monitor.BL_NODES,
# **epsilon_tensor_monitor.BL_NODES,
# **diffraction_monitor.BL_NODES,
}

View File

@ -26,18 +26,27 @@ class EHFieldMonitorNode(base.MaxwellSimNode):
# - Sockets
####################
input_sockets = {
"Rec Start": sockets.PhysicalTimeSocketDef(),
"Rec Stop": sockets.PhysicalTimeSocketDef(
default_value=200*spux.fs
),
"Center": sockets.PhysicalPoint3DSocketDef(),
"Size": sockets.PhysicalSize3DSocketDef(),
"Samples/Space": sockets.Integer3DVectorSocketDef(
default_value=sp.Matrix([10, 10, 10])
),
"Samples/Time": sockets.IntegerNumberSocketDef(
default_value=100,
),
}
input_socket_sets = {
"Freq Domain": {
"Freqs": sockets.PhysicalFreqSocketDef(
is_list=True,
),
},
"Time Domain": {
"Rec Start": sockets.PhysicalTimeSocketDef(),
"Rec Stop": sockets.PhysicalTimeSocketDef(
default_value=200*spux.fs
),
"Samples/Time": sockets.IntegerNumberSocketDef(
default_value=100,
),
},
}
output_sockets = {
"Monitor": sockets.MaxwellMonitorSocketDef(),
@ -70,33 +79,49 @@ class EHFieldMonitorNode(base.MaxwellSimNode):
"Monitor",
input_sockets={
"Rec Start", "Rec Stop", "Center", "Size", "Samples/Space",
"Samples/Time",
"Samples/Time", "Freqs",
},
props={"sim_node_name"}
props={"active_socket_set", "sim_node_name"}
)
def compute_monitor(self, input_sockets: dict, props: dict) -> td.FieldTimeMonitor:
_rec_start = input_sockets["Rec Start"]
_rec_stop = input_sockets["Rec Stop"]
_center = input_sockets["Center"]
_size = input_sockets["Size"]
_samples_space = input_sockets["Samples/Space"]
samples_time = input_sockets["Samples/Time"]
rec_start = spu.convert_to(_rec_start, spu.second) / spu.second
rec_stop = spu.convert_to(_rec_stop, spu.second) / spu.second
center = tuple(spu.convert_to(_center, spu.um) / spu.um)
size = tuple(spu.convert_to(_size, spu.um) / spu.um)
samples_space = tuple(_samples_space)
return td.FieldTimeMonitor(
center=center,
size=size,
name=props["sim_node_name"],
start=rec_start,
stop=rec_stop,
interval=samples_time,
interval_space=samples_space,
)
if props["active_socket_set"] == "Freq Domain":
freqs = input_sockets["Freqs"]
return td.FieldMonitor(
center=center,
size=size,
name=props["sim_node_name"],
interval_space=samples_space,
freqs=[
float(spu.convert_to(freq, spu.hertz) / spu.hertz)
for freq in freqs
],
)
else: ## Time Domain
_rec_start = input_sockets["Rec Start"]
_rec_stop = input_sockets["Rec Stop"]
samples_time = input_sockets["Samples/Time"]
rec_start = spu.convert_to(_rec_start, spu.second) / spu.second
rec_stop = spu.convert_to(_rec_stop, spu.second) / spu.second
return td.FieldTimeMonitor(
center=center,
size=size,
name=props["sim_node_name"],
start=rec_start,
stop=rec_stop,
interval=samples_time,
interval_space=samples_space,
)
####################
# - Preview - Changes to Input Sockets

View File

@ -1,6 +1,201 @@
import typing as typ
import functools
import bpy
import tidy3d as td
import sympy as sp
import sympy.physics.units as spu
import numpy as np
import scipy as sc
from .....utils import analyze_geonodes
from .....utils import extra_sympy_units as spux
from ... import contracts as ct
from ... import sockets
from ... import managed_objs
from .. import base
GEONODES_MONITOR_BOX = "monitor_flux_box"
class FieldPowerFluxMonitorNode(base.MaxwellSimNode):
node_type = ct.NodeType.FieldPowerFluxMonitor
bl_label = "Field Power Flux Monitor"
use_sim_node_name = True
####################
# - Sockets
####################
input_sockets = {
"Center": sockets.PhysicalPoint3DSocketDef(),
"Size": sockets.PhysicalSize3DSocketDef(),
"Samples/Space": sockets.Integer3DVectorSocketDef(
default_value=sp.Matrix([10, 10, 10])
),
"Direction": sockets.BoolSocketDef(),
}
input_socket_sets = {
"Freq Domain": {
"Freqs": sockets.PhysicalFreqSocketDef(
is_list=True,
),
},
"Time Domain": {
"Rec Start": sockets.PhysicalTimeSocketDef(),
"Rec Stop": sockets.PhysicalTimeSocketDef(
default_value=200*spux.fs
),
"Samples/Time": sockets.IntegerNumberSocketDef(
default_value=100,
),
},
}
output_sockets = {
"Monitor": sockets.MaxwellMonitorSocketDef(),
}
managed_obj_defs = {
"monitor_box": ct.schemas.ManagedObjDef(
mk=lambda name: managed_objs.ManagedBLObject(name),
name_prefix="",
)
}
####################
# - Properties
####################
####################
# - UI
####################
def draw_props(self, context, layout):
pass
def draw_info(self, context, col):
pass
####################
# - Output Sockets
####################
@base.computes_output_socket(
"Monitor",
input_sockets={
"Rec Start", "Rec Stop", "Center", "Size", "Samples/Space",
"Samples/Time", "Freqs", "Direction",
},
props={"active_socket_set", "sim_node_name"}
)
def compute_monitor(self, input_sockets: dict, props: dict) -> td.FieldTimeMonitor:
_center = input_sockets["Center"]
_size = input_sockets["Size"]
_samples_space = input_sockets["Samples/Space"]
center = tuple(spu.convert_to(_center, spu.um) / spu.um)
size = tuple(spu.convert_to(_size, spu.um) / spu.um)
samples_space = tuple(_samples_space)
direction = "+" if input_sockets["Direction"] else "-"
if props["active_socket_set"] == "Freq Domain":
freqs = input_sockets["Freqs"]
return td.FluxMonitor(
center=center,
size=size,
name=props["sim_node_name"],
interval_space=samples_space,
freqs=[
float(spu.convert_to(freq, spu.hertz) / spu.hertz)
for freq in freqs
],
normal_dir=direction,
)
else: ## Time Domain
_rec_start = input_sockets["Rec Start"]
_rec_stop = input_sockets["Rec Stop"]
samples_time = input_sockets["Samples/Time"]
rec_start = spu.convert_to(_rec_start, spu.second) / spu.second
rec_stop = spu.convert_to(_rec_stop, spu.second) / spu.second
return td.FieldTimeMonitor(
center=center,
size=size,
name=props["sim_node_name"],
start=rec_start,
stop=rec_stop,
interval=samples_time,
interval_space=samples_space,
)
####################
# - Preview - Changes to Input Sockets
####################
@base.on_value_changed(
socket_name={"Center", "Size"},
input_sockets={"Center", "Size", "Direction"},
managed_objs={"monitor_box"},
)
def on_value_changed__center_size(
self,
input_sockets: dict,
managed_objs: dict[str, ct.schemas.ManagedObj],
):
_center = input_sockets["Center"]
center = tuple([
float(el)
for el in spu.convert_to(_center, spu.um) / spu.um
])
_size = input_sockets["Size"]
size = tuple([
float(el)
for el in spu.convert_to(_size, spu.um) / spu.um
])
## TODO: Preview unit system?? Presume um for now
# Retrieve Hard-Coded GeoNodes and Analyze Input
geo_nodes = bpy.data.node_groups[GEONODES_MONITOR_BOX]
geonodes_interface = analyze_geonodes.interface(
geo_nodes, direc="INPUT"
)
# Sync Modifier Inputs
managed_objs["monitor_box"].sync_geonodes_modifier(
geonodes_node_group=geo_nodes,
geonodes_identifier_to_value={
geonodes_interface["Size"].identifier: size,
geonodes_interface["Direction"].identifier: input_sockets["Direction"],
## TODO: Use 'bl_socket_map.value_to_bl`!
## - This accounts for auto-conversion, unit systems, etc. .
## - We could keep it in the node base class...
## - ...But it needs aligning with Blender, too. Hmm.
}
)
# Sync Object Position
managed_objs["monitor_box"].bl_object("MESH").location = center
####################
# - Preview - Show Preview
####################
@base.on_show_preview(
managed_objs={"monitor_box"},
)
def on_show_preview(
self,
managed_objs: dict[str, ct.schemas.ManagedObj],
):
managed_objs["monitor_box"].show_preview("MESH")
self.on_value_changed__center_size()
####################
# - Blender Registration
####################
BL_REGISTER = []
BL_NODES = {}
BL_REGISTER = [
FieldPowerFluxMonitorNode,
]
BL_NODES = {
ct.NodeType.FieldPowerFluxMonitor: (
ct.NodeCategory.MAXWELLSIM_MONITORS
)
}

View File

@ -11,232 +11,185 @@ import pydantic as pyd
import tidy3d as td
import tidy3d.web as _td_web
from ......utils.auth_td_web import g_td_web, is_td_web_authed
from ......utils import tdcloud
from .... import contracts as ct
from .... import sockets
from ... import base
####################
# - Task Getters
####################
## TODO: We should probably refactor this setup.
@functools.cache
def estimated_task_cost(task_id: str):
return _td_web.api.webapi.estimate_cost(task_id)
@functools.cache
def billed_task_cost(task_id: str):
return _td_web.api.webapi.real_cost(task_id)
@functools.cache
def task_status(task_id: str):
task = _td_web.api.webapi.get_info(task_id)
return task.status
####################
# - Progress Timer
####################
## TODO: We should probably refactor this too.
class Tidy3DTaskStatusModalOperator(bpy.types.Operator):
bl_idname = "blender_maxwell.tidy_3d_task_status_modal_operator"
bl_label = "Tidy3D Task Status Modal Operator"
_timer = None
_task_id = None
_node = None
_status = None
_reported_done = False
def modal(self, context, event):
# Retrieve New Status
task_status.cache_clear()
new_status = task_status(self._task_id)
if new_status != self._status:
task_status.cache_clear()
self._status = new_status
# Check Done Status
if self._status in {"success", "error"}:
# Report Done
if not self._reported_done:
self._node.trigger_action("value_changed")
self._reported_done = True
# Finish when Billing is Known
if not billed_task_cost(self._task_id):
billed_task_cost.cache_clear()
else:
return {'FINISHED'}
return {'PASS_THROUGH'}
def execute(self, context):
node = context.node
wm = context.window_manager
self._timer = wm.event_timer_add(2.0, window=context.window)
self._task_id = node.uploaded_task_id
self._node = node
self._status = task_status(self._task_id)
wm.modal_handler_add(self)
return {'RUNNING_MODAL'}
####################
# - Web Uploader / Loader / Runner / Releaser
####################
## TODO: We should probably refactor this too.
class Tidy3DWebUploadOperator(bpy.types.Operator):
bl_idname = "blender_maxwell.tidy_3d_web_upload_operator"
bl_label = "Tidy3D Web Upload Operator"
class UploadSimulation(bpy.types.Operator):
bl_idname = "blender_maxwell.nodes__upload_simulation"
bl_label = "Upload Tidy3D Simulation"
bl_description = "Upload the attached (locked) simulation, such that it is ready to run on the Tidy3D cloud"
@classmethod
def poll(cls, context):
space = context.space_data
return (
space.type == 'NODE_EDITOR'
and space.node_tree is not None
and space.node_tree.bl_idname == "MaxwellSimTreeType"
and is_td_web_authed()
and hasattr(context, "node")
hasattr(context, "node")
and hasattr(context.node, "node_type")
and context.node.node_type == ct.NodeType.Tidy3DWebExporter
and context.node.lock_tree
and tdcloud.IS_AUTHENTICATED
and not context.node.tracked_task_id
and context.node.inputs["FDTD Sim"].is_linked
)
def execute(self, context):
node = context.node
node.web_upload()
node.upload_sim()
return {'FINISHED'}
class Tidy3DLoadUploadedOperator(bpy.types.Operator):
bl_idname = "blender_maxwell.tidy_3d_load_uploaded_operator"
bl_label = "Tidy3D Load Uploaded Operator"
bl_description = "Load an already-uploaded simulation, as selected in the dropdown of the 'Cloud Task' socket"
class RunSimulation(bpy.types.Operator):
bl_idname = "blender_maxwell.nodes__run_simulation"
bl_label = "Run Tracked Tidy3D Sim"
bl_description = "Run the currently tracked simulation task"
@classmethod
def poll(cls, context):
space = context.space_data
return (
space.type == 'NODE_EDITOR'
and space.node_tree is not None
and space.node_tree.bl_idname == "MaxwellSimTreeType"
and is_td_web_authed()
and hasattr(context, "node")
and context.node.lock_tree
hasattr(context, "node")
and hasattr(context.node, "node_type")
and context.node.node_type == ct.NodeType.Tidy3DWebExporter
and tdcloud.IS_AUTHENTICATED
and context.node.tracked_task_id
and (task_info := tdcloud.TidyCloudTasks.task_info(
context.node.tracked_task_id
)) is not None
and task_info.status == "draft"
)
def execute(self, context):
node = context.node
node.load_uploaded_task()
# Load Simulation to Compare
## Load Local Sim
local_sim = node._compute_input("FDTD Sim")
## Load Cloud Sim
task_id = node.compute_output("Cloud Task")
with tempfile.NamedTemporaryFile(delete=False) as f:
_path_tmp = Path(f.name)
_path_tmp.rename(f.name + ".json")
path_tmp = Path(f.name + ".json")
cloud_sim = _td_web.api.webapi.load_simulation(task_id, path=str(path_tmp))
Path(path_tmp).unlink()
## Compare
if local_sim != cloud_sim:
node.release_uploaded_task()
msg = "Loaded simulation doesn't match input simulation"
raise ValueError(msg)
node.run_tracked_task()
return {'FINISHED'}
class RunUploadedTidy3DSim(bpy.types.Operator):
bl_idname = "blender_maxwell.run_uploaded_tidy_3d_sim"
bl_label = "Run Uploaded Tidy3D Sim"
bl_description = "Run the currently uploaded (and loaded) simulation"
class ReloadTrackedTask(bpy.types.Operator):
bl_idname = "blender_maxwell.nodes__reload_tracked_task"
bl_label = "Reload Tracked Tidy3D Cloud Task"
bl_description = "Reload the currently tracked simulation task"
@classmethod
def poll(cls, context):
space = context.space_data
return (
space.type == 'NODE_EDITOR'
and space.node_tree is not None
and space.node_tree.bl_idname == "MaxwellSimTreeType"
and is_td_web_authed()
and hasattr(context, "node")
and context.node.lock_tree
and context.node.uploaded_task_id
and task_status(context.node.uploaded_task_id) == "draft"
hasattr(context, "node")
and hasattr(context.node, "node_type")
and context.node.node_type == ct.NodeType.Tidy3DWebExporter
and tdcloud.IS_AUTHENTICATED
and context.node.tracked_task_id
)
def execute(self, context):
node = context.node
node.run_uploaded_task()
bpy.ops.blender_maxwell.tidy_3d_task_status_modal_operator()
if (
cloud_task := tdcloud.TidyCloudTasks.task(node.tracked_task_id)
) is None:
msg = "Tried to reload tracked task, but it doesn't exist"
raise RuntimeError(msg)
cloud_task = tdcloud.TidyCloudTasks.update_task(cloud_task)
return {'FINISHED'}
class ReleaseTidy3DExportOperator(bpy.types.Operator):
bl_idname = "blender_maxwell.release_tidy_3d_export_operator"
bl_label = "Release Tidy3D Export Operator"
class EstCostTrackedTask(bpy.types.Operator):
bl_idname = "blender_maxwell.nodes__est_cost_tracked_task"
bl_label = "Est Cost of Tracked Tidy3D Cloud Task"
bl_description = "Reload the currently tracked simulation task"
@classmethod
def poll(cls, context):
space = context.space_data
return (
space.type == 'NODE_EDITOR'
and space.node_tree is not None
and space.node_tree.bl_idname == "MaxwellSimTreeType"
and is_td_web_authed()
and hasattr(context, "node")
and context.node.lock_tree
and context.node.uploaded_task_id
hasattr(context, "node")
and hasattr(context.node, "node_type")
and context.node.node_type == ct.NodeType.Tidy3DWebExporter
and tdcloud.IS_AUTHENTICATED
and context.node.tracked_task_id
)
def execute(self, context):
node = context.node
node.release_uploaded_task()
if (
task_info := tdcloud.TidyCloudTasks.task_info(context.node.tracked_task_id)
) is None:
msg = "Tried to estimate cost of tracked task, but it doesn't exist"
raise RuntimeError(msg)
node.cache_est_cost = task_info.cost_est()
return {'FINISHED'}
class ReleaseTrackedTask(bpy.types.Operator):
bl_idname = "blender_maxwell.nodes__release_tracked_task"
bl_label = "Release Tracked Tidy3D Cloud Task"
bl_description = "Release the currently tracked simulation task"
@classmethod
def poll(cls, context):
return (
hasattr(context, "node")
and hasattr(context.node, "node_type")
and context.node.node_type == ct.NodeType.Tidy3DWebExporter
#and tdcloud.IS_AUTHENTICATED
and context.node.tracked_task_id
)
def execute(self, context):
node = context.node
node.tracked_task_id = ""
return {'FINISHED'}
####################
# - Web Exporter Node
# - Node
####################
class Tidy3DWebExporterNode(base.MaxwellSimNode):
node_type = ct.NodeType.Tidy3DWebExporter
bl_label = "Tidy3DWebExporter"
bl_label = "Tidy3D Web Exporter"
input_sockets = {
"FDTD Sim": sockets.MaxwellFDTDSimSocketDef(),
"Cloud Task": sockets.Tidy3DCloudTaskSocketDef(
task_exists=False,
),
}
output_sockets = {
"Cloud Task": sockets.Tidy3DCloudTaskSocketDef(
task_exists=True,
should_exist=False,
),
}
####################
# - Properties
####################
lock_tree: bpy.props.BoolProperty(
name="Whether to lock the attached tree",
description="Whether or not to lock the attached tree",
default=False,
update=(lambda self, context: self.sync_lock_tree(context)),
update=lambda self, context: self.sync_lock_tree(context),
)
uploaded_task_id: bpy.props.StringProperty(
name="Uploaded Task ID",
description="The uploaded task ID",
tracked_task_id: bpy.props.StringProperty(
name="Tracked Task ID",
description="The currently tracked task ID",
default="",
update=lambda self, context: self.sync_tracked_task_id(context),
)
# Cache
cache_total_monitor_data: bpy.props.FloatProperty(
name="(Cached) Total Monitor Data",
description="Required storage space by all monitors",
default=0.0,
)
cache_est_cost: bpy.props.FloatProperty(
name="(Cached) Estimated Total Cost",
description="Est. Cost in FlexCompute units",
default=-1.0,
)
####################
# - Sync Methods
####################
def sync_lock_tree(self, context):
node_tree = self.id_data
if self.lock_tree:
self.trigger_action("enable_lock")
self.locked = False
@ -247,106 +200,200 @@ class Tidy3DWebExporterNode(base.MaxwellSimNode):
else:
self.trigger_action("disable_lock")
self.sync_prop("lock_tree", context)
def sync_tracked_task_id(self, context):
# Select Tracked Task
if self.tracked_task_id:
cloud_task = tdcloud.TidyCloudTasks.task(self.tracked_task_id)
task_info = tdcloud.TidyCloudTasks.task_info(self.tracked_task_id)
self.loose_output_sockets = {
"Cloud Task": sockets.Tidy3DCloudTaskSocketDef(
should_exist=True,
),
}
self.inputs["Cloud Task"].locked = True
# Release Tracked Task
else:
self.cache_est_cost = -1.0
self.loose_output_sockets = {}
self.inputs["Cloud Task"].sync_prepare_new_task()
self.inputs["Cloud Task"].locked = False
self.sync_prop("tracked_task_id", context)
####################
# - Output Socket Callbacks
####################
def web_upload(self):
if not (sim := self._compute_input("FDTD Sim")):
raise ValueError("Must attach simulation")
def validate_sim(self):
if (sim := self._compute_input("FDTD Sim")) is None:
msg = "Tried to validate simulation, but none is attached"
raise ValueError(msg)
if not (new_task_dict := self._compute_input("Cloud Task")):
raise ValueError("No valid cloud task defined")
sim.validate_pre_upload(source_required = True)
td_web = g_td_web(None) ## Presume already auth'ed
def upload_sim(self):
if (sim := self._compute_input("FDTD Sim")) is None:
msg = "Tried to upload simulation, but none is attached"
raise ValueError(msg)
self.uploaded_task_id = td_web.api.webapi.upload(
sim,
**new_task_dict,
if (
(new_task := self._compute_input("Cloud Task")) is None
or isinstance(
new_task,
tdcloud.CloudTask,
)
):
msg = "Tried to upload simulation to new task, but existing task was selected"
raise ValueError(msg)
# Create Cloud Task
cloud_task = tdcloud.TidyCloudTasks.mk_task(
task_name=new_task[0],
cloud_folder=new_task[1],
sim=sim,
upload_progress_cb=lambda uploaded_bytes: None, ## TODO: Use!
verbose=True,
)
self.inputs["Cloud Task"].sync_task_loaded(self.uploaded_task_id)
# Declare to Cloud Task that it Exists Now
## This will change the UI to not allow free-text input.
## If the socket is linked, this errors.
self.inputs["Cloud Task"].sync_created_new_task(cloud_task)
def load_uploaded_task(self):
self.inputs["Cloud Task"].sync_task_loaded(None)
self.uploaded_task_id = self._compute_input("Cloud Task")
# Track the Newly Uploaded Task ID
self.tracked_task_id = cloud_task.task_id
self.trigger_action("value_changed")
def run_tracked_task(self):
if (
cloud_task := tdcloud.TidyCloudTasks.task(self.tracked_task_id)
) is None:
msg = "Tried to run tracked task, but it doesn't exist"
raise RuntimeError(msg)
def run_uploaded_task(self):
td_web = g_td_web(None) ## Presume already auth'ed
td_web.api.webapi.start(self.uploaded_task_id)
self.trigger_action("value_changed")
def release_uploaded_task(self):
self.uploaded_task_id = ""
self.inputs["Cloud Task"].sync_task_released(specify_new_task=True)
self.trigger_action("value_changed")
cloud_task.submit()
tdcloud.TidyCloudTasks.update_task(cloud_task) ## TODO: Check that status is actually immediately updated.
####################
# - UI
####################
def draw_operators(self, context, layout):
is_authed = is_td_web_authed()
has_uploaded_task_id = bool(self.uploaded_task_id)
# Row: Run Simulation
# Row: Upload Sim Buttons
row = layout.row(align=True)
if has_uploaded_task_id: row.enabled = False
row.operator(
Tidy3DWebUploadOperator.bl_idname,
text="Upload Sim",
UploadSimulation.bl_idname,
text="Upload",
)
tree_lock_icon = "LOCKED" if self.lock_tree else "UNLOCKED"
row.prop(self, "lock_tree", toggle=True, icon=tree_lock_icon, text="")
# Row: Run Simulation
# Row: Run Sim Buttons
row = layout.row(align=True)
if is_authed and has_uploaded_task_id:
run_sim_text = f"Run Sim (~{estimated_task_cost(self.uploaded_task_id):.3f} credits)"
else:
run_sim_text = f"Run Sim"
row.operator(
RunUploadedTidy3DSim.bl_idname,
text=run_sim_text,
RunSimulation.bl_idname,
text="Run",
)
if has_uploaded_task_id:
if self.tracked_task_id:
tree_lock_icon = "LOOP_BACK"
row.operator(
ReleaseTidy3DExportOperator.bl_idname,
ReleaseTrackedTask.bl_idname,
icon="LOOP_BACK",
text="",
)
else:
row.operator(
Tidy3DLoadUploadedOperator.bl_idname,
icon="TRIA_UP_BAR",
text="",
def draw_info(self, context, layout):
# Connection Info
auth_icon = "CHECKBOX_HLT" if tdcloud.IS_AUTHENTICATED else "CHECKBOX_DEHLT"
conn_icon = "CHECKBOX_HLT" if tdcloud.IS_ONLINE else "CHECKBOX_DEHLT"
row = layout.row()
row.alignment = "CENTER"
row.label(text="Cloud Status")
box = layout.box()
split = box.split(factor=0.85)
## Split: Left Column
col = split.column(align=False)
col.label(text="Authed")
col.label(text="Connected")
## Split: Right Column
col = split.column(align=False)
col.label(icon=auth_icon)
col.label(icon=conn_icon)
# Simulation Info
if self.inputs["FDTD Sim"].is_linked:
row = layout.row()
row.alignment = "CENTER"
row.label(text="Sim Info")
box = layout.box()
split = box.split(factor=0.4)
## Split: Left Column
col = split.column(align=False)
col.label(text="𝝨 Output")
## Split: Right Column
col = split.column(align=False)
col.alignment = "RIGHT"
col.label(text=f"{self.cache_total_monitor_data / 1_000_000:.2f}MB")
# Cloud Task Info
if self.tracked_task_id and tdcloud.IS_AUTHENTICATED:
task_info = tdcloud.TidyCloudTasks.task_info(
self.tracked_task_id
)
if task_info is None: return
# Row: Simulation Progress
if is_authed and has_uploaded_task_id:
progress = {
"draft": (0.0, "Waiting to Run..."),
"initialized": (0.0, "Initializing..."),
"queued": (0.0, "Queued..."),
"preprocessing": (0.05, "Pre-processing..."),
"running": (0.2, "Running..."),
"postprocess": (0.85, "Post-processing..."),
"success": (1.0, f"Success (={billed_task_cost(self.uploaded_task_id)} credits)"),
"error": (1.0, f"Error (={billed_task_cost(self.uploaded_task_id)} credits)"),
}[task_status(self.uploaded_task_id)]
## Header
row = layout.row()
row.alignment = "CENTER"
row.label(text="Task Info")
layout.separator()
## Progress Bar
row = layout.row(align=True)
row.progress(
factor=progress[0],
factor=0.0,
type="BAR",
text=progress[1],
text=f"Status: {task_info.status.capitalize()}",
)
row.operator(
ReloadTrackedTask.bl_idname,
text="",
icon="FILE_REFRESH",
)
row.operator(
EstCostTrackedTask.bl_idname,
text="",
icon="SORTTIME",
)
## Information
box = layout.box()
split = box.split(factor=0.4)
## Split: Left Column
col = split.column(align=False)
col.label(text="Status")
col.label(text="Est. Cost")
col.label(text="Real Cost")
## Split: Right Column
cost_est = f"{self.cache_est_cost:.2f}" if self.cache_est_cost >= 0 else "TBD"
cost_real = f"{task_info.cost_real:.2f}" if task_info.cost_real is not None else "TBD"
col = split.column(align=False)
col.alignment = "RIGHT"
col.label(text=task_info.status.capitalize())
col.label(text=f"{cost_est} creds")
col.label(text=f"{cost_real} creds")
# Connection Information
####################
# - Output Methods
@ -355,35 +402,40 @@ class Tidy3DWebExporterNode(base.MaxwellSimNode):
"Cloud Task",
input_sockets={"Cloud Task"},
)
def compute_cloud_task(self, input_sockets: dict) -> str | None:
if self.uploaded_task_id: return self.uploaded_task_id
def compute_cloud_task(self, input_sockets: dict) -> tdcloud.CloudTask | None:
if isinstance(
cloud_task := input_sockets["Cloud Task"],
tdcloud.CloudTask
):
return cloud_task
return None
####################
# - Update
# - Output Methods
####################
@base.on_value_changed(socket_name="FDTD Sim")
def on_value_changed__fdtd_sim(self):
estimated_task_cost.cache_clear()
task_status.cache_clear()
billed_task_cost.cache_clear()
@base.on_value_changed(
socket_name="FDTD Sim",
input_sockets={"FDTD Sim"},
)
def on_value_changed__fdtd_sim(self, input_sockets):
if (sim := self._compute_input("FDTD Sim")) is None:
self.cache_total_monitor_data = 0
return
@base.on_value_changed(socket_name="Cloud Task")
def on_value_changed__cloud_task(self):
estimated_task_cost.cache_clear()
task_status.cache_clear()
billed_task_cost.cache_clear()
sim.validate_pre_upload(source_required = True)
self.cache_total_monitor_data = sum(sim.monitors_data_size.values())
####################
# - Blender Registration
####################
BL_REGISTER = [
Tidy3DWebUploadOperator,
Tidy3DTaskStatusModalOperator,
RunUploadedTidy3DSim,
Tidy3DLoadUploadedOperator,
ReleaseTidy3DExportOperator,
UploadSimulation,
RunSimulation,
ReloadTrackedTask,
EstCostTrackedTask,
ReleaseTrackedTask,
Tidy3DWebExporterNode,
]
BL_NODES = {

View File

@ -16,9 +16,15 @@ class FDTDSimNode(base.MaxwellSimNode):
input_sockets = {
"Domain": sockets.MaxwellSimDomainSocketDef(),
"BCs": sockets.MaxwellBoundCondsSocketDef(),
"Sources": sockets.MaxwellSourceSocketDef(),
"Structures": sockets.MaxwellStructureSocketDef(),
"Monitors": sockets.MaxwellMonitorSocketDef(),
"Sources": sockets.MaxwellSourceSocketDef(
is_list=True,
),
"Structures": sockets.MaxwellStructureSocketDef(
is_list=True,
),
"Monitors": sockets.MaxwellMonitorSocketDef(
is_list=True,
),
}
output_sockets = {
"FDTD Sim": sockets.MaxwellFDTDSimSocketDef(),
@ -41,12 +47,12 @@ class FDTDSimNode(base.MaxwellSimNode):
bounds = input_sockets["BCs"]
monitors = input_sockets["Monitors"]
if not isinstance(sources, list):
sources = [sources]
if not isinstance(structures, list):
structures = [structures]
if not isinstance(monitors, list):
monitors = [monitors]
#if not isinstance(sources, list):
# sources = [sources]
#if not isinstance(structures, list):
# structures = [structures]
#if not isinstance(monitors, list):
# monitors = [monitors]
return td.Simulation(
**sim_domain, ## run_time=, size=, grid=, medium=

View File

@ -32,7 +32,7 @@ class SimDomainNode(base.MaxwellSimNode):
managed_obj_defs = {
"domain_box": ct.schemas.ManagedObjDef(
mk=lambda name: managed_objs.ManagedBLObject(name),
name_prefix="domain_box_",
name_prefix="",
)
}

View File

@ -31,7 +31,7 @@ class PointDipoleSourceNode(base.MaxwellSimNode):
managed_obj_defs = {
"sphere_empty": ct.schemas.ManagedObjDef(
mk=lambda name: managed_objs.ManagedBLObject(name),
name_prefix="point_dipole_",
name_prefix="",
)
}
@ -47,14 +47,20 @@ class PointDipoleSourceNode(base.MaxwellSimNode):
("EZ", "Ez", "Electric field in z-dir"),
],
default="EX",
update=(lambda self, context: self.sync_prop("pol_axis")),
update=(lambda self, context: self.sync_prop("pol_axis", context)),
)
####################
# - UI
####################
def draw_props(self, context, layout):
layout.prop(self, "pol_axis", text="Pol Axis")
split = layout.split(factor=0.6)
col = split.column()
col.label(text="Pol Axis")
col = split.column()
col.prop(self, "pol_axis", text="")
####################
# - Output Socket Computation
@ -117,6 +123,7 @@ class PointDipoleSourceNode(base.MaxwellSimNode):
"EMPTY",
empty_display_type="SPHERE",
)
managed_objs["sphere_empty"].bl_object("EMPTY").empty_display_size = 0.2

View File

@ -15,7 +15,6 @@ from ... import base
class GaussianPulseTemporalShapeNode(base.MaxwellSimNode):
node_type = ct.NodeType.GaussianPulseTemporalShape
bl_label = "Gaussian Pulse Temporal Shape"
#bl_icon = ...

View File

@ -35,7 +35,7 @@ class GeoNodesStructureNode(base.MaxwellSimNode):
managed_obj_defs = {
"geometry": ct.schemas.ManagedObjDef(
mk=lambda name: managed_objs.ManagedBLObject(name),
name_prefix="geonodes_",
name_prefix="",
)
}

View File

@ -1,14 +1,14 @@
from . import box_structure
#from . import cylinder_structure
#from . import sphere_structure
from . import sphere_structure
BL_REGISTER = [
*box_structure.BL_REGISTER,
# *cylinder_structure.BL_REGISTER,
# *sphere_structure.BL_REGISTER,
*sphere_structure.BL_REGISTER,
]
BL_NODES = {
**box_structure.BL_NODES,
# **cylinder_structure.BL_NODES,
# **sphere_structure.BL_NODES,
**sphere_structure.BL_NODES,
}

View File

@ -2,10 +2,16 @@ import tidy3d as td
import sympy as sp
import sympy.physics.units as spu
import bpy
from ......utils import analyze_geonodes
from .... import contracts as ct
from .... import sockets
from .... import managed_objs
from ... import base
GEONODES_STRUCTURE_BOX = "structure_box"
class BoxStructureNode(base.MaxwellSimNode):
node_type = ct.NodeType.BoxStructure
bl_label = "Box Structure"
@ -16,12 +22,21 @@ class BoxStructureNode(base.MaxwellSimNode):
input_sockets = {
"Medium": sockets.MaxwellMediumSocketDef(),
"Center": sockets.PhysicalPoint3DSocketDef(),
"Size": sockets.PhysicalSize3DSocketDef(),
"Size": sockets.PhysicalSize3DSocketDef(
default_value=sp.Matrix([500, 500, 500]) * spu.nm
),
}
output_sockets = {
"Structure": sockets.MaxwellStructureSocketDef(),
}
managed_obj_defs = {
"structure_box": ct.schemas.ManagedObjDef(
mk=lambda name: managed_objs.ManagedBLObject(name),
name_prefix="",
)
}
####################
# - Output Socket Computation
####################
@ -45,6 +60,66 @@ class BoxStructureNode(base.MaxwellSimNode):
medium=medium,
)
####################
# - Preview - Changes to Input Sockets
####################
@base.on_value_changed(
socket_name={"Center", "Size"},
input_sockets={"Center", "Size"},
managed_objs={"structure_box"},
)
def on_value_changed__center_size(
self,
input_sockets: dict,
managed_objs: dict[str, ct.schemas.ManagedObj],
):
_center = input_sockets["Center"]
center = tuple([
float(el)
for el in spu.convert_to(_center, spu.um) / spu.um
])
_size = input_sockets["Size"]
size = tuple([
float(el)
for el in spu.convert_to(_size, spu.um) / spu.um
])
## TODO: Preview unit system?? Presume um for now
# Retrieve Hard-Coded GeoNodes and Analyze Input
geo_nodes = bpy.data.node_groups[GEONODES_STRUCTURE_BOX]
geonodes_interface = analyze_geonodes.interface(
geo_nodes, direc="INPUT"
)
# Sync Modifier Inputs
managed_objs["structure_box"].sync_geonodes_modifier(
geonodes_node_group=geo_nodes,
geonodes_identifier_to_value={
geonodes_interface["Size"].identifier: size,
## TODO: Use 'bl_socket_map.value_to_bl`!
## - This accounts for auto-conversion, unit systems, etc. .
## - We could keep it in the node base class...
## - ...But it needs aligning with Blender, too. Hmm.
}
)
# Sync Object Position
managed_objs["structure_box"].bl_object("MESH").location = center
####################
# - Preview - Show Preview
####################
@base.on_show_preview(
managed_objs={"structure_box"},
)
def on_show_preview(
self,
managed_objs: dict[str, ct.schemas.ManagedObj],
):
managed_objs["structure_box"].show_preview("MESH")
self.on_value_changed__center_size()
####################

View File

@ -2,43 +2,52 @@ import tidy3d as td
import sympy as sp
import sympy.physics.units as spu
from .... import contracts
import bpy
from ......utils import analyze_geonodes
from .... import contracts as ct
from .... import sockets
from .... import managed_objs
from ... import base
class SphereStructureNode(base.MaxwellSimTreeNode):
node_type = contracts.NodeType.SphereStructure
GEONODES_STRUCTURE_SPHERE = "structure_sphere"
class SphereStructureNode(base.MaxwellSimNode):
node_type = ct.NodeType.SphereStructure
bl_label = "Sphere Structure"
#bl_icon = ...
####################
# - Sockets
####################
input_sockets = {
"medium": sockets.MaxwellMediumSocketDef(
label="Medium",
),
"center": sockets.PhysicalPoint3DSocketDef(
label="Center",
),
"radius": sockets.PhysicalLengthSocketDef(
label="Radius",
"Center": sockets.PhysicalPoint3DSocketDef(),
"Radius": sockets.PhysicalLengthSocketDef(
default_value=150*spu.nm,
),
"Medium": sockets.MaxwellMediumSocketDef(),
}
output_sockets = {
"structure": sockets.MaxwellStructureSocketDef(
label="Structure",
),
"Structure": sockets.MaxwellStructureSocketDef(),
}
managed_obj_defs = {
"structure_sphere": ct.schemas.ManagedObjDef(
mk=lambda name: managed_objs.ManagedBLObject(name),
name_prefix="",
)
}
####################
# - Output Socket Computation
####################
@base.computes_output_socket("structure")
def compute_simulation(self: contracts.NodeTypeProtocol) -> td.Box:
medium = self.compute_input("medium")
_center = self.compute_input("center")
_radius = self.compute_input("radius")
@base.computes_output_socket(
"Structure",
input_sockets={"Center", "Radius", "Medium"},
)
def compute_structure(self, input_sockets: dict) -> td.Box:
medium = input_sockets["Medium"]
_center = input_sockets["Center"]
_radius = input_sockets["Radius"]
center = tuple(spu.convert_to(_center, spu.um) / spu.um)
radius = spu.convert_to(_radius, spu.um) / spu.um
@ -51,6 +60,63 @@ class SphereStructureNode(base.MaxwellSimTreeNode):
medium=medium,
)
####################
# - Preview - Changes to Input Sockets
####################
@base.on_value_changed(
socket_name={"Center", "Radius"},
input_sockets={"Center", "Radius"},
managed_objs={"structure_sphere"},
)
def on_value_changed__center_radius(
self,
input_sockets: dict,
managed_objs: dict[str, ct.schemas.ManagedObj],
):
_center = input_sockets["Center"]
center = tuple([
float(el)
for el in spu.convert_to(_center, spu.um) / spu.um
])
_radius = input_sockets["Radius"]
radius = float(spu.convert_to(_radius, spu.um) / spu.um)
## TODO: Preview unit system?? Presume um for now
# Retrieve Hard-Coded GeoNodes and Analyze Input
geo_nodes = bpy.data.node_groups[GEONODES_STRUCTURE_SPHERE]
geonodes_interface = analyze_geonodes.interface(
geo_nodes, direc="INPUT"
)
# Sync Modifier Inputs
managed_objs["structure_sphere"].sync_geonodes_modifier(
geonodes_node_group=geo_nodes,
geonodes_identifier_to_value={
geonodes_interface["Radius"].identifier: radius,
## TODO: Use 'bl_socket_map.value_to_bl`!
## - This accounts for auto-conversion, unit systems, etc. .
## - We could keep it in the node base class...
## - ...But it needs aligning with Blender, too. Hmm.
}
)
# Sync Object Position
managed_objs["structure_sphere"].bl_object("MESH").location = center
####################
# - Preview - Show Preview
####################
@base.on_show_preview(
managed_objs={"structure_sphere"},
)
def on_show_preview(
self,
managed_objs: dict[str, ct.schemas.ManagedObj],
):
managed_objs["structure_sphere"].show_preview("MESH")
self.on_value_changed__center_radius()
####################
@ -60,7 +126,7 @@ BL_REGISTER = [
SphereStructureNode,
]
BL_NODES = {
contracts.NodeType.SphereStructure: (
contracts.NodeCategory.MAXWELLSIM_STRUCTURES_PRIMITIVES
ct.NodeType.SphereStructure: (
ct.NodeCategory.MAXWELLSIM_STRUCTURES_PRIMITIVES
)
}

View File

@ -1,23 +1,16 @@
#from . import math
from . import combine
#from . import separate
from . import math
from . import operations
from . import converter
BL_REGISTER = [
# *math.BL_REGISTER,
*combine.BL_REGISTER,
#*separate.BL_REGISTER,
*converter.BL_REGISTER,
*math.BL_REGISTER,
*operations.BL_REGISTER,
]
BL_NODES = {
# **math.BL_NODES,
**combine.BL_NODES,
#**separate.BL_NODES,
**converter.BL_NODES,
**math.BL_NODES,
**operations.BL_NODES,
}

View File

@ -2,103 +2,167 @@ import sympy as sp
import sympy.physics.units as spu
import scipy as sc
from ... import contracts
import bpy
from ... import contracts as ct
from ... import sockets
from .. import base
MAX_AMOUNT = 20
class CombineNode(base.MaxwellSimNode):
node_type = contracts.NodeType.Combine
node_type = ct.NodeType.Combine
bl_label = "Combine"
#bl_icon = ...
####################
# - Sockets
####################
input_sockets = {}
input_socket_sets = {
"real_3d_vector": {
f"x_{i}": sockets.RealNumberSocketDef(
label=f"x_{i}"
)
"Maxwell Sources": {},
"Maxwell Structures": {},
"Maxwell Monitors": {},
"Real 3D Vector": {
f"x_{i}": sockets.RealNumberSocketDef()
for i in range(3)
},
"point_3d": {
axis: sockets.PhysicalLengthSocketDef(
label=axis
)
for i, axis in zip(
range(3),
["x", "y", "z"]
)
},
"size_3d": {
axis_key: sockets.PhysicalLengthSocketDef(
label=axis_label
)
for i, axis_key, axis_label in zip(
range(3),
["x_size", "y_size", "z_size"],
["X Size", "Y Size", "Z Size"],
)
},
#"Point 3D": {
# axis: sockets.PhysicalLengthSocketDef()
# for i, axis in zip(
# range(3),
# ["x", "y", "z"]
# )
#},
#"Size 3D": {
# axis_key: sockets.PhysicalLengthSocketDef()
# for i, axis_key, axis_label in zip(
# range(3),
# ["x_size", "y_size", "z_size"],
# ["X Size", "Y Size", "Z Size"],
# )
#},
}
output_sockets = {}
output_socket_sets = {
"real_3d_vector": {
"real_3d_vector": sockets.Real3DVectorSocketDef(
label="Real 3D Vector",
"Maxwell Sources": {
"Sources": sockets.MaxwellSourceSocketDef(
is_list=True,
),
},
"point_3d": {
"point_3d": sockets.PhysicalPoint3DSocketDef(
label="3D Point",
"Maxwell Structures": {
"Structures": sockets.MaxwellStructureSocketDef(
is_list=True,
),
},
"size_3d": {
"size_3d": sockets.PhysicalSize3DSocketDef(
label="3D Size",
"Maxwell Monitors": {
"Monitors": sockets.MaxwellMonitorSocketDef(
is_list=True,
),
},
"Real 3D Vector": {
"Real 3D Vector": sockets.Real3DVectorSocketDef(),
},
#"Point 3D": {
# "3D Point": sockets.PhysicalPoint3DSocketDef(),
#},
#"Size 3D": {
# "3D Size": sockets.PhysicalSize3DSocketDef(),
#},
}
amount: bpy.props.IntProperty(
name="# Objects to Combine",
description="Amount of Objects to Combine",
default=1,
min=1,
max=MAX_AMOUNT,
update=lambda self, context: self.sync_prop("amount", context)
)
####################
# - Draw
####################
def draw_props(self, context, layout):
layout.prop(self, "amount", text="#")
####################
# - Output Socket Computation
####################
@base.computes_output_socket("real_3d_vector")
def compute_real_3d_vector(self: contracts.NodeTypeProtocol) -> sp.Expr:
x1, x2, x3 = [
self.compute_input(f"x_{i}")
for i in range(3)
@base.computes_output_socket(
"Real 3D Vector",
input_sockets={"x_0", "x_1", "x_2"}
)
def compute_real_3d_vector(self, input_sockets) -> sp.Expr:
return sp.Matrix([input_sockets[f"x_{i}"] for i in range(3)])
@base.computes_output_socket(
"Sources",
input_sockets={f"Source #{i}" for i in range(MAX_AMOUNT)},
props={"amount"},
)
def compute_sources(self, input_sockets, props) -> sp.Expr:
return [
input_sockets[f"Source #{i}"]
for i in range(props["amount"])
]
return (x1, x2, x3)
@base.computes_output_socket("point_3d")
def compute_point_3d(self: contracts.NodeTypeProtocol) -> sp.Expr:
x, y, z = [
self.compute_input(axis)
#spu.convert_to(
# self.compute_input(axis),
# spu.meter,
#) / spu.meter
for axis in ["x", "y", "z"]
@base.computes_output_socket(
"Structures",
input_sockets={f"Structure #{i}" for i in range(MAX_AMOUNT)},
props={"amount"},
)
def compute_structures(self, input_sockets, props) -> sp.Expr:
return [
input_sockets[f"Structure #{i}"]
for i in range(props["amount"])
]
return sp.Matrix([x, y, z])# * spu.meter
@base.computes_output_socket("size_3d")
def compute_size_3d(self: contracts.NodeTypeProtocol) -> sp.Expr:
x_size, y_size, z_size = [
self.compute_input(axis)
#spu.convert_to(
# self.compute_input(axis),
# spu.meter,
#) / spu.meter
for axis in ["x_size", "y_size", "z_size"]
@base.computes_output_socket(
"Monitors",
input_sockets={f"Monitor #{i}" for i in range(MAX_AMOUNT)},
props={"amount"},
)
def compute_monitors(self, input_sockets, props) -> sp.Expr:
return [
input_sockets[f"Monitor #{i}"]
for i in range(props["amount"])
]
return sp.Matrix([x_size, y_size, z_size])# * spu.meter
####################
# - Input Socket Compilation
####################
@base.on_value_changed(
prop_name="active_socket_set",
props={"active_socket_set", "amount"},
)
def on_value_changed__active_socket_set(self, props):
if props["active_socket_set"] == "Maxwell Sources":
self.loose_input_sockets = {
f"Source #{i}": sockets.MaxwellSourceSocketDef()
for i in range(props["amount"])
}
elif props["active_socket_set"] == "Maxwell Structures":
self.loose_input_sockets = {
f"Structure #{i}": sockets.MaxwellStructureSocketDef()
for i in range(props["amount"])
}
elif props["active_socket_set"] == "Maxwell Monitors":
self.loose_input_sockets = {
f"Monitor #{i}": sockets.MaxwellMonitorSocketDef()
for i in range(props["amount"])
}
else:
self.loose_input_sockets = {}
@base.on_value_changed(
prop_name="amount",
)
def on_value_changed__amount(self):
self.on_value_changed__active_socket_set()
@base.on_init()
def on_init(self):
self.on_value_changed__active_socket_set()
####################
@ -108,7 +172,7 @@ BL_REGISTER = [
CombineNode,
]
BL_NODES = {
contracts.NodeType.Combine: (
contracts.NodeCategory.MAXWELLSIM_UTILITIES
ct.NodeType.Combine: (
ct.NodeCategory.MAXWELLSIM_UTILITIES
)
}

View File

@ -1,8 +0,0 @@
from . import wave_converter
BL_REGISTER = [
*wave_converter.BL_REGISTER,
]
BL_NODES = {
**wave_converter.BL_NODES,
}

View File

@ -1,82 +0,0 @@
import tidy3d as td
import sympy as sp
import sympy.physics.units as spu
import scipy as sc
from .... import contracts
from .... import sockets
from ... import base
class WaveConverterNode(base.MaxwellSimTreeNode):
node_type = contracts.NodeType.WaveConverter
bl_label = "Wave Converter"
#bl_icon = ...
####################
# - Sockets
####################
input_sockets = {}
input_socket_sets = {
"freq_to_vacwl": {
"freq": sockets.PhysicalFreqSocketDef(
label="Freq",
),
},
"vacwl_to_freq": {
"vacwl": sockets.PhysicalVacWLSocketDef(
label="Vac WL",
),
},
}
output_sockets = {}
output_socket_sets = {
"freq_to_vacwl": {
"vacwl": sockets.PhysicalVacWLSocketDef(
label="Vac WL",
),
},
"vacwl_to_freq": {
"freq": sockets.PhysicalFreqSocketDef(
label="Freq",
),
},
}
####################
# - Output Socket Computation
####################
@base.computes_output_socket("freq")
def compute_freq(self: contracts.NodeTypeProtocol) -> sp.Expr:
vac_speed_of_light = sc.constants.speed_of_light * spu.meter/spu.second
vacwl = self.compute_input("vacwl")
return spu.convert_to(
vac_speed_of_light / vacwl,
spu.hertz,
)
@base.computes_output_socket("vacwl")
def compute_vacwl(self: contracts.NodeTypeProtocol) -> sp.Expr:
vac_speed_of_light = sc.constants.speed_of_light * spu.meter/spu.second
freq = self.compute_input("freq")
return spu.convert_to(
vac_speed_of_light / freq,
spu.meter,
)
####################
# - Blender Registration
####################
BL_REGISTER = [
WaveConverterNode,
]
BL_NODES = {
contracts.NodeType.WaveConverter: (
contracts.NodeCategory.MAXWELLSIM_UTILITIES_CONVERTERS
)
}

View File

@ -1,8 +0,0 @@
from . import array_operation
BL_REGISTER = [
*array_operation.BL_REGISTER,
]
BL_NODES = {
**array_operation.BL_NODES,
}

View File

@ -1,5 +0,0 @@
####################
# - Blender Registration
####################
BL_REGISTER = []
BL_NODES = {}

View File

@ -0,0 +1,8 @@
from . import sim_data_viz
BL_REGISTER = [
*sim_data_viz.BL_REGISTER,
]
BL_NODES = {
**sim_data_viz.BL_NODES,
}

View File

@ -0,0 +1,332 @@
import typing as typ
import tidy3d as td
import numpy as np
import sympy as sp
import sympy.physics.units as spu
import bpy
from .....utils import analyze_geonodes
from ... import bl_socket_map
from ... import contracts as ct
from ... import sockets
from .. import base
from ... import managed_objs
CACHE = {}
class FDTDSimDataVizNode(base.MaxwellSimNode):
node_type = ct.NodeType.FDTDSimDataViz
bl_label = "FDTD Sim Data Viz"
####################
# - Sockets
####################
input_sockets = {
"FDTD Sim Data": sockets.MaxwellFDTDSimDataSocketDef(),
}
output_sockets= {
"Preview": sockets.AnySocketDef()
}
managed_obj_defs = {
"viz_plot": ct.schemas.ManagedObjDef(
mk=lambda name: managed_objs.ManagedBLImage(name),
name_prefix="",
),
"viz_object": ct.schemas.ManagedObjDef(
mk=lambda name: managed_objs.ManagedBLObject(name),
name_prefix="",
),
}
####################
# - Properties
####################
viz_monitor_name: bpy.props.EnumProperty(
name="Viz Monitor Name",
description="Monitor to visualize within the attached SimData",
items=lambda self, context: self.retrieve_monitors(context),
update=(lambda self, context: self.sync_viz_monitor_name(context)),
)
cache_viz_monitor_type: bpy.props.StringProperty(
name="Viz Monitor Type",
description="Type of the viz monitor",
default=""
)
# Field Monitor Type
field_viz_component: bpy.props.EnumProperty(
name="Field Component",
description="Field component to visualize",
items=[
("E", "E", "Electric"),
#("H", "H", "Magnetic"),
#("S", "S", "Poynting"),
("Ex", "Ex", "Ex"),
("Ey", "Ey", "Ey"),
("Ez", "Ez", "Ez"),
#("Hx", "Hx", "Hx"),
#("Hy", "Hy", "Hy"),
#("Hz", "Hz", "Hz"),
],
default="E",
update=lambda self, context: self.sync_prop("field_viz_component", context),
)
field_viz_part: bpy.props.EnumProperty(
name="Field Part",
description="Field part to visualize",
items=[
("real", "Real", "Electric"),
("imag", "Imaginary", "Imaginary"),
("abs", "Abs", "Abs"),
("abs^2", "Squared Abs", "Square Abs"),
("phase", "Phase", "Phase"),
],
default="real",
update=lambda self, context: self.sync_prop("field_viz_part", context),
)
field_viz_scale: bpy.props.EnumProperty(
name="Field Scale",
description="Field scale to visualize in, Linear or Log",
items=[
("lin", "Linear", "Linear Scale"),
("dB", "Log (dB)", "Logarithmic (dB) Scale"),
],
default="lin",
update=lambda self, context: self.sync_prop("field_viz_scale", context),
)
field_viz_structure_visibility: bpy.props.FloatProperty(
name="Field Viz Plot: Structure Visibility",
description="Visibility of structes",
default=0.2,
min=0.0,
max=1.0,
update=lambda self, context: self.sync_prop("field_viz_plot_fixed_f", context),
)
field_viz_plot_fix_x: bpy.props.BoolProperty(
name="Field Viz Plot: Fix X",
description="Fix the x-coordinate on the plot",
default=False,
update=lambda self, context: self.sync_prop("field_viz_plot_fix_x", context),
)
field_viz_plot_fix_y: bpy.props.BoolProperty(
name="Field Viz Plot: Fix Y",
description="Fix the y coordinate on the plot",
default=False,
update=lambda self, context: self.sync_prop("field_viz_plot_fix_y", context),
)
field_viz_plot_fix_z: bpy.props.BoolProperty(
name="Field Viz Plot: Fix Z",
description="Fix the z coordinate on the plot",
default=False,
update=lambda self, context: self.sync_prop("field_viz_plot_fix_z", context),
)
field_viz_plot_fix_f: bpy.props.BoolProperty(
name="Field Viz Plot: Fix Freq",
description="Fix the frequency coordinate on the plot",
default=False,
update=lambda self, context: self.sync_prop("field_viz_plot_fix_f", context),
)
field_viz_plot_fixed_x: bpy.props.FloatProperty(
name="Field Viz Plot: Fix X",
description="Fix the x-coordinate on the plot",
default=0.0,
update=lambda self, context: self.sync_prop("field_viz_plot_fixed_x", context),
)
field_viz_plot_fixed_y: bpy.props.FloatProperty(
name="Field Viz Plot: Fixed Y",
description="Fix the y coordinate on the plot",
default=0.0,
update=lambda self, context: self.sync_prop("field_viz_plot_fixed_y", context),
)
field_viz_plot_fixed_z: bpy.props.FloatProperty(
name="Field Viz Plot: Fixed Z",
description="Fix the z coordinate on the plot",
default=0.0,
update=lambda self, context: self.sync_prop("field_viz_plot_fixed_z", context),
)
field_viz_plot_fixed_f: bpy.props.FloatProperty(
name="Field Viz Plot: Fixed Freq (Thz)",
description="Fix the frequency coordinate on the plot",
default=0.0,
update=lambda self, context: self.sync_prop("field_viz_plot_fixed_f", context),
)
####################
# - Derived Properties
####################
def sync_viz_monitor_name(self, context):
if (sim_data := self._compute_input("FDTD Sim Data")) is None:
return
self.cache_viz_monitor_type = sim_data.monitor_data[
self.viz_monitor_name
].type
self.sync_prop("viz_monitor_name", context)
def retrieve_monitors(self, context) -> list[tuple]:
global CACHE
if not CACHE.get(self.instance_id):
sim_data = self._compute_input("FDTD Sim Data")
if sim_data is not None:
CACHE[self.instance_id] = {"monitors": list(
sim_data.monitor_data.keys()
)}
else:
return [("NONE", "None", "No monitors")]
monitor_names = CACHE[self.instance_id]["monitors"]
# Check for No Monitors
if not monitor_names:
return [("NONE", "None", "No monitors")]
return [
(
monitor_name,
monitor_name,
f"Monitor '{monitor_name}' recorded by the FDTD Sim",
)
for monitor_name in monitor_names
]
####################
# - UI
####################
def draw_props(self, context, layout):
row = layout.row()
row.prop(self, "viz_monitor_name", text="")
if self.cache_viz_monitor_type == "FieldData":
# Array Selection
split = layout.split(factor=0.45)
col = split.column(align=False)
col.label(text="Component")
col.label(text="Part")
col.label(text="Scale")
col = split.column(align=False)
col.prop(self, "field_viz_component", text="")
col.prop(self, "field_viz_part", text="")
col.prop(self, "field_viz_scale", text="")
# Coordinate Fixing
split = layout.split(factor=0.45)
col = split.column(align=False)
col.prop(self, "field_viz_plot_fix_x", text="Fix x (um)")
col.prop(self, "field_viz_plot_fix_y", text="Fix y (um)")
col.prop(self, "field_viz_plot_fix_z", text="Fix z (um)")
col.prop(self, "field_viz_plot_fix_f", text="Fix f (THz)")
col = split.column(align=False)
col.prop(self, "field_viz_plot_fixed_x", text="")
col.prop(self, "field_viz_plot_fixed_y", text="")
col.prop(self, "field_viz_plot_fixed_z", text="")
col.prop(self, "field_viz_plot_fixed_f", text="")
####################
# - On Value Changed Methods
####################
@base.on_value_changed(
socket_name="FDTD Sim Data",
managed_objs={"viz_object"},
input_sockets={"FDTD Sim Data"},
)
def on_value_changed__fdtd_sim_data(
self,
managed_objs: dict[str, ct.schemas.ManagedObj],
input_sockets: dict[str, typ.Any],
) -> None:
global CACHE
if (sim_data := input_sockets["FDTD Sim Data"]) is None:
CACHE.pop(self.instance_id, None)
return
CACHE[self.instance_id] = {"monitors": list(
sim_data.monitor_data.keys()
)}
####################
# - Plotting
####################
@base.on_show_plot(
managed_objs={"viz_plot"},
props={
"viz_monitor_name", "field_viz_component",
"field_viz_part", "field_viz_scale",
"field_viz_structure_visibility",
"field_viz_plot_fix_x", "field_viz_plot_fix_y",
"field_viz_plot_fix_z", "field_viz_plot_fix_f",
"field_viz_plot_fixed_x", "field_viz_plot_fixed_y",
"field_viz_plot_fixed_z", "field_viz_plot_fixed_f",
},
input_sockets={"FDTD Sim Data"},
stop_propagation=True,
)
def on_show_plot(
self,
managed_objs: dict[str, ct.schemas.ManagedObj],
input_sockets: dict[str, typ.Any],
props: dict[str, typ.Any],
):
if (
(sim_data := input_sockets["FDTD Sim Data"]) is None
or (monitor_name := props["viz_monitor_name"]) == "NONE"
):
return
coord_fix = {}
for coord in ["x", "y", "z", "f"]:
if props[f"field_viz_plot_fix_{coord}"]:
coord_fix |= {
coord: props[f"field_viz_plot_fixed_{coord}"],
}
if "f" in coord_fix:
coord_fix["f"] *= 1e12
managed_objs["viz_plot"].mpl_plot_to_image(
lambda ax: sim_data.plot_field(
monitor_name,
props["field_viz_component"],
val=props["field_viz_part"],
scale=props["field_viz_scale"],
eps_alpha=props["field_viz_structure_visibility"],
phase=0,
**coord_fix,
ax=ax,
),
bl_select=True,
)
#@base.on_show_preview(
# managed_objs={"viz_object"},
#)
#def on_show_preview(
# self,
# managed_objs: dict[str, ct.schemas.ManagedObj],
#):
# """Called whenever a Loose Input Socket is altered.
#
# Synchronizes the change to the actual GeoNodes modifier, so that the change is immediately visible.
# """
# managed_objs["viz_object"].show_preview("MESH")
####################
# - Blender Registration
####################
BL_REGISTER = [
FDTDSimDataVizNode,
]
BL_NODES = {
ct.NodeType.FDTDSimDataViz: (
ct.NodeCategory.MAXWELLSIM_VIZ
)
}

View File

@ -52,6 +52,7 @@ MaxwellTemporalShapeSocketDef = maxwell.MaxwellTemporalShapeSocketDef
MaxwellStructureSocketDef = maxwell.MaxwellStructureSocketDef
MaxwellMonitorSocketDef = maxwell.MaxwellMonitorSocketDef
MaxwellFDTDSimSocketDef = maxwell.MaxwellFDTDSimSocketDef
MaxwellFDTDSimDataSocketDef = maxwell.MaxwellFDTDSimDataSocketDef
MaxwellSimGridSocketDef = maxwell.MaxwellSimGridSocketDef
MaxwellSimGridAxisSocketDef = maxwell.MaxwellSimGridAxisSocketDef
MaxwellSimDomainSocketDef = maxwell.MaxwellSimDomainSocketDef

View File

@ -19,12 +19,17 @@ class MaxwellSimSocket(bpy.types.NodeSocket):
"CIRCLE", "SQUARE", "DIAMOND", "CIRCLE_DOT", "SQUARE_DOT",
"DIAMOND_DOT",
]
## We use the following conventions for shapes:
## - CIRCLE: Single Value.
## - SQUARE: Container of Value.
## - DIAMOND: Pointer Value.
## - +DOT: Uses Units
socket_color: tuple
# Options
#link_limit: int = 0
use_units: bool = False
#list_like: bool = False
use_prelock: bool = False
# Computed
bl_idname: str
@ -52,8 +57,19 @@ class MaxwellSimSocket(bpy.types.NodeSocket):
cls.socket_color = ct.SOCKET_COLORS[cls.socket_type]
cls.socket_shape = ct.SOCKET_SHAPES[cls.socket_type]
# Setup List
cls.__annotations__["is_list"] = bpy.props.BoolProperty(
name="Is List",
description="Whether or not a particular socket is a list type socket",
default=False,
update=lambda self, context: self.sync_is_list(context)
)
# Configure Use of Units
if cls.use_units:
# Set Shape :)
cls.socket_shape += "_DOT"
if not (socket_units := ct.SOCKET_UNITS.get(cls.socket_type)):
msg = "Tried to `use_units` on {cls.bl_idname} socket, but `SocketType` has no units defined in `contracts.SOCKET_UNITS`"
raise RuntimeError(msg)
@ -123,6 +139,17 @@ class MaxwellSimSocket(bpy.types.NodeSocket):
####################
# - Action Chain: Event Handlers
####################
def sync_is_list(self, context: bpy.types.Context):
"""Called when the "is_list_ property has been updated.
"""
if self.is_list:
if self.use_units:
self.display_shape = "SQUARE_DOT"
else:
self.display_shape = "SQUARE"
self.trigger_action("value_changed")
def sync_prop(self, prop_name: str, context: bpy.types.Context):
"""Called when a property has been updated.
"""
@ -166,11 +193,17 @@ class MaxwellSimSocket(bpy.types.NodeSocket):
@property
def value(self) -> typ.Any:
raise NotImplementedError
@value.setter
def value(self, value: typ.Any) -> None:
raise NotImplementedError
@property
def value_list(self) -> typ.Any:
return [self.value]
@value_list.setter
def value_list(self, value: typ.Any) -> None:
raise NotImplementedError
def value_as_unit_system(
self,
unit_system: dict,
@ -187,11 +220,17 @@ class MaxwellSimSocket(bpy.types.NodeSocket):
@property
def lazy_value(self) -> None:
raise NotImplementedError
@lazy_value.setter
def lazy_value(self, lazy_value: typ.Any) -> None:
raise NotImplementedError
@property
def lazy_value_list(self) -> typ.Any:
return [self.lazy_value]
@lazy_value_list.setter
def lazy_value_list(self, value: typ.Any) -> None:
raise NotImplementedError
@property
def capabilities(self) -> None:
raise NotImplementedError
@ -205,11 +244,15 @@ class MaxwellSimSocket(bpy.types.NodeSocket):
**NOTE**: Low-level method. Use `compute_data` instead.
"""
if kind == ct.DataFlowKind.Value:
return self.value
if kind == ct.DataFlowKind.LazyValue:
if self.is_list: return self.value_list
else: return self.value
elif kind == ct.DataFlowKind.LazyValue:
if self.is_list: return self.lazy_value_list
else: return self.lazy_value
return self.lazy_value
if kind == ct.DataFlowKind.Capabilities:
elif kind == ct.DataFlowKind.Capabilities:
return self.capabilities
return None
def compute_data(
@ -222,8 +265,11 @@ class MaxwellSimSocket(bpy.types.NodeSocket):
- If output socket, ask node for data.
"""
# Compute Output Socket
## List-like sockets guarantee that a list of a thing is passed.
if self.is_output:
return self.node.compute_output(self.name, kind=kind)
res = self.node.compute_output(self.name, kind=kind)
if self.is_list and not isinstance(res, list): return [res]
return res
# Compute Input Socket
## Unlinked: Retrieve Socket Value
@ -334,13 +380,21 @@ class MaxwellSimSocket(bpy.types.NodeSocket):
) -> None:
"""Called by Blender to draw the socket UI.
"""
if self.locked: layout.enabled = False
if self.is_output:
self.draw_output(context, layout, node, text)
else:
self.draw_input(context, layout, node, text)
def draw_prelock(
self,
context: bpy.types.Context,
col: bpy.types.UILayout,
node: bpy.types.Node,
text: str,
) -> None:
pass
def draw_input(
self,
context: bpy.types.Context,
@ -350,18 +404,20 @@ class MaxwellSimSocket(bpy.types.NodeSocket):
) -> None:
"""Draws the socket UI, when the socket is an input socket.
"""
# Draw Linked Input: Label Row
if self.is_linked:
layout.label(text=text)
return
# Parent Column
col = layout.column(align=False)
# Draw Label Row
row = col.row(align=True)
# Label Row
row = col.row(align=False)
if self.locked: row.enabled = False
## Linked Label
if self.is_linked:
row.label(text=text)
return
## User Label Row (incl. Units)
if self.use_units:
split = row.split(factor=0.65, align=True)
split = row.split(factor=0.6, align=True)
_row = split.row(align=True)
self.draw_label_row(_row, text)
@ -371,8 +427,25 @@ class MaxwellSimSocket(bpy.types.NodeSocket):
else:
self.draw_label_row(row, text)
# Draw Value Row(s)
self.draw_value(col)
# Prelock Row
row = col.row(align=False)
if self.use_prelock:
_col = row.column(align=False)
_col.enabled = True
self.draw_prelock(context, _col, node, text)
if self.locked:
row = col.row(align=False)
row.enabled = False
else:
if self.locked: row.enabled = False
# Value Column(s)
col = row.column(align=True)
if self.is_list:
self.draw_value_list(col)
else:
self.draw_value(col)
def draw_output(
self,
@ -406,3 +479,10 @@ class MaxwellSimSocket(bpy.types.NodeSocket):
"""
pass
def draw_value_list(self, col: bpy.types.UILayout) -> None:
"""Called to draw the value list column in unlinked input sockets.
Can be overridden.
"""
pass

View File

@ -49,18 +49,18 @@ class BlenderGeoNodesBLSocket(base.MaxwellSimSocket):
####################
# - UI
####################
def draw_label_row(self, label_col_row, text):
label_col_row.label(text=text)
if not self.raw_value: return
op = label_col_row.operator(
BlenderMaxwellResetGeoNodesSocket.bl_idname,
text="",
icon="FILE_REFRESH",
)
op.socket_name = self.name
op.node_name = self.node.name
op.node_tree_name = self.node.id_data.name
#def draw_label_row(self, label_col_row, text):
# label_col_row.label(text=text)
# if not self.raw_value: return
#
# op = label_col_row.operator(
# BlenderMaxwellResetGeoNodesSocket.bl_idname,
# text="",
# icon="FILE_REFRESH",
# )
# op.socket_name = self.name
# op.node_name = self.node.name
# op.node_tree_name = self.node.id_data.name
####################
# - UI

View File

@ -20,10 +20,12 @@ from . import monitor
MaxwellMonitorSocketDef = monitor.MaxwellMonitorSocketDef
from . import fdtd_sim
from . import fdtd_sim_data
from . import sim_grid
from . import sim_grid_axis
from . import sim_domain
MaxwellFDTDSimSocketDef = fdtd_sim.MaxwellFDTDSimSocketDef
MaxwellFDTDSimDataSocketDef = fdtd_sim_data.MaxwellFDTDSimDataSocketDef
MaxwellSimGridSocketDef = sim_grid.MaxwellSimGridSocketDef
MaxwellSimGridAxisSocketDef = sim_grid_axis.MaxwellSimGridAxisSocketDef
MaxwellSimDomainSocketDef = sim_domain.MaxwellSimDomainSocketDef
@ -39,6 +41,7 @@ BL_REGISTER = [
*structure.BL_REGISTER,
*monitor.BL_REGISTER,
*fdtd_sim.BL_REGISTER,
*fdtd_sim_data.BL_REGISTER,
*sim_grid.BL_REGISTER,
*sim_grid_axis.BL_REGISTER,
*sim_domain.BL_REGISTER,

View File

@ -11,6 +11,10 @@ class MaxwellFDTDSimBLSocket(base.MaxwellSimSocket):
socket_type = ct.SocketType.MaxwellFDTDSim
bl_label = "Maxwell FDTD Simulation"
@property
def value(self) -> None:
return None
####################
# - Socket Configuration
####################

View File

@ -0,0 +1,32 @@
import typing as typ
import bpy
import pydantic as pyd
import tidy3d as td
from .. import base
from ... import contracts as ct
class MaxwellFDTDSimDataBLSocket(base.MaxwellSimSocket):
socket_type = ct.SocketType.MaxwellFDTDSimData
bl_label = "Maxwell FDTD Simulation"
@property
def value(self):
return None
####################
# - Socket Configuration
####################
class MaxwellFDTDSimDataSocketDef(pyd.BaseModel):
socket_type: ct.SocketType = ct.SocketType.MaxwellFDTDSimData
def init(self, bl_socket: MaxwellFDTDSimDataBLSocket) -> None:
pass
####################
# - Blender Registration
####################
BL_REGISTER = [
MaxwellFDTDSimDataBLSocket,
]

View File

@ -9,11 +9,6 @@ import scipy as sc
from .. import base
from ... import contracts as ct
VAC_SPEED_OF_LIGHT = (
sc.constants.speed_of_light
* spu.meter/spu.second
)
class MaxwellMonitorBLSocket(base.MaxwellSimSocket):
socket_type = ct.SocketType.MaxwellMonitor
bl_label = "Maxwell Monitor"
@ -24,8 +19,10 @@ class MaxwellMonitorBLSocket(base.MaxwellSimSocket):
class MaxwellMonitorSocketDef(pyd.BaseModel):
socket_type: ct.SocketType = ct.SocketType.MaxwellMonitor
is_list: bool = False
def init(self, bl_socket: MaxwellMonitorBLSocket) -> None:
pass
bl_socket.is_list = self.is_list
####################
# - Blender Registration

View File

@ -17,8 +17,10 @@ class MaxwellSourceBLSocket(base.MaxwellSimSocket):
class MaxwellSourceSocketDef(pyd.BaseModel):
socket_type: ct.SocketType = ct.SocketType.MaxwellSource
is_list: bool = False
def init(self, bl_socket: MaxwellSourceBLSocket) -> None:
pass
bl_socket.is_list = self.is_list
####################
# - Blender Registration

View File

@ -16,8 +16,10 @@ class MaxwellStructureBLSocket(base.MaxwellSimSocket):
class MaxwellStructureSocketDef(pyd.BaseModel):
socket_type: ct.SocketType = ct.SocketType.MaxwellStructure
is_list: bool = False
def init(self, bl_socket: MaxwellStructureBLSocket) -> None:
pass
bl_socket.is_list = self.is_list
####################
# - Blender Registration

View File

@ -1,9 +1,13 @@
import typing as typ
import json
import numpy as np
import bpy
import sympy as sp
import sympy.physics.units as spu
import pydantic as pyd
from .....utils import extra_sympy_units as spux
from .....utils.pydantic_sympy import SympyExpr
from .. import base
from ... import contracts as ct
@ -27,38 +31,103 @@ class PhysicalFreqBLSocket(base.MaxwellSimSocket):
update=(lambda self, context: self.sync_prop("raw_value", context)),
)
min_freq: bpy.props.FloatProperty(
name="Min Frequency",
description="Lowest frequency",
default=0.0,
precision=4,
update=(lambda self, context: self.sync_prop("min_freq", context)),
)
max_freq: bpy.props.FloatProperty(
name="Max Frequency",
description="Highest frequency",
default=0.0,
precision=4,
update=(lambda self, context: self.sync_prop("max_freq", context)),
)
steps: bpy.props.IntProperty(
name="Frequency Steps",
description="# of steps between min and max",
default=2,
update=(lambda self, context: self.sync_prop("steps", context)),
)
####################
# - Socket UI
####################
def draw_value(self, col: bpy.types.UILayout) -> None:
col.prop(self, "raw_value", text="")
def draw_value_list(self, col: bpy.types.UILayout) -> None:
col.prop(self, "min_freq", text="Min")
col.prop(self, "max_freq", text="Max")
col.prop(self, "steps", text="Steps")
####################
# - Default Value
####################
@property
def value(self) -> SympyExpr:
return self.raw_value * self.unit
@value.setter
def value(self, value: SympyExpr) -> None:
self.raw_value = spu.convert_to(value, self.unit) / self.unit
@property
def value_list(self) -> list[SympyExpr]:
return [
el * self.unit
for el in np.linspace(self.min_freq, self.max_freq, self.steps)
]
@value_list.setter
def value_list(self, value: tuple[SympyExpr, SympyExpr, int]):
self.min_freq, self.max_freq, self.steps = [
spu.convert_to(el, self.unit) / self.unit
for el in value[:2]
] + [value[2]]
def sync_unit_change(self) -> None:
if self.is_list:
self.value_list = (
spu.convert_to(
self.min_freq * self.prev_unit,
self.unit
),
spu.convert_to(
self.max_freq * self.prev_unit,
self.unit
),
self.steps,
)
else:
self.value = self.value / self.unit * self.prev_unit
self.prev_active_unit = self.active_unit
####################
# - Socket Configuration
####################
class PhysicalFreqSocketDef(pyd.BaseModel):
socket_type: ct.SocketType = ct.SocketType.PhysicalFreq
default_value: SympyExpr | None = None
default_value: SympyExpr = 500*spux.terahertz
default_unit: SympyExpr | None = None
is_list: bool = False
min_freq: SympyExpr = 400.0*spux.terahertz
max_freq: SympyExpr = 600.0*spux.terahertz
steps: SympyExpr = 50
def init(self, bl_socket: PhysicalFreqBLSocket) -> None:
if self.default_value:
bl_socket.value = self.default_value
bl_socket.value = self.default_value
bl_socket.is_list = self.is_list
if self.default_unit:
bl_socket.unit = self.default_unit
if self.is_list:
bl_socket.value_list = (self.min_freq, self.max_freq, self.steps)
####################
# - Blender Registration
####################

View File

@ -2,6 +2,7 @@ import typing as typ
import bpy
import sympy.physics.units as spu
import numpy as np
import pydantic as pyd
from .....utils.pydantic_sympy import SympyExpr
@ -27,35 +28,103 @@ class PhysicalLengthBLSocket(base.MaxwellSimSocket):
update=(lambda self, context: self.sync_prop("raw_value", context)),
)
min_len: bpy.props.FloatProperty(
name="Min Length",
description="Lowest length",
default=0.0,
precision=4,
update=(lambda self, context: self.sync_prop("min_len", context)),
)
max_len: bpy.props.FloatProperty(
name="Max Length",
description="Highest length",
default=0.0,
precision=4,
update=(lambda self, context: self.sync_prop("max_len", context)),
)
steps: bpy.props.IntProperty(
name="Length Steps",
description="# of steps between min and max",
default=2,
update=(lambda self, context: self.sync_prop("steps", context)),
)
####################
# - Socket UI
####################
def draw_value(self, col: bpy.types.UILayout) -> None:
col.prop(self, "raw_value", text="")
def draw_value_list(self, col: bpy.types.UILayout) -> None:
col.prop(self, "min_len", text="Min")
col.prop(self, "max_len", text="Max")
col.prop(self, "steps", text="Steps")
####################
# - Default Value
####################
@property
def value(self) -> SympyExpr:
return self.raw_value * self.unit
@value.setter
def value(self, value: SympyExpr) -> None:
self.raw_value = spu.convert_to(value, self.unit) / self.unit
@property
def value_list(self) -> list[SympyExpr]:
return [
el * self.unit
for el in np.linspace(self.min_len, self.max_len, self.steps)
]
@value_list.setter
def value_list(self, value: tuple[SympyExpr, SympyExpr, int]):
self.min_len, self.max_len, self.steps = [
spu.convert_to(el, self.unit) / self.unit
for el in value[:2]
] + [value[2]]
def sync_unit_change(self) -> None:
if self.is_list:
self.value_list = (
spu.convert_to(
self.min_len * self.prev_unit,
self.unit
),
spu.convert_to(
self.max_len * self.prev_unit,
self.unit
),
self.steps,
)
else:
self.value = self.value / self.unit * self.prev_unit
self.prev_active_unit = self.active_unit
####################
# - Socket Configuration
####################
class PhysicalLengthSocketDef(pyd.BaseModel):
socket_type: ct.SocketType = ct.SocketType.PhysicalLength
default_value: SympyExpr = 1*spu.um
default_unit: SympyExpr | None = None
is_list: bool = False
min_len: SympyExpr = 400.0*spu.nm
max_len: SympyExpr = 600.0*spu.nm
steps: SympyExpr = 50
def init(self, bl_socket: PhysicalLengthBLSocket) -> None:
bl_socket.value = self.default_value
bl_socket.is_list = self.is_list
if self.default_unit:
bl_socket.unit = self.default_unit
if self.is_list:
bl_socket.value_list = (self.min_len, self.max_len, self.steps)
####################
# - Blender Registration
####################

View File

@ -49,9 +49,11 @@ class PhysicalSize3DBLSocket(base.MaxwellSimSocket):
class PhysicalSize3DSocketDef(pyd.BaseModel):
socket_type: ct.SocketType = ct.SocketType.PhysicalSize3D
default_value: SympyExpr = sp.Matrix([1, 1, 1]) * spu.um
default_unit: SympyExpr | None = None
def init(self, bl_socket: PhysicalSize3DBLSocket) -> None:
bl_socket.value = self.default_value
if self.default_unit:
bl_socket.unit = self.default_unit

View File

@ -6,91 +6,74 @@ import pydantic as pyd
import tidy3d as td
import tidy3d.web as _td_web
from .....utils.auth_td_web import g_td_web, is_td_web_authed
from .....utils import tdcloud
from .. import base
from ... import contracts as ct
####################
# - Tidy3D Folder/Task Management
# - Operators
####################
TD_FOLDERS = None
## TODO: Keep this data serialized in each node, so it works offline and saves/loads correctly (then we can try/except when the network fails).
## - We should consider adding some kind of serialization-backed instance data to the node base class...
## - We could guard it behind a feature, 'use_node_data_store' for example.
def g_td_folders():
global TD_FOLDERS
if TD_FOLDERS is not None: return TD_FOLDERS
# Populate Folders Cache & Return
TD_FOLDERS = {
cloud_folder.folder_name: None
for cloud_folder in _td_web.core.task_core.Folder.list()
}
return TD_FOLDERS
def g_td_tasks(cloud_folder_name: str):
global TD_FOLDERS
# Retrieve Cached Tasks
if (_tasks := TD_FOLDERS.get(cloud_folder_name)) is not None:
return _tasks
# Retrieve Cloud Folder (if exists)
try:
cloud_folder = _td_web.core.task_core.Folder.get(cloud_folder_name)
except AttributeError as err:
# Folder Doesn't Exist
TD_FOLDERS = None
return []
# Return Tasks as List (also empty)
if (tasks := cloud_folder.list_tasks()) is None:
tasks = []
# Populate Cloud-Folder Cache & Return
TD_FOLDERS[cloud_folder_name] = [
task
for task in tasks
]
return TD_FOLDERS[cloud_folder_name]
class BlenderMaxwellRefreshTDFolderList(bpy.types.Operator):
bl_idname = "blender_maxwell.refresh_td_folder_list"
bl_label = "Refresh Tidy3D Folder List"
bl_description = "Refresh the cached Tidy3D folder list"
bl_options = {'REGISTER'}
class ReloadFolderList(bpy.types.Operator):
bl_idname = "blender_maxwell.sockets__reload_folder_list"
bl_label = "Reload Tidy3D Folder List"
bl_description = "Reload the the cached Tidy3D folder list"
@classmethod
def poll(cls, context):
space = context.space_data
return (
space.type == 'NODE_EDITOR'
and space.node_tree is not None
and space.node_tree.bl_idname == "MaxwellSimTreeType"
and is_td_web_authed()
tdcloud.IS_AUTHENTICATED
and hasattr(context, "socket")
and hasattr(context.socket, "socket_type")
and context.socket.socket_type == ct.SocketType.Tidy3DCloudTask
)
def execute(self, context):
global TD_FOLDERS
socket = context.socket
tdcloud.TidyCloudFolders.update_folders()
tdcloud.TidyCloudTasks.update_tasks(socket.existing_folder_id)
TD_FOLDERS = None
return {'FINISHED'}
class Authenticate(bpy.types.Operator):
bl_idname = "blender_maxwell.sockets__authenticate"
bl_label = "Authenticate Tidy3D"
bl_description = "Authenticate the Tidy3D Web API from a Cloud Task socket"
@classmethod
def poll(cls, context):
return (
not tdcloud.IS_AUTHENTICATED
and hasattr(context, "socket")
and hasattr(context.socket, "socket_type")
and context.socket.socket_type == ct.SocketType.Tidy3DCloudTask
)
def execute(self, context):
bl_socket = context.socket
if not tdcloud.check_authentication():
tdcloud.authenticate_with_api_key(bl_socket.api_key)
bl_socket.api_key = ""
return {'FINISHED'}
####################
# - Socket
####################
class Tidy3DCloudTaskBLSocket(base.MaxwellSimSocket):
socket_type = ct.SocketType.Tidy3DCloudTask
bl_label = "Tidy3D Cloud Sim"
bl_label = "Tidy3D Cloud Task"
use_prelock = True
####################
# - Properties
####################
task_exists: bpy.props.BoolProperty(
name="Cloud Task Should Exist",
description="Whether or not the cloud task referred to should exist",
default=False,
)
# Authentication
api_key: bpy.props.StringProperty(
name="API Key",
description="API Key for the Tidy3D Cloud",
@ -99,11 +82,19 @@ class Tidy3DCloudTaskBLSocket(base.MaxwellSimSocket):
subtype="PASSWORD",
)
existing_folder_name: bpy.props.EnumProperty(
# Task Existance Presumption
should_exist: bpy.props.BoolProperty(
name="Cloud Task Should Exist",
description="Whether or not the cloud task should already exist",
default=False,
)
# Identifiers
existing_folder_id: bpy.props.EnumProperty(
name="Folder of Cloud Tasks",
description="An existing folder on the Tidy3D Cloud",
items=lambda self, context: self.retrieve_folders(context),
update=(lambda self, context: self.sync_prop("existing_folder_name", context)),
update=(lambda self, context: self.sync_prop("existing_folder_id", context)),
)
existing_task_id: bpy.props.EnumProperty(
name="Existing Cloud Task",
@ -111,35 +102,49 @@ class Tidy3DCloudTaskBLSocket(base.MaxwellSimSocket):
items=lambda self, context: self.retrieve_tasks(context),
update=(lambda self, context: self.sync_prop("existing_task_id", context)),
)
# (Potential) New Task
new_task_name: bpy.props.StringProperty(
name="New Cloud Task Name",
description="Name of a new task to submit to the Tidy3D Cloud",
default="",
update=(lambda self, context: self.sync_new_task(context)),
update=(lambda self, context: self.sync_prop("new_task_name", context)),
)
lock_nonauth_interface: bpy.props.BoolProperty(
name="Lock the non-Auth Interface",
description="Declares that the non-auth interface should be locked",
default=False,
)
####################
# - Property Methods
####################
def sync_existing_folder_id(self, context):
folder_task_ids = self.retrieve_tasks(context)
self.existing_task_id = folder_task_ids[0][0]
## There's guaranteed to at least be one element, even if it's "NONE".
self.sync_prop("existing_folder_id", context)
def retrieve_folders(self, context) -> list[tuple]:
if not is_td_web_authed: return []
## What if there are no folders?
folders = tdcloud.TidyCloudFolders.folders()
if not folders:
return [("NONE", "None", "No folders")]
return [
(
folder_name,
folder_name,
folder_name,
cloud_folder.folder_id,
cloud_folder.folder_name,
f"Folder 'cloud_folder.folder_name' with ID {folder_id}",
)
for folder_name in g_td_folders()
for folder_id, cloud_folder in folders.items()
]
def retrieve_tasks(self, context) -> list[tuple]:
if not is_td_web_authed: return []
if not (cloud_tasks := g_td_tasks(self.existing_folder_name)):
if (cloud_folder := tdcloud.TidyCloudFolders.folders().get(
self.existing_folder_id
)) is None:
return [("NONE", "None", "Folder doesn't exist")]
tasks = tdcloud.TidyCloudTasks.tasks(cloud_folder)
if not tasks:
return [("NONE", "None", "No tasks in folder")]
return [
@ -156,81 +161,66 @@ class Tidy3DCloudTaskBLSocket(base.MaxwellSimSocket):
]),
## Task Description
{
"draft": "Task has been uploaded, but not run",
"initialized": "Task is initializing",
"queued": "Task is queued for simulation",
"preprocessing": "Task is pre-processing",
"running": "Task is currently running",
"postprocess": "Task is post-processing",
"success": "Task ran successfully, costing {task.real_flex_unit} credits",
"error": "Task ran, but an error occurred",
}[task.status],
f"Task Status: {task.status}",
## Status Icon
{
_icon if (_icon := {
"draft": "SEQUENCE_COLOR_08",
"initialized": "SHADING_SOLID",
"queued": "SEQUENCE_COLOR_03",
"preprocessing": "SEQUENCE_COLOR_02",
"running": "SEQUENCE_COLOR_05",
"postprocess": "SEQUENCE_COLOR_06",
"postprocessing": "SEQUENCE_COLOR_06",
"success": "SEQUENCE_COLOR_04",
"error": "SEQUENCE_COLOR_01",
}[task.status],
}.get(task.status)) else "SEQUENCE_COLOR_09",
## Unique Number
i,
)
for i, task in enumerate(
sorted(cloud_tasks, key=lambda el: el.created_at, reverse=True)
sorted(
tasks.values(),
key=lambda el: el.created_at,
reverse=True,
)
)
]
####################
# - Task Sync Methods
####################
def sync_new_task(self, context):
if self.new_task_name == "": return
def sync_created_new_task(self, cloud_task):
"""Called whenever the task specified in `new_task_name` has been actually created.
if self.new_task_name in {
task.taskName
for task in g_td_tasks(self.existing_folder_name)
}:
self.new_task_name = ""
This changes the socket somewhat: Folder/task IDs are set, and the socket is switched to presume that the task exists.
self.sync_prop("new_task_name", context)
def sync_task_loaded(self, loaded_task_id: str | None):
"""Called whenever a particular task has been loaded.
This resets the 'new_task_name' (if any), sets the dropdown to the new loaded task (which must be in the already-selected folder) (or, if input is None, leaves the selection alone), locks the socket UI (though NEVER the API authentication interface), and declares that the specified task exists.
If the socket is linked, then an error is raised.
"""
global TD_FOLDERS
## TODO: This doesn't work with a linked socket. It should.
# Propagate along Link
if self.is_linked:
msg = f"Cannot sync newly created task to linked Cloud Task socket."
raise ValueError(msg)
## TODO: A little aggressive. Is there a good use case?
if not (TD_FOLDERS is None):
TD_FOLDERS[self.existing_folder_name] = None
# Synchronize w/New Task Information
self.existing_folder_id = cloud_task.folder_id
self.existing_task_id = cloud_task.task_id
self.should_exist = True
if loaded_task_id is not None:
self.existing_task_id = loaded_task_id
def sync_prepare_new_task(self):
"""Called to switch the socket to no longer presume that the task it specifies exists (yet).
self.new_task_name = ""
self.lock_nonauth_interface = True
self.task_exists = True
If the socket is linked, then an error is raised.
"""
# Propagate along Link
if self.is_linked:
msg = f"Cannot sync newly created task to linked Cloud Task socket."
raise ValueError(msg)
## TODO: A little aggressive. Is there a good use case?
def sync_task_status_change(self, running_task_id: str):
global TD_FOLDERS
## TODO: This doesn't work with a linked socket. It should.
if not (TD_FOLDERS is None):
TD_FOLDERS[self.existing_folder_name] = None
def sync_task_released(self, specify_new_task: bool = False):
## TODO: This doesn't work with a linked socket. It should.
self.new_task_name = ""
self.lock_nonauth_interface = False
self.task_exists = not specify_new_task
# Synchronize w/New Task Information
self.should_exist = False
####################
# - Socket UI
@ -238,78 +228,106 @@ class Tidy3DCloudTaskBLSocket(base.MaxwellSimSocket):
def draw_label_row(self, row: bpy.types.UILayout, text: str):
row.label(text=text)
auth_icon = "CHECKBOX_HLT" if is_td_web_authed() else "CHECKBOX_DEHLT"
auth_icon = "LOCKVIEW_ON" if tdcloud.IS_AUTHENTICATED else "LOCKVIEW_OFF"
row.operator(
"blender_maxwell.refresh_td_auth",
Authenticate.bl_idname,
text="",
icon=auth_icon,
)
def draw_value(self, col: bpy.types.UILayout) -> None:
if is_td_web_authed():
if self.lock_nonauth_interface: col.enabled = False
else: col.enabled = True
def draw_prelock(
self,
context: bpy.types.Context,
col: bpy.types.UILayout,
node: bpy.types.Node,
text: str,
) -> None:
if not tdcloud.IS_AUTHENTICATED:
row = col.row()
row.label(icon="FILE_FOLDER")
row.prop(self, "existing_folder_name", text="")
row.operator(
BlenderMaxwellRefreshTDFolderList.bl_idname,
text="",
icon="FILE_REFRESH",
)
if not self.task_exists:
row = col.row()
row.label(icon="SEQUENCE_COLOR_04")
row.prop(self, "new_task_name", text="")
if self.task_exists:
row = col.row()
else:
col.separator(factor=1.0)
box = col.box()
row = box.row()
row.label(icon="NETWORK_DRIVE")
row.prop(self, "existing_task_id", text="")
else:
col.enabled = True
row = col.row()
row.alignment="CENTER"
row.alignment = "CENTER"
row.label(text="Tidy3D API Key")
row = col.row()
row.prop(self, "api_key", text="")
@property
def value(self) -> str | None:
if self.task_exists:
if self.existing_task_id == "NONE": return None
return self.existing_task_id
row = col.row()
row.operator(
Authenticate.bl_idname,
text="Connect",
)
return dict(
task_name=self.new_task_name,
folder_name=self.existing_folder_name,
def draw_value(self, col: bpy.types.UILayout) -> None:
if not tdcloud.IS_AUTHENTICATED: return
# Cloud Folder Selector
row = col.row()
row.label(icon="FILE_FOLDER")
row.prop(self, "existing_folder_id", text="")
row.operator(
ReloadFolderList.bl_idname,
text="",
icon="FILE_REFRESH",
)
# New Task Name Selector
row = col.row()
if not self.should_exist:
row = col.row()
row.label(icon="NETWORK_DRIVE")
row.prop(self, "new_task_name", text="")
col.separator(factor=1.0)
box = col.box()
row = box.row()
row.prop(self, "existing_task_id", text="")
@property
def value(self) -> tuple[tdcloud.CloudTaskName, tdcloud.CloudFolder] | tdcloud.CloudTask | None:
# Retrieve Folder
## Authentication is presumed OK
if (cloud_folder := tdcloud.TidyCloudFolders.folders().get(
self.existing_folder_id
)) is None:
msg = "Selected folder doesn't exist (it was probably deleted elsewhere)"
raise RuntimeError(msg)
# No Tasks in Folder
## The UI should set to "NONE" when there are no tasks in a folder
if self.existing_task_id == "NONE": return None
# Retrieve Task
if self.should_exist:
if (cloud_task := tdcloud.TidyCloudTasks.tasks(
cloud_folder
).get(self.existing_task_id)) is None:
msg = "Selected task doesn't exist (it was probably deleted elsewhere)"
raise RuntimeError(msg)
return cloud_task
return (self.new_task_name, cloud_folder)
####################
# - Socket Configuration
####################
class Tidy3DCloudTaskSocketDef(pyd.BaseModel):
socket_type: ct.SocketType = ct.SocketType.Tidy3DCloudTask
task_exists: bool
should_exist: bool
def init(self, bl_socket: Tidy3DCloudTaskBLSocket) -> None:
bl_socket.task_exists = self.task_exists
bl_socket.should_exist = self.should_exist
####################
# - Blender Registration
####################
BL_REGISTER = [
BlenderMaxwellRefreshTDFolderList,
ReloadFolderList,
Authenticate,
Tidy3DCloudTaskBLSocket,
]

View File

@ -1,13 +1,11 @@
from . import install_deps
from . import uninstall_deps
from . import connect_viewer
from . import refresh_td_auth
BL_REGISTER = [
*install_deps.BL_REGISTER,
*uninstall_deps.BL_REGISTER,
*connect_viewer.BL_REGISTER,
*refresh_td_auth.BL_REGISTER,
]
BL_KMI_REGISTER = [
*connect_viewer.BL_KMI_REGISTER,

View File

@ -1,30 +0,0 @@
import bpy
from ..utils.auth_td_web import is_td_web_authed
class BlenderMaxwellRefreshTDAuth(bpy.types.Operator):
bl_idname = "blender_maxwell.refresh_td_auth"
bl_label = "Refresh Tidy3D Auth"
bl_description = "Refresh the authentication of Tidy3D Web API"
bl_options = {'REGISTER'}
@classmethod
def poll(cls, context):
space = context.space_data
return (
space.type == 'NODE_EDITOR'
and space.node_tree is not None
and space.node_tree.bl_idname == "MaxwellSimTreeType"
)
def invoke(self, context, event):
is_td_web_authed(force_check=True)
return {'FINISHED'}
####################
# - Blender Registration
####################
BL_REGISTER = [
BlenderMaxwellRefreshTDAuth,
]
BL_KMI_REGISTER = []

View File

@ -3,7 +3,7 @@ import bpy
from .operators import types as operators_types
class BlenderMaxwellAddonPreferences(bpy.types.AddonPreferences):
bl_idname = "blender_maxwell_preferences"
bl_idname = "blender_maxwell"
def draw(self, context):
layout = self.layout

View File

@ -3,3 +3,5 @@ pydantic==2.6.0
sympy==1.12
scipy==1.12.0
trimesh==4.1.4
networkx==3.2.1
Rtree==1.2.0

View File

@ -1,57 +0,0 @@
import types
import tidy3d.web as td_web
AUTHENTICATED = False
def td_auth(api_key: str):
# Check for API Key
if api_key:
msg = "API Key must be defined to authenticate"
raise ValueError(msg)
# Perform Authentication
td_web.configure(api_key)
try:
td_web.test()
except:
msg = "Tidy3D Cloud Authentication Failed"
raise ValueError(msg)
AUTHENTICATED = True
def is_td_web_authed(force_check: bool = False) -> bool:
"""Checks whether `td_web` is authenticated, using the cache.
The result is heuristically accurate.
If accuracy must be guaranteed, an aliveness-check can be performed by setting `force_check=True`.
This comes at a performance penalty, as a web request must be made; thus, `force_check` is not appropriate for hot-paths like `draw` functions.
If a check is performed
"""
global AUTHENTICATED
# Return Cached Authentication
if not force_check:
return AUTHENTICATED
# Re-Check Authentication
try:
td_web.test()
AUTHENTICATED = True ## Guarantee cache value to True.
return True
except:
AUTHENTICATED = False ## Guarantee cache value to False.
return False
def g_td_web(api_key: str, force_check: bool = False) -> types.ModuleType:
"""Returns a `tidy3d.web` module object that is already authenticated using the given API key.
The authentication status is cached using a global module-level variable, `AUTHENTICATED`.
"""
global AUTHENTICATED
# Check Cached Authentication
if not is_td_web_authed(force_check=force_check):
td_auth(api_key)
return td_web

View File

@ -1,3 +1,5 @@
import functools
import sympy as sp
import sympy.physics.units as spu
@ -67,11 +69,21 @@ exahertz.set_global_relative_scale_factor(spu.exa, spu.hertz)
####################
# - Sympy Expression Typing
####################
#ALL_UNIT_SYMBOLS = {
# unit
# for unit in spu.__dict__.values()
# if isinstance(unit, spu.Quantity)
#}
ALL_UNIT_SYMBOLS = {
unit.abbrev: unit
for unit in spu.__dict__.values()
if isinstance(unit, spu.Quantity)
} | {
unit.abbrev: unit
for unit in globals().values()
if isinstance(unit, spu.Quantity)
}
@functools.lru_cache(maxsize=1024)
def parse_abbrev_symbols_to_units(expr: sp.Basic) -> sp.Basic:
print("IN ABBREV", expr)
return expr.subs(ALL_UNIT_SYMBOLS)
#def has_units(expr: sp.Expr):
# return any(
# symbol in ALL_UNIT_SYMBOLS

View File

@ -6,12 +6,12 @@ from pydantic_core import core_schema as pyd_core_schema
import sympy as sp
import sympy.physics.units as spu
from . import extra_sympy_units as spuex
from . import extra_sympy_units as spux
####################
# - Missing Basics
####################
AllowedSympyExprs = sp.Expr | sp.MatrixBase
AllowedSympyExprs = sp.Expr | sp.MatrixBase | sp.MutableDenseMatrix
Complex = typx.Annotated[
complex,
pyd.GetPydanticSchema(
@ -36,11 +36,13 @@ class _SympyExpr:
return value
try:
return sp.sympify(value)
expr = sp.sympify(value)
except ValueError as ex:
msg = f"Value {value} is not a `sympify`able string"
raise ValueError(msg) from ex
return expr.subs(spux.ALL_UNIT_SYMBOLS)
def validate_from_expr(value: AllowedSympyExprs) -> AllowedSympyExprs:
if not (
isinstance(value, sp.Expr)
@ -108,7 +110,7 @@ def ConstrSympyExpr(
# Validate Feature Class
if (not allow_variables) and (len(expr.free_symbols) > 0):
msgs.add(f"allow_variables={allow_variables} does not match expression {expr}.")
if (not allow_units) and spuex.uses_units(expr):
if (not allow_units) and spux.uses_units(expr):
msgs.add(f"allow_units={allow_units} does not match expression {expr}.")
# Validate Structure Class
@ -134,7 +136,7 @@ def ConstrSympyExpr(
# Validate Element Class
if allowed_symbols and expr.free_symbols.issubset(allowed_symbols):
msgs.add(f"allowed_symbols={allowed_symbols} does not match expression {expr}")
if allowed_units and spuex.get_units(expr).issubset(allowed_units):
if allowed_units and spux.get_units(expr).issubset(allowed_units):
msgs.add(f"allowed_units={allowed_units} does not match expression {expr}")
# Validate Shape Class

View File

@ -0,0 +1,407 @@
"""Defines a sane interface to the Tidy3D cloud, as constructed by reverse-engineering the official open-source `tidy3d` client library.
- SimulationTask: <https://github.com/flexcompute/tidy3d/blob/453055e89dcff6d619597120b47817e996f1c198/tidy3d/web/core/task_core.py>
- Tidy3D Stub: <https://github.com/flexcompute/tidy3d/blob/453055e89dcff6d619597120b47817e996f1c198/tidy3d/web/api/tidy3d_stub.py>
"""
from dataclasses import dataclass
import typing as typ
import functools
import datetime as dt
import tidy3d as td
import tidy3d.web as td_web
CloudFolderID = str
CloudFolderName = str
CloudFolder = td_web.core.task_core.Folder
CloudTaskID = str
CloudTaskName = str
CloudTask = td_web.core.task_core.SimulationTask
FileUploadCallback = typ.Callable[[float], None]
## Takes "uploaded bytes" as argument.
####################
# - Module-Level Globals
####################
IS_ONLINE = False
IS_AUTHENTICATED = False
def is_online():
global IS_ONLINE
return IS_ONLINE
def set_online():
global IS_ONLINE
IS_ONLINE = True
def set_offline():
global IS_ONLINE
IS_ONLINE = False
####################
# - Cloud Authentication
####################
def check_authentication() -> bool:
global IS_AUTHENTICATED
global IS_ONLINE
# Check Previous Authentication
## If we authenticated once, we presume that it'll work again.
## TODO: API keys can change... It would just look like "offline" for now.
if IS_AUTHENTICATED:
return True
api_key = td_web.core.http_util.api_key()
if api_key is not None:
try:
td_web.test()
set_online()
except td.exceptions.WebError:
set_offline()
return False
IS_AUTHENTICATED = True
return True
return False
def authenticate_with_api_key(api_key: str) -> bool:
td_web.configure(api_key)
return check_authentication()
####################
# - Cloud Folder
####################
class TidyCloudFolders:
cache_folders: dict[CloudFolderID, CloudFolder] | None = None
####################
# - Folders
####################
@classmethod
def folders(cls) -> dict[CloudFolderID, CloudFolder]:
"""Get all cloud folders as a dict, indexed by ID.
"""
if cls.cache_folders is not None: return cls.cache_folders
try:
cloud_folders = td_web.core.task_core.Folder.list()
set_online()
except td.exceptions.WebError:
set_offline()
msg = "Tried to get cloud folders, but cannot connect to cloud"
raise RuntimeError(msg)
folders = {
cloud_folder.folder_id: cloud_folder
for cloud_folder in cloud_folders
}
cls.cache_folders = folders
return folders
@classmethod
def mk_folder(cls, folder_name: CloudFolderName) -> CloudFolder:
"""Create a cloud folder, raising an exception if it exists.
"""
folders = cls.update_folders()
if folder_name not in {
cloud_folder.folder_name
for cloud_folder in folders.values()
}:
try:
cloud_folder = td_web.core.task_core.Folder.create(folder_name)
set_online()
except td.exceptions.WebError:
set_offline()
msg = "Tried to create cloud folder, but cannot connect to cloud"
raise RuntimeError(msg)
if cls.cache_folders is None: cls.cache_folders = {}
cls.cache_folders[cloud_folder.folder_id] = cloud_folder
return cloud_folder
msg = f"Cannot create cloud folder: Folder '{folder_name}' already exists"
raise ValueError(msg)
@classmethod
def update_folders(cls) -> dict[CloudFolderID, CloudFolder]:
"""Get all cloud folders as a dict, forcing a re-check with the web service.
"""
cls.cache_folders = None
return cls.folders()
## TODO: Support removing folders. Unsure of the semantics (does it recursively delete tasks too?)
####################
# - Cloud Task
####################
@dataclass
class CloudTaskInfo:
"""Toned-down, simplified `dataclass` variant of TaskInfo.
See TaskInfo for more: <https://github.com/flexcompute/tidy3d/blob/453055e89dcff6d619597120b47817e996f1c198/tidy3d/web/core/task_info.py>)
"""
task_name: str
status: str
created_at: dt.datetime
cost_est: typ.Callable[[], float | None]
run_info: typ.Callable[[], tuple[float | None, float | None] | None]
# Timing
completed_at: dt.datetime | None = None ## completedAt
# Cost
cost_real: float | None = None ## realCost
# Sim Properties
task_type: str | None = None ## solverVersion
version_solver: str | None = None ## solverVersion
callback_url: str | None = None ## callbackUrl
class TidyCloudTasks:
"""Greatly simplifies working with Tidy3D Tasks in the Cloud, specifically, via the lowish-level `tidy3d.web.core.task_core.SimulationTask` object.
In particular, cache mechanics ensure that web-requests are only made when absolutely needed.
This greatly improves performance in ex. UI functions.
In particular, `update_task` updates only one task with a single request.
Of particular note are the `SimulationTask` methods that are not abstracted:
- `cloud_task.taskName`: Undocumented, but it works (?)
- `cloud_task.submit()`: Starts the running of a drafted task.
- `cloud_task.real_flex_unit`: `None` until available. Just repeat `update_task` until not None.
- `cloud_task.get_running_info()`: GETs % and field-decay of a running task.
- `cloud_task.get_log(path)`: GET the run log. Remember to use `NamedTemporaryFile` if a stringified log is desired.
"""
cache_tasks: dict[CloudTaskID, CloudTask] = {}
cache_folder_tasks: dict[CloudFolderID, set[CloudTaskID]] = {}
cache_task_info: dict[CloudTaskID, CloudTaskInfo] = {}
@classmethod
def clear_cache(cls):
cls.cache_tasks = {}
####################
# - Task Getters
####################
@classmethod
def task(cls, task_id: CloudTaskID) -> CloudTask | None:
return cls.cache_tasks.get(task_id)
@classmethod
def task_info(cls, task_id: CloudTaskID) -> CloudTaskInfo | None:
return cls.cache_task_info.get(task_id)
@classmethod
def tasks(cls, cloud_folder: CloudFolder) -> dict[CloudTaskID, CloudTask]:
"""Get all cloud tasks within a particular cloud folder as a set.
"""
# Retrieve Cached Tasks
if (task_ids := cls.cache_folder_tasks.get(cloud_folder.folder_id)) is not None:
return {
task_id: cls.cache_tasks[task_id]
for task_id in task_ids
}
# Retrieve Tasks by-Folder
try:
folder_tasks = cloud_folder.list_tasks()
set_online()
except td.exceptions.WebError:
set_offline()
msg = "Tried to get tasks of a cloud folder, but cannot access cloud"
raise RuntimeError(msg)
# No Tasks: Empty Set
if folder_tasks is None:
cls.cache_folder_tasks[cloud_folder.folder_id] = set()
return {}
# Populate Caches
## Direct Task Cache
cloud_tasks = {
cloud_task.task_id: cloud_task
for cloud_task in folder_tasks
}
cls.cache_tasks |= cloud_tasks
## Task Info Cache
for task_id, cloud_task in cloud_tasks.items():
cls.cache_task_info[task_id] = CloudTaskInfo(
task_name=cloud_task.taskName,
status=cloud_task.status,
created_at=cloud_task.created_at,
cost_est=functools.partial(td_web.estimate_cost, cloud_task.task_id),
run_info=cloud_task.get_running_info,
callback_url=cloud_task.callback_url,
)
## Task by-Folder Cache
cls.cache_folder_tasks[cloud_folder.folder_id] = {
task_id
for task_id in cloud_tasks
}
return cloud_tasks
####################
# - Task Create/Delete
####################
@classmethod
def mk_task(
cls,
task_name: CloudTaskName,
cloud_folder: CloudFolder,
sim: td.Simulation,
upload_progress_cb: FileUploadCallback | None = None,
verbose: bool = True,
) -> CloudTask:
"""Creates a `CloudTask` of the given `td.Simulation`.
Presume that `sim.validate_pre_upload()` has already been run, so that the simulation is good to go.
"""
# Create "Stub"
## Minimal Tidy3D object that can be turned into a file for upload
## Has "type" in {"Simulation", "ModeSolver", "HeatSimulation"}
stub = td_web.api.tidy3d_stub.Tidy3dStub(simulation=sim)
# Create Cloud Task
## So far, this is a boring, empty task with no data
## May overlay by name with other tasks - then makes a new "version"
try:
cloud_task = td_web.core.task_core.SimulationTask.create(
task_type=stub.get_type(),
task_name=task_name,
folder_name=cloud_folder.folder_name,
)
set_online()
except td.exceptions.WebError:
set_offline()
msg = "Tried to create cloud task, but cannot access cloud"
raise RuntimeError(msg)
# Upload Simulation to Cloud Task
if not upload_progress_cb is None:
upload_progress_cb = lambda uploaded_bytes: None
try:
cloud_task.upload_simulation(
stub,
verbose=verbose,
#progress_callback=upload_progress_cb,
)
set_online()
except td.exceptions.WebError:
set_offline()
msg = "Tried to upload simulation to cloud task, but cannot access cloud"
raise RuntimeError(msg)
# Populate Caches
## Direct Task Cache
cls.cache_tasks[cloud_task.task_id] = cloud_task
## Task Info Cache
cls.cache_task_info[cloud_task.task_id] = CloudTaskInfo(
task_name=cloud_task.taskName,
status=cloud_task.status,
created_at=cloud_task.created_at,
cost_est=functools.partial(td_web.estimate_cost, cloud_task.task_id),
run_info=cloud_task.get_running_info,
callback_url=cloud_task.callback_url,
)
## Task by-Folder Cache
if cls.cache_folder_tasks.get(cloud_task.folder_id):
cls.cache_folder_tasks[cloud_task.folder_id].add(cloud_task.task_id)
else:
cls.cache_folder_tasks[cloud_task.folder_id] = {cloud_task.task_id}
return cloud_task
####################
# - Task Update/Delete
####################
@classmethod
def rm_task(
cls,
cloud_task: CloudTask,
) -> CloudTask:
"""Deletes a cloud task.
"""
## TODO: Abort first?
task_id = cloud_task.task_id
folder_id = cloud_task.folder_id
try:
cloud_task.delete()
set_online()
except td.exceptions.WebError:
set_offline()
msg = "Tried to delete cloud task, but cannot access cloud"
raise RuntimeError(msg)
# Populate Caches
## Direct Task Cache
cls.cache_tasks.pop(task_id, None)
## Task Info Cache
cls.cache_task_info.pop(task_id, None)
## Task by-Folder Cache
cls.cache_folder_tasks[folder_id].remove(task_id)
@classmethod
def update_task(cls, cloud_task: CloudTask) -> CloudTask:
"""Updates the CloudTask to the latest ex. status attributes.
"""
# BUG: td_web.core.task_core.SimulationTask.get(task_id) doesn't return the `created_at` field.
## Therefore, we unfortunately need to get all tasks for the folder ID just to update one.
# Retrieve Folder
task_id = cloud_task.task_id
folder_id = cloud_task.folder_id
cloud_folder = TidyCloudFolders.folders()[folder_id]
# Repopulate All Caches
## By deleting the folder ID, all tasks within will be reloaded
del cls.cache_folder_tasks[folder_id]
folder_tasks = cls.tasks(cloud_folder)
return cls.tasks(cloud_folder)[task_id]
@classmethod
def update_tasks(cls, folder_id: CloudFolderID) -> dict[CloudTaskID, CloudTask]:
"""Updates the CloudTask to the latest ex. status attributes.
"""
# BUG: td_web.core.task_core.SimulationTask.get(task_id) doesn't return the `created_at` field.
## Therefore, we unfortunately need to get all tasks for the folder ID just to update one.
# Retrieve Folder
cloud_folder = TidyCloudFolders.folders()[folder_id]
# Repopulate All Caches
## By deleting the folder ID, all tasks within will be reloaded
del cls.cache_folder_tasks[folder_id]
folder_tasks = cls.tasks(cloud_folder)
return {
task_id: cls.cache_tasks[task_id]
for task_id in cls.cache_folder_tasks[folder_id]
}
@classmethod
def abort_task(cls, cloud_task: CloudTask) -> CloudTask:
"""Aborts a running CloudTask to the latest ex. status attributes.
"""
## TODO: Check status?
new_cloud_task = cls.update_task(cloud_task)
try:
new_cloud_task.abort()
set_online()
except td.exceptions.WebError:
set_offline()
msg = "Tried to abort cloud task, but cannot access cloud"
raise RuntimeError(msg)
return cls.update_task(cloud_task)