Source code for opensbt.model_ga.problem


import numpy as np
import pymoo.gradient.toolbox as anp
from pymoo.util.misc import at_least_2d_array
from opensbt.simulation.simulator import SimulationOutput

from pymoo.core.problem import Problem
from pymoo.core.problem import ElementwiseEvaluationFunction
from pymoo.core.problem import LoopedElementwiseEvaluation

[docs] class SimulationProblem(Problem): """ This class extends the pymoo's Problem class to integrate the handling of simulation output-related data. """
[docs] def __init__(self, n_var, n_obj=1, n_ieq_constr=0, n_eq_constr=0, xl=None, xu=None, vtype=None, vars=None, elementwise=False, elementwise_func=ElementwiseEvaluationFunction, elementwise_runner=LoopedElementwiseEvaluation(), replace_nan_values_by=None, exclude_from_serialization=None, callback=None, strict=True, **kwargs): """ Parameters ---------- n_var : int Number of Variables n_obj : int Number of Objectives n_ieq_constr : int Number of Inequality Constraints n_eq_constr : int Number of Equality Constraints xl : np.array, float, int Lower bounds for the variables. if integer all lower bounds are equal. xu : np.array, float, int Upper bounds for the variable. if integer all upper bounds are equal. vtype : type The variable type. So far, just used as a type hint. """ # if variables are provided directly if vars is not None: n_var = len(vars) # number of variable self.n_var = n_var # number of objectives self.n_obj = n_obj # number of inequality constraints self.n_ieq_constr = n_ieq_constr if "n_constr" not in kwargs else max(n_ieq_constr, kwargs["n_constr"]) # number of equality constraints self.n_eq_constr = n_eq_constr # type of the variable to be evaluated self.data = dict(**kwargs) # the lower bounds, make sure it is a numpy array with the length of n_var self.xl, self.xu = xl, xu # a callback function to be called after every evaluation self.callback = callback # if the variables are provided in their explicit form self.vars = vars # the variable type (only as a type hint at this point) self.vtype = vtype # the functions used if elementwise is enabled self.elementwise = elementwise self.elementwise_func = elementwise_func self.elementwise_runner = elementwise_runner # whether the shapes are checked strictly self.strict = strict # if it is a problem with an actual number of variables - make sure xl and xu are numpy arrays if n_var > 0: if self.xl is not None: if not isinstance(self.xl, np.ndarray): self.xl = np.ones(n_var) * xl self.xl = self.xl.astype(float) if self.xu is not None: if not isinstance(self.xu, np.ndarray): self.xu = np.ones(n_var) * xu self.xu = self.xu.astype(float) # this defines if NaN values should be replaced or not self.replace_nan_values_by = replace_nan_values_by # attribute which are excluded from being serialized self.exclude_from_serialization = exclude_from_serialization
[docs] def _evaluate_elementwise(self, X, out, *args, **kwargs): # create the function that evaluates a single individual f = self.elementwise_func(self, args, kwargs) # execute the runner elems = self.elementwise_runner(f, X) # for each evaluation call for elem in elems: # for each key stored for this evaluation for k, v in elem.items(): # if the element does not exist in out yet -> create it if out.get(k, None) is None: out[k] = [] out[k].append(v) # convert to arrays (the none check is important because otherwise an empty array is initialized) for k in out: if out[k] is not None: if not isinstance(out[k][-1], SimulationOutput): out[k] = anp.array(out[k])
[docs] def evaluate(self, X, *args, return_values_of=None, return_as_dictionary=False, **kwargs): if return_values_of is None: return_values_of = ["F"] if self.n_ieq_constr > 0: return_values_of.append("G") if self.n_eq_constr > 0: return_values_of.append("H") # make sure the array is at least 2d. store if reshaping was necessary if isinstance(X, np.ndarray) and X.dtype != object: X, only_single_value = at_least_2d_array(X, extend_as="row", return_if_reshaped=True) assert X.shape[1] == self.n_var, f'Input dimension {X.shape[1]} are not equal to n_var {self.n_var}!' else: only_single_value = not (isinstance(X, list) or isinstance(X, np.ndarray)) # this is where the actual evaluation takes place _out = self.do(X, return_values_of, *args, **kwargs) out = {} for k, v in _out.items(): # copy it to a numpy array (it might be one of jax at this point) v = np.array(v) # in case the input had only one dimension, then remove always the first dimension from each output if only_single_value: v = v[0] # if the NaN values should be replaced if self.replace_nan_values_by is not None: v[np.isnan(v)] = self.replace_nan_values_by out[k] = v # try: # out[k] = v.astype(np.float64) # except: # out[k] = v if self.callback is not None: self.callback(X, out) # now depending on what should be returned prepare the output if return_as_dictionary: return out if len(return_values_of) == 1: return out[return_values_of[0]] else: return tuple([out[e] for e in return_values_of])
[docs] def pareto_front_n_points(self, n_points=1000): if self.pareto_front is not None: pf = self.pareto_front(n_points) else: pf = None print("No analytical solution provided for the given problem.") return pf