Source code for opensbt.visualization.visualizer

import csv
import os
import numpy as np
import matplotlib.pyplot as plt
from pathlib import Path
from datetime import datetime
import opensbt.algorithm.classification.decision_tree.decision_tree as decision_tree
import matplotlib.patches as mpatches
from matplotlib.patches import Rectangle
from matplotlib.lines import Line2D
from matplotlib.legend_handler import HandlerPatch
from opensbt.visualization import scenario_plotter
from pymoo.indicators.igd import IGD
from pymoo.indicators.hv import Hypervolume
from pymoo.core.population import Population
from opensbt.visualization.configuration import *
from opensbt.utils.sorting import *
from opensbt.algorithm.classification.classifier import ClassificationType
from opensbt.analysis.quality_indicators.quality import Quality
from opensbt.model_ga.problem import *
from opensbt.model_ga.result import *
from typing import Dict
from opensbt.utils.duplicates import duplicate_free
import logging as log
import uuid
import shutil
from opensbt.visualization.visualization3d import visualize_3d
from opensbt.visualization import scenario_plotter
from opensbt.config import BACKUP_FOLDER, CONSIDER_HIGH_VAL_OS_PLOT,  \
                            PENALTY_MAX, PENALTY_MIN, WRITE_ALL_INDIVIDUALS, \
                                METRIC_PLOTS_FOLDER, LAST_ITERATION_ONLY_DEFAULT, \
                                OUTPUT_PRECISION

"""This module provides functions for the output and presentation of results."""

[docs] def create_save_folder(problem: Problem, results_folder: str, algorithm_name: str, is_experimental=False): problem_name = problem.problem_name # if results folder is already a valid folder, do not create it in parent, use it relative if os.path.isdir(results_folder): save_folder = results_folder elif is_experimental: save_folder = str( os.getcwd()) + results_folder + problem_name + os.sep + algorithm_name + os.sep + "temp" + os.sep else: save_folder = str( os.getcwd()) + results_folder + problem_name + os.sep + algorithm_name + os.sep + datetime.now().strftime( "%d-%m-%Y_%H-%M-%S") + os.sep if Path(save_folder).exists() and Path(save_folder).is_dir(): shutil.rmtree(save_folder) log.info(f"Old save_folder deleted.") Path(save_folder).mkdir(parents=True, exist_ok=True) log.info(f"Save_folder created: {save_folder}") return save_folder
[docs] def write_calculation_properties(res: Result, save_folder: str, algorithm_name: str, algorithm_parameters: Dict, **kwargs): problem = res.problem # algorithm_name = type(res.algorithm).__name__ is_simulation = problem.is_simulation() now = datetime.now() # current date and time date_time = now.strftime("%d-%m-%Y_%H:%M:%S") uid = str(uuid.uuid4()) with open(save_folder + 'calculation_properties.csv', 'w', encoding='UTF8', newline='') as f: write_to = csv.writer(f) header = ['Attribute', 'Value'] write_to.writerow(header) write_to.writerow(['Id', uid]) write_to.writerow(['Timestamp', date_time]) write_to.writerow(['Problem', problem.problem_name]) write_to.writerow(['Algorithm', algorithm_name]) write_to.writerow(['Search variables', problem.design_names]) write_to.writerow(['Search space', [v for v in zip(problem.xl,problem.xu)]]) if is_simulation: write_to.writerow(['Fitness function', str(problem.fitness_function.__class__.__name__)]) else: write_to.writerow(['Fitness function', "<No name available>"]) write_to.writerow(['Critical function', str(problem.critical_function.__class__.__name__)]) # write_to.writerow(['Number of maximal tree generations', str(max_tree_iterations)]) write_to.writerow(['Search time', str("%.2f" % res.exec_time + " sec")]) for item,value in algorithm_parameters.items(): write_to.writerow([item, value]) _additional_description(res, save_folder, algorithm_name, **kwargs) f.close() _calc_properties(res, save_folder, algorithm_name, **kwargs)
[docs] def _calc_properties(res, save_folder, algorithm_name, **kwargs): pass
[docs] def _additional_description(res, save_folder, algorithm_name, **kwargs): pass
[docs] def get_pop_using_mode(res: Result, mode: str): inds = Population() # print(f"mode: {mode}") if mode == "all": inds = res.obtain_archive() elif mode == "opt": inds = res.opt elif mode == "crit": all = res.obtain_archive() inds, _ = all.divide_critical_non_critical() else: print("Mode is not accepted. Accepted modes are: all, opt, crit.") return inds
'''Output of the simulation data for all solutions (for the moment only partial data)'''
[docs] def write_simulation_output(res: Result, save_folder: str, mode = "all", write_max = 100): problem = res.problem if not problem.is_simulation(): return save_folder_simout = save_folder + os.sep + "simout" + os.sep Path(save_folder_simout).mkdir(parents=True, exist_ok=True) inds = get_pop_using_mode(res=res, mode=mode)[:write_max] if len(inds) == 0: log.info("The population is empty.") return write_simout(save_folder_simout, inds)
[docs] def write_simout(path, pop): for i, _ in enumerate(pop): param_values = pop.get("X")[i] param_v_chain = "_".join("%.2f" % a for a in param_values) simout_dumped = pop.get("SO")[i].to_json() with open(path + os.sep + f'simout{f"_S{param_v_chain}" if param_v_chain is not None else ""}.json', 'w') as f: f.write(simout_dumped) simout_pop = pop.get("SO") with open(path + f'simout_criticality.csv','w', encoding='UTF8', newline='') as f: write_to = csv.writer(f) header = ['Index'] other_params = simout_pop[0].otherParams # write header for item, value in other_params.items(): if isinstance(value,float) or isinstance(value,int) or isinstance(value,bool): header.append(item) write_to.writerow(header) # write values for index in range(len(simout_pop)): row = [index] other_params = simout_pop[index].otherParams for item, value in other_params.items(): if isinstance(value,float): row.extend(["%.2f" % value]) if isinstance(value,int) or isinstance(value,bool): row.extend([value]) write_to.writerow(row) f.close()
[docs] class HandlerCircle(HandlerPatch):
[docs] def create_artists(self, legend, orig_handle, xdescent, ydescent, width, height, fontsize, trans): center = 0.5 * width - 0.5 * xdescent, 0.5 * height - 0.5 * ydescent p = mpatches.Ellipse(xy=center, width=min(width, height), height=min(width, height)) self.update_prop(p, orig_handle, legend) p.set_transform(trans) return [p]
[docs] def create_markers(): patch_not_critical_region = mpatches.Patch(color=color_not_critical, label='Not critical regions', alpha=0.05) patch_critical_region = mpatches.Patch(color=color_critical, label='Critical regions', alpha=0.05) circle_critical = mpatches.Circle((0.5, 0.5), radius=2, facecolor='none', edgecolor=color_critical, linewidth=1, label='Critical testcases') circle_not_critical = mpatches.Circle((0.5, 0.5), radius=2, facecolor='none', edgecolor=color_not_critical, linewidth=1, label='Not critical testcases') circle_optimal = mpatches.Circle((0.5, 0.5), radius=2, facecolor=color_optimal, edgecolor='none', linewidth=1, label='Optimal testcases') circle_not_optimal = mpatches.Circle((0.5, 0.5), radius=2, facecolor=color_not_optimal, edgecolor='none', linewidth=1, label='Not optimal testcases') line_pareto = Line2D([0], [0], label='Pareto front', color='blue') marker_list = [patch_not_critical_region, patch_critical_region, circle_critical, circle_not_critical, circle_optimal, circle_not_optimal, line_pareto] return marker_list
[docs] def write_summary_results(res, save_folder): all_population = res.obtain_archive() best_population = res.opt critical_best,_ = best_population.divide_critical_non_critical() critical_all,_ = all_population.divide_critical_non_critical() n_crit_all_dup_free = len(duplicate_free(critical_all)) n_all_dup_free = len(duplicate_free(all_population)) n_crit_best_dup_free = len(duplicate_free(critical_best)) n_best_dup_free = len(duplicate_free(best_population)) # write down when first critical solutions found + which fitness value it has iter_crit, inds_critical = res.get_first_critical() '''Output of summery of the performance''' with open(save_folder + 'summary_results.csv', 'w', encoding='UTF8', newline='') as f: write_to = csv.writer(f) header = ['Attribute', 'Value'] write_to.writerow(header) write_to.writerow(['Number Critical Scenarios', len(critical_all)]) write_to.writerow(['Number Critical Scenarios (duplicate free)', n_crit_all_dup_free]) write_to.writerow(['Number All Scenarios', len(all_population)]) write_to.writerow(['Number All Scenarios (duplicate free)', n_all_dup_free]) write_to.writerow(['Number Best Critical Scenarios', len(critical_best)]) write_to.writerow(['Number Best Critical Scenarios (duplicate free)', n_crit_best_dup_free]) write_to.writerow(['Number Best Scenarios', len(best_population)]) write_to.writerow(['Number Best Scenarios (duplicate free)',n_best_dup_free]) write_to.writerow(['Ratio Critical/All scenarios', '{0:.2f}'.format(len(critical_all) / len(all_population))]) write_to.writerow(['Ratio Critical/All scenarios (duplicate free)', '{0:.2f}'.format(n_crit_all_dup_free/n_all_dup_free)]) write_to.writerow(['Ratio Best Critical/Best Scenarios', '{0:.2f}'.format(len(critical_best) / len(best_population))]) write_to.writerow(['Ratio Best Critical/Best Scenarios (duplicate free)', '{0:.2f}'.format(n_crit_best_dup_free/n_best_dup_free)]) write_to.writerow(['Iteration first critical found', '{}'.format(iter_crit)]) write_to.writerow(['Fitness value of critical (first of population of interest)','{}'.format(str(inds_critical[0].get("F")) if len(inds_critical) > 0 else None) ]) write_to.writerow(['Input value of critical (first of population of interest)','{}'.format(str(inds_critical[0].get("X")) if len(inds_critical) > 0 else None) ]) f.close() log.info(['Number Critical Scenarios (duplicate free)', n_crit_all_dup_free]) log.info(['Number All Scenarios (duplicate free)', n_all_dup_free]) log.info(['Ratio Critical/All scenarios (duplicate free)', '{0:.2f}'.format(n_crit_all_dup_free/n_all_dup_free)])
[docs] def design_space(res, save_folder, classification_type=ClassificationType.DT, iteration=None): save_folder_design = save_folder + "design_space" + os.sep Path(save_folder_design).mkdir(parents=True, exist_ok=True) save_folder_plot = save_folder_design if iteration is not None: save_folder_design_iteration = save_folder_design + 'TI_' + str(iteration) + os.sep Path(save_folder_design_iteration).mkdir(parents=True, exist_ok=True) save_folder_plot = save_folder_design_iteration problem = res.problem design_names = problem.design_names n_var = problem.n_var xl = problem.xl xu = problem.xu all_population = res.obtain_archive() critical_all, _ = all_population.divide_critical_non_critical() if classification_type == ClassificationType.DT: save_folder_classification = save_folder + "classification" + os.sep Path(save_folder_classification).mkdir(parents=True, exist_ok=True) regions = decision_tree.generate_critical_regions(all_population, problem, save_folder=save_folder_classification) f = plt.figure(figsize=(12, 10)) for axis_x in range(n_var - 1): for axis_y in range(axis_x + 1, n_var): if classification_type == ClassificationType.DT: for region in regions: x_rectangle = region.xl[axis_x] y_rectangle = region.xl[axis_y] width_rectangle = region.xu[axis_x] - region.xl[axis_x] height_rectangle = region.xu[axis_y] - region.xl[axis_y] region_color = color_not_critical if region.is_critical: region_color = color_critical plt.gca().add_patch(Rectangle((x_rectangle, y_rectangle), width_rectangle, height_rectangle, edgecolor=region_color, lw=1.5, ls='-', facecolor='none', alpha=0.2)) plt.gca().add_patch(Rectangle((x_rectangle, y_rectangle), width_rectangle, height_rectangle, edgecolor='none', facecolor=region_color, alpha=0.05)) ax = plt.subplot(111) plt.title(f"{res.algorithm.__class__.__name__}\nDesign Space" + " (" + str(len(all_population)) + " testcases, " + str(len(critical_all)) + " of which are critical)") if classification_type == ClassificationType.DT: critical, not_critical = all_population.divide_critical_non_critical() if len(not_critical) != 0: ax.scatter(not_critical.get("X")[:, axis_x], not_critical.get("X")[:, axis_y], s=40, facecolors=color_not_optimal, edgecolors=color_not_critical, marker='o') if len(critical) != 0: ax.scatter(critical.get("X")[:, axis_x], critical.get("X")[:, axis_y], s=40, facecolors=color_not_optimal, edgecolors=color_critical, marker='o') opt = get_nondominated_population(all_population) critical_opt, not_critical_opt = opt.divide_critical_non_critical() if len(critical_opt) != 0: ax.scatter(critical_opt.get("X")[:, axis_x], critical_opt.get("X")[:, axis_y], s=40, facecolors=color_optimal, edgecolors=color_critical, marker='o') if len(not_critical_opt) != 0: ax.scatter(not_critical_opt.get("X")[:, axis_x], not_critical_opt.get("X")[:, axis_y], s=40, facecolors=color_optimal, edgecolors=color_not_critical, marker='o') eta_x = (xu[axis_x] - xl[axis_x]) / 10 eta_y = (xu[axis_y] - xl[axis_y]) / 10 plt.xlim(xl[axis_x] - eta_x, xu[axis_x] + eta_x) plt.ylim(xl[axis_y] - eta_y, xu[axis_y] + eta_y) plt.xlabel(design_names[axis_x]) plt.ylabel(design_names[axis_y]) box = ax.get_position() ax.set_position([box.x0, box.y0, box.width * 0.8, box.height]) ax.set_aspect(1.0 / ax.get_data_ratio(), adjustable='box') marker_list = create_markers() markers = marker_list[:-1] plt.legend(handles=markers, loc='center left', bbox_to_anchor=(1, 0.5), handler_map={mpatches.Circle: HandlerCircle()}) plt.savefig(save_folder_plot + design_names[axis_x] + '_' + design_names[axis_y] + '.png') plt.savefig(save_folder_plot + design_names[axis_x] + '_' + design_names[axis_y] + '.pdf', format="pdf") plt.clf() # output 3d plots if n_var == 3: visualize_3d(all_population, save_folder_design, design_names, mode="critical", markersize=20, do_save=True) plt.close(f)
[docs] def backup_problem(res,save_folder): save_folder_problem = save_folder + BACKUP_FOLDER Path(save_folder_problem).mkdir(parents=True, exist_ok=True) import dill with open(save_folder_problem + os.sep + "problem", "wb") as f: dill.dump(res.problem, f)
[docs] def objective_space(res, save_folder, iteration=None, show=False, last_iteration=LAST_ITERATION_ONLY_DEFAULT): save_folder_objective = save_folder + "objective_space" + os.sep Path(save_folder_objective).mkdir(parents=True, exist_ok=True) save_folder_plot = save_folder_objective if iteration is not None: save_folder_iteration = save_folder_objective + 'TI_' + str(iteration) + os.sep Path(save_folder_iteration).mkdir(parents=True, exist_ok=True) save_folder_plot = save_folder_iteration hist = res.history problem = res.problem pf = problem.pareto_front() n_obj = problem.n_obj objective_names = problem.objective_names if n_obj == 1: plot_single_objective_space(result=res, save_folder_plot=save_folder_plot, objective_names=objective_names, show=show, pf=pf) else: plot_multi_objective_space(res, n_obj, save_folder_objective, objective_names, show, pf, last_iteration) # output 3d plots if n_obj == 3: all_population = res.obtain_archive() visualize_3d(all_population, save_folder_objective, objective_names, mode="critical", markersize=20, do_save=True, dimension="F", angles=[(45,-45),(45,45),(45,135)], show=show) log.info(f"Objective Space: {save_folder_plot}")
[docs] def plot_multi_objective_space(res, n_obj, save_folder_objective, objective_names, show, pf, last_iteration): all_population = Population() # n_evals_all = 0 for i, generation in enumerate(res.history): # TODO first generation has somehow archive size of 0 # all_population = generation.archive #Population.merge(all_population, generation.pop) n_eval = generation.evaluator.n_eval # TODO assure that every algorithm stores in n_eval the number of evaluations SO FAR performed!! # n_evals_all += n_eval # print(f"[visualizer] n_eval: {n_eval}") all_population = res.archive[0:n_eval] #assert len(all_population) == n_eval, f"{len(all_population)} != {n_eval}" # all_population = res.obtain_all_population() # print(f"[visualizer] len(gen archive): {len(all_population)}") critical_all, _ = all_population.divide_critical_non_critical() # plot only last iteration if requested if last_iteration and i < len(res.history) -1 : continue save_folder_iteration = save_folder_objective + f"iteration_{i}" + os.sep Path(save_folder_iteration).mkdir(parents=True, exist_ok=True) save_folder_plot = save_folder_iteration f = plt.figure(figsize=(12, 10)) for axis_x in range(n_obj - 1): for axis_y in range(axis_x + 1, n_obj): ax = plt.subplot(111) plt.title(f"{res.algorithm.__class__.__name__}\nObjective Space" + " (" + str(len(all_population)) + " testcases, " + str(len(critical_all)) + " of which are critical)") if True: #classification_type == ClassificationType.DT: critical, not_critical = all_population.divide_critical_non_critical() critical_clean = duplicate_free(critical) not_critical_clean = duplicate_free(not_critical) if len(not_critical_clean) != 0: ax.scatter(not_critical_clean.get("F")[:, axis_x], not_critical_clean.get("F")[:, axis_y], s=40, facecolors=color_not_optimal, edgecolors=color_not_critical, marker='o') if len(critical_clean) != 0: ax.scatter(critical_clean.get("F")[:, axis_x], critical_clean.get("F")[:, axis_y], s=40, facecolors=color_not_optimal, edgecolors=color_critical, marker='o') if pf is not None: ax.plot(pf[:, axis_x], pf[:, axis_y], color='blue', lw=0.7, zorder=1) if True: #classification_type == ClassificationType.DT: optimal_pop = get_nondominated_population(all_population) critical, not_critical = optimal_pop.divide_critical_non_critical() critical_clean = duplicate_free(critical) not_critical_clean = duplicate_free(not_critical) if len(not_critical_clean) != 0: ax.scatter(not_critical_clean.get("F")[:, axis_x], not_critical_clean.get("F")[:, axis_y], s=40, facecolors=color_optimal, edgecolors=color_not_critical, marker='o') if len(critical_clean) != 0: ax.scatter(critical_clean.get("F")[:, axis_x], critical_clean.get("F")[:, axis_y], s=40, facecolors=color_optimal, edgecolors=color_critical, marker='o') #limit axes bounds, since we do not want to show fitness values as 1000 or int.max, # that assign bad quality to worse scenarios if CONSIDER_HIGH_VAL_OS_PLOT: pop_f_x = all_population.get("F")[:,axis_x] clean_pop_x = np.delete(pop_f_x, np.where(pop_f_x == PENALTY_MAX)) max_x_f_ind = max(clean_pop_x) clean_pop_x = np.delete(pop_f_x, np.where(pop_f_x == PENALTY_MIN)) min_x_f_ind = min(clean_pop_x) pop_f_y = all_population.get("F")[:,axis_y] clean_pop_y = np.delete(pop_f_y, np.where(pop_f_y == PENALTY_MAX)) max_y_f_ind = max(clean_pop_y) clean_pop_y = np.delete(pop_f_y, np.where(pop_f_y == PENALTY_MIN)) min_y_f_ind = min(clean_pop_y) eta_x = abs(max_x_f_ind - min_x_f_ind) / 10 eta_y = abs(max_y_f_ind- min_y_f_ind) / 10 plt.xlim(min_x_f_ind - eta_x, max_x_f_ind + eta_x) plt.ylim(min_y_f_ind - eta_y, max_y_f_ind + eta_y) plt.xlabel(objective_names[axis_x]) plt.ylabel(objective_names[axis_y]) box = ax.get_position() ax.set_position([box.x0, box.y0, box.width * 0.8, box.height]) marker_list = create_markers() if pf is not None: markers = marker_list[2:] else: markers = marker_list[2:-1] plt.legend(handles=markers, loc='center left', bbox_to_anchor=(1, 0.5), handler_map={mpatches.Circle: HandlerCircle()}) if show: plt.show() plt.savefig(save_folder_plot + objective_names[axis_x] + '_' + objective_names[axis_y] + '.png') plt.savefig(save_folder_plot + objective_names[axis_x] + '_' + objective_names[axis_y] + '.pdf', format='pdf') plt.clf() plt.close(f)
[docs] def plot_single_objective_space(result, save_folder_plot, objective_names, show, pf): res = result problem = res.problem x_axis_width = 10 plt.figure(figsize=(x_axis_width,6)) plt.axis('auto') # ax = plt.subplot(111) # ax.axis('auto') fig = plt.gcf() # Set the figure size to stretch the x-axis physically fig.set_size_inches(x_axis_width, fig.get_figheight(), forward=True) all_population = res.obtain_archive() critical, _ = all_population.divide_critical_non_critical() plt.title(f"{res.algorithm.__class__.__name__}\nObjective Space | {problem.problem_name}" + \ " (" + str(len(all_population)) + " testcases, " + \ str(len(critical)) + " of which are critical)") n_evals_all = 0 # we plot the fitness over time as it is only one value for each iteration for i, generation in enumerate(res.history): n_eval = generation.evaluator.n_eval n_evals_all += n_eval # print(f"[visualizer] n_eval: {n_eval}") all_population = res.archive[0:n_evals_all] axis_y = 0 critical, not_critical = all_population.divide_critical_non_critical() critical_clean = duplicate_free(critical) not_critical_clean = duplicate_free(not_critical) if len(not_critical_clean) != 0: plt.scatter([i]*len(not_critical_clean), not_critical_clean.get("F")[:, axis_y], s=40, facecolors=color_not_optimal, edgecolors=color_not_critical, marker='o') if len(critical_clean) != 0: plt.scatter([i]*len(critical_clean), critical_clean.get("F")[:, axis_y], s=40, facecolors=color_not_optimal, edgecolors=color_critical, marker='o') optimal_pop = get_nondominated_population(all_population) critical, not_critical = optimal_pop.divide_critical_non_critical() critical_clean = duplicate_free(critical) not_critical_clean = duplicate_free(not_critical) if len(not_critical_clean) != 0: plt.scatter([i]*len(not_critical_clean), not_critical_clean.get("F")[:, axis_y], s=40, facecolors=color_optimal, edgecolors=color_not_critical, marker='o') if len(critical_clean) != 0: plt.scatter([i]*len(critical_clean), critical_clean.get("F")[:, axis_y], s=40, facecolors=color_optimal, edgecolors=color_critical, marker='o') # box = fig.get_position() # fig.set_position([box.x0, box.y0, box.width * 0.8, box.height]) marker_list = create_markers() if pf is not None: markers = marker_list[2:] else: markers = marker_list[2:-1] plt.xlabel("Iteration") plt.ylabel(problem.objective_names[0]) plt.legend(handles=markers, #loc='center left', loc='best', bbox_to_anchor=(1, 0.5), handler_map={mpatches.Circle: HandlerCircle()}) if show: plt.show() plt.savefig(save_folder_plot + objective_names[0] + '_iterations.png') plt.clf() pass
[docs] def optimal_individuals(res, save_folder): """Output of optimal individuals (duplicate free)""" problem = res.problem design_names = problem.design_names objective_names = problem.objective_names with open(save_folder + 'optimal_testcases.csv', 'w', encoding='UTF8', newline='') as f: write_to = csv.writer(f) header = ['Index'] for i in range(problem.n_var): header.append(design_names[i]) for i in range(problem.n_obj): header.append(f"Fitness_"+ objective_names[i]) # column to indicate wheter individual is critical or not header.append(f"Critical") write_to.writerow(header) clean_pop = duplicate_free(res.opt) for index in range(len(clean_pop)): row = [index] row.extend([f"%.{OUTPUT_PRECISION}f" % X_i for X_i in clean_pop.get("X")[index]]) row.extend([f"%.{OUTPUT_PRECISION}f" % F_i for F_i in clean_pop.get("F")[index]]) row.extend(["%i" % clean_pop.get("CB")[index]]) write_to.writerow(row) f.close()
[docs] def all_individuals(res, save_folder): """Output of all evaluated individuals""" problem = res.problem hist = res.history design_names = problem.design_names objective_names = problem.objective_names with open(save_folder + 'all_testcases.csv', 'w', encoding='UTF8', newline='') as f: write_to = csv.writer(f) header = ['Index'] for i in range(problem.n_var): header.append(design_names[i]) for i in range(problem.n_obj): header.append(f"Fitness_{objective_names[i]}") # column to indicate wheter individual is critical or not header.append(f"Critical") write_to.writerow(header) index = 0 all_individuals = res.obtain_archive() for index, ind in enumerate(all_individuals): row = [index] row.extend([f"%.{OUTPUT_PRECISION}f" % X_i for X_i in ind.get("X")]) row.extend([f"%.{OUTPUT_PRECISION}f" % F_i for F_i in ind.get("F")]) row.extend(["%i" % ind.get("CB")]) write_to.writerow(row) f.close()
[docs] def all_critical_individuals(res, save_folder): """Output of all critical individuals""" problem = res.problem hist = res.history # TODO check why when iterating over the algo in the history set is different design_names = problem.design_names objective_names = problem.objective_names all = res.obtain_archive() critical = all.divide_critical_non_critical()[0] with open(save_folder + 'all_critical_testcases.csv', 'w', encoding='UTF8', newline='') as f: write_to = csv.writer(f) header = ['Index'] for i in range(problem.n_var): header.append(design_names[i]) for i in range(problem.n_obj): header.append(f"Fitness_"+ objective_names[i]) write_to.writerow(header) index = 0 # for algo in hist: for i in range(len(critical)): row = [index] row.extend([f"%.{OUTPUT_PRECISION}f" % X_i for X_i in critical.get("X")[i]]) row.extend([f"%.{OUTPUT_PRECISION}f" % F_i for F_i in critical.get("F")[i]]) write_to.writerow(row) index += 1 f.close()
[docs] def simulations(res, save_folder, mode="all", write_max = 100): '''Visualization of the results of simulations''' problem = res.problem is_simulation = problem.is_simulation() if is_simulation: save_folder_gif = save_folder + "gif" + os.sep Path(save_folder_gif).mkdir(parents=True, exist_ok=True) pop = get_pop_using_mode(res=res, mode=mode)[:write_max] log.info("Writing 2D scenario visualization in .gif ...") for index, simout in enumerate(pop.get("SO")): param_values = pop.get("X")[index] param_v_chain = "_".join("%.2f" % a for a in param_values) file_name = str(param_v_chain) + str("_trajectory") scenario_plotter.plot_scenario_gif(param_values, simout, save_folder_gif, file_name) else: log.info("No simulation visualization available. The experiment is not a simulation.")
''' Write down the population for each generation'''
[docs] def write_generations(res, save_folder): save_folder_history = save_folder + "generations" + os.sep Path(save_folder_history).mkdir(parents=True, exist_ok=True) problem = res.problem hist = res.history design_names = problem.design_names objective_names = problem.objective_names for i, algo in enumerate(hist): with open(save_folder_history + f'gen_{i+1}.csv', 'w', encoding='UTF8', newline='') as f: write_to = csv.writer(f) header = ['Index'] for i in range(problem.n_var): header.append(design_names[i]) for i in range(problem.n_obj): header.append(f"Fitness_{objective_names[i]}") # column to indicate wheter individual is critical or not header.append(f"Critical") write_to.writerow(header) index = 0 for i in range(len(algo.pop)): row = [index] row.extend([f"%.{OUTPUT_PRECISION}f" % X_i for X_i in algo.pop.get("X")[i]]) row.extend([f"%.{OUTPUT_PRECISION}f" % F_i for F_i in algo.pop.get("F")[i]]) row.extend(["%i" % algo.pop.get("CB")[i]]) write_to.writerow(row) index += 1 f.close()
[docs] def write_pf_individuals(save_folder, pf_pop): """Output of pf individuals (duplicate free)""" n_var = len(pf_pop.get("X")[0]) n_obj= len(pf_pop.get("F")[0]) # We dont have the design, objective names design_names = [f"X_{i}" for i in range(n_var)] objective_names = [f"Fitness_{i}" for i in range(n_obj)] with open(save_folder + 'estimated_pf.csv', 'w', encoding='UTF8', newline='') as f: write_to = csv.writer(f) header = ['Index'] for i in range(n_var): header.append(design_names[i]) for i in range(n_obj): header.append(f"Fitness_"+ objective_names[i]) # column to indicate wheter individual is critical or not header.append(f"Critical") write_to.writerow(header) for index in range(len(pf_pop)): row = [index] row.extend([f"%.{OUTPUT_PRECISION}f" % X_i for X_i in pf_pop.get("X")[index]]) row.extend([f"%.{OUTPUT_PRECISION}f" % F_i for F_i in pf_pop.get("F")[index]]) row.extend(["%i" % pf_pop.get("CB")[index]]) write_to.writerow(row) f.close()
[docs] def plot_timeseries_basic(res, save_folder, mode="crit", write_max=100): plot_timeseries(res, save_folder, mode, type="X", max=write_max) plot_timeseries(res, save_folder, mode, type="Y",max=write_max) plot_timeseries(res, save_folder, mode, type="V",max=write_max)
[docs] def plot_timeseries(res, save_folder, mode="crit", type="X", max="100"): problem = res.problem is_simulation = problem.is_simulation() if is_simulation: inds = get_pop_using_mode(res=res, mode=mode) clean_pop = duplicate_free(inds)[:max] if len(clean_pop) == 0: return actors = list((clean_pop.get("SO")[0].location).keys()) save_folder_gif = save_folder + os.sep + "trace_comparison" Path(save_folder_gif).mkdir(parents=True, exist_ok=True) for index in range(len(clean_pop)): param_v_chain = "_".join("%.2f" % a for a in clean_pop.get("X")[index]) f = plt.figure(figsize=(12,10)) cmap = plt.get_cmap('gnuplot') colors = [cmap(i) for i in np.linspace(0, 1, len(actors))] simout = clean_pop.get("SO")[index] times = simout.times param_values = clean_pop.get("X")[index] param_v_chain = "_".join("%.2f" % a for a in param_values) for actor_ind, actor in enumerate(actors): if type.lower() == "x": plt.plot(times, [v[0] for v in simout.location[actor]], label=actor, color=colors[actor_ind]) elif type.lower() == "y": plt.plot(times, [v[1] for v in simout.location[actor]], color=colors[actor_ind], label=actor) elif type.lower() == "v": plt.plot(times, [v for v in simout.speed[actor]], color=colors[actor_ind], label=actor) elif type.lower() in simout.otherParams: plt.plot(times, [v for v in simout.otherParams[type.lower()]], color=colors[actor_ind], label=actor) else: print("Type is unknown") return plt.xlabel('Timestep') plt.ylabel(f'{type.upper()}') plt.title(f'{type.upper()} Traces') plt.legend() plt.savefig(save_folder_gif + os.sep + f"{type.upper()}_trace_{param_v_chain}.png", format="png") plt.clf() plt.close(f) else: print("No simulation visualization available. The experiment is not a simulation.")