import pymoo
import random
import time
import logging as log
from opensbt.model_ga.individual import IndividualSimulated
pymoo.core.individual.Individual = IndividualSimulated
from opensbt.model_ga.population import PopulationExtended
pymoo.core.population.Population = PopulationExtended
from opensbt.model_ga.result import SimulationResult
pymoo.core.result.Result = SimulationResult
from opensbt.model_ga.problem import SimulationProblem
pymoo.core.problem.Problem = SimulationProblem
from pymoo.core.algorithm import Algorithm
from opensbt.algorithm.optimizer import Optimizer
from pymoo.core.problem import Problem
from pymoo.operators.sampling.rnd import FloatRandomSampling
from opensbt.experiment.search_configuration import SearchConfiguration
from opensbt.utils.evaluation import evaluate_individuals
from opensbt.utils.sorting import get_nondominated_population
[docs]
class PureSampling(Optimizer):
"""
This class provides the parent class for all sampling based search algorithms.
"""
algorithm_name = "RS"
[docs]
def __init__(self,
problem: Problem,
config: SearchConfiguration,
sampling_type = FloatRandomSampling):
"""Initializes pure sampling approaches.
:param problem: The testing problem to be solved.
:type problem: Problem
:param config: The configuration for the search.
:type config: SearchConfiguration
:param sampling_type: Sets by default sampling type to RS.
:type sampling_type: _type_, optional
"""
self.config = config
self.problem = problem
self.res = None
self.sampling_type = sampling_type
self.sample_size = config.population_size
self.n_splits = 10 # divide the population by this size
# to make the algorithm iterative for further analysis
self.parameters = {
"sample_size" : self.sample_size
}
log.info(f"Initialized algorithm with config: {config.__dict__}")
[docs]
def run(self) -> SimulationResult:
"""Overrides the run method of Optimizer by providing custom evaluation of samples and division in "buckets" for further analysis with pymoo.
(s. n_splits variable)
:return: Return a SimulationResults object which holds all information from the simulation.
:rtype: SimulationResult
"""
config = self.config
random.seed(config.seed)
problem = self.problem
sample_size = self.sample_size
sampled = self.sampling_type()(problem,sample_size)
n_splits = self.n_splits
start_time = time.time()
pop = evaluate_individuals(sampled, problem)
execution_time = time.time() - start_time
# create result object
self.res = self._create_result(problem, pop, execution_time, n_splits)
return self.res
[docs]
def _create_result(self, problem, pop, execution_time, n_splits):
res_holder = SimulationResult()
res_holder.algorithm = Algorithm()
res_holder.algorithm.pop = pop
res_holder.algorithm.archive = pop
res_holder.algorithm.evaluator.n_eval = len(pop)
res_holder.problem = problem
res_holder.algorithm.problem = problem
res_holder.exec_time = execution_time
res_holder.opt = get_nondominated_population(pop)
res_holder.algorithm.opt = res_holder.opt
res_holder.archive = pop
res_holder.history = [] # history is the same instance
n_bucket = len(pop) // n_splits
pop_sofar = 0
for i in range(0,n_splits):
algo = Algorithm()
algo.pop = pop[(i*n_bucket):min((i+1)*n_bucket,len(pop))]
algo.archive = pop[:min((i+1)*n_bucket,len(pop))]
pop_sofar += len(algo.pop)
algo.evaluator.n_eval = pop_sofar
algo.opt = get_nondominated_population(algo.pop)
res_holder.history.append(algo)
return res_holder