oscillode/blender_maxwell/utils/tdcloud.py

408 lines
12 KiB
Python
Raw Normal View History

"""Defines a sane interface to the Tidy3D cloud, as constructed by reverse-engineering the official open-source `tidy3d` client library.
- SimulationTask: <https://github.com/flexcompute/tidy3d/blob/453055e89dcff6d619597120b47817e996f1c198/tidy3d/web/core/task_core.py>
- Tidy3D Stub: <https://github.com/flexcompute/tidy3d/blob/453055e89dcff6d619597120b47817e996f1c198/tidy3d/web/api/tidy3d_stub.py>
"""
from dataclasses import dataclass
import typing as typ
import functools
import datetime as dt
import tidy3d as td
import tidy3d.web as td_web
CloudFolderID = str
CloudFolderName = str
CloudFolder = td_web.core.task_core.Folder
CloudTaskID = str
CloudTaskName = str
CloudTask = td_web.core.task_core.SimulationTask
FileUploadCallback = typ.Callable[[float], None]
## Takes "uploaded bytes" as argument.
####################
# - Module-Level Globals
####################
IS_ONLINE = False
IS_AUTHENTICATED = False
def is_online():
global IS_ONLINE
return IS_ONLINE
def set_online():
global IS_ONLINE
IS_ONLINE = True
def set_offline():
global IS_ONLINE
IS_ONLINE = False
####################
# - Cloud Authentication
####################
def check_authentication() -> bool:
global IS_AUTHENTICATED
global IS_ONLINE
# Check Previous Authentication
## If we authenticated once, we presume that it'll work again.
## TODO: API keys can change... It would just look like "offline" for now.
if IS_AUTHENTICATED:
return True
api_key = td_web.core.http_util.api_key()
if api_key is not None:
try:
td_web.test()
set_online()
except td.exceptions.WebError:
set_offline()
return False
IS_AUTHENTICATED = True
return True
return False
def authenticate_with_api_key(api_key: str) -> bool:
td_web.configure(api_key)
return check_authentication()
####################
# - Cloud Folder
####################
class TidyCloudFolders:
cache_folders: dict[CloudFolderID, CloudFolder] | None = None
####################
# - Folders
####################
@classmethod
def folders(cls) -> dict[CloudFolderID, CloudFolder]:
"""Get all cloud folders as a dict, indexed by ID.
"""
if cls.cache_folders is not None: return cls.cache_folders
try:
cloud_folders = td_web.core.task_core.Folder.list()
set_online()
except td.exceptions.WebError:
set_offline()
msg = "Tried to get cloud folders, but cannot connect to cloud"
raise RuntimeError(msg)
folders = {
cloud_folder.folder_id: cloud_folder
for cloud_folder in cloud_folders
}
cls.cache_folders = folders
return folders
@classmethod
def mk_folder(cls, folder_name: CloudFolderName) -> CloudFolder:
"""Create a cloud folder, raising an exception if it exists.
"""
folders = cls.update_folders()
if folder_name not in {
cloud_folder.folder_name
for cloud_folder in folders.values()
}:
try:
cloud_folder = td_web.core.task_core.Folder.create(folder_name)
set_online()
except td.exceptions.WebError:
set_offline()
msg = "Tried to create cloud folder, but cannot connect to cloud"
raise RuntimeError(msg)
if cls.cache_folders is None: cls.cache_folders = {}
cls.cache_folders[cloud_folder.folder_id] = cloud_folder
return cloud_folder
msg = f"Cannot create cloud folder: Folder '{folder_name}' already exists"
raise ValueError(msg)
@classmethod
def update_folders(cls) -> dict[CloudFolderID, CloudFolder]:
"""Get all cloud folders as a dict, forcing a re-check with the web service.
"""
cls.cache_folders = None
return cls.folders()
## TODO: Support removing folders. Unsure of the semantics (does it recursively delete tasks too?)
####################
# - Cloud Task
####################
@dataclass
class CloudTaskInfo:
"""Toned-down, simplified `dataclass` variant of TaskInfo.
See TaskInfo for more: <https://github.com/flexcompute/tidy3d/blob/453055e89dcff6d619597120b47817e996f1c198/tidy3d/web/core/task_info.py>)
"""
task_name: str
status: str
created_at: dt.datetime
cost_est: typ.Callable[[], float | None]
run_info: typ.Callable[[], tuple[float | None, float | None] | None]
# Timing
completed_at: dt.datetime | None = None ## completedAt
# Cost
cost_real: float | None = None ## realCost
# Sim Properties
task_type: str | None = None ## solverVersion
version_solver: str | None = None ## solverVersion
callback_url: str | None = None ## callbackUrl
class TidyCloudTasks:
"""Greatly simplifies working with Tidy3D Tasks in the Cloud, specifically, via the lowish-level `tidy3d.web.core.task_core.SimulationTask` object.
In particular, cache mechanics ensure that web-requests are only made when absolutely needed.
This greatly improves performance in ex. UI functions.
In particular, `update_task` updates only one task with a single request.
Of particular note are the `SimulationTask` methods that are not abstracted:
- `cloud_task.taskName`: Undocumented, but it works (?)
- `cloud_task.submit()`: Starts the running of a drafted task.
- `cloud_task.real_flex_unit`: `None` until available. Just repeat `update_task` until not None.
- `cloud_task.get_running_info()`: GETs % and field-decay of a running task.
- `cloud_task.get_log(path)`: GET the run log. Remember to use `NamedTemporaryFile` if a stringified log is desired.
"""
cache_tasks: dict[CloudTaskID, CloudTask] = {}
cache_folder_tasks: dict[CloudFolderID, set[CloudTaskID]] = {}
cache_task_info: dict[CloudTaskID, CloudTaskInfo] = {}
@classmethod
def clear_cache(cls):
cls.cache_tasks = {}
####################
# - Task Getters
####################
@classmethod
def task(cls, task_id: CloudTaskID) -> CloudTask | None:
return cls.cache_tasks.get(task_id)
@classmethod
def task_info(cls, task_id: CloudTaskID) -> CloudTaskInfo | None:
return cls.cache_task_info.get(task_id)
@classmethod
def tasks(cls, cloud_folder: CloudFolder) -> dict[CloudTaskID, CloudTask]:
"""Get all cloud tasks within a particular cloud folder as a set.
"""
# Retrieve Cached Tasks
if (task_ids := cls.cache_folder_tasks.get(cloud_folder.folder_id)) is not None:
return {
task_id: cls.cache_tasks[task_id]
for task_id in task_ids
}
# Retrieve Tasks by-Folder
try:
folder_tasks = cloud_folder.list_tasks()
set_online()
except td.exceptions.WebError:
set_offline()
msg = "Tried to get tasks of a cloud folder, but cannot access cloud"
raise RuntimeError(msg)
# No Tasks: Empty Set
if folder_tasks is None:
cls.cache_folder_tasks[cloud_folder.folder_id] = set()
return {}
# Populate Caches
## Direct Task Cache
cloud_tasks = {
cloud_task.task_id: cloud_task
for cloud_task in folder_tasks
}
cls.cache_tasks |= cloud_tasks
## Task Info Cache
for task_id, cloud_task in cloud_tasks.items():
cls.cache_task_info[task_id] = CloudTaskInfo(
task_name=cloud_task.taskName,
status=cloud_task.status,
created_at=cloud_task.created_at,
cost_est=functools.partial(td_web.estimate_cost, cloud_task.task_id),
run_info=cloud_task.get_running_info,
callback_url=cloud_task.callback_url,
)
## Task by-Folder Cache
cls.cache_folder_tasks[cloud_folder.folder_id] = {
task_id
for task_id in cloud_tasks
}
return cloud_tasks
####################
# - Task Create/Delete
####################
@classmethod
def mk_task(
cls,
task_name: CloudTaskName,
cloud_folder: CloudFolder,
sim: td.Simulation,
upload_progress_cb: FileUploadCallback | None = None,
verbose: bool = True,
) -> CloudTask:
"""Creates a `CloudTask` of the given `td.Simulation`.
Presume that `sim.validate_pre_upload()` has already been run, so that the simulation is good to go.
"""
# Create "Stub"
## Minimal Tidy3D object that can be turned into a file for upload
## Has "type" in {"Simulation", "ModeSolver", "HeatSimulation"}
stub = td_web.api.tidy3d_stub.Tidy3dStub(simulation=sim)
# Create Cloud Task
## So far, this is a boring, empty task with no data
## May overlay by name with other tasks - then makes a new "version"
try:
cloud_task = td_web.core.task_core.SimulationTask.create(
task_type=stub.get_type(),
task_name=task_name,
folder_name=cloud_folder.folder_name,
)
set_online()
except td.exceptions.WebError:
set_offline()
msg = "Tried to create cloud task, but cannot access cloud"
raise RuntimeError(msg)
# Upload Simulation to Cloud Task
if not upload_progress_cb is None:
upload_progress_cb = lambda uploaded_bytes: None
try:
cloud_task.upload_simulation(
stub,
verbose=verbose,
#progress_callback=upload_progress_cb,
)
set_online()
except td.exceptions.WebError:
set_offline()
msg = "Tried to upload simulation to cloud task, but cannot access cloud"
raise RuntimeError(msg)
# Populate Caches
## Direct Task Cache
cls.cache_tasks[cloud_task.task_id] = cloud_task
## Task Info Cache
cls.cache_task_info[cloud_task.task_id] = CloudTaskInfo(
task_name=cloud_task.taskName,
status=cloud_task.status,
created_at=cloud_task.created_at,
cost_est=functools.partial(td_web.estimate_cost, cloud_task.task_id),
run_info=cloud_task.get_running_info,
callback_url=cloud_task.callback_url,
)
## Task by-Folder Cache
if cls.cache_folder_tasks.get(cloud_task.folder_id):
cls.cache_folder_tasks[cloud_task.folder_id].add(cloud_task.task_id)
else:
cls.cache_folder_tasks[cloud_task.folder_id] = {cloud_task.task_id}
return cloud_task
####################
# - Task Update/Delete
####################
@classmethod
def rm_task(
cls,
cloud_task: CloudTask,
) -> CloudTask:
"""Deletes a cloud task.
"""
## TODO: Abort first?
task_id = cloud_task.task_id
folder_id = cloud_task.folder_id
try:
cloud_task.delete()
set_online()
except td.exceptions.WebError:
set_offline()
msg = "Tried to delete cloud task, but cannot access cloud"
raise RuntimeError(msg)
# Populate Caches
## Direct Task Cache
cls.cache_tasks.pop(task_id, None)
## Task Info Cache
cls.cache_task_info.pop(task_id, None)
## Task by-Folder Cache
cls.cache_folder_tasks[folder_id].remove(task_id)
@classmethod
def update_task(cls, cloud_task: CloudTask) -> CloudTask:
"""Updates the CloudTask to the latest ex. status attributes.
"""
# BUG: td_web.core.task_core.SimulationTask.get(task_id) doesn't return the `created_at` field.
## Therefore, we unfortunately need to get all tasks for the folder ID just to update one.
# Retrieve Folder
task_id = cloud_task.task_id
folder_id = cloud_task.folder_id
cloud_folder = TidyCloudFolders.folders()[folder_id]
# Repopulate All Caches
## By deleting the folder ID, all tasks within will be reloaded
del cls.cache_folder_tasks[folder_id]
folder_tasks = cls.tasks(cloud_folder)
return cls.tasks(cloud_folder)[task_id]
@classmethod
def update_tasks(cls, folder_id: CloudFolderID) -> dict[CloudTaskID, CloudTask]:
"""Updates the CloudTask to the latest ex. status attributes.
"""
# BUG: td_web.core.task_core.SimulationTask.get(task_id) doesn't return the `created_at` field.
## Therefore, we unfortunately need to get all tasks for the folder ID just to update one.
# Retrieve Folder
cloud_folder = TidyCloudFolders.folders()[folder_id]
# Repopulate All Caches
## By deleting the folder ID, all tasks within will be reloaded
del cls.cache_folder_tasks[folder_id]
folder_tasks = cls.tasks(cloud_folder)
return {
task_id: cls.cache_tasks[task_id]
for task_id in cls.cache_folder_tasks[folder_id]
}
@classmethod
def abort_task(cls, cloud_task: CloudTask) -> CloudTask:
"""Aborts a running CloudTask to the latest ex. status attributes.
"""
## TODO: Check status?
new_cloud_task = cls.update_task(cloud_task)
try:
new_cloud_task.abort()
set_online()
except td.exceptions.WebError:
set_offline()
msg = "Tried to abort cloud task, but cannot access cloud"
raise RuntimeError(msg)
return cls.update_task(cloud_task)