Source code for forge.optimise

"""Contains the principal FORGE optimisation procedure.

Copyright 2025-2026 Chris Marsden

This file is part of FORGE.

FORGE is free software: you can redistribute it and/or modify
it under the terms of the GNU Lesser General Public License as published by
the Free Software Foundation, either version 3 of the License, or
(at your option) any later version.

FORGE is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
GNU Lesser General Public License for more details.

You should have received a copy of the GNU Lesser General Public License
along with FORGE.  If not, see <http://www.gnu.org/licenses/>.
"""

import logging
import math
import threading
from collections import deque
from copy import deepcopy

import matplotlib.pyplot as plt
import numpy as np
from scipy.linalg import null_space
from shapely import LineString, Point
from shapely.prepared import prep

from forge.critical import find_critical
from forge.equilibrium import Equilibrium
from forge.utils import (
    densify_closed_shape,
    estimate_xpoint_location,
    grid_points_inside_linestring,
)

logger = logging.getLogger(__name__)

ONE_2PI = 1.0 / (2.0 * np.pi)

# Shared singleton returned by check_field_line_intersection when there
# is no intersection.  Avoids allocating a new dict on every call (the
# common case — the vast majority of RK4 steps do not hit anything).
_NO_INTERSECTION = {"intersects": False}


def _fast_bilinear(grid, fi, fj, nR_m1, nZ_m1):
    """Bilinear interpolation on a regular grid from pre-computed fractional indices.

    Avoids the ~50-100 us Python overhead of RectBivariateSpline.ev() per scalar
    call by doing the interpolation directly.  On a regular grid this reduces to
    simple index arithmetic plus four multiply-adds.

    Parameters
    ----------
    grid : 2D numpy array (nR, nZ)
        The data field to interpolate.
    fi : float
        Fractional index in the R direction: (R - R_min) / dR.
    fj : float
        Fractional index in the Z direction: (Z - Z_min) / dZ.
    nR_m1 : int
        nR - 1 (maximum valid integer index in the R direction).
    nZ_m1 : int
        nZ - 1 (maximum valid integer index in the Z direction).

    Returns
    -------
    float
        Interpolated value.
    """
    # Clamp integer indices to valid range
    i = int(fi)
    j = int(fj)
    if i < 0:
        i = 0
    elif i > nR_m1 - 1:
        i = nR_m1 - 1
    if j < 0:
        j = 0
    elif j > nZ_m1 - 1:
        j = nZ_m1 - 1

    # Fractional parts
    dr = fi - i
    dz = fj - j

    # Bilinear interpolation (4 grid lookups, 4 multiplies)
    return (grid[i, j]     * (1.0 - dr) * (1.0 - dz) +
            grid[i + 1, j] * dr          * (1.0 - dz) +
            grid[i, j + 1] * (1.0 - dr) * dz +
            grid[i + 1, j + 1] * dr      * dz)


def _fast_bilinear_vec(grid, fi_arr, fj_arr, nR_m1, nZ_m1):
    """Vectorised bilinear interpolation on a regular grid.

    Same as ``_fast_bilinear`` but operates on 1-D arrays of fractional
    indices, returning a 1-D array of interpolated values.  This is used
    for the connection-length calculation where Br/Bz are needed at points
    along a traced field line.
    """
    i = np.clip(fi_arr.astype(np.intp), 0, nR_m1 - 1)
    j = np.clip(fj_arr.astype(np.intp), 0, nZ_m1 - 1)
    dr = fi_arr - i
    dz = fj_arr - j
    return (grid[i, j]         * (1.0 - dr) * (1.0 - dz) +
            grid[i + 1, j]     * dr          * (1.0 - dz) +
            grid[i, j + 1]     * (1.0 - dr) * dz          +
            grid[i + 1, j + 1] * dr          * dz)


[docs] class Optimiser: """Object representing the simulated annealing optimiser. This Optimiser will carry out the simulated annealing-based optimisation of of the magnetic geometry of the divertor(s). Parameters ---------- eq : forge.equilibrium Equilibrium object The equilibrium that will be optimised. divertor_data : dict A dictionary of data pertaining to the divertor region(s) to be optimised. tokamak_initial : forge.machine Machine object Tokamak associated with the initial starting equilibrium. This machine should contain the PF coil set used to make the the initial equilibrium. tokamak_opt : forge.machine Machine object Tokamak that the user wishes to perform the optimisation on. This machine does not need to have the same PF coil set as tokamak_initial. If this is None, then tokamak_initial - the tokamak used to make the initial equilibrium - will be used. If tokamak_opt is not None then an estimation of an initial set of coil currents will be made. max_evals : int The maximum number of cost function evaluations to carry out (not including the initial evaluation). current_step_size_factor : float The step size taken when moving in current space is given by this factor of a typical coil current size taken from the set of initial coil currents. initial_temperature : float The initial temperature of the system. This pseudo-temperature is what is lowered during the annealing process - analogous of a hot metal being cooled. threshold_acceptance_rate_decay : float The annealing acceptance rate, over a specified window, must fall below a threshold value before cooling is allowed. This threshold acceptance rate is temperature dependent, and evolves as R = R0 * exp(-λ*(1-T/T0)) where R0 is the initial threshold value, T is the current temperature, T0 is the initial temperature and λ is this parameter, the threshold_acceptance_rate_decay constant. If λ is larger, the threshold drops more aggresively. If the optimiser should be more explorative, λ should be lower. initial_threshold_acceptance_rate : float The initial threshold value for the acceptance rate, R0. n_window : int The size of the window over which the rolling acceptance rate is calculated. constraints : dict A dictionary of constraints to be used. See the "generate_default_constraints" method. field_line_trace_step_size : float The poloidal step size (in metres) used by the RK4 field-line tracer. field_line_trace_max_steps : int The maximum number of steps the field-line tracer will take before abandoning a trace (e.g. if it never reaches the wall). field_line_trace_psi_tollerance : float Target maximum value of the relative error on the poloidal magnetic flux of the traced field line(s). buffer_intersection_penalty_factor : float A multiplier on the connection length cost term to be applied if the divertor leg intersects a buffer on the tokamak's wall. initial_total_connection_length_cost : float The initial total combined connection length cost across all divertor regions. initial_total_strike_point_distance_cost : float The initial total combined strike point distance cost across all divertor regions. initial_coil_currents_cost : float The initial cost due to the coil currents. self.initial_xpoint_regions_cost : float The initial cost due to regions in which additional X-points are ncouraged to be formed. use_xpoint_regions : bool Flag for using regions in which secondary divertor X-point(s) will be encouraged to form. xpoint_regions : dict A dictionary mapping divertor region names to X-point region definitions. Required when ``use_xpoint_regions`` is True. Each key is a divertor region name (e.g. ``"lower_outer"``) and each value is a dictionary containing the R and Z coordinates of the region boundary: ``{"lower_outer": {"R": [...], "Z": [...]}, ...}``. max_magnetic_disconnection_factor : float Maximum relative difference in poloidal magnetic flux between a secondary divertor X-point and the separatrix, i.e. |ψ_xpt − ψ_lcfs| / |ψ_lcfs|. This sets the scale at which the magnetic disconnection cost saturates when forming an X-point target (XPT) geometry. use_buffers : bool Flag for adding buffer regions around parts of the wall. buffers : dict Buffer region definitions keyed by divertor region name. Required when ``use_buffers`` is True. Each value is a list of dictionaries containing the R and Z coordinates of the wall segment endpoints and the buffer distance:: {"lower_outer": [{"R": [R1, R2], "Z": [Z1, Z2], "distance": d}, ...], ...} initial_alpha : float The initial value of the alpha parameter used in the Metropolis-Hastings acceptance criterion. Alpha controls the sensitivity of the acceptance probability to the energy change. FORGE will automatically tune alpha to meet the first threshold acceptance rate, after which it is frozen. alpha_update_factor : float The factor by which the parameter alpha is updated in order to induce a first cooling event. max_cooling_factor : float The maximum allowed temperature cooling multiplication factor, β_max. min_cooling_factor : float The minimum allowed temperature cooling multiplication factor, β_min. detailed_logging : bool Flag for carrying out a detailed logging of data. cost_termination_fraction : float If the incumbent cost drops below this fraction of the initial cost, the annealing will terminate early. For example, 0.01 means the optimisation stops once the cost reaches 1% of its initial value. min_temperature : float The minimum temperature below which the annealing will stop. """ def __init__( self, eq, tokamak_initial, tokamak_opt = None, divertor_data = None, max_evals = 3000, current_step_size_factor = 0.05, estimate_initial_currents = False, initial_temperature = 10.0, threshold_acceptance_rate_decay = 2.0, initial_threshold_acceptance_rate = 0.75, n_window = 50, constraints = None, field_line_trace_step_size = 0.15, field_line_trace_max_steps = 1000, field_line_trace_psi_tollerance = 0.01, buffer_intersection_penalty_factor = 1.05, initial_total_connection_length_cost = 1.0, initial_total_strike_point_distance_cost = 1.0, initial_coil_currents_cost = 1.0, initial_xpoint_regions_cost = 1.0, use_xpoint_regions = False, xpoint_regions = None, max_magnetic_disconnection_factor = 0.5, use_buffers = False, buffers = None, initial_alpha = 50.0, alpha_update_factor = 1.05, max_cooling_factor = 0.99, min_cooling_factor = 0.9, detailed_logging = False, cost_termination_fraction = 0.01, min_temperature = 1.0, on_iteration = None, stop_event = None, ): self.eq = eq self.divertor_data = divertor_data self.tokamak_initial = deepcopy(tokamak_initial) # Iteration callback: called after each iteration with the optimiser as argument. # If None (default), no callback is invoked, preserving the original scripting behaviour. self.on_iteration = on_iteration # Cancellation mechanism: if a threading.Event is supplied, the optimise loop will # check it every iteration and stop gracefully when set. if stop_event is None: self._stop_event = threading.Event() else: self._stop_event = stop_event # The user can choose to use the same tokamak as was used to # produce the initial equilibrum, or provide a separate one. if tokamak_opt is None: self.tokamak_opt = deepcopy(tokamak_initial) else: self.tokamak_opt = tokamak_opt self.max_evals = max_evals self.current_step_size_factor = current_step_size_factor self.estimate_initial_currents = estimate_initial_currents self.threshold_acceptance_rate_decay = threshold_acceptance_rate_decay self.initial_temperature = initial_temperature self.initial_threshold_acceptance_rate = initial_threshold_acceptance_rate self.n_window = n_window self.field_line_trace_step_size = field_line_trace_step_size self.field_line_trace_max_steps = field_line_trace_max_steps self.field_line_psi_tollerance = field_line_trace_psi_tollerance self.buffer_intersection_penalty_factor = buffer_intersection_penalty_factor self.initial_total_connection_length_cost = initial_total_connection_length_cost self.initial_total_strike_point_distance_cost = initial_total_strike_point_distance_cost self.initial_coil_currents_cost = initial_coil_currents_cost self.initial_xpoint_regions_cost = initial_xpoint_regions_cost self.use_xpoint_regions = use_xpoint_regions self.xpoint_regions = xpoint_regions self.max_magnetic_disconnection_factor = max_magnetic_disconnection_factor self.use_buffers = use_buffers self.buffers_input = buffers self.initial_alpha = initial_alpha self.alpha_update_factor = alpha_update_factor self.max_cooling_factor = max_cooling_factor self.min_cooling_factor = min_cooling_factor self.detailed_logging = detailed_logging self.cost_termination_fraction = cost_termination_fraction self.min_temperature = min_temperature # Pre-compute the Green's functions on the 2D grid for each coil in the coilset. This will # later be used to quickly calculate the new machine flux as the optimiser changes the # coil currents. self.compute_coilset_greens_on_grid() # Pre-compute the background plasma dpsi/dR and dpsi/dZ grids. The plasma # is treated as a fixed background throughout the optimisation, so these # are evaluated once and cached. self._psi_plas_dpsi_dR_grid = self.eq.dpsi_plas_dR_RZ(self.eq.R_2D, self.eq.Z_2D) self._psi_plas_dpsi_dZ_grid = self.eq.dpsi_plas_dZ_RZ(self.eq.R_2D, self.eq.Z_2D) # Cache the grid metadata that the fast tracer / bilinear interpolators need. # These never change (the equilibrium grid is fixed). self._grid_R_min = self.eq.R_min self._grid_Z_min = self.eq.Z_min self._grid_inv_dR = 1.0 / self.eq.dR self._grid_inv_dZ = 1.0 / self.eq.dZ self._grid_nR_m1 = self.eq.nR - 1 self._grid_nZ_m1 = self.eq.nZ - 1 # Build a prepared (spatially indexed) version of the wall and buffer geometries. # This dramatically accelerates the per-step intersection tests inside the field # line tracer, which is the innermost hot loop of the optimisation. self._prepared_wall = prep(self.tokamak_opt.wall) # Set those settings related to the use of constraints in the optimisation. if constraints is None: # The user did not specify the constraints settings. Use default values. self.constraints = self.generate_default_constraints() else: self.constraints = constraints # Generate the constraints using the above settings. self.generate_constraints() # Set the initial coil currents and 2D map of poloidal magnetic flux: if self.estimate_initial_currents: # Estimate initial currents in the PF coils self.estimate_currents() self.initial_coil_currents = self.initial_currents_estimate # Initialise the 2D map of poloidal magnetic flux that will be evolved during the optimisation. self.psi_2D = self.initial_psi_2D_estimate else: # Use the currents that come with the tokamak self.initial_coil_currents = deepcopy(self.tokamak_opt.get_currents()) # Initialise the 2D map of poloidal magnetic flux that will be evolved during the optimisation. self.psi_2D = deepcopy(self.eq.psi_2D) # Extract the divertor regions that the user has selected for optimisation, from the # divertor_data provided. self.divertor_regions = list(self.divertor_data.keys()) # Determine the starting location for field line traces of the divertor region(s) aswell # as the field line trace direction. self.init_divertor_data() # Initialise data for regions in which additional X-points will be encouraged to form, if # such regions are present within the Machine. if self.use_xpoint_regions: self.init_xpoint_regions() else: self.N_xpoint_regions = 0 # Create buffer regions around parts of the wall. if self.use_buffers: self.create_buffers() # Build prepared (spatially indexed) versions of each buffer geometry # for the same reason as the wall: O(log N) pre-filtering in the # field-line tracing hot loop. Keyed by divertor region. self._prepared_buffers = { region: [prep(b) for b in geoms] for region, geoms in self.buffers.items() } else: self.buffers = None self._prepared_buffers = None
[docs] def optimise(self): """Performs the simulated annealing-based optimisation of the divertor(s)' magnetic geometry.""" # Keep track of the number of cost function evaluations self.N_evals = 0 # Get the coil current step size from the initial coil currents. currents = self.initial_coil_currents self.init_current_step_size(currents) # Calculate the initial cost function, which will set the initial value for the cost function # from the previous step, as we are yet to actually start iterating. Note that the Equilibrium object # will not itself be updated until the annealing is complete, hence any new flux maps generated during # the annealing must be passed directly to the cost function. The output of calculate_cost is a dictionary # containing all the key data defining the state and its associated cost function. previous_state_data = self.calculate_cost( currents = currents, flux_map = self.psi_2D, ) # As this is the initial state, these initial currents will also serve as the first set of accepted currents. accepted_currents = deepcopy(currents) # Record the initial cost for use in early termination checks. self.initial_cost = previous_state_data["cost"] # As this is the first cost function evaluation it is by definition also the best state currently explored. # Hence, the incumbent state will be this one, for now. self.incumbent_data = previous_state_data self.incumbent_data["iteration_num"] = 0 # Now we can actually begin the simulated annealing algorithm. To briefly quote Google: # "Simulated annealing is a probabilistic optimisation technique inspired by the physical process of # annealing metals, which gradually cools a material to achieve a stable, low-energy state. Its core principle # is to start at a high "temperature" allowing many "uphill" or worse moves to escape local optima and explore a # broad solution space. As the "temperature" gradually decreases over time (following a cooling schedule), the # algorithm becomes less likely to accept worse solutions, eventually settling into a near-optimal solution, # similar to how a metal crystalizes into its lowest energy, most ordered state". # # Our temperature here is not associated with a real-world counterpart. The measure by which we will assess # the goodness of each state is the cost function. # Initialise the number of cost function evaluations carried out (not including the first one). # The optimisation will hard-stop if this exceeds the defined maximum number of evaluations. self.num_evals = 0 # Initialise list(s) that will be used to track key quantities # List of state data self.tracking_temperature = [] self.tracking_energy_change = [] self.tracking_acceptance = [] self.tracking_cost = [] self.tracking_cost_strike_point_distance = [] self.tracking_cost_connection_length = [] self.tracking_cost_coil_currents = [] self.tracking_cost_xpoint_regions = [] self.tracking_acceptance_rate = [] self.tracking_acceptance_prob = [] self.tracking_alpha = [] # Additional tracking data, which is tied to specific divertor regions. self.tracking_connection_length = {} self.tracking_strike_point_R = {} self.tracking_strike_point_Z = {} self.tracking_field_lines_R = {} self.tracking_field_lines_Z = {} for divertor_region in self.divertor_regions: self.tracking_connection_length[divertor_region] = [] self.tracking_strike_point_R[divertor_region] = [] self.tracking_strike_point_Z[divertor_region] = [] # If more detailed logging is desired, initialise lists that will track other key data if self.detailed_logging: # In the event that detailed logging is desired, the following will be recorded # at every iteration: # # - coil currents # - 2D flux map (can also be recalculated from coil currents, but recorded for convenience) # - incumbent state # - cooling rate factor # - threshold acceptance rate self.tracking_coil_currents = [] self.tracking_psi_2D = [] self.tracking_incumbent_state = [] self.tracking_cooling_factor = [] self.tracking_threshold_acceptance_rate = [] # Seed the tracking lists with the initial (unperturbed) state so that # iteration 0 on the plots corresponds to the true starting point. self.tracking_temperature.append(self.initial_temperature) self.tracking_energy_change.append(0.0) self.tracking_cost.append(previous_state_data["cost"]) self.tracking_cost_strike_point_distance.append(previous_state_data["cost_strike_point_distance"]) self.tracking_cost_connection_length.append(previous_state_data["cost_connection_length"]) self.tracking_cost_coil_currents.append(previous_state_data["cost_coil_currents"]) self.tracking_cost_xpoint_regions.append(previous_state_data["cost_xpoint_regions"]) self.tracking_acceptance_rate.append(np.nan) self.tracking_acceptance_prob.append(np.nan) self.tracking_alpha.append(self.initial_alpha) for divertor_region in self.divertor_regions: self.tracking_connection_length[divertor_region].append( previous_state_data["divertors"][divertor_region]["connection_length"] ) self.tracking_strike_point_R[divertor_region].append( previous_state_data["divertors"][divertor_region]["strike_point_R"] ) self.tracking_strike_point_Z[divertor_region].append( previous_state_data["divertors"][divertor_region]["strike_point_Z"] ) if self.detailed_logging: self.tracking_coil_currents.append(previous_state_data["currents"]) self.tracking_psi_2D.append(previous_state_data["psi_2D"]) self.tracking_incumbent_state.append(previous_state_data) self.tracking_cooling_factor.append(np.nan) self.tracking_threshold_acceptance_rate.append(np.nan) # Set an initial value for alpha - this will be adjusted automatically, in the event that # the resultant acceptance rate is (initially) higher than the (first) value of the # threshold acceptance rate, which is used to signal cooling. It can be challenging to # pick a value of alpha that results in the desired acceptance rate, hence FORGE will # automatically tune it. Note, that this tuning only occurs to help the FIRST threshold # accpetance rate value be met - after this first cooling event, alpha is frozen (if we # kept adjusting it, we would just forcefully quench the system down to the final temperature). self.alpha = self.initial_alpha self.update_alpha = True # The temperature will change based on a dynamic cooling factor. The associated cooling # factor is initialy NaN and will be set at the first cooling event. self.cooling_factor = np.nan # Set the current temperature # (plotting is handled externally, e.g. by the GUI) # Set the current temperature of the system equal to the initial temperature self.temperature = self.initial_temperature while self.num_evals < self.max_evals and self.temperature > self.min_temperature \ and self.incumbent_data["cost"] > self.cost_termination_fraction * self.initial_cost \ and not self._stop_event.is_set(): # Perform simulated annealing # Initialise the number of cost function evaluations (not including the first one) # performed at the current temperature value, excluding those evaluations where # there was a negative energy change - only counting those states that were # accepted/rejected probabalistically. These states, where the cost function increased, # are referred to as having moved "uphill" in the cost function landscape. self.num_evals_at_temp_uphill = 0 # Determine the threshold acceptance rate over the specified window. If the acceptance # rate over the window drops below the threshold value, then it is time to cool the # system in accordance with the cooling schedule. Note that the window will only include # states that result from an uphill move in the cost function landscape. self.threshold_acceptance_rate = self.initial_threshold_acceptance_rate * \ np.exp(-self.threshold_acceptance_rate_decay * (1.0 - (self.temperature / self.initial_temperature))) # Create the buffer of length n_window that tracks the rolling acceptance rate. This is reset at each (new) # temperature. acceptance_buffer = deque([1.0] * self.n_window, maxlen=self.n_window) # Start annealing keep_iterating = True ready_to_cool = False while keep_iterating: # Generate a new neighbouring state. This is done by randomly perturbing the coil # currents used to generate the state, with this perturbation about the most recently # accepted set of coil currents. neighbour = self.generate_new_neighbour_in_current_space( currents = accepted_currents, step_size = self.current_step_size, ) # Evaluate the new neighbour candidate point new_state_data = self.evaluate_neighbour( neighbour = neighbour, previous_accepted_cost = previous_state_data["cost"], ) # Check if the new candidate neighbour point is to be accepted or not if new_state_data["acceptance"]: # Acceptance successful self.psi_2D = new_state_data["psi_2D"] currents = new_state_data["currents"] accepted_currents = currents.copy() # This newly accepted state will become the previously accepted state for the # next state that will be produced in the next iteration. previous_state_data = new_state_data # Update the global acceptance tracker self.tracking_acceptance.append(1.0) # Check if the newly accepted state is the incumbent. # Sometimes we allow worse solutions (particularly at higher temperature) # hence this new accepted state wont necessarily be the incumbent. if previous_state_data["cost"] < self.incumbent_data["cost"]: self.incumbent_data = previous_state_data self.incumbent_data["iteration_num"] = self.num_evals else: # Update the global acceptance tracker self.tracking_acceptance.append(0.0) # Next we check if the state was accepted/rejected probabilistically or not. # If the acceptence probability was eactly equal to 1 - this corresponds to # there having been a downhill movement in the cost function landscape, which # is deterministically accepted. Hence, probabilistic evaluations, which correspond # to uphill movements in the cost function landscape, will have occured when the acceptance # probability was not exactly equal to 1. # We need to record the acceptence rate for probabilistic accpetances only. if new_state_data["acceptance_prob"] != 1.0: # We will also increment the counter that tracks the number of probabilistic # evaluations. self.num_evals_at_temp_uphill += 1 # State was evaluated probabilistically. # The deque has a fixed maxlen so appending automatically # discards the oldest entry — no manual pop(0) needed. if new_state_data["acceptance"]: acceptance_buffer.append(1.0) else: acceptance_buffer.append(0.0) # Increase counters self.num_evals += 1 # Update tracking data self.tracking_temperature.append(self.temperature) self.tracking_energy_change.append(new_state_data["energy_change"]) self.tracking_cost.append(new_state_data["cost"]) self.tracking_cost_strike_point_distance.append(new_state_data["cost_strike_point_distance"]) self.tracking_cost_connection_length.append(new_state_data["cost_connection_length"]) self.tracking_cost_coil_currents.append(new_state_data["cost_coil_currents"]) self.tracking_cost_xpoint_regions.append(new_state_data["cost_xpoint_regions"]) self.tracking_acceptance_prob.append(new_state_data["acceptance_prob"]) self.tracking_alpha.append(self.alpha) # Update tracking data for the divertor regions for divertor_region in self.divertor_regions: # Update connection length self.tracking_connection_length[divertor_region].append( new_state_data["divertors"][divertor_region]["connection_length"] ) # Update strike point R coordinate self.tracking_strike_point_R[divertor_region].append( new_state_data["divertors"][divertor_region]["strike_point_R"] ) # Update strike point Z coodinate self.tracking_strike_point_Z[divertor_region].append( new_state_data["divertors"][divertor_region]["strike_point_Z"] ) # Update traced field line R coordinates self.tracking_field_lines_R[divertor_region] = new_state_data["divertors"][divertor_region]["field_line_R"] # Update traced field line Z coordinates self.tracking_field_lines_Z[divertor_region] = new_state_data["divertors"][divertor_region]["field_line_Z"] if self.detailed_logging: self.tracking_coil_currents.append(new_state_data["currents"]) self.tracking_psi_2D.append(new_state_data["psi_2D"]) self.tracking_incumbent_state.append(self.incumbent_data) self.tracking_cooling_factor.append(self.cooling_factor) self.tracking_threshold_acceptance_rate.append(self.threshold_acceptance_rate) # Check whether or not the system is ready to cool. # We must fill up the buffer before evaluating. if self.num_evals_at_temp_uphill >= self.n_window: # Calculate the rolling acceptance rate over the window acceptance_rate = np.mean(acceptance_buffer) # Record the acceptance rate self.tracking_acceptance_rate.append(acceptance_rate) # Check if this falls below the threshold acceptance rate. if acceptance_rate <= self.threshold_acceptance_rate: ready_to_cool = True # Get the number of iterations required to reach the threshold acceptance rate # not including the initial filling of the buffer self.iterations_to_acceptance = max(self.num_evals_at_temp_uphill - self.n_window, 0) # The first time we match the threshold acceptance rate, we can say that # the value of alpha is appropriate, and hence no longer needs updating. self.update_alpha = False else: # Update the (initial) value of alpha (if it hasn't been done so already), in order # to adjust the accpetance rate (if required). The update to alpha only occurs when the number # # of probabilistic cost evaluations at the current temperature is an integer multiple of the # window size. The reason for this is that there is an inertia to the acceptance rate - it takes # time for the effects of the updated value of alpha to (noticebly) affect the acceptance rate over # the window. Hence we should wait until the another window's worth of probabilistic cost evaluations # have passed, before changing alpha again. If we don't do this then we can quickly over-correct alpha, # leading to a sudden quench of the system. It is also worth highlighting that the optimiser may go on # a streak of downhill (in the cost function landscape) improvements. Hence, we need to ensure that # those downhill iterations are not included in the evaluations counter, as otherwise alpha would just # skyrocket during a later period of improvement, which would lead to a sudden quench once the streak # ends. if self.update_alpha and self.num_evals_at_temp_uphill%self.n_window == 0: self.alpha *= self.alpha_update_factor else: # The system cannot cool unless the acceptance rate of the window drops below some # threshold value. The acceptance rate over the window cannot be evaluated until # the number of iterations has reached the window size. ready_to_cool = False # As the acceptance rate cannot be evaluated, record the result as a NaN. self.tracking_acceptance_rate.append(np.nan) # Fire the iteration callback (if one has been provided) if self.on_iteration is not None: self.on_iteration(self) # Check for external stop request if self._stop_event.is_set(): logger.info('Optimisation stopped by external request.') keep_iterating = False # Check for early termination if the incumbent cost has dropped below the threshold cost_below_threshold = self.incumbent_data["cost"] <= self.cost_termination_fraction * self.initial_cost if (self.num_evals >= self.max_evals) or ready_to_cool or cost_below_threshold: if cost_below_threshold: logger.info( 'Early termination: incumbent cost (%.6e) fell below %.1f%% of initial cost (%.6e).', self.incumbent_data["cost"], self.cost_termination_fraction * 100.0, self.initial_cost, ) keep_iterating = False # Either we have are ready to cool, or we have met our maximum number of iterations if ready_to_cool: # Ready to lower the temperature # Lower the temperature according to the cooling shedule. # Calculate the cooling factor based on how many iterations were # required to reach the threshold acceptance rate. self.update_cooling_factor() self.temperature *= self.cooling_factor # Log the reason the optimisation stopped if self._stop_event.is_set(): logger.info('Optimisation stopped by user request.') elif self.num_evals >= self.max_evals: logger.info('Optimisation stopped: max iterations (%d) reached.', self.max_evals) elif self.temperature <= self.min_temperature: logger.info('Optimisation stopped: min temperature (%.4e) reached.', self.min_temperature) elif self.incumbent_data["cost"] <= self.cost_termination_fraction * self.initial_cost: logger.info('Optimisation stopped: cost target reached (incumbent cost %.6e <= %.1f%% of initial %.6e).', self.incumbent_data["cost"], self.cost_termination_fraction * 100.0, self.initial_cost) # Optimisation completed. Next we generate new Equilibrium and Machine objects # for the optimised geometry and corresponding coil currents. self.generate_optimised_eq_machine()
[docs] def evaluate_neighbour( self, neighbour, previous_accepted_cost, ): """Calculates the cost of a neighbouring state and accepts/rejects it accordingly. Evaluates whether or not a new neighbouring state is going to be kept or not. In doing so, the cost function for this candidate state is evaluated. Parameters ---------- neighbour : dict The new candidate neighbour state to be evaluated. previous_accepted_cost : float The cost of the previous state that was deemed acceptable [dict] Returns ------- state_data : dict Dictionary of data of the neighbouring state. Includes updated data related to the cost of the state. """ state_data = self.calculate_cost( currents = neighbour["currents"], flux_map = neighbour["psi_2D"], ) # Evaluate the change in "energy" between the new candidate neighbour state # and the previous state. This is given by the change in cost function. energy_change = state_data["cost"] - previous_accepted_cost state_data["energy_change"] = energy_change # Evaluate whether or not we will accept this new candidate state using the # Metropolis-Hastings criterion. If the energy change is negative, immediately # accepted the new state, otherwise the probability of acceptance is evaluated. if energy_change < 0: # Always accept states with a lower cost state_data["acceptance"] = True # Record the probability of acceptance state_data["acceptance_prob"] = 1.0 else: # State has a higher cost. Calculate the probability of acceptance probability_of_acceptance = np.exp(-self.alpha * energy_change / self.temperature) # Record the probability of acceptance state_data["acceptance_prob"] = probability_of_acceptance # Check if the acceptance is successful or not if np.random.rand() < probability_of_acceptance: # Accept the new candidate neighbour state state_data["acceptance"] = True else: # Reject the new candidate neighbour state state_data["acceptance"] = False # Return the state data for the evaluated candidate neighbour state return state_data
[docs] def generate_new_neighbour_in_current_space( self, currents, step_size, ): """Generates a new neighbouring state. Generates a new position in coil current space for evaluation. Samples a random unit vector from the null space already generated and multiplies it by a given step size. This new perturbation vector is then added to the current vector to make a new set of coil currents. A new flux map is then generated for this configuration. Parameters ---------- currents : 1D numpy array. The exisiting coil currents (A). step_size : float A factor to scale the step size taken in coil current space. Returns ------- new state dictionary A dictionary of data representing the new state. """ # Start by generating a vector of random numbers between 0-1 of length of the number # of columns of the null space matrix (as all of our constraints constraints are linearly # independent, this will be equivalent to the the number of coils minus the number of # constraints. unit_vector = np.random.rand(self.tokamak_opt.N_coils - self.annealing_N_constraints) # Currents should be able to both increase and decrease - shift [0,1] elements to be [-0.5,0.5] unit_vector = np.subtract(unit_vector,0.5) # Normalise the vector unit_vector /= np.linalg.norm(unit_vector) # Calculate the coil current perturbation current_perturbation = step_size * (unit_vector @ self.annealing_greens_matrix_nullspace_transpose) # Calculate the new resultant coil currents by adding the perturbed currents to the existing currents new_currents = np.add(currents,current_perturbation) # Generate the new equilibrium psi on the 2D grid, for these new coil currents return self.generate_eq_from_currents( new_currents = new_currents )
[docs] def generate_eq_from_currents( self, new_currents, ): """Generates equilibrium data from a set of coil currents. Parameters ---------- new_currents : 1D numpy array The set of coil currents. Returns ------- dictionary of equilibrium data. Contains the 2D grid of poloidal magnetic flux, as well as the machine and plasma poloidal magnetic fluxes, and the coil currents. """ # At this stage we have our new coil currents. As the total poloidal magnetic # flux is simply the flux from the plasma (which we take as a known background flux) # plus the flux from the machine, we now need to calculate the new machine (coilset) flux. # As we have already pre-computed the Green's functions on our 2D grid for every coil # we simply need to multiply the new coil currents by the corresponding Green's grids # and superimpose them ontop of the background plasma flux to calculate the new total flux. # Calculate the new machine flux on the 2D grid by contracting the coil # currents with the pre-computed Green's function matrices. new_psi_mach_2D = np.einsum('i,ijk->jk', new_currents, self.coilset_greens_on_grid) # Add the new flux from the machine to the existing background plasma flux # to get the new total poloidal magnetic flux on the 2D grid. new_psi_2D = self.eq.psi_plas_2D + new_psi_mach_2D return { "currents":new_currents, "psi_2D":new_psi_2D, "psi_mach_2D":new_psi_mach_2D, "psi_plas_2D":self.eq.psi_plas_2D, }
[docs] def calculate_cost( self, currents, flux_map, ): """Evaluates the cost of a state. Calculates the cost of an equilibrium state. This includes all cost components of all divertor regions being optimised, as well as cost terms not tied to specific divertor regions, such as the coil currents cost term. In calculating the cost, field lines are traced in the divertor regions as appropriate. If X-point regions are used, then any secondary divertor X-points in these regions are located. Parameters ---------- currents : 1D numpy array The set of coil currents. flux_map : 2D numpy array 2D array of the state's poloidal magnetic flux. Returns ------- Dictionary of cost data and divertor data A dictionary containing detailed cost data as well as divertor data (such as the strike point, connection length, traced field line etc.) for each divertor region. """ # We have moved in current space, hence the 2D map of poloidal magnetic flux has changed, # and so a new connection length for our new divertor geometry must be calculated. # Compute the dpsi/dR and dpsi/dZ grids analytically from the pre-computed # coil Green's function derivative matrices. This replaces the previous # approach of constructing a RectBivariateSpline and numerically differentiating # it — the einsum is a simple matrix-vector multiply over the coil dimension # and is both faster (~10x) and more accurate (analytical Green's function # derivatives rather than spline numerical derivatives). _dpsi_dR_grid = self._psi_plas_dpsi_dR_grid + np.einsum( 'i,ijk->jk', currents, self.coilset_dpsi_dR_greens_on_grid ) _dpsi_dZ_grid = self._psi_plas_dpsi_dZ_grid + np.einsum( 'i,ijk->jk', currents, self.coilset_dpsi_dZ_greens_on_grid ) # Bundle the grid metadata that the fast tracer needs. The grid # dimensions are cached on self and never change; only the derivative # grids and flux_map vary per iteration. _tracer_grid_data = { "psi_grid": flux_map, "dpsi_dR_grid": _dpsi_dR_grid, "dpsi_dZ_grid": _dpsi_dZ_grid, "R_min": self._grid_R_min, "Z_min": self._grid_Z_min, "inv_dR": self._grid_inv_dR, "inv_dZ": self._grid_inv_dZ, "nR_m1": self._grid_nR_m1, "nZ_m1": self._grid_nZ_m1, } # Create an empty dictionary that will hold pertinent data for each divertor region. divertor_results = {} # Iterate over the divertor region(s) to be optimised. for divertor_region in self.divertor_regions: # Perform the relevant field line trace # Get the starting location of the field line trace and the trace direction trace_starting_position_R = self.divertor_data[divertor_region]["trace_starting_position_R"] trace_starting_position_Z = self.divertor_data[divertor_region]["trace_starting_position_Z"] trace_direction = self.divertor_data[divertor_region]["trace_direction"] # Field line trace starting location trace_starting_position = [trace_starting_position_R,trace_starting_position_Z] # Trace a field line from the starting position field_line_data = self.trace_field_line( starting_position = trace_starting_position, step_size = self.field_line_trace_step_size, max_steps = self.field_line_trace_max_steps, direction = trace_direction, grid_data = _tracer_grid_data, divertor_region = divertor_region, ) field_line_R = field_line_data["field_line_R"] field_line_Z = field_line_data["field_line_Z"] intersection_with_buffer = field_line_data["intersection_with_buffer"] # We have finished the field line tracing. We will now calculate the distance between the end # point and the intended strike geometry. This distance will be used to penalise geometries # where the strike point is far from the intended location. # Extract the end point - the strike point strike_point_R = field_line_R[-1] strike_point_Z = field_line_Z[-1] # Create a Shapely Point object of this strike point strike_point = Point(strike_point_R,strike_point_Z) # Target strike geometry strike_geometry = self.divertor_data[divertor_region]["strike_geometry"] # Get the distance between the strike point and the intended strike geometry strike_point_distance = strike_point.distance(strike_geometry) # Calculate the strike point distance cost # The initial strike point distance cost for this divertor region initial_strike_point_distance_cost = self.divertor_data[divertor_region]["initial_strike_point_distance_cost"] # If this is the first cost function evaluation, we will record the initial strike point distance. if self.N_evals == 0: self.divertor_data[divertor_region]["initial_strike_point_distance"] = strike_point_distance # One issue that may occur is if the strike point is initially on the target strike geometry, and later # ends up also being on the target strike geometry. In this case both the strike point distance # and the initial strike point distance are vanishingly small, but due to floating point precision # limitations can still end up having a large ratio, giving the false impression that the strike point # if far off the target strike geometry. Hence, if the strike point distance is small, the strike # point distance cost is just set to zero. # Calculate the strike point distance multiplication factor for this divertor if strike_point_distance < 1.0e-04: strike_point_distance_multiplication_factor = 0.0 else: initial_strike_point_distance = self.divertor_data[divertor_region]["initial_strike_point_distance"] strike_point_distance_multiplication_factor = strike_point_distance / initial_strike_point_distance # The strike point distance cost multiplication factor is the same as the distance multiplication factor itself # Calculate the strike point distance cost cost_strike_point_distance = initial_strike_point_distance_cost * strike_point_distance_multiplication_factor # We have traced our field line of interest in the poloidal plane, however, connection length here is # the full parallel connection length, not just the poloidal one. We will now calculate the parallel # connection length by integrating along the field line. # field_line_R, field_line_Z # Calculate the cumulative poloidal connection length from the start # to the end of the field line (so 0 at the start). points = np.column_stack((field_line_R, field_line_Z)) s_pol_dists = np.sqrt(np.sum(np.diff(points, axis=0)**2, axis=1)) s_pol_dists = np.insert(np.cumsum(s_pol_dists), 0, 0) # Calculate the parallel connection length # Get Br, Bz and Btor at the points along the field line. # Use the vectorised bilinear interpolator on the analytically-derived # derivative grids — avoids the need for a RectBivariateSpline entirely. _fi_fl = (field_line_R - self._grid_R_min) * self._grid_inv_dR _fj_fl = (field_line_Z - self._grid_Z_min) * self._grid_inv_dZ _dpsi_dZ_fl = _fast_bilinear_vec(_dpsi_dZ_grid, _fi_fl, _fj_fl, self._grid_nR_m1, self._grid_nZ_m1) _dpsi_dR_fl = _fast_bilinear_vec(_dpsi_dR_grid, _fi_fl, _fj_fl, self._grid_nR_m1, self._grid_nZ_m1) Br = -(ONE_2PI / field_line_R) * _dpsi_dZ_fl Bz = (ONE_2PI / field_line_R) * _dpsi_dR_fl Btor = self.eq.fvac / field_line_R # Calculate the connection length integrand integrand = np.sqrt(1.0 + ((Btor * Btor) / (Br * Br + Bz * Bz))) # Perform numerical integration connection_length = np.trapz(integrand,s_pol_dists) # Connection_length is now the full parallel connection length for this configuration # Calculate the connection length cost for this divertor # The initial connection length cost for this divertor region initial_connection_length_cost = self.divertor_data[divertor_region]["initial_connection_length_cost"] # If this is the first cost function evaluation, we will record the initial connection # length. if self.N_evals == 0: self.divertor_data[divertor_region]["initial_connection_length"] = connection_length # Calculate the connection length multiplication factor for this divertor initial_connection_length = self.divertor_data[divertor_region]["initial_connection_length"] connection_length_multiplication_factor = connection_length / initial_connection_length # Calculate the connection length cost multiplication factor for this divertor # The connection length multiplication factor at which the cost multiplication factor will zero-out. # E.g. at 1.5 - if the connection length has increased by 50%, the cost will be zero. connection_length_multiplication_factor_zero = ( self.divertor_data[divertor_region]["connection_length_multiplication_factor_zero"] ) connection_length_cost_multiplication_factor = 1.0 + (-1.0 / (connection_length_multiplication_factor_zero - 1.0))\ * (connection_length_multiplication_factor - 1.0) connection_length_cost_multiplication_factor = max(0.0,connection_length_cost_multiplication_factor) # Calculate the connection length cost cost_connection_length = initial_connection_length_cost * connection_length_cost_multiplication_factor # If the field line trace struck a buffer region, apply a penalty factor to the connection length cost if intersection_with_buffer: cost_connection_length *= self.buffer_intersection_penalty_factor # Next, we will check if any X-point regions associated with this divertor are present and # perform the required cost calculations accordingly. xpoint_region_present = self.divertor_data[divertor_region]["xpoint_region_present"] if xpoint_region_present: # An X-point region associated with this divertor region is present # Check if the separatrix is in the region by testing if the traced field line crosses # the xpoint region boundary. # Retrieve the pre-computed region boundary region_boundary = self.divertor_data[divertor_region]["xpoint_region"]["region_boundary"] # Retrieve the traced field line field_line = field_line_data["field_line"] # The separatrix is in the region if the traced field line intersects the region boundary separatrix_in_region = field_line.intersects(region_boundary) # If the separatrix is within this region, proceed. if separatrix_in_region: # Extract the relevant pre-calculated Green's function matrices for Br and Bz xpoint_region_greens_br_matrix_transpose = ( self.divertor_data[divertor_region]["xpoint_region"]["greens_br_matrix_transpose"] ) xpoint_region_greens_bz_matrix_transpose = ( self.divertor_data[divertor_region]["xpoint_region"]["greens_bz_matrix_transpose"] ) # Extract the background poloidal field from the plasma at points in the region Br_plas = self.divertor_data[divertor_region]["xpoint_region"]["background_Br_points"] Bz_plas = self.divertor_data[divertor_region]["xpoint_region"]["background_Bz_points"] # Calculate Br and Bz from the coils at points in the region Br_coils = currents @ xpoint_region_greens_br_matrix_transpose Bz_coils = currents @ xpoint_region_greens_bz_matrix_transpose # Total poloidal field Br = Br_coils + Br_plas Bz = Bz_coils + Bz_plas # Calculate the square of the poloidal field at points in the region Bp2 = Br * Br + Bz * Bz # Extract the minimumn value Bp2_min = np.amin(Bp2) # Could this be done more elegantly? try: if self.divertor_data[divertor_region]["xpoint_region"]["first_pass"]: initial_Bp2_min = self.divertor_data[divertor_region]["xpoint_region"]["initial_Bp2_min"] except KeyError: initial_Bp2_min = Bp2_min self.divertor_data[divertor_region]["xpoint_region"]["first_pass"] = True self.divertor_data[divertor_region]["xpoint_region"]["initial_Bp2_min"] = initial_Bp2_min # Now we will calculate the part of the cost associated with this poloidal field. # The cost multiplier from the poloidal field will be the ratio of the achieved square # of the poloidal field to the initial square of the poloidal field. The other parts of the # X-point region cost will come from whether or not there exists an X-point within # the X-point region. We will also extract the initial X-point region cost for this divertor region. # The initial X-point region cost for this divertor region initial_xpoint_region_cost = self.divertor_data[divertor_region]["initial_xpoint_region_cost"] Bp2_cost_multplication_factor = Bp2_min / initial_Bp2_min cost_Bp2 = (1. / 3.) * initial_xpoint_region_cost * Bp2_cost_multplication_factor # Next we will check to see whether or not an X-point is present within the region. # First, we will extract data for theis region that will be used. # Extract the matrix of 2x2 Jacobian matrices for the control poloidal field from the coils xpoint_region_coils_Bp_jacobians_matrix_transpose = ( self.divertor_data[divertor_region]["xpoint_region"]["coils_Bp_jacobians_matrix_transpose"] ) # Extract the (row) vector of 2x2 Jacobian matrices for the background poloidal field from the coils xpoint_region_background_Bp_jacobian_points = ( self.divertor_data[divertor_region]["xpoint_region"]["xpoint_region_background_Bp_jacobian_points"] ) # Calculate the (row) vector of 2x2 Jacobian matrices for the full poloidal field from the coils xpoint_region_coils_Bp_jacobian_points = np.einsum( 'i,ijkl->jkl', currents, xpoint_region_coils_Bp_jacobians_matrix_transpose ) # Calculate the (row) vector of 2x2 Jacobian matrices for the full poloidal field (background + coils) xpoint_region_Bp_jacobian_points = xpoint_region_background_Bp_jacobian_points + \ xpoint_region_coils_Bp_jacobian_points # Extract the R,Z locations of these points inside the X-point region R_xpoint_region = self.divertor_data[divertor_region]["xpoint_region"]["R_points"] Z_xpoint_region = self.divertor_data[divertor_region]["xpoint_region"]["Z_points"] self.R_xpoint_estimate, self.Z_xpoint_estimate, xpoint_present = estimate_xpoint_location( R_xpoint_region, Z_xpoint_region, Br, Bz, xpoint_region_Bp_jacobian_points, self.eq.dR, self.eq.dZ ) # If an X-point is present, we don't need to include the Bp^2 cost term, as this is really only # there to reward lowering the poloidal field on the way to making an X-point. As the poloidal field # is only evaluated on a finite number of grid points, the exact X-point position, if one is present, will # not be on a grid point, hence the lowest Bp value will not strictly be zero. As such, we will, if an # X-point if present, just set this cost component to zero, otherwise this remains as is. # The final component of the total X-point cost for this region will be a cost associated with the level of # magnetic disconnection between the flux surface that the X-point sits on (if one is present) and the # separatrix. The idea here is to reward having the secondary X-point on/close to the separatrix. If no # secondary X-point is present the associated multiplication factor for this part of the X-point cost is # 1.0 . This cost multiplication factor is dependent on the degree of magentic disconnection, with the cost # scaled about a set maximium level of disconnection. The idea here is that even a small level of magnetic # disconnection is unnaceptable, hence the cost multiplication factor needs to be sensitive to the level of # disconnetion. if xpoint_present: # At least one X-point is present inside the region. Part of the total X-point region cost comes # from whether or not an X-point is present within the region. If an X-point is present, this # part of the total cost has a multiplication factor of zero, otherwise the factor is 1 if # no X-point is present inside the region. xpoint_present_cost_multplication_factor = 0.0 # Calculate the cost multiplication factor associated with the level of magnetic disconnection of # the secondary X-point. # Get the value of flux at the secondary X-point _fi_xpt = (self.R_xpoint_estimate - self._grid_R_min) * self._grid_inv_dR _fj_xpt = (self.Z_xpoint_estimate - self._grid_Z_min) * self._grid_inv_dZ psi_secondary_divertor_xpoint = _fast_bilinear( flux_map, _fi_xpt, _fj_xpt, self._grid_nR_m1, self._grid_nZ_m1 ) magnetic_disconnection_factor = np.abs( (psi_secondary_divertor_xpoint - self.eq.psi_lcfs) \ / self.eq.psi_lcfs ) xpoint_magnetic_connection_cost_multiplication_factor = (1.0 / self.max_magnetic_disconnection_factor)\ * magnetic_disconnection_factor xpoint_magnetic_connection_cost_multiplication_factor = min( 1.0, xpoint_magnetic_connection_cost_multiplication_factor ) else: # No X-points are present inside the region xpoint_present_cost_multplication_factor = 1.0 # As no X-point is present within the region, the cost multiplication factor associated with the # magnetic disconnection of the secondary X-point is maximal - set to an upper limit of 1. xpoint_magnetic_connection_cost_multiplication_factor = 1.0 # Calculate the part of the X-point region cost from this binary check for the presence of an X-point cost_xpoint_present = (1. / 3.) * initial_xpoint_region_cost * xpoint_present_cost_multplication_factor # Adjust the part of the X-point cost coming from Bp^2 cost_Bp2 *= xpoint_present_cost_multplication_factor # Calculate the part of the X-point region cost from the magnetic disconnection of the secondary X-point cost_xpoint_magnetic_connection = (1. / 3.) * initial_xpoint_region_cost \ * xpoint_magnetic_connection_cost_multiplication_factor # Total cost for the X-point region cost_xpoint_region = cost_Bp2 + cost_xpoint_present + cost_xpoint_magnetic_connection else: # The separatrix is not in the divertor region # Is this consistent with the stated method in the paper? # The initial X-point region cost for this divertor region initial_xpoint_region_cost = self.divertor_data[divertor_region]["initial_xpoint_region_cost"] cost_xpoint_region = initial_xpoint_region_cost self.R_xpoint_estimate = np.nan self.Z_xpoint_estimate = np.nan else: # No X-point region associated with this divertor region is present cost_xpoint_region = 0.0 # Caclulate the cost associated with this divertor. # Record the connection length, strike point distance and location for this divertor, along with the # associated costs. The R,Z of the field line trace is also included. divertor_results[divertor_region] = { "connection_length": connection_length, "strike_point_distance": strike_point_distance, "cost_connection_length": cost_connection_length, "cost_strike_point_distance": cost_strike_point_distance, "cost_xpoint_region": cost_xpoint_region, "field_line_R": field_line_R, "field_line_Z": field_line_Z, "strike_point_R": strike_point_R, "strike_point_Z": strike_point_Z, } # All divertor regions have now had their connection length and strike point distances # calculated, along with relevant checks on a potential X-point region, with all associated # costs now calculated. # Calculate the total connection length, strike point distance and X-point region costs across all divertor regions. cost_connection_length = 0 cost_strike_point_distance = 0 cost_xpoint_regions = 0 for divertor_region in self.divertor_regions: cost_connection_length += divertor_results[divertor_region]["cost_connection_length"] cost_strike_point_distance += divertor_results[divertor_region]["cost_strike_point_distance"] cost_xpoint_regions += divertor_results[divertor_region]["cost_xpoint_region"] # Calculate the sum square of the coil currents - this will contribute to the cost sum_square_coil_currents = np.dot(currents,currents) # The cost from the coil currents is as follows. The cost is nominally the initial cost value multiplied # by a cost multiplication factor, given by the sum square of the coil currents normalised to the initial # sum square of the coil currents. However, we establish a null-zone, wherein if the sum square lies between # the initial sum square value and sum square value representing the coils all being "reasonably energised", # the cost multiplication factor remains at a value of one. If the sum square is below the initial sum square, # those data are used in the normalisation, else if the sum square is below the energised sum square value, those # data are used in the normalisation. if self.initial_sum_square_coil_currents <= sum_square_coil_currents <= self.energised_sum_square_coil_currents: coil_currents_multiplication_factor = 1.0 elif sum_square_coil_currents < self.initial_sum_square_coil_currents: coil_currents_multiplication_factor = sum_square_coil_currents / self.initial_sum_square_coil_currents else: coil_currents_multiplication_factor = sum_square_coil_currents / self.energised_sum_square_coil_currents #if self.N_evals % self.n_window == 0: # print('sum square currents achieved: ',sum_square_coil_currents) # Contribution from the coil currents - these are regularised so as to penalise states # with prohibitively large coil currents. The sum square of the coil currents is used # as the square of the currents is more physicaly meaningful (e.g. forces between coils # goes as the square of the currents). cost_coil_currents = self.initial_coil_currents_cost * coil_currents_multiplication_factor # Finally, we will compute the total cost for this configuration. # Calculate the total cost cost = cost_strike_point_distance cost += cost_connection_length cost += cost_coil_currents cost += cost_xpoint_regions # Update the tracker for number of cost function evaluations self.N_evals += 1 # Return a dictionary of key information about the state and its cost # This is later reffered to as the state_data dictionary. return { "cost": cost, "cost_strike_point_distance": cost_strike_point_distance, "cost_connection_length": cost_connection_length, "cost_coil_currents": cost_coil_currents, "cost_xpoint_regions": cost_xpoint_regions, "divertors": divertor_results, "psi_2D": flux_map, "currents": currents, }
[docs] def trace_field_line( self, starting_position, step_size, max_steps = 1000, direction = 1.0, grid_data = None, divertor_region = None, ): """Trace a magnetic field line until it hits a wall segment. Trace a field line from the specified starting location. Stop tracing either when the field line intersects the machine's wall, or a maximum number of steps along the field line has been taken. Uses RK4 integration with a **fast bilinear interpolator** on pre-computed derivative grids and a **gradient-projection correction** after every step to keep the trace pinned to the correct flux surface. Performance notes ----------------- The original bottleneck was ``psi_func.ev()`` called as a scalar inside a Python loop (~30-100 us overhead per call). By pre-computing dpsi/dR and dpsi/dZ on the regular grid once (in ``calculate_cost``) and interpolating them bilinearly here (~1 us per call), the per-step cost drops by **30-50x**. Combined with RK4's larger feasible step size this gives an overall speed-up of roughly **10-20x** on the tracer. The gradient-projection step ensures that the accumulated psi drift after each RK4 step is corrected to machine precision, so the trace stays on the target flux surface regardless of step size. Parameters ---------- starting_position : list [R,Z] of the starting position for the trace. step_size : float The size of each poloidal step along the field line. max_steps : int Maximum number of steps to take along the field line. direction : float Direction to trace the field line. +1/-1 corresponds to the poloidal helicity of the trace. grid_data : dict Pre-computed grids and metadata from ``calculate_cost``. Keys: ``psi_grid``, ``dpsi_dR_grid``, ``dpsi_dZ_grid``, ``R_min``, ``Z_min``, ``inv_dR``, ``inv_dZ``, ``nR_m1``, ``nZ_m1``. divertor_region : str, optional Name of the divertor region being traced (e.g. ``"lower_outer"``). When buffers are keyed per region, only the buffers for this region are checked for intersections. """ # Unpack grid data for the fast bilinear evaluator. psi_grid = grid_data["psi_grid"] dpsi_dR_grid = grid_data["dpsi_dR_grid"] dpsi_dZ_grid = grid_data["dpsi_dZ_grid"] gR_min = grid_data["R_min"] gZ_min = grid_data["Z_min"] g_inv_dR = grid_data["inv_dR"] g_inv_dZ = grid_data["inv_dZ"] g_nR_m1 = grid_data["nR_m1"] g_nZ_m1 = grid_data["nZ_m1"] # Local references to the fast interpolator (avoid repeated global/dict lookups) _interp = _fast_bilinear # Pre-allocate output arrays. locations_R = np.empty(max_steps + 1) locations_Z = np.empty(max_steps + 1) locations_R[0] = starting_position[0] locations_Z[0] = starting_position[1] n_points = 1 intersection_with_buffer = False # Target psi value — the flux surface the trace must stay on. fi0 = (starting_position[0] - gR_min) * g_inv_dR fj0 = (starting_position[1] - gZ_min) * g_inv_dZ psi_target = _interp(psi_grid, fi0, fj0, g_nR_m1, g_nZ_m1) for i in range(max_steps): R0 = locations_R[n_points - 1] Z0 = locations_Z[n_points - 1] # Fractional grid indices for the starting point (reused by stage 1) fi = (R0 - gR_min) * g_inv_dR fj = (Z0 - gZ_min) * g_inv_dZ # ---- RK4 Stage 1 ---- dR_val = _interp(dpsi_dR_grid, fi, fj, g_nR_m1, g_nZ_m1) dZ_val = _interp(dpsi_dZ_grid, fi, fj, g_nR_m1, g_nZ_m1) Br1 = -(ONE_2PI / R0) * dZ_val Bz1 = (ONE_2PI / R0) * dR_val Bp1_sq = Br1 * Br1 + Bz1 * Bz1 if Bp1_sq < 1e-30: break inv_Bp1 = direction / math.sqrt(Bp1_sq) k1R = step_size * Br1 * inv_Bp1 k1Z = step_size * Bz1 * inv_Bp1 # ---- RK4 Stage 2 ---- Rm = R0 + 0.5 * k1R Zm = Z0 + 0.5 * k1Z fi = (Rm - gR_min) * g_inv_dR fj = (Zm - gZ_min) * g_inv_dZ dR_val = _interp(dpsi_dR_grid, fi, fj, g_nR_m1, g_nZ_m1) dZ_val = _interp(dpsi_dZ_grid, fi, fj, g_nR_m1, g_nZ_m1) Br2 = -(ONE_2PI / Rm) * dZ_val Bz2 = (ONE_2PI / Rm) * dR_val Bp2_sq = Br2 * Br2 + Bz2 * Bz2 if Bp2_sq < 1e-30: break inv_Bp2 = direction / math.sqrt(Bp2_sq) k2R = step_size * Br2 * inv_Bp2 k2Z = step_size * Bz2 * inv_Bp2 # ---- RK4 Stage 3 ---- Rm = R0 + 0.5 * k2R Zm = Z0 + 0.5 * k2Z fi = (Rm - gR_min) * g_inv_dR fj = (Zm - gZ_min) * g_inv_dZ dR_val = _interp(dpsi_dR_grid, fi, fj, g_nR_m1, g_nZ_m1) dZ_val = _interp(dpsi_dZ_grid, fi, fj, g_nR_m1, g_nZ_m1) Br3 = -(ONE_2PI / Rm) * dZ_val Bz3 = (ONE_2PI / Rm) * dR_val Bp3_sq = Br3 * Br3 + Bz3 * Bz3 if Bp3_sq < 1e-30: break inv_Bp3 = direction / math.sqrt(Bp3_sq) k3R = step_size * Br3 * inv_Bp3 k3Z = step_size * Bz3 * inv_Bp3 # ---- RK4 Stage 4 ---- Rm = R0 + k3R Zm = Z0 + k3Z fi = (Rm - gR_min) * g_inv_dR fj = (Zm - gZ_min) * g_inv_dZ dR_val = _interp(dpsi_dR_grid, fi, fj, g_nR_m1, g_nZ_m1) dZ_val = _interp(dpsi_dZ_grid, fi, fj, g_nR_m1, g_nZ_m1) Br4 = -(ONE_2PI / Rm) * dZ_val Bz4 = (ONE_2PI / Rm) * dR_val Bp4_sq = Br4 * Br4 + Bz4 * Bz4 if Bp4_sq < 1e-30: break inv_Bp4 = direction / math.sqrt(Bp4_sq) k4R = step_size * Br4 * inv_Bp4 k4Z = step_size * Bz4 * inv_Bp4 # Combine the four stages (standard RK4 weights) R_end = R0 + (k1R + 2.0 * k2R + 2.0 * k3R + k4R) / 6.0 Z_end = Z0 + (k1Z + 2.0 * k2Z + 2.0 * k3Z + k4Z) / 6.0 # ---- Gradient-projection correction ---- # Project the point back onto the target flux surface. # This is a single Newton step: # x_corrected = x - (psi(x) - psi_target) / |grad(psi)|^2 * grad(psi) # It eliminates accumulated psi drift to first order, keeping # the trace on its flux surface to machine precision. fi_end = (R_end - gR_min) * g_inv_dR fj_end = (Z_end - gZ_min) * g_inv_dZ psi_here = _interp(psi_grid, fi_end, fj_end, g_nR_m1, g_nZ_m1) psi_err = psi_here - psi_target grad_R = _interp(dpsi_dR_grid, fi_end, fj_end, g_nR_m1, g_nZ_m1) grad_Z = _interp(dpsi_dZ_grid, fi_end, fj_end, g_nR_m1, g_nZ_m1) grad_sq = grad_R * grad_R + grad_Z * grad_Z if grad_sq > 1e-30: correction = psi_err / grad_sq R_end -= correction * grad_R Z_end -= correction * grad_Z # Check for wall / buffer intersection intersection_data = self.check_field_line_intersection( field_line_R_start = R0, field_line_Z_start = Z0, field_line_R_end = R_end, field_line_Z_end = Z_end, divertor_region = divertor_region, ) if intersection_data["intersects"]: locations_R[n_points] = intersection_data["strike_point_R"] locations_Z[n_points] = intersection_data["strike_point_Z"] n_points += 1 intersection_with_buffer = intersection_data.get("intersection_with_buffer", False) break else: locations_R[n_points] = R_end locations_Z[n_points] = Z_end n_points += 1 # Trim the pre-allocated arrays to the actual number of traced points. field_line_R = locations_R[:n_points].copy() field_line_Z = locations_Z[:n_points].copy() field_line_data = { "field_line_R": field_line_R, "field_line_Z": field_line_Z, "field_line": LineString(np.column_stack((field_line_R, field_line_Z))), "intersection_with_buffer": intersection_with_buffer, } return field_line_data
def _extract_intersection_point(self, intersection): """Extracts (R, Z) from a Shapely intersection result (Point or MultiPoint).""" if intersection.geom_type == "Point": return intersection.x, intersection.y elif intersection.geom_type == "MultiPoint": return intersection.geoms[0].x, intersection.geoms[0].y return None
[docs] def check_field_line_intersection( self, field_line_R_start, field_line_Z_start, field_line_R_end, field_line_Z_end, divertor_region=None, ): """Locates intersections between a traced field line segment and wall/buffer structures. Checks for an intersection between the provided field line segment and the tokamak's wall. If buffer regions are present, intersections with these are also examined. If multiple intersections are present, the point closest to the field line's starting point is returned. Note that this is for only a single segment of a field line, i.e. between two traced points. Uses the prepared (spatially-indexed) wall geometry created during __init__ for a fast boolean pre-check before computing the full intersection geometry. This avoids the expensive intersection computation on steps that are entirely inside the domain (the vast majority). Parameters ---------- field_line_R_start : float R coordinate of the starting location of the field line segment. field_line_Z_start : float Z coordinate of the starting location of the field line segment. field_line_R_end : float R coordinate of the end location of the field line segment. field_line_Z_end : float Z coordinate of the end location of the field line segment. divertor_region : str, optional Name of the divertor region being traced. When provided and buffers are keyed per region, only the buffers for this region are checked. Returns ------- intersection_data : dict Dictionary of intersection data containing a flag for whether or not there was an intersection, the strike point (R,Z) coordinates (if there is an intersection) and an additional flag for if an intersection occurs with a buffer. """ # Build the segment LineString once field_line_segment = LineString([(field_line_R_start, field_line_Z_start), (field_line_R_end, field_line_Z_end)]) # Fast boolean pre-check using the prepared (spatially indexed) wall. # For the majority of steps, where the segment is well inside the # domain, this returns False in O(log N) time without computing the # full intersection geometry. wall_may_intersect = self._prepared_wall.intersects(field_line_segment) # Determine the buffer geometries relevant to this trace. # Buffers are keyed per divertor region for efficiency. region_buffers = None region_prep_buffers = None if self.buffers is not None and divertor_region is not None: region_buffers = self.buffers.get(divertor_region) if region_buffers is not None: region_prep_buffers = self._prepared_buffers.get(divertor_region) buffers_present = region_buffers is not None and len(region_buffers) > 0 if not wall_may_intersect and not buffers_present: return _NO_INTERSECTION # --- Collect candidate intersection points --- intersection_points_R = [] intersection_points_Z = [] intersection_with_buffer = [] # Wall intersection (only computed when the fast check said yes) if wall_may_intersect: intersection = field_line_segment.intersection(self.tokamak_opt.wall) if not intersection.is_empty: pt = self._extract_intersection_point(intersection) if pt is not None: intersection_points_R.append(pt[0]) intersection_points_Z.append(pt[1]) intersection_with_buffer.append(False) # Buffer intersections (only for the active divertor region) if buffers_present: for prep_buf, buffer in zip(region_prep_buffers, region_buffers): if not prep_buf.intersects(field_line_segment): continue intersection = field_line_segment.intersection(buffer) if not intersection.is_empty: pt = self._extract_intersection_point(intersection) if pt is not None: intersection_points_R.append(pt[0]) intersection_points_Z.append(pt[1]) intersection_with_buffer.append(True) if len(intersection_points_R) == 0: return _NO_INTERSECTION # Find the intersection point closest to the segment start if len(intersection_points_R) == 1: # Only one candidate — skip the distance calculation return { "intersects": True, "strike_point_R": intersection_points_R[0], "strike_point_Z": intersection_points_Z[0], "intersection_with_buffer": intersection_with_buffer[0], } d2 = [(R - field_line_R_start)**2 + (Z - field_line_Z_start)**2 for R, Z in zip(intersection_points_R, intersection_points_Z)] idx = min(range(len(d2)), key=lambda i: d2[i]) return { "intersects": True, "strike_point_R": intersection_points_R[idx], "strike_point_Z": intersection_points_Z[idx], "intersection_with_buffer": intersection_with_buffer[idx], }
[docs] def compute_coilset_greens_on_grid(self): """Computes the coils' Green's functions on the 2D equilibrium grid. Pre-calculates, for each coil in the coilset, the Green's function matrices for: * Poloidal magnetic flux, psi — via ``coil.control_psi``. * The R-derivative of psi, dpsi/dR — derived from the coil's analytical ``control_Bz`` response using the identity dpsi/dR = 2 pi R Bz. * The Z-derivative of psi, dpsi/dZ — derived from the coil's analytical ``control_Br`` response using the identity dpsi/dZ = -2 pi R Br. These derivative matrices are used inside ``calculate_cost`` to construct the full dpsi/dR and dpsi/dZ grids via a single ``np.einsum`` call (a currents-vector times the precomputed matrices), which is *much* faster than fitting a ``RectBivariateSpline`` and numerically differentiating it every iteration. The analytical Green's function derivatives (via ``greens.Greens_dpsi_dR`` / ``greens.Greens_dpsi_dZ``, which underpin ``control_Bz`` / ``control_Br``) are also more accurate than spline-based numerical differentiation. """ R_2D = self.eq.R_2D Z_2D = self.eq.Z_2D TWO_PI = 2.0 * np.pi TWO_PI_R = TWO_PI * R_2D coilset_greens_on_grid = [] coilset_dpsi_dR_greens_on_grid = [] coilset_dpsi_dZ_greens_on_grid = [] for coil in self.tokamak_opt.coilset.values(): # Flux Green's function (same as before) coil_greens_on_grid = coil.control_psi(R_2D, Z_2D) coilset_greens_on_grid.append(coil_greens_on_grid) # Derivative Green's functions from the analytical field responses. # dpsi/dR = 2*pi*R * Bz and dpsi/dZ = -2*pi*R * Br coil_Bz_on_grid = coil.control_Bz(R_2D, Z_2D) coil_Br_on_grid = coil.control_Br(R_2D, Z_2D) coilset_dpsi_dR_greens_on_grid.append(TWO_PI_R * coil_Bz_on_grid) coilset_dpsi_dZ_greens_on_grid.append(-TWO_PI_R * coil_Br_on_grid) self.coilset_greens_on_grid = np.asarray(coilset_greens_on_grid) self.coilset_dpsi_dR_greens_on_grid = np.asarray(coilset_dpsi_dR_greens_on_grid) self.coilset_dpsi_dZ_greens_on_grid = np.asarray(coilset_dpsi_dZ_greens_on_grid)
[docs] def generate_constraints(self): """Generates constraint matrices/vector for simulated annealing and Tikhonov regularisation (if used). Generates various constraint matrices/vectors. In general, there will exist a matrix of Green's functions related to the various constraints, as well as a corresponding vector of the constraints themselves. These constraints will be things such as the value of magnetic field or flux from the coils at a point. There will exist up to two sets of these constraints. One set of constraints are used during the simulated annealing. The other set is used in the determination of the initial coil currents. Determining the initial coil currents is itself optional, but is particularly needed if the user decides to perform an optimisation using a set of PF coils that were different to those that were used to make the initial equilibrium. If the user is using the same PF set as was used to make the initial equilibrium, then this determination of the initial coil currents is not required. In such a case the user can still choose to estimate the initial coil currents. They may want to do this in such a way as to incroporate new divertor constraints, such as those describing the position of the divertor legs, as such a resultant equilibrium may provide a more useful initial starting point for their particular optimisation problem. During the annealing a null-space approach is utilised, wherein there is a requirement that number of coils - number of constraints >= 1. It may not be possible for the user to include lots of additional divetor geometry constraints in the annealing, if the number of coils they have supplied is insufficient. However, if the user chooses to estimate the initial coil currents using the Tikhonov regulariastion approach, then during this process they are not subject to the strict requirement on the number of constraints, and can try to include more divertor constraints. In such a case, a larger set of cosntraints can be used, with a smaller set of constraints then later used during the annealing process. In this way, the user may be able to bias the initial equilibrium into having favourable features in the divertor, which whilst not constrained during the annealing, may be more likely to be produced naturally, having started from an initial equilibrium with such features. The set of constraints used during the annealing are reffered to as the annealing constraints, with the set used during the Tikhonov regularisation reffered to as the Tikhonov constraints. """ # Annealing constraints ####################### annealing_greens_matrix = [] annealing_constraints = [] annealing_constraint_points_R = [] annealing_constraint_points_Z = [] # X-point constraint - determine which X-point to use. # By default ("primary"), the primary X-point is used. In DND configurations, # the user may instead specify "lower" or "upper" to select that X-point. annealing_xpt_choice = self.constraints["annealing"].get("xpoint_constraint", "primary") R_xpt_annealing, Z_xpt_annealing = self._resolve_xpoint_constraint(annealing_xpt_choice, "annealing") # Machine total Br at X-point annealing_constraints.append(self.eq.Br_machRZ(R_xpt_annealing, Z_xpt_annealing)) # Machine total Bz at X-point annealing_constraints.append(self.eq.Bz_machRZ(R_xpt_annealing, Z_xpt_annealing)) # Machine flux at the X-point annealing_constraints.append(self.eq.psi_machRZ(R_xpt_annealing, Z_xpt_annealing)) # Control Br at X-point annealing_greens_matrix.append(self.tokamak_opt.control_Br(R_xpt_annealing, Z_xpt_annealing)) # Control Bz at X-point annealing_greens_matrix.append(self.tokamak_opt.control_Bz(R_xpt_annealing, Z_xpt_annealing)) # Control psi at the X-point annealing_greens_matrix.append(self.tokamak_opt.control_psi(R_xpt_annealing, Z_xpt_annealing)) annealing_constraint_points_R.append(R_xpt_annealing) annealing_constraint_points_Z.append(Z_xpt_annealing) # Secondary X-point - removed pending further consideration # Upper right quadrant if self.constraints["annealing"]["constrain_upper_right_quadrant"]: # Extract the number of constraint points to use N_points = self.constraints["annealing"]["N_constraints_upper_right_quadrant"] # Calculate the normalised distance along the separatrix of these points norm_distances_points = np.linspace(0.0,self.eq.separatrix_dist_norm_vertical_upper_lcfs,N_points + 2)[1:-1] # Extract the R,Z location of these points points = self.eq.separatrix_interpolator(norm_distances_points) R_points = [r for (r,z) in points] Z_points = [z for (r,z) in points] # Iterate over the constraint points for R_point, Z_point in list(zip(R_points,Z_points)): # Machine flux at the constraint point annealing_constraints.append(self.eq.psi_machRZ(R_point,Z_point)) # Control psi at the constraint point annealing_greens_matrix.append(self.tokamak_opt.control_psi(R_point,Z_point)) annealing_constraint_points_R.append(R_point) annealing_constraint_points_Z.append(Z_point) # Upper left quadrant if self.constraints["annealing"]["constrain_upper_left_quadrant"]: # Extract the number of constraint points to use N_points = self.constraints["annealing"]["N_constraints_upper_left_quadrant"] # Calculate the normalised distance along the separatrix of these points norm_distances_points = np.linspace( self.eq.separatrix_dist_norm_vertical_upper_lcfs, self.eq.separatrix_dist_norm_imp_lcfs,N_points + 2 )[1:-1] # Extract the R,Z location of these points points = self.eq.separatrix_interpolator(norm_distances_points) R_points = [r for (r,z) in points] Z_points = [z for (r,z) in points] # Iterate over the constraint points for R_point, Z_point in list(zip(R_points,Z_points)): # Machine flux at the constraint point annealing_constraints.append(self.eq.psi_machRZ(R_point,Z_point)) # Control psi at the constraint point annealing_greens_matrix.append(self.tokamak_opt.control_psi(R_point,Z_point)) annealing_constraint_points_R.append(R_point) annealing_constraint_points_Z.append(Z_point) # Lower left quadrant if self.constraints["annealing"]["constrain_lower_left_quadrant"]: # Extract the number of constraint points to use N_points = self.constraints["annealing"]["N_constraints_lower_left_quadrant"] # Calculate the normalised distance along the separatrix of these points norm_distances_points = np.linspace( self.eq.separatrix_dist_norm_imp_lcfs, self.eq.separatrix_dist_norm_vertical_lower_lcfs, N_points + 2 )[1:-1] # Extract the R,Z location of these points points = self.eq.separatrix_interpolator(norm_distances_points) R_points = [r for (r,z) in points] Z_points = [z for (r,z) in points] # Iterate over the constraint points for R_point, Z_point in list(zip(R_points,Z_points)): # Machine flux at the constraint point annealing_constraints.append(self.eq.psi_machRZ(R_point,Z_point)) # Control psi at the constraint point annealing_greens_matrix.append(self.tokamak_opt.control_psi(R_point,Z_point)) annealing_constraint_points_R.append(R_point) annealing_constraint_points_Z.append(Z_point) # Lower right quadrant if self.constraints["annealing"]["constrain_lower_right_quadrant"]: # Extract the number of constraint points to use N_points = self.constraints["annealing"]["N_constraints_lower_right_quadrant"] # Calculate the normalised distance along the separatrix of these points norm_distances_points = np.linspace(self.eq.separatrix_dist_norm_vertical_lower_lcfs,1,N_points + 2)[1:-1] # Extract the R,Z location of these points points = self.eq.separatrix_interpolator(norm_distances_points) R_points = [r for (r,z) in points] Z_points = [z for (r,z) in points] # Iterate over the constraint points for R_point, Z_point in list(zip(R_points,Z_points)): # Machine flux at the constraint point annealing_constraints.append(self.eq.psi_machRZ(R_point,Z_point)) # Control psi at the constraint point annealing_greens_matrix.append(self.tokamak_opt.control_psi(R_point,Z_point)) annealing_constraint_points_R.append(R_point) annealing_constraint_points_Z.append(Z_point) # Outer midplane - OMP if self.constraints["annealing"]["constrain_omp"]: # Machine flux at the OMP annealing_constraints.append(self.eq.psi_machRZ(self.eq.R_OMP,self.eq.Z_OMP)) # Control psi at the OMP annealing_greens_matrix.append(self.tokamak_opt.control_psi(self.eq.R_OMP,self.eq.Z_OMP)) annealing_constraint_points_R.append(self.eq.R_OMP) annealing_constraint_points_Z.append(self.eq.Z_OMP) # Inner midplane - IMP if self.constraints["annealing"]["constrain_imp"]: # Machine flux at the IMP annealing_constraints.append(self.eq.psi_machRZ(self.eq.R_IMP,self.eq.Z_IMP)) # Control psi at the IMP annealing_greens_matrix.append(self.tokamak_opt.control_psi(self.eq.R_IMP,self.eq.Z_IMP)) annealing_constraint_points_R.append(self.eq.R_IMP) annealing_constraint_points_Z.append(self.eq.Z_IMP) # Upper-most point if self.constraints["annealing"]["constrain_upper_point"]: # Machine flux at the upper-most point annealing_constraints.append(self.eq.psi_machRZ(self.eq.R_vertical_upper,self.eq.Z_vertical_upper)) # Control psi at the upper-most point annealing_greens_matrix.append(self.tokamak_opt.control_psi(self.eq.R_vertical_upper,self.eq.Z_vertical_upper)) annealing_constraint_points_R.append(self.eq.R_vertical_upper) annealing_constraint_points_Z.append(self.eq.Z_vertical_upper) # Lower-most point if self.constraints["annealing"]["constrain_lower_point"]: # Machine flux at the upper-most point annealing_constraints.append(self.eq.psi_machRZ(self.eq.R_vertical_lower,self.eq.Z_vertical_lower)) # Control psi at the upper-most point annealing_greens_matrix.append(self.tokamak_opt.control_psi(self.eq.R_vertical_lower,self.eq.Z_vertical_lower)) annealing_constraint_points_R.append(self.eq.R_vertical_lower) annealing_constraint_points_Z.append(self.eq.Z_vertical_lower) # Divertor constraint points - these are points that should lie on the divertor # legs, i.e. the separatrix. The points may or may not currently be on the separatrix. if self.constraints["annealing"]["additional_divertor_constraint_points"] is not None: # Iterate over the points for point in self.constraints["annealing"]["additional_divertor_constraint_points"]: # Extract the R,Z of the point R_point = point[0] Z_point = point[1] # The coils must produce a flux equal to the difference between the desired # total flux (that of the separatrix) and the background flux from the plasma, # at this point required_machine_flux = self.eq.psi_lcfs - self.eq.psi_plasRZ(R_point,Z_point) # Machine flux at the constraint point annealing_constraints.append(required_machine_flux) # Control psi at the constraint point annealing_greens_matrix.append(self.tokamak_opt.control_psi(R_point,Z_point)) annealing_constraint_points_R.append(R_point) annealing_constraint_points_Z.append(Z_point) # Additional divertor X-points - these are X-points in addition to the primary X-point(s). # These may be used in advanced divertor configurations such as the X-point target, # snowflake, X and super-X divertor configurations. if self.constraints["annealing"]["additional_divertor_xpoints"] is not None: # Iterate over the points for point in self.constraints["annealing"]["additional_divertor_xpoints"]: # Extract the R,Z of the point R_point = point[0] Z_point = point[1] # To make a new X-point, the machine must provide both a vertical and radial field # that exactly cancels out the vertical and radial field from the background plasma. background_Br = self.eq.Br_plasRZ(R_point,Z_point) background_Bz = self.eq.Bz_plasRZ(R_point,Z_point) # Machine total Br at X-point annealing_constraints.append(-background_Br) # Machine total Bz at X-point annealing_constraints.append(-background_Bz) # Control Br at X-point annealing_greens_matrix.append(self.tokamak_opt.control_Br(R_point,Z_point)) # Control Bz at X-point annealing_greens_matrix.append(self.tokamak_opt.control_Bz(R_point,Z_point)) annealing_constraint_points_R.append(R_point) annealing_constraint_points_Z.append(Z_point) self.annealing_constraint_points_R = annealing_constraint_points_R self.annealing_constraint_points_Z = annealing_constraint_points_Z # Cast to numpy arrays self.annealing_constraints = np.asarray(annealing_constraints) self.annealing_greens_matrix = np.asarray(annealing_greens_matrix) # Calculate the null space of the greens matrix self.annealing_greens_matrix_nullspace = null_space(self.annealing_greens_matrix) # Calculate the transpose of the nullspace matrix self.annealing_greens_matrix_nullspace_transpose = np.transpose(self.annealing_greens_matrix_nullspace) # Record the number of constraints self.annealing_N_constraints = len(self.annealing_constraints) logger.info('N_coils: %d', self.tokamak_opt.N_coils) logger.info('N_constraints (annealing): %d', self.annealing_N_constraints) # Tikhonov constraints - these will only exist if an initial estimate of the coil # currents is required to be calculated. ####################### if self.estimate_initial_currents: tikhonov_greens_matrix = [] tikhonov_constraints = [] tikhonov_constraint_points_R = [] tikhonov_constraint_points_Z = [] # The user can elect to exclude/turn off certain coils during this estimation # of the initial coil currents. To enable this, the Green's functions for such coils # are set to 0, which will lead to their currents being returned as 0. # We now create a "mask" array of 1's, of length N_coils, wherein enetries corresponding # to excluded coils are set to 0. self.tikhonov_mask = np.ones(self.tokamak_opt.N_coils) if self.constraints["tikhonov"]["exclude_coils"] is not None: # Get the names of the coils in the coilset coil_names = self.tokamak_opt.get_coil_names() # Get a list of indeces of the excluded coils as they appear in the above list name_indeces = [coil_names.index(name) for name in self.constraints["tikhonov"]["exclude_coils"]] # Adjust the mask, setting the excluded coils entries' to 0 for index in name_indeces: self.tikhonov_mask[index] = 0 # X-point constraint - determine which X-point to use. tikhonov_xpt_choice = self.constraints["tikhonov"].get("xpoint_constraint", "primary") R_xpt_tikhonov, Z_xpt_tikhonov = self._resolve_xpoint_constraint(tikhonov_xpt_choice, "tikhonov") # Machine total Br at X-point tikhonov_constraints.append(self.eq.Br_machRZ(R_xpt_tikhonov, Z_xpt_tikhonov)) # Machine total Bz at X-point tikhonov_constraints.append(self.eq.Bz_machRZ(R_xpt_tikhonov, Z_xpt_tikhonov)) # Machine flux at the X-point tikhonov_constraints.append(self.eq.psi_machRZ(R_xpt_tikhonov, Z_xpt_tikhonov)) # Control Br at X-point tikhonov_greens_matrix.append( self.tokamak_opt.control_Br(R_xpt_tikhonov, Z_xpt_tikhonov) * self.tikhonov_mask ) # Control Bz at X-point tikhonov_greens_matrix.append( self.tokamak_opt.control_Bz(R_xpt_tikhonov, Z_xpt_tikhonov) * self.tikhonov_mask ) # Control psi at the X-point tikhonov_greens_matrix.append( self.tokamak_opt.control_psi(R_xpt_tikhonov, Z_xpt_tikhonov) * self.tikhonov_mask ) tikhonov_constraint_points_R.append(R_xpt_tikhonov) tikhonov_constraint_points_Z.append(Z_xpt_tikhonov) # Secondary X-point - removed pending further consideration # Upper right quadrant if self.constraints["tikhonov"]["constrain_upper_right_quadrant"]: # Extract the number of constraint points to use N_points = self.constraints["tikhonov"]["N_constraints_upper_right_quadrant"] # Calculate the normalised distance along the separatrix of these points norm_distances_points = np.linspace(0.0,self.eq.separatrix_dist_norm_vertical_upper_lcfs,N_points + 2)[1:-1] # Extract the R,Z location of these points points = self.eq.separatrix_interpolator(norm_distances_points) R_points = [r for (r,z) in points] Z_points = [z for (r,z) in points] # Iterate over the constraint points for R_point, Z_point in list(zip(R_points,Z_points)): # Machine flux at the constraint point tikhonov_constraints.append(self.eq.psi_machRZ(R_point,Z_point)) # Control psi at the constraint point tikhonov_greens_matrix.append(self.tokamak_opt.control_psi(R_point,Z_point) * self.tikhonov_mask) tikhonov_constraint_points_R.append(R_point) tikhonov_constraint_points_Z.append(Z_point) # Upper left quadrant if self.constraints["tikhonov"]["constrain_upper_left_quadrant"]: # Extract the number of constraint points to use N_points = self.constraints["tikhonov"]["N_constraints_upper_left_quadrant"] # Calculate the normalised distance along the separatrix of these points norm_distances_points = np.linspace( self.eq.separatrix_dist_norm_vertical_upper_lcfs, self.eq.separatrix_dist_norm_imp_lcfs, N_points + 2 )[1:-1] # Extract the R,Z location of these points points = self.eq.separatrix_interpolator(norm_distances_points) R_points = [r for (r,z) in points] Z_points = [z for (r,z) in points] # Iterate over the constraint points for R_point, Z_point in list(zip(R_points,Z_points)): # Machine flux at the constraint point tikhonov_constraints.append(self.eq.psi_machRZ(R_point,Z_point)) # Control psi at the constraint point tikhonov_greens_matrix.append(self.tokamak_opt.control_psi(R_point,Z_point) * self.tikhonov_mask) tikhonov_constraint_points_R.append(R_point) tikhonov_constraint_points_Z.append(Z_point) # Lower left quadrant if self.constraints["tikhonov"]["constrain_lower_left_quadrant"]: # Extract the number of constraint points to use N_points = self.constraints["tikhonov"]["N_constraints_lower_left_quadrant"] # Calculate the normalised distance along the separatrix of these points norm_distances_points = np.linspace( self.eq.separatrix_dist_norm_imp_lcfs, self.eq.separatrix_dist_norm_vertical_lower_lcfs, N_points + 2 )[1:-1] # Extract the R,Z location of these points points = self.eq.separatrix_interpolator(norm_distances_points) R_points = [r for (r,z) in points] Z_points = [z for (r,z) in points] # Iterate over the constraint points for R_point, Z_point in list(zip(R_points,Z_points)): # Machine flux at the constraint point tikhonov_constraints.append(self.eq.psi_machRZ(R_point,Z_point)) # Control psi at the constraint point tikhonov_greens_matrix.append(self.tokamak_opt.control_psi(R_point,Z_point) * self.tikhonov_mask) tikhonov_constraint_points_R.append(R_point) tikhonov_constraint_points_Z.append(Z_point) # Lower right quadrant if self.constraints["tikhonov"]["constrain_lower_right_quadrant"]: # Extract the number of constraint points to use N_points = self.constraints["tikhonov"]["N_constraints_lower_right_quadrant"] # Calculate the normalised distance along the separatrix of these points norm_distances_points = np.linspace(self.eq.separatrix_dist_norm_vertical_lower_lcfs,1,N_points + 2)[1:-1] # Extract the R,Z location of these points points = self.eq.separatrix_interpolator(norm_distances_points) R_points = [r for (r,z) in points] Z_points = [z for (r,z) in points] # Iterate over the constraint points for R_point, Z_point in list(zip(R_points,Z_points)): # Machine flux at the constraint point tikhonov_constraints.append(self.eq.psi_machRZ(R_point,Z_point)) # Control psi at the constraint point tikhonov_greens_matrix.append(self.tokamak_opt.control_psi(R_point,Z_point) * self.tikhonov_mask) tikhonov_constraint_points_R.append(R_point) tikhonov_constraint_points_Z.append(Z_point) # Outer midplane - OMP if self.constraints["tikhonov"]["constrain_omp"]: # Machine flux at the OMP tikhonov_constraints.append(self.eq.psi_machRZ(self.eq.R_OMP,self.eq.Z_OMP)) # Control psi at the OMP tikhonov_greens_matrix.append(self.tokamak_opt.control_psi(self.eq.R_OMP,self.eq.Z_OMP) * self.tikhonov_mask) tikhonov_constraint_points_R.append(self.eq.R_OMP) tikhonov_constraint_points_Z.append(self.eq.Z_OMP) # Inner midplane - IMP if self.constraints["tikhonov"]["constrain_imp"]: # Machine flux at the IMP tikhonov_constraints.append(self.eq.psi_machRZ(self.eq.R_IMP,self.eq.Z_IMP)) # Control psi at the IMP tikhonov_greens_matrix.append(self.tokamak_opt.control_psi(self.eq.R_IMP,self.eq.Z_IMP) * self.tikhonov_mask) tikhonov_constraint_points_R.append(self.eq.R_IMP) tikhonov_constraint_points_Z.append(self.eq.Z_IMP) # Upper-most point if self.constraints["tikhonov"]["constrain_upper_point"]: # Machine flux at the upper-most point tikhonov_constraints.append(self.eq.psi_machRZ(self.eq.R_vertical_upper,self.eq.Z_vertical_upper)) # Control psi at the upper-most point tikhonov_greens_matrix.append(self.tokamak_opt.control_psi(self.eq.R_vertical_upper,self.eq.Z_vertical_upper)) tikhonov_constraint_points_R.append(self.eq.R_vertical_upper) tikhonov_constraint_points_Z.append(self.eq.Z_vertical_upper) # Lower-most point if self.constraints["tikhonov"]["constrain_lower_point"]: # Machine flux at the upper-most point tikhonov_constraints.append(self.eq.psi_machRZ(self.eq.R_vertical_lower,self.eq.Z_vertical_lower)) # Control psi at the upper-most point tikhonov_greens_matrix.append(self.tokamak_opt.control_psi(self.eq.R_vertical_lower,self.eq.Z_vertical_lower)) tikhonov_constraint_points_R.append(self.eq.R_vertical_lower) tikhonov_constraint_points_Z.append(self.eq.Z_vertical_lower) # Divertor constraint points - these are points that should lie on the divertor # legs, i.e. the separatrix. The points may or may not currently be on the separatrix. if self.constraints["tikhonov"]["additional_divertor_constraint_points"] is not None: # Iterate over the points for point in self.constraints["tikhonov"]["additional_divertor_constraint_points"]: # Extract the R,Z of the point R_point = point[0] Z_point = point[1] # The coils must produce a flux equal to the difference between the desired # total flux (that of the separatrix) and the background flux from the plasma, # at this point required_machine_flux = self.eq.psi_lcfs - self.eq.psi_plasRZ(R_point,Z_point) # Machine flux at the constraint point tikhonov_constraints.append(required_machine_flux) # Control psi at the constraint point tikhonov_greens_matrix.append(self.tokamak_opt.control_psi(R_point,Z_point) * self.tikhonov_mask) tikhonov_constraint_points_R.append(R_point) tikhonov_constraint_points_Z.append(Z_point) # Additional divertor X-points - these are X-points in addition to the primary X-point(s). # These may be used in advanced divertor configurations such as the X-point target, # snowflake, X and super-X divertor configurations. if self.constraints["tikhonov"]["additional_divertor_xpoints"] is not None: # Iterate over the points for point in self.constraints["tikhonov"]["additional_divertor_xpoints"]: # Extract the R,Z of the point R_point = point[0] Z_point = point[1] # To make a new X-point, the machine must provide both a vertical and radial field # that exactly cancels out the vertical and radial field from the background plasma. background_Br = self.eq.Br_plasRZ(R_point,Z_point) background_Bz = self.eq.Bz_plasRZ(R_point,Z_point) # Machine total Br at X-point tikhonov_constraints.append(-background_Br) # Machine total Bz at X-point tikhonov_constraints.append(-background_Bz) # Control Br at X-point tikhonov_greens_matrix.append(self.tokamak_opt.control_Br(R_point,Z_point) * self.tikhonov_mask) # Control Bz at X-point tikhonov_greens_matrix.append(self.tokamak_opt.control_Bz(R_point,Z_point) * self.tikhonov_mask) tikhonov_constraint_points_R.append(R_point) tikhonov_constraint_points_Z.append(Z_point) self.tikhonov_constraint_points_R = tikhonov_constraint_points_R self.tikhonov_constraint_points_Z = tikhonov_constraint_points_Z # Cast to numpy arrays self.tikhonov_constraints = np.asarray(tikhonov_constraints) self.tikhonov_greens_matrix = np.asarray(tikhonov_greens_matrix) # Record the number of constraints self.tikhonov_N_constraints = len(self.tikhonov_constraints)
[docs] def create_buffers(self): """Creates buffer regions around the wall of the machine. The user can choose to create buffer regions around parts of the wall. These are useful if the user wants the divertor leg(s) to not come within a certain distance of parts of the wall. An example of this might be regions such as the divertor nose in tightly baffled geometries, which may not be equipped to handle heat fluxes as high as the main divertor targets. Buffer definitions are keyed by divertor region name so that, at runtime, only the buffers belonging to the region currently being traced are checked for intersections. This improves efficiency and avoids false-positive buffer hits from unrelated regions. Raises ------ ValueError If ``use_buffers`` is True but no buffer data has been supplied. TypeError If ``buffers`` is not a dict keyed by region name. """ if self.buffers_input is None: raise ValueError( "use_buffers=True but no buffer data was supplied. " "Pass buffer definitions via the 'buffers' parameter, e.g. " "buffers={'lower_outer': [{'R': [R1, R2], 'Z': [Z1, Z2], 'distance': d}, ...], ...}, " "or use the FORGE GUI to define buffers interactively." ) if not isinstance(self.buffers_input, dict): raise TypeError( "The 'buffers' parameter must be a dict keyed by divertor region " "name, e.g. {'lower_outer': [{'R': [R1, R2], 'Z': [Z1, Z2], 'distance': d}, ...]}. " f"Got {type(self.buffers_input).__name__}." ) # Build per-region Shapely geometries and definition records. # self.buffers_input = {"region": [{"R":[R1,R2],"Z":[Z1,Z2],"distance":d}, ...], ...} self.buffers = {} # {region: [boundary_geom, ...]} self.buffers_data = {} # {region: [{"R":..., "Z":..., "distance":...}, ...]} for region, defs_list in self.buffers_input.items(): region_geoms = [] region_data = [] for buffer_def in defs_list: R = buffer_def["R"] Z = buffer_def["Z"] distance = buffer_def["distance"] # Reconstruct the Shapely buffer geometry line_geom = LineString(list(zip(R, Z))) buffer_geom = line_geom.buffer(distance) region_geoms.append(buffer_geom.boundary) # Record the definition data for potential saving region_data.append({ "R": list(R), "Z": list(Z), "distance": float(distance), }) if region_geoms: self.buffers[region] = region_geoms self.buffers_data[region] = region_data
[docs] def create_xpoint_regions(self): """Defines regions for additional divertor X-points. The user can choose to define regions within which X-points are encouraged to be formed under optimisation. The regions are recorded as a dictionary mapping divertor region names to Shapely LineString objects. Raises ------ ValueError If ``use_xpoint_regions`` is True but no region data has been supplied. TypeError If ``xpoint_regions`` is not a dictionary keyed by divertor region name. """ if self.xpoint_regions is None: raise ValueError( "use_xpoint_regions=True but no X-point region data was supplied. " "Pass region definitions via the 'xpoint_regions' parameter as a " "dictionary mapping divertor region names to {R, Z} definitions, e.g. " 'xpoint_regions={"lower_outer": {"R": [...], "Z": [...]}}' ) if not isinstance(self.xpoint_regions, dict): raise TypeError( "xpoint_regions must be a dictionary mapping divertor region names " "to {R, Z} definitions, e.g. " 'xpoint_regions={"lower_outer": {"R": [...], "Z": [...]}}. ' "Received type: " + type(self.xpoint_regions).__name__ ) # The user has supplied the X-point region locations keyed by divertor region. # self.xpoint_regions = {"lower_outer": {"R":[], "Z":[]}, ...} xpoint_regions = {} for region_name, region_data in self.xpoint_regions.items(): if region_name not in self.divertor_regions: logger.warning( "X-point region defined for '%s' but that divertor region is not " "being optimised — skipping.", region_name ) continue # Get the region R_region = region_data["R"] Z_region = region_data["Z"] # Densify the shape. Here, we insert more points along the # edges of the shape. R_region, Z_region = densify_closed_shape(R_region, Z_region, 0.01) # Make the region into a Shapely LineString xpoint_region = LineString(list(zip(R_region, Z_region))) xpoint_regions[region_name] = xpoint_region self.xpoint_regions = xpoint_regions
[docs] def init_xpoint_regions(self): """Initialises secondary divertor X-point regions. Initialises regions in which the optimiser will encourage additional X-points to form. This is particularly useful if X-points target or snowflake divertor configurations are desired. A list of regions in the form of Shapely LineStrings will have been pre-defined as an attribute of the tokamak being used in the optimisation. For each of these regions, those points on the 2D R,Z equilibrium grid that lie within the region(s) are identified and recorded. There will be N_points number of points that lie within the X-points region(s). For each of these points a set of 2x2 Jacobian matrices for the poloidal field from each of the coils in the coilset is created, along with a corresponding 2x2 Jacobian matrix for the background poloidal field from the plasma. The coil Jacobian matrices are themselves stored in a N_points x N_coils matrix, with the background field Jacobian matrices stored in an N_points vector. These data will later be used to identify the location of additional secondary divertor X-points that may form within these regions. We also record the background poloidal field components (Br,Bz) themselves, along with coil Br/Bz control responses, as a means of later quickly calculating the poloidal field strength inside the regions. """ # The user can define the X-point region(s) interactively or by supplying the points defining the region(s) # First, the user defines the X-point regions self.create_xpoint_regions() # Record the number of X-point regions self.N_xpoint_regions = len(self.xpoint_regions) for divertor_region, xpoint_region in self.xpoint_regions.items(): # Get the R,Z of the grid points inside this region R_points, Z_points = grid_points_inside_linestring( self.eq.R_2D, self.eq.Z_2D, xpoint_region, include_boundary=True ) # Get the control Br/Bz at these points. This is an N_points x N_coils matrix xpoint_region_greens_br_matrix = [] xpoint_region_greens_bz_matrix = [] # Get the Br/Bz background field from the plasma at these points. These are list of length N_points. xpoint_region_background_Br_points = [] xpoint_region_background_Bz_points = [] # Get the coils poloidal field Jacobians matrices. This is an N_points x N_coils matrix of matrices. xpoint_region_coils_Bp_jacobians_matrix = [] # Get the background poloidal field Jacobian from the plasma at these points. This is a list of length N_points. xpoint_region_background_Bp_jacobian_points = [] for R_point,Z_point in zip(R_points,Z_points): # Get the control Br/Bz from the coils xpoint_region_greens_br_matrix.append(self.tokamak_opt.control_Br(R_point,Z_point)) xpoint_region_greens_bz_matrix.append(self.tokamak_opt.control_Bz(R_point,Z_point)) # Get the background Br/Bz from the plasma xpoint_region_background_Br_points.append(self.eq.Br_plasRZ(R_point,Z_point)) xpoint_region_background_Bz_points.append(self.eq.Bz_plasRZ(R_point,Z_point)) # Get the control poloidal field Jacobian matrices xpoint_region_coils_Bp_jacobians_matrix.append(self.tokamak_opt.control_Bp_jacobians(R_point,Z_point)) # Get the backhround poloidal field Jacobian matrix xpoint_region_background_Bp_jacobian_points.append(self.eq.Bpol_jacobian_plasRZ(R_point,Z_point)) # Convert to numpy arrays R_points = np.asarray(R_points) Z_points = np.asarray(Z_points) xpoint_region_greens_br_matrix = np.asarray(xpoint_region_greens_br_matrix) xpoint_region_greens_bz_matrix = np.asarray(xpoint_region_greens_bz_matrix) xpoint_region_background_Br_points = np.asarray(xpoint_region_background_Br_points) xpoint_region_background_Bz_points = np.asarray(xpoint_region_background_Bz_points) xpoint_region_coils_Bp_jacobians_matrix = np.asarray(xpoint_region_coils_Bp_jacobians_matrix) xpoint_region_background_Bp_jacobian_points = np.asarray(xpoint_region_background_Bp_jacobian_points) # Calculate the transpose of these matrices. We do this because it is more # convenient to work with row vectors to represent 1D quantities. xpoint_region_greens_br_matrix_transpose = np.transpose(xpoint_region_greens_br_matrix) xpoint_region_greens_bz_matrix_transpose = np.transpose(xpoint_region_greens_bz_matrix) xpoint_region_coils_Bp_jacobians_matrix_transpose = np.swapaxes(xpoint_region_coils_Bp_jacobians_matrix,0,1) # The divertor region for this X-point region is explicitly specified # by the user via the xpoint_regions dictionary key. # Using these matrices, calculate the initial poloidal field square in the region Br_coils_initial = self.initial_coil_currents @ xpoint_region_greens_br_matrix_transpose Bz_coils_initial = self.initial_coil_currents @ xpoint_region_greens_bz_matrix_transpose Br_initial = Br_coils_initial + xpoint_region_background_Br_points Bz_initial = Bz_coils_initial + xpoint_region_background_Bz_points Bp2_initial = Br_initial * Br_initial + Bz_initial * Bz_initial # Extract the minimum value and record initial_Bp2_min = np.amin(Bp2_initial) # Extract the R,Z of the X-point region boundary, as it will be convenient to store this data # in the divertor region subdict R_region, Z_region = xpoint_region.xy # Note that an X-point region exists within this divertor region and initialise the subdict self.divertor_data[divertor_region]["xpoint_region_present"] = True self.divertor_data[divertor_region]["xpoint_region"] = { "R_points": R_points, "Z_points": Z_points, "R_region": R_region, "Z_region": Z_region, "region_boundary": xpoint_region, "greens_br_matrix": xpoint_region_greens_br_matrix, # N_points x N_coils "greens_bz_matrix": xpoint_region_greens_bz_matrix, # N_points x N_coils "greens_br_matrix_transpose": xpoint_region_greens_br_matrix_transpose, "greens_bz_matrix_transpose": xpoint_region_greens_bz_matrix_transpose, "background_Br_points": xpoint_region_background_Br_points, "background_Bz_points": xpoint_region_background_Bz_points, "coils_Bp_jacobians_matrix": xpoint_region_coils_Bp_jacobians_matrix, # N_points x N_coils "coils_Bp_jacobians_matrix_transpose": xpoint_region_coils_Bp_jacobians_matrix_transpose, # N_points x N_coils "xpoint_region_background_Bp_jacobian_points": xpoint_region_background_Bp_jacobian_points, "initial_Bp2_min": initial_Bp2_min, } # Set any divertor regions without an X-point region to have an X-point region cost of 0. for divertor_region in self.divertor_regions: if not self.divertor_data[divertor_region]["xpoint_region_present"]: self.divertor_data[divertor_region]["initial_xpoint_region_cost"] = 0.0
[docs] def init_current_step_size(self,currents): """Sets the typical step size for perturbing the coil currents. Parameters ---------- currents : 1D numpy array. The coil currents (A). """ # Filter the currents to remove small currents eps = 0.1 * 1.0e03 currents_filtered = [current for current in currents if abs(current) >= eps] # Get the mean current from the filtered set self.typical_current = np.mean(np.abs(currents_filtered)) # Set the typical current step size self.current_step_size = self.current_step_size_factor * self.typical_current # Whilst here, we will calculate the initial sum square of the coil currents, which will # later be used in the optimisation's cost function. self.initial_sum_square_coil_currents = np.dot(currents,currents) # We will also need to consider what would happen if coils that are initially off (or have a realtively # low current) find themselves being made use of as a result of the optimisation, i.e. they obtain currents # comparable to those in the other PF coils that are initally energised. We can imagine a similar # sum square of the coil currents in such an instance. # We will now create an equivalent list of coil currents, with all coils reasonably energised. energised_currents = np.abs(deepcopy(currents)) energised_currents[energised_currents < 0.05 * (self.typical_current)] = 0.5 * self.typical_current # Calculate the corresponding sum square of the coil currents. self.energised_sum_square_coil_currents = np.dot(energised_currents,energised_currents) logger.info('typical current (kA): %.3f', self.typical_current * 1.0e-03) logger.info('initial currents (kA): %s', currents * 1.0e-03) logger.info('energised currents (kA): %s', energised_currents * 1.0e-03) logger.info('sum square: %.6e', self.initial_sum_square_coil_currents) logger.info('energised sum square: %.6e', self.energised_sum_square_coil_currents)
[docs] def estimate_currents(self): """Estimates the initial currents in the coils using Tikhonov regularisation. Performs Tikhonov reguralisation to estimate initial coil currents, automatically tuning the regularisation parameter, alpha, to meet the equilibrium constraints within a specified threshold. This is optionally used, and is particularly useful when using a coil set that differs from the coil set used to produce the initial equilibrium. The simulated annealing will move us around in the subspace of coil currents that satisfy our constraints - this only works by perturbing an initial set of coil currents that themselves satisfy the constraints. Hence, if you have a new coil set, and don't know what the initial currents are, you can use this to estimate such currents. """ self.initial_currents_estimate = self.tikhonov_min_residual( G = self.tikhonov_greens_matrix, b = self.tikhonov_constraints, threshold = 0.01, alpha_start = 1e-10, alpha_min = 1e-40, decay_factor = 0.8, max_iter = 1000, ) np.set_printoptions(suppress=True) logger.info('initial_currents_estimate (kA): %s', self.initial_currents_estimate * 1.0e-03) logger.info('initial_currents_estimate (MA): %s', self.initial_currents_estimate * 1.0e-06) # Generate the new 2D grid of poloidal magnetic flux from these coil currents data = self.generate_eq_from_currents( new_currents=self.initial_currents_estimate ) # Record the 2D map of poloidal magnetic flux from this estimate of the # initial coil currents self.initial_psi_2D_estimate = data["psi_2D"] # Visualise the resultant equilibrium # Locate the X-point _, xpt = find_critical(self.eq.R_2D,self.eq.Z_2D,data["psi_2D"]) # Plot - total flux fig, ax = plt.subplots() ax.set_aspect('equal') ax.set_xlabel('R (m)') ax.set_ylabel('Z (m)') ax.contour(self.eq.R_2D,self.eq.Z_2D,data["psi_2D"],levels=100,alpha=0.4,colors='k') ax.contour(self.eq.R_2D,self.eq.Z_2D,self.eq.psi_2D,levels=[self.eq.psi_lcfs],colors='r') ax.contour(self.eq.R_2D,self.eq.Z_2D,data["psi_2D"],levels=[xpt[0][2]],colors='tab:orange') ax.plot(self.tokamak_opt.wall_R,self.tokamak_opt.wall_Z,color='k') ax.scatter(self.tikhonov_constraint_points_R,self.tikhonov_constraint_points_Z,color='r',marker='s',zorder=2) ax = self.tokamak_opt.plot(ax=ax,show=False) ax.title.set_text('Pseudo-inverse') plt.show()
[docs] def tikhonov_min_residual( self, G = None, b = None, threshold = 0.01, alpha_start = 1e-10, alpha_min = 1e-35, decay_factor = 0.8, max_iter = 1000, ): """Performs Tikhonov regularisation on the coil curents. Performs Tikhonov regularisation to solve G I = b for I, where I is a vector of coil currents, G is a matrix of Green's functions related to a set of equilibrium constraints on field/flux, denoted by the vector b. The regularisation parameter, alpha, is automatically tuned (becoming smaller each iteration), such that the residuals are minimised until the maximum error value drops below some threshold value set by the user. Parameters ---------- G : 2D numpy array 2D matrix of Green's functions/coil responses. b : 1D numpy array Vector of constraints. threshold : float Value that the maximum relative error must drop below. alpha_start : float The initial value of alpha. alpha_min : float The minimum allowed value of alpha. decay_factor : float Value by which alpha is multiplied during each tuning. max_iter : int Maximum number of interations/tunings of alpha to take. Returns ------- I : 1D numpy array Vector of coil currents. """ alpha = alpha_start # These quantities depend only on G and b, which are constant # across iterations — compute once outside the loop. GTG = G.T @ G GTb = G.T @ b identity = np.eye(G.shape[1]) for _ in range(max_iter): regularised = GTG + alpha * identity I = np.linalg.solve(regularised, GTb) b_achieved = G @ I relative_errors = np.abs((b_achieved - b) / b) max_relative_error = np.amax(relative_errors) if max_relative_error < threshold: return I alpha *= decay_factor if alpha < alpha_min: break
[docs] def update_cooling_factor(self): """Updates the cooling factor, β. Sets the cooling factor, β, for use in the cooling schedule (T_new = β * T_old). If many iterations at the current temperature were required to reach the threshold acceptance rate, then this indicates that the optimiser is still acting in an exploratory manner, hence β will remain high (the temperature will not decrease much). This aims to prevent premature cooling. Likewise, if not that many iterations were required, this indicates the optimiser is not being as explorative, hence β will remain low (the temperature will decrease more rapidly). β is bounded between β_min and β_max. """ baseline_iterations = int(0.5 * self.n_window) # Factor is capped at 1 to prevent the cooling becoming too slow. factor = min(self.iterations_to_acceptance / baseline_iterations,1.0) self.cooling_factor = self.max_cooling_factor - \ (self.max_cooling_factor - self.min_cooling_factor) * (1.0 - factor)
def _resolve_xpoint_constraint(self, choice, label): """Resolves the X-point (R, Z) coordinates to use for constraints. Parameters ---------- choice : str One of ``"primary"``, ``"lower"``, or ``"upper"``. label : str A label for logging purposes (e.g. ``"annealing"`` or ``"tikhonov"``). Returns ------- R_xpt : float Major radius of the selected X-point. Z_xpt : float Vertical position of the selected X-point. """ if choice == "primary": return self.eq.R_xpt_lcfs, self.eq.Z_xpt_lcfs if choice not in ("lower", "upper"): raise ValueError( f"xpoint_constraint must be 'primary', 'lower', or 'upper'. Got '{choice}'." ) if not self.eq.DND: logger.warning( "xpoint_constraint='%s' was specified for %s constraints, but the equilibrium " "is not a double null. Falling back to the primary X-point.", choice, label, ) return self.eq.R_xpt_lcfs, self.eq.Z_xpt_lcfs if choice == "lower": logger.info( "Using the lower X-point for %s constraints (R=%.4f, Z=%.4f).", label, self.eq.R_xpt_lower, self.eq.Z_xpt_lower, ) return self.eq.R_xpt_lower, self.eq.Z_xpt_lower # choice == "upper" logger.info( "Using the upper X-point for %s constraints (R=%.4f, Z=%.4f).", label, self.eq.R_xpt_upper, self.eq.Z_xpt_upper, ) return self.eq.R_xpt_upper, self.eq.Z_xpt_upper
[docs] def generate_default_constraints(self): """If the user does not specify any constraints, a dictionary of default constraints will be generated and returned.""" constraints = { "annealing":{ "constrain_omp": True, "constrain_imp": True, "constrain_upper_point": False, "constrain_lower_point": False, "constrain_upper_right_quadrant": True, "N_constraints_upper_right_quadrant": 1, "constrain_upper_left_quadrant": True, "N_constraints_upper_left_quadrant": 1, "constrain_lower_left_quadrant": True, "N_constraints_lower_left_quadrant": 1, "constrain_lower_right_quadrant": True, "N_constraints_lower_right_quadrant": 1, "additional_divertor_constraint_points": None, "additional_divertor_xpoints": None, "xpoint_constraint": "primary", }, "tikhonov":{ "constrain_omp": True, "constrain_imp": True, "constrain_upper_right_quadrant": True, "N_constraints_upper_right_quadrant": 1, "constrain_upper_left_quadrant": True, "N_constraints_upper_left_quadrant": 1, "constrain_lower_left_quadrant": True, "N_constraints_lower_left_quadrant": 1, "constrain_lower_right_quadrant": True, "N_constraints_lower_right_quadrant": 1, "additional_divertor_constraint_points": None, "additional_divertor_xpoints": None, "xpoint_constraint": "primary", "exclude_coils": None, } } return constraints
[docs] def init_divertor_data(self): """Initialises key pieces of data related to the divertor regions. The starting position of the field line trace for each divertor region is determined, as is the direction of the field line trace. Divertor-related cost terms are also initialised. """ # We will now identify the (R,Z) starting location for the field line trace along # the divertor leg, which will start near the relevant X-point. In addition, the direction of the # field line trace from X-point->downstream is determined, along with additional data # for plotting routines, such as short-hand labels and colours. Cost terms associated with # each divertor region are also initialised. # We will also determine the min/max Z plotting limits whilst examining each region. # Set default plotting values, which may be modified by the presence of divertor regions. self.plotting_R_max = self.tokamak_opt.wall_R_max self.plotting_R_min = self.tokamak_opt.wall_R_min self.plotting_Z_max = self.eq.Z_mag self.plotting_Z_min = self.eq.Z_mag for divertor_region in self.divertor_regions: if divertor_region == "lower_outer": # R,Z starting position of the field line trace trace_starting_position_R = self.eq.R_xpt_lower + 2.0 * self.eq.dR trace_starting_position_Z = self.eq.Z_xpt_lower # Short-hand label for the region short_label = "LO" # Plotting colour colour = "#1F77B4" # Z limit for plotting self.plotting_Z_min = self.tokamak_opt.wall_Z_min elif divertor_region == "lower_inner": # R,Z starting position of the field line trace trace_starting_position_R = self.eq.R_xpt_lower - 2.0 * self.eq.dR trace_starting_position_Z = self.eq.Z_xpt_lower # Short-hand label for the region short_label = "LI" # Plotting colour colour = "#FF7F0E" # Z limit for plotting self.plotting_Z_min = self.tokamak_opt.wall_Z_min elif divertor_region == "upper_outer": # R,Z starting position of the field line trace trace_starting_position_R = self.eq.R_xpt_upper + 2.0 * self.eq.dR trace_starting_position_Z = self.eq.Z_xpt_upper # Short-hand label for the region short_label = "UO" # Plotting colour colour = "#D62728" # Z limit for plotting self.plotting_Z_max = self.tokamak_opt.wall_Z_max elif divertor_region == "upper_inner": # R,Z starting position of the field line trace trace_starting_position_R = self.eq.R_xpt_upper - 2.0 * self.eq.dR trace_starting_position_Z = self.eq.Z_xpt_upper # Short-hand label for the region short_label = "UI" # Plotting colour colour = "#2CA02C" # Z limit for plotting self.plotting_Z_max = self.tokamak_opt.wall_Z_max else: raise ValueError(str(divertor_region) + " is not a valid divertor region.") # Determine the trace direction from the local poloidal field. # The trace must go from near the X-point downstream toward the # divertor target: downward (Z decreasing) for lower divertors, # upward (Z increasing) for upper divertors. The Z-component of # the RK4 step is proportional to direction * Bz, so we pick the # sign of direction that gives the correct vertical step. Bz_start = float(self.eq.BzRZ(trace_starting_position_R, trace_starting_position_Z)) if "lower" in divertor_region: # Need the trace to move downward (dZ < 0) → direction * Bz < 0 trace_direction = -1.0 if Bz_start > 0 else 1.0 else: # Need the trace to move upward (dZ > 0) → direction * Bz > 0 trace_direction = 1.0 if Bz_start > 0 else -1.0 # Check if the asociated cost function weights for this divertor region have been initialised. # If a weight has not been initialised, set it to (1/N_divertors) N_divertors = len(self.divertor_regions) backup_weight = 1.0 / N_divertors # Connection length try: weight_connection_length = self.divertor_data[divertor_region]["weight_connection_length"] except KeyError: weight_connection_length = backup_weight self.divertor_data[divertor_region]["weight_connection_length"] = weight_connection_length # Strike point distance try: weight_strike_point_distance = self.divertor_data[divertor_region]["weight_strike_point_distance"] except KeyError: weight_strike_point_distance = backup_weight self.divertor_data[divertor_region]["weight_strike_point_distance"] = weight_strike_point_distance # Xpoint region try: weight_xpoint_region = self.divertor_data[divertor_region]["weight_xpoint_region"] except KeyError: weight_xpoint_region = backup_weight self.divertor_data[divertor_region]["weight_xpoint_region"] = weight_xpoint_region # Calculate the initial costs for this divertor region # Connection length initial_connection_length_cost = self.initial_total_connection_length_cost * weight_connection_length # Strike point distance initial_strike_point_distance_cost = self.initial_total_strike_point_distance_cost * weight_strike_point_distance # X-point region - at this stage we have not checked if an X-point region associated with this divertor is present. # If it later transpires that one is not present, this cost will be altered to zero. For now, we assume there may # be one. initial_xpoint_region_cost = self.initial_xpoint_regions_cost * weight_xpoint_region # Check if a connection length multiplication factor zero point has been defined for this divertor region try: connection_length_multiplication_factor_zero = ( self.divertor_data[divertor_region]["connection_length_multiplication_factor_zero"] ) except KeyError: self.divertor_data[divertor_region]["connection_length_multiplication_factor_zero"] = 1.1 # Lastly, the relevant Shapely object for the intended strike geometry is created. If a single # R,Z point pair is provided, then a strike point is assumed (Shapely Point created), and .if more # than one point pair is provided, a strike surface is assumed (Shapely LineString created) strike_R = self.divertor_data[divertor_region]["strike_R"] strike_Z = self.divertor_data[divertor_region]["strike_Z"] if ( (isinstance(strike_R,int) or isinstance(strike_R,float)) and (isinstance(strike_Z,int) or isinstance(strike_Z,float)) ): # Strike point strike_object = Point(strike_R,strike_Z) elif (isinstance(strike_R,list)) and (isinstance(strike_Z,list)): # Strike surface strike_object = LineString(list(zip(strike_R,strike_Z))) else: logger.warning('Unable to determine provided strike geometry for the divertor region: %s', divertor_region) # Record these data self.divertor_data[divertor_region]["trace_starting_position_R"] = trace_starting_position_R self.divertor_data[divertor_region]["trace_starting_position_Z"] = trace_starting_position_Z self.divertor_data[divertor_region]["trace_direction"] = trace_direction self.divertor_data[divertor_region]["short_label"] = short_label self.divertor_data[divertor_region]["colour"] = colour self.divertor_data[divertor_region]["initial_connection_length_cost"] = initial_connection_length_cost self.divertor_data[divertor_region]["initial_strike_point_distance_cost"] = initial_strike_point_distance_cost self.divertor_data[divertor_region]["initial_xpoint_region_cost"] = initial_xpoint_region_cost self.divertor_data[divertor_region]["strike_geometry"] = strike_object # Adjust the min/max R/Z plotting limits to have a small offset dR = self.plotting_R_max - self.plotting_R_min dZ = self.plotting_Z_max - self.plotting_Z_min offset = 0.025 self.plotting_R_min -= offset * dR self.plotting_R_max += offset * dR self.plotting_Z_min -= offset * dZ self.plotting_Z_max += offset * dZ # Each divetor region may or may not also feature a region in which an secondary divertor X-point is encouraged # to form. If such regions exists, they will be initialised later. self.divertor_data[divertor_region]["xpoint_region_present"] = False
[docs] def request_stop(self): """Request the optimisation loop to stop gracefully. The loop will terminate at the start of the next iteration. This is safe to call from any thread. """ self._stop_event.set()
[docs] def get_tracking_snapshot(self): """Return a snapshot dict of all tracking data for external consumers (e.g. GUI). Returns ------- dict A dictionary containing copies of the current tracking lists and key state needed by :func:`forge.plotting.update_tracking_plots`. """ return { "temperature": list(self.tracking_temperature), "energy_change": list(self.tracking_energy_change), "cost": list(self.tracking_cost), "cost_strike_point_distance": list(self.tracking_cost_strike_point_distance), "cost_connection_length": list(self.tracking_cost_connection_length), "cost_coil_currents": list(self.tracking_cost_coil_currents), "cost_xpoint_regions": list(self.tracking_cost_xpoint_regions), "acceptance_rate": list(self.tracking_acceptance_rate), "acceptance_prob": list(self.tracking_acceptance_prob), "alpha": list(self.tracking_alpha), "connection_length": {r: list(v) for r, v in self.tracking_connection_length.items()}, "field_lines_R": dict(self.tracking_field_lines_R), "field_lines_Z": dict(self.tracking_field_lines_Z), "incumbent_data": self.incumbent_data, "flux_map": self.psi_2D, "num_evals": self.num_evals, "constraint_points_R": self.annealing_constraint_points_R, "constraint_points_Z": self.annealing_constraint_points_Z, "buffers": self.buffers, # dict keyed by region, or None "threshold_acceptance_rate": getattr(self, "threshold_acceptance_rate", 0.0), "plotting_lims": { "R_min": self.plotting_R_min, "R_max": self.plotting_R_max, "Z_min": self.plotting_Z_min, "Z_max": self.plotting_Z_max, }, }
[docs] def generate_optimised_eq_machine( self, ): """Creates new Equilibrium and Machine objects of the final state. Creates a new forge.equilibrium.Equilibrium object for the optimised equilibrium. Also creates a new forge.machine.Machine object for the tokamak with the updated coil currents. """ # Create a copy of the exisiting machine self.optimised_tokamak = deepcopy(self.tokamak_opt) # Update the coil currents updated_coil_currents = self.incumbent_data["currents"] self.optimised_tokamak.update_currents( new_currents = updated_coil_currents, ) # Update the equilibrium data # First, get the data used to initialise the starting equilibrium updated_eq_data = deepcopy(self.eq.eq_data) # Modify the (total) poloidal magnetic flux updated_eq_data["psi_2D"] = self.incumbent_data["psi_2D"] # Create the new Equilibrium object self.optimised_eq = Equilibrium( eq_data = updated_eq_data, tokamak = self.optimised_tokamak, calculate_flux_from_coils = True, )
[docs] def separatrix_coil_flux_change(self): """Quantify the change in coil flux along the separatrix after optimisation. Evaluates the poloidal magnetic flux from the coils (psi_mach) along the initial separatrix for both the initial and optimised equilibria. If the coil flux is unchanged on the separatrix, the separatrix position is preserved and the background plasma equilibrium remains self-consistent. Returns ------- dict A dictionary with the following keys: - ``R_lcfs`` : ndarray — R coordinates of the separatrix evaluation points. - ``Z_lcfs`` : ndarray — Z coordinates of the separatrix evaluation points. - ``psi_mach_initial`` : ndarray — Coil flux along the separatrix (initial). - ``psi_mach_optimised`` : ndarray — Coil flux along the separatrix (optimised). - ``delta_psi_mach`` : ndarray — Absolute change (optimised - initial). - ``delta_psi_mach_rel`` : ndarray — Relative change (optimised - initial) / initial. - ``max_abs_change`` : float — Maximum absolute change along the separatrix. - ``max_rel_change`` : float — Maximum relative change (dimensionless). - ``mean_abs_change`` : float — Mean absolute change along the separatrix. - ``mean_rel_change`` : float — Mean relative change (dimensionless). """ R_sep = self.eq.R_lcfs Z_sep = self.eq.Z_lcfs # Evaluate coil flux along the initial separatrix psi_mach_init = self.eq.psi_machRZ(R_sep, Z_sep) psi_mach_opt = self.optimised_eq.psi_machRZ(R_sep, Z_sep) delta = psi_mach_opt - psi_mach_init # Relative change normalised by the initial coil flux at each point with np.errstate(divide="ignore", invalid="ignore"): delta_rel = np.where(psi_mach_init != 0, delta / psi_mach_init, 0.0) result = { "R_lcfs": R_sep, "Z_lcfs": Z_sep, "psi_mach_initial": psi_mach_init, "psi_mach_optimised": psi_mach_opt, "delta_psi_mach": delta, "delta_psi_mach_rel": delta_rel, "max_abs_change": float(np.max(np.abs(delta))), "max_rel_change": float(np.max(np.abs(delta_rel))), "mean_abs_change": float(np.mean(np.abs(delta))), "mean_rel_change": float(np.mean(np.abs(delta_rel))), } logger.info( "Separatrix coil flux change: " "max |Δψ_mach/ψ_mach| = %.4f%%, mean |Δψ_mach/ψ_mach| = %.4f%%.", result["max_rel_change"] * 100, result["mean_rel_change"] * 100, ) return result