Source code for opensbt.problem.adas_problem

from dataclasses import dataclass
from typing import Dict, List
from pymoo.core.problem import Problem
import numpy as np
from opensbt.evaluation.critical import Critical
from opensbt.evaluation.fitness import Fitness
import logging as log

[docs] @dataclass class ADASProblem(Problem): """ Basic problem class for ADAS problems """
[docs] def __init__(self, xl: List[float], xu: List[float], scenario_path: str, fitness_function: Fitness, simulate_function, critical_function: Critical, simulation_variables: List[str], simulation_time: float = 10, sampling_time: float = 100, design_names: List[str] = None, objective_names: List[str] = None, problem_name: str = None, other_parameters: Dict = None, approx_eval_time: float = None, do_visualize: bool = False): """Generates a simulation-based testing problem. :param xl: Lower bound for the search domain. :type xl: List[float] :param xu: Upper bound for the search domain. :type xu: List[float] :param scenario_path: The path to the scenario to be simulated. :type scenario_path: str :param fitness_function: The instance of the fitness function to evaluate simulations. :type fitness_function: Fitness :param simulate_function: The pointer to the simulate function of the simulator. :type simulate_function: _type_ :param critical_function: The instance of the oracle function to assign pass fail verdicts. :type critical_function: Critical :param simulation_variables: The name of the simulation variables to alter for test generation. :type simulation_variables: List[float] :param simulation_time: The simulation time for the test execution, defaults to 10 :type simulation_time: float, optional :param sampling_time: The sampling time for the simulator, defaults to 100 :type sampling_time: float, optional :param design_names: The name for the search variable to use in the output. If none is said, simulation_variables is used, defaults to None :type design_names: List[str], optional :param objective_names: The name of the objectives to use in the output, defaults to None :type objective_names: List[str], optional :param problem_name: The name of the problem, defaults to None :type problem_name: str, optional :param other_parameters: Other necessary parameters, defaults to None :type other_parameters: Dict, optional :param approx_eval_time: The approximate evaluation time for one test execution, defaults to None :type approx_eval_time: float, optional :param do_visualize: Visualize test execution or not, defaults to False :type do_visualize: bool, optional :raises ValueError: Raises ValueError if min_or_max of fitness function is not correctly configured. """ super().__init__(n_var=len(xl), n_obj=len(fitness_function.name), xl=xl, xu=xu) assert xl is not None assert xu is not None assert scenario_path is not None assert fitness_function is not None assert simulate_function is not None assert simulation_time is not None assert sampling_time is not None assert np.equal(len(xl), len(xu)) assert np.less_equal(xl, xu).all() assert len(fitness_function.min_or_max) == len(fitness_function.name) self.fitness_function = fitness_function self.simulate_function = simulate_function self.critical_function = critical_function self.simulation_time = simulation_time self.sampling_time = sampling_time self.simulation_variables = simulation_variables self.do_visualize = do_visualize if design_names is not None: self.design_names = design_names else: self.design_names = simulation_variables if objective_names is not None: self.objective_names = objective_names else: self.objective_names = fitness_function.name self.scenario_path = scenario_path self.problem_name = problem_name if other_parameters is not None: self.other_parameters = other_parameters if approx_eval_time is not None: self.approx_eval_time = approx_eval_time self.signs = [] for value in self.fitness_function.min_or_max: if value == 'max': self.signs.append(-1) elif value == 'min': self.signs.append(1) else: raise ValueError( "Error: The optimization property " + str(value) + " is not supported.") self.counter = 0
[docs] def _evaluate(self, x, out, *args, **kwargs): self.counter = self.counter + 1 log.info(f"Running evaluation number {self.counter}") try: # it is assumed that the i-th simout in the simout list correponds to i-th scenario in the list x simout_list = self.simulate_function(x, self.simulation_variables, self.scenario_path, sim_time=self.simulation_time, time_step=self.sampling_time, do_visualize=self.do_visualize) except Exception as e: log.info("Exception during simulation ocurred: ") # TODO handle exception, terminate, so that results are stored raise e out["SO"] = [] vector_list = [] label_list = [] for simout in simout_list: out["SO"].append(simout) vector_fitness = np.asarray( self.signs) * np.array(self.fitness_function.eval(simout, **kwargs)) vector_list.append(np.array(vector_fitness)) label_list.append(self.critical_function.eval(vector_fitness, simout = simout)) out["F"] = np.vstack(vector_list) out["CB"] = label_list
[docs] def is_simulation(self): return True