Source code for forge.io

"""Contains routines for reading-from and writing-to files.

Copyright 2025-2026 Chris Marsden

This file is part of FORGE.

FORGE is free software: you can redistribute it and/or modify
it under the terms of the GNU Lesser General Public License as published by
the Free Software Foundation, either version 3 of the License, or
(at your option) any later version.

FORGE is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
GNU Lesser General Public License for more details.

You should have received a copy of the GNU Lesser General Public License
along with FORGE.  If not, see <http://www.gnu.org/licenses/>.
"""

import copy
import json
import pickle
from pathlib import Path
from typing import Any, Union

import numpy as np
from freeqdsk import geqdsk


[docs] def read_geqdsk( path_to_geqdsk = None ): """Reads data from a GEQDSK file. Function to read data from a GEQDSK file. Makes use of the FreeQDSK package's GEQDSK reader. Some additional fields not directly returned by FreeQDSK are derived, and some fields are ignored as they are not relevant to FORGE. Parameters ---------- path_to_geqdsk : str Path to the GEQDSK file to be read. Returns ------- eq_data : dict Dictionary of data extracted from the GEQDSK file. """ if not isinstance(path_to_geqdsk, str): raise TypeError( "Error. path_to_geqdsk was not of the required type - str. Type passed was ", str(type(path_to_geqdsk)) ) # Load the GEQDSK with open(path_to_geqdsk,"r") as f: data = geqdsk.read(f) # Extract the R, Z grid R_min = data["rleft"] R_max = R_min + data["rdim"] nR = data["nx"] R_1D = np.linspace(R_min,R_max,nR,endpoint=True) dR = abs(R_1D[1] - R_1D[0]) Z_mid = data["zmid"] nZ = data["ny"] Z_dim = data["zdim"] Z_min = Z_mid - 0.5 * Z_dim Z_max = Z_mid + 0.5 * Z_dim Z_1D = np.linspace(Z_min,Z_max,nZ,endpoint=True) dZ = abs(Z_1D[1] - Z_1D[0]) # Generate the 2D R,Z grid R_2D, Z_2D = np.meshgrid(R_1D,Z_1D,indexing="ij") # Extract the wall wall_R = data["rlim"].tolist() wall_Z = data["zlim"].tolist() # Check if the wall is joined up if not (wall_R[-1] == wall_R[0] and wall_Z[-1] == wall_Z[0]): wall_R.append(wall_R[0]) wall_Z.append(wall_Z[0]) # Get the 1D psin data for the plasma profiles psin_data = np.linspace(0.0,1.0,nR,endpoint=True) # Get fvac - the toroidal field function in vacuum. # This is constant outside the plasma, wherein fvac = R * B_toroidal. # The GEQDSK stores the toroidal field at a reference point. fvac = data["rcentr"] * data["bcentr"] # Store data a dictionary eq_data = { "R_min": R_min, "R_max": R_max, "nR": nR, "R_1D": R_1D, "dR": dR, "R_2D": R_2D, "Z_min": Z_min, "Z_max": Z_max, "nZ": nZ, "Z_1D": Z_1D, "dZ": dZ, "Z_2D": Z_2D, "wall_R": wall_R, "wall_Z": wall_Z, "psi_2D": data["psi"], "psi_lcfs": data["sibdry"], "psi_axis": data["simagx"], "psin_data": psin_data, "pprime_data": data["pprime"], "ffprime_data": data["ffprime"], "q_data": data["qpsi"], "pressure_data": data["pres"], "fpol_data": data["fpol"], "plasma_current": data["cpasma"], "fvac": fvac } return eq_data
[docs] def read_magnets( path_to_magnets = None # Path to magnets file [str] ): """Reads a PF coil set from a JSON file. Function to read data from a JSON file containing data on the PF coil set. Parameters ---------- path_to_magnets : str Path to the JSON file to be read. Returns ------- magnets_data :dict Dictionary of data extracted from the JSON file [dict]. """ if not isinstance(path_to_magnets, str): raise TypeError( "Error. path_to_magnets was not of the required type - str. Type passed was ", str(type(path_to_magnets)) ) with open(path_to_magnets, 'r') as f: magnets_data = json.load(f) # In addition to loading the magnets data, we will identify which coils appear to have # upper/lower corresponding pairs based off of their names (e.g. P1U and P1L), and produce # a list of suggested circuits [("P1U","P1L"),("P2U","P2L")]. # Extract the coil data coils_data = magnets_data["coils"] # Extract the circuits data - the user may not have defined any circuits, hence it may not be present. try: circuits_data = magnets_data["circuits"] except KeyError: circuits_data = None # Create some suggest circuits suggested_circuits = {} coil_names = list(coils_data.keys()) indeces_checked = [] for coil_name in coil_names: # Get the circuit name, which is just the coil name with the last character dropped circuit_name = coil_name[0:-1] circuit_coils_list = [] if len(circuit_name) > 1: # Get names of all of the coils in this circuit for i in range(len(coil_names)): if i not in indeces_checked: # Potential coil to check coil_name_to_check = coil_names[i] if circuit_name in coil_name_to_check: circuit_coils_list.append(coil_name_to_check) indeces_checked.append(i) if len(circuit_coils_list) > 1: # There were more than 1 coils containing the circuit name, hence this is a viable circuit. suggested_circuits[circuit_name] = circuit_coils_list return coils_data, circuits_data, suggested_circuits
[docs] def write_magnets(machine, path): """Save a Machine's coils and circuits to a JSON file. The output format matches the magnets JSON schema used by :func:`read_magnets`, so a saved file can be loaded back directly. Current values are read from the live coilset, reflecting any post-optimisation changes. Parameters ---------- machine : forge.machine.Machine The machine to serialise. path : str or Path Output file path. """ save_fancy_json(machine.to_dict(), path)
[docs] def write_geqdsk( eq, path_to_geqdsk, ): """Takes a forge.equilibrium Equilibrium object and creates a GEQDSK file. Parameters ---------- eq : forge.equilibrium.Equilibrium object Equilibrium to be output as a GEQDSK. path_to_geqdsk : str Output path for the GEQDSK file. """ # Define a reference radius rcentr = 1.0 data = { 'nx': eq.nR, 'ny': eq.nZ, 'rdim': eq.R_max - eq.R_min, 'zdim': eq.Z_max - eq.Z_min, 'rcentr': rcentr, 'rleft': eq.R_min, 'zmid': 0.5 * (eq.Z_min + eq.Z_max), 'rmagx': eq.R_mag, 'zmagx': eq.Z_mag, 'simagx': eq.psi_axis, 'sibdry': eq.psi_lcfs, 'bcentr': eq.fvac / rcentr, 'cpasma': eq.plasma_current, 'fpol': eq.fpol_data, 'pres': eq.pressure_data, 'ffprime': eq.ffprime_data, 'pprime': eq.pprime_data, 'psi': eq.psi_2D, 'qpsi': eq.q_data, 'nbdry': len(eq.R_lcfs), 'nlim': len(eq.wall_R), 'rbdry': eq.R_lcfs, 'zbdry': eq.Z_lcfs, 'rlim': eq.wall_R, 'zlim': eq.wall_Z, } time = int(0) with open(path_to_geqdsk,"w+") as f: geqdsk.write(data,f,"FORGE",0,time) f.close()
[docs] def fancy_json_string(data: Any, indent: int = 2) -> str: """Serialize *data* to a JSON string with compact arrays and pretty dicts. Produces JSON where: - Dictionary entries split across new lines according to `indent`. - Lists/tuples/NumPy arrays kept on a single line (compact). - NumPy scalars converted to Python scalars. - Dicts that appear inside lists serialized in a compact (minified) form so the parent list remains single-line. Parameters ---------- data : Any The (typically dict) object to serialize. indent : int Number of spaces used for indenting dictionary entries. Returns ------- str The formatted JSON string. Notes ----- - This function prefers valid JSON. NaN and ±Inf are converted to null. - Large lists will be written on a single (long) line by design. - Key order follows the insertion order of the input dict (Python 3.7+). Examples -------- >>> d = { ... "meta": {"shot": 12345, "machine": "MAST-U"}, ... "channels": ["A", "B", "C"], ... "numbers": [1, 2, 3.5], ... } >>> save_json_compact_arrays(d, "out.json", indent=4) # Result (schematic): # { # "meta": { # "shot": 12345, # "machine": "MAST-U" # }, # "channels": ["A", "B", "C"], # "numbers": [1, 2, 3.5] # } """ if not isinstance(indent, int) or indent < 0: raise ValueError("indent must be a non-negative integer") def _to_python_basic(obj: Any) -> Any: """Convert NumPy types to native Python types; leave others unchanged.""" if isinstance(obj, np.ndarray): return obj.tolist() if isinstance(obj, (np.integer,)): return int(obj) if isinstance(obj, (np.floating,)): val = float(obj) # Keep valid JSON: map NaN/Inf to null if val != val or val in (float("inf"), float("-inf")): return None return val if isinstance(obj, (np.bool_,)): return bool(obj) if obj is np.nan or obj is np.inf or obj is -np.inf: return None return obj def _encode_atomic(x: Any) -> str: """Encode non-container JSON atomics (str, int, float, bool, null) with no extra spaces. Throws TypeError if a container is passed. """ x = _to_python_basic(x) if isinstance(x, (dict, list, tuple)): raise TypeError("encode_atomic called with container type") # ensure_ascii=False keeps unicode; separators minify; allow_nan=False enforces valid JSON return json.dumps(x, ensure_ascii=False, separators=(",", ":"), allow_nan=False) def _encode(obj: Any, level: int = 0, compact: bool = False) -> str: """Recursive encoder. - Dicts: * pretty with indentation/newlines unless compact=True - Lists/Tuples: * always single-line * elements encoded compactly; dict elements inside lists are minified - Atomics: * minified via _encode_atomic """ obj = _to_python_basic(obj) sp = " " * indent # Dictionaries if isinstance(obj, dict): if not obj: return "{}" if compact: # Minified dict (no newlines) — used when dict appears inside a list parts = [] for k, v in obj.items(): key = json.dumps(str(k), ensure_ascii=False, separators=(",", ":")) # Inside a compact dict, encode children compactly too val = _encode(v, level=0, compact=True) parts.append(f"{key}:{val}") return "{" + ",".join(parts) + "}" else: # Pretty dict with newlines/indentation lines = [] for k, v in obj.items(): key = json.dumps(str(k), ensure_ascii=False, separators=(",", ":")) # If the child is a list/tuple, force compact=True to keep it single-line. v_basic = _to_python_basic(v) if isinstance(v_basic, (list, tuple)): val_str = _encode(v, level + 1, compact=True) else: val_str = _encode(v, level + 1, compact=False) lines.append(f"{sp * (level + 1)}{key}: {val_str}") return "{\n" + ",\n".join(lines) + "\n" + sp * level + "}" # Lists/Tuples — always one line if isinstance(obj, (list, tuple)): elems = [] for el in obj: el_basic = _to_python_basic(el) if isinstance(el_basic, dict): elems.append(_encode(el, level=level, compact=True)) # minified dict in list elif isinstance(el_basic, (list, tuple)): elems.append(_encode(el, level=level, compact=True)) # nested list stays one line else: elems.append(_encode_atomic(el)) return "[" + ", ".join(elems) + "]" # Atomics return _encode_atomic(obj) text = _encode(data, level=0, compact=False) return text
[docs] def save_fancy_json(data: Any, out_path: Union[str, Path], indent: int = 2) -> None: """Write *data* to a JSON file using :func:`fancy_json_string` formatting. Parameters ---------- data : Any The object to serialize (typically a dict). out_path : str or pathlib.Path Destination file path. indent : int Number of spaces for indenting dictionary entries. """ text = fancy_json_string(data, indent=indent) out_path = Path(out_path) out_path.parent.mkdir(parents=True, exist_ok=True) out_path.write_text(text, encoding="utf-8")
# -- Attributes that cannot be pickled (prepared geometries, threads, callbacks). _UNPICKLABLE_ATTRS = ("_prepared_wall", "_prepared_buffers", "_stop_event", "on_iteration")
[docs] def save_optimiser(optimiser, path): """Save an Optimiser object to a pickle file. A shallow copy of the object is made and any unpicklable attributes (Shapely prepared geometries, threading events, callbacks) are stripped from the copy before serialisation. The original object is never modified. Parameters ---------- optimiser : forge.optimise.Optimiser The optimiser instance to save. path : str or pathlib.Path Destination file path (typically ending in ``.pkl``). See Also -------- load_optimiser : Load an Optimiser object from a pickle file. """ path = Path(path) # Work on a shallow copy so the caller's object is untouched. opt_copy = copy.copy(optimiser) for attr in _UNPICKLABLE_ATTRS: if hasattr(opt_copy, attr): delattr(opt_copy, attr) with open(path, "wb") as f: pickle.dump(opt_copy, f)
[docs] def load_optimiser(path): """Load an Optimiser object from a pickle file. Parameters ---------- path : str or pathlib.Path Path to the pickle file to be read. Returns ------- optimiser : forge.optimise.Optimiser The deserialised optimiser instance. See Also -------- save_optimiser : Save an Optimiser object to a pickle file. """ path = Path(path) with open(path, "rb") as f: return pickle.load(f)
# --------------------------------------------------------------------------- # Strike geometry I/O # ---------------------------------------------------------------------------
[docs] def save_strike_geometry(divertor_data, path, *, buffers=None, xpoint_regions=None): """Save strike geometry definitions to a JSON file. The file uses a region-first layout:: { "lower_outer": { "strike": {"strike_R": [...], "strike_Z": [...]}, "buffers": [{"R": [...], "Z": [...], "distance": d}, ...], "xpoint_regions": {"R": [...], "Z": [...]}, "connection_length_multiplication_factor_zero": 2.0, "weight_connection_length": null, ... }, ... } Parameters ---------- divertor_data : dict Divertor data dict as returned by :meth:`~forge.gui.geometry_tab.GeometryTab.build_divertor_data` or constructed manually in a script. Only the user-facing keys are written; internal keys added by the Optimiser are stripped. path : str or pathlib.Path Destination file path (typically ending in ``.json``). buffers : dict or None Optional buffer definitions keyed by region name. xpoint_regions : dict or None Optional X-point region polygon definitions keyed by region name. """ path = Path(path) _STRIKE_KEYS = {"strike_R", "strike_Z"} _SETTING_KEYS = { "connection_length_multiplication_factor_zero", "weight_connection_length", "weight_strike_point_distance", "weight_xpoint_region", } # Collect every region name mentioned in any of the inputs. all_regions = set(divertor_data) if buffers is not None: all_regions |= set(buffers) if xpoint_regions is not None: all_regions |= set(xpoint_regions) payload: dict[str, Any] = {} for region in sorted(all_regions): entry: dict[str, Any] = {} # Strike sub-dict dd = divertor_data.get(region, {}) strike = {k: _to_serialisable(dd[k]) for k in _STRIKE_KEYS if k in dd} if strike: entry["strike"] = strike # Per-region settings (flat keys) for k in _SETTING_KEYS: if k in dd: entry[k] = _to_serialisable(dd[k]) # Buffers if buffers is not None and region in buffers: entry["buffers"] = [ {bk: _to_serialisable(bv) for bk, bv in buf.items()} for buf in buffers[region] ] # X-point regions if xpoint_regions is not None and region in xpoint_regions: entry["xpoint_regions"] = { pk: _to_serialisable(pv) for pk, pv in xpoint_regions[region].items() } payload[region] = entry save_fancy_json(payload, path)
[docs] def load_strike_geometry(path): """Load strike geometry definitions from a JSON file. Parameters ---------- path : str or pathlib.Path Path to a JSON file previously written by :func:`save_strike_geometry`. Returns ------- divertor_data : dict Divertor data dict suitable for passing to :class:`~forge.optimise.Optimiser`. buffers : dict or None Buffer definitions, or *None* if the file contains none. xpoint_regions : dict or None X-point region polygon definitions, or *None* if the file contains none. """ path = Path(path) with open(path, "r") as f: payload = json.load(f) _SETTING_KEYS = { "connection_length_multiplication_factor_zero", "weight_connection_length", "weight_strike_point_distance", "weight_xpoint_region", } divertor_data = {} buffers = {} xpoint_regions = {} for region, entry in payload.items(): # Strike geometry strike = entry.get("strike", {}) if "strike_R" in strike and "strike_Z" in strike: dd_entry = {"strike_R": strike["strike_R"], "strike_Z": strike["strike_Z"]} for k in _SETTING_KEYS: if k in entry: dd_entry[k] = entry[k] divertor_data[region] = dd_entry # Buffers if "buffers" in entry: buffers[region] = entry["buffers"] # X-point regions if "xpoint_regions" in entry: xpoint_regions[region] = entry["xpoint_regions"] return ( divertor_data if divertor_data else None, buffers if buffers else None, xpoint_regions if xpoint_regions else None, )
def _to_serialisable(obj): """Convert numpy types to native Python for JSON serialisation.""" if isinstance(obj, np.ndarray): return obj.tolist() if isinstance(obj, (np.integer,)): return int(obj) if isinstance(obj, (np.floating,)): return float(obj) return obj # --------------------------------------------------------------------------- # Full optimisation configuration I/O # --------------------------------------------------------------------------- # Keys from divertor_data that belong in the "strike" sub-dict. _STRIKE_KEYS = {"strike_R", "strike_Z"} # Per-region setting keys stored alongside "strike" / "buffers" / "xpoint_regions". _REGION_SETTING_KEYS = { "connection_length_multiplication_factor_zero", "weight_connection_length", "weight_strike_point_distance", "weight_xpoint_region", } # Scalar optimiser parameters that map directly to Optimiser.__init__ kwargs. _OPTIMISER_SCALAR_KEYS = [ "max_evals", "current_step_size_factor", "estimate_initial_currents", "initial_temperature", "min_temperature", "threshold_acceptance_rate_decay", "initial_threshold_acceptance_rate", "n_window", "cost_termination_fraction", "field_line_trace_step_size", "field_line_trace_max_steps", "field_line_trace_psi_tollerance", "buffer_intersection_penalty_factor", "initial_total_connection_length_cost", "initial_total_strike_point_distance_cost", "initial_coil_currents_cost", "initial_xpoint_regions_cost", "use_xpoint_regions", "use_buffers", "max_magnetic_disconnection_factor", "initial_alpha", "alpha_update_factor", "max_cooling_factor", "min_cooling_factor", "detailed_logging", ]
[docs] def save_optimisation_config( path, *, divertor_data=None, buffers=None, xpoint_regions=None, constraints=None, **optimiser_kwargs, ): """Save a complete optimisation configuration to a JSON file. The file captures every setting needed to reproduce an optimisation run (except the equilibrium and machine data, which must be loaded separately from their own files). The JSON layout is:: { "geometry": { "lower_outer": { "strike": {"strike_R": ..., "strike_Z": ...}, "buffers": [...], "xpoint_regions": {"R": [...], "Z": [...]}, <per-region settings> }, ... }, "constraints": { ... }, "optimiser": { "max_evals": 3000, "initial_temperature": 10.0, ... } } Parameters ---------- path : str or pathlib.Path Destination file path (typically ending in ``.json``). divertor_data : dict or None Divertor data dict (strike geometry + per-region settings). buffers : dict or None Buffer definitions keyed by region name. xpoint_regions : dict or None X-point region polygons keyed by region name. constraints : dict or None Constraints dict with ``"annealing"`` and ``"tikhonov"`` sections. **optimiser_kwargs Scalar optimiser parameters (temperatures, step sizes, cost weights, feature toggles, etc.). Only recognised keys from ``Optimiser.__init__`` are written. """ path = Path(path) # --- geometry section (region-first layout) --- geometry = {} all_regions: set[str] = set() if divertor_data is not None: all_regions |= set(divertor_data) if buffers is not None: all_regions |= set(buffers) if xpoint_regions is not None: all_regions |= set(xpoint_regions) for region in sorted(all_regions): entry: dict[str, Any] = {} dd = (divertor_data or {}).get(region, {}) strike = {k: _to_serialisable(dd[k]) for k in _STRIKE_KEYS if k in dd} if strike: entry["strike"] = strike for k in _REGION_SETTING_KEYS: if k in dd: entry[k] = _to_serialisable(dd[k]) if buffers is not None and region in buffers: entry["buffers"] = [ {bk: _to_serialisable(bv) for bk, bv in buf.items()} for buf in buffers[region] ] if xpoint_regions is not None and region in xpoint_regions: entry["xpoint_regions"] = { pk: _to_serialisable(pv) for pk, pv in xpoint_regions[region].items() } geometry[region] = entry # --- optimiser scalars --- opt_section = {} for key in _OPTIMISER_SCALAR_KEYS: if key in optimiser_kwargs: opt_section[key] = _to_serialisable(optimiser_kwargs[key]) # --- assemble --- payload: dict[str, Any] = {} if geometry: payload["geometry"] = geometry if constraints is not None: payload["constraints"] = constraints if opt_section: payload["optimiser"] = opt_section save_fancy_json(payload, path)
[docs] def load_optimisation_config(path): """Load a complete optimisation configuration from a JSON file. Parameters ---------- path : str or pathlib.Path Path to a JSON file previously written by :func:`save_optimisation_config`. Returns ------- config : dict Dictionary with the following keys (any may be *None* if absent from the file): - ``"divertor_data"`` — dict suitable for ``Optimiser(divertor_data=...)``. - ``"buffers"`` — dict suitable for ``Optimiser(buffers=...)``. - ``"xpoint_regions"`` — dict suitable for ``Optimiser(xpoint_regions=...)``. - ``"constraints"`` — dict suitable for ``Optimiser(constraints=...)``. - ``"optimiser"`` — dict of scalar kwargs to unpack into ``Optimiser(**cfg["optimiser"])``. """ path = Path(path) with open(path, "r") as f: payload = json.load(f) # --- geometry --- geometry = payload.get("geometry", {}) divertor_data = {} buffers_out = {} xpt_out = {} for region, entry in geometry.items(): strike = entry.get("strike", {}) if "strike_R" in strike and "strike_Z" in strike: dd_entry = {"strike_R": strike["strike_R"], "strike_Z": strike["strike_Z"]} for k in _REGION_SETTING_KEYS: if k in entry: dd_entry[k] = entry[k] divertor_data[region] = dd_entry if "buffers" in entry: buffers_out[region] = entry["buffers"] if "xpoint_regions" in entry: xpt_out[region] = entry["xpoint_regions"] return { "divertor_data": divertor_data or None, "buffers": buffers_out or None, "xpoint_regions": xpt_out or None, "constraints": payload.get("constraints"), "optimiser": payload.get("optimiser", {}), }