Source code for opensbt.visualization.combined

import traceback
import numpy as np
import os
import csv
from matplotlib import pyplot as plt
import pandas as pd
from opensbt.model_ga.individual import Individual
from matplotlib.ticker import (AutoMinorLocator)
from opensbt.model_ga.population import Population
from opensbt.utils.sorting import get_nondominated_population
from opensbt.visualization.visualizer import *
from opensbt.analysis.quality_indicators.quality import EvaluationResult
from matplotlib import pyplot as plt
import scipy
import matplotlib
from opensbt.utils.duplicates import duplicate_free
import logging as log
from opensbt.analysis.statistics import wilcoxon
from opensbt.analysis.quality_indicators.metrics import ncrit

from opensbt.config import BACKUP_FOLDER, N_CELLS

"""This module provides functions for the analysis of completed runs with different search approaches.
"""
[docs] def calculate_combined_crit_pop(run_paths): """Unions critical solutions from all runs to approximate "real" critical design space. """ len(f"run_paths: {run_paths}") crit_pop = Population() for run_path in run_paths: crit_run = read_pf_single( run_path + os.sep + "all_critical_testcases.csv") crit_pop = Population.merge(crit_pop, crit_run) # assign critical label for the visualization for i in range(0, len(crit_pop)): crit_pop[i].set("CB", True) return crit_pop
[docs] def calculate_combined_pf(run_paths, critical_only=False): # TODO add information on the deviation of the values wrt. to differen runs in the plots pf_pop = Population() for run_path in run_paths: pf_run = read_pf_single(run_path + os.sep + "optimal_testcases.csv") pf_pop = Population.merge(pf_pop, pf_run) # log.info(f"len: {len(pf_pop)}") pf_pop = get_nondominated_population(pf_pop) if critical_only: crit_inds = np.where(pf_pop.get("CB"))[0] pf_pop = pf_pop[crit_inds] inds = [ind.get("F").tolist() for ind in pf_pop] pf = np.array(inds, dtype=float) # log.info(f"pf: {pf}") return pf, pf_pop
[docs] def plot_combined_analysis_last_min_max(metric_name, run_paths_array, save_folder): """ Outputs mean/std/min/max for final metric value instead for several number of evaluations as in plot_combined_analysis """ plot_array = [] for key, (algo, run_paths) in enumerate(run_paths_array.items()): values_all = [] y_stacked = np.zeros((len(run_paths), 1)) f = plt.figure(figsize=(7, 5)) for key_run, run_path in enumerate(run_paths): eval_res = EvaluationResult.load( run_path + os.sep + BACKUP_FOLDER, metric_name) n_evals, hv = eval_res.steps, eval_res.values values_all.append(hv[-1]) plt.plot(n_evals, hv, marker='.', linestyle='--', label='run ' + str(key_run + 1)) max_value = max(values_all) min_value = min(values_all) max_value_runs = [max_value] min_value_runs = [min_value] def std_dev(y_values): y_mean = np.sum(y_values, axis=0) / \ np.count_nonzero(y_values, axis=0) y_error = [] for i in range(len(y_mean)): square_sum = 0 n = 0 for j in range(len(run_paths)): if np.all(y_values[j, i]): n += 1 deviation = y_values[j, i] - y_mean[i] square_sum += deviation ** 2 if n <= 1: variance = 0 else: variance = square_sum / (n - 1) standart_deviation = np.sqrt(variance) y_error.append(standart_deviation / np.sqrt(n)) return y_error, y_mean y_stacked[:, 0] = values_all y_error, y_mean = std_dev(y_stacked) x_plot = n_evals # TODO make labels RS and NSGA-II plt.plot(x_plot, y_mean[0:len(x_plot)], color='black', marker='o', lw=2, label='combined') plt.errorbar(x_plot, y_mean[0:len(x_plot)], y_error[0:len(x_plot)], fmt='.k', capsize=5) plt.legend(loc='best') Path(save_folder).mkdir(parents=True, exist_ok=True) plt.title(f"{metric_name.upper()} Analysis ({algo})") plt.xlabel("Evaluations") plt.ylabel(f"{metric_name.upper()}") plt.savefig( save_folder + f'{metric_name}_combined_single' + str(algo) + '.png') plt.clf() plt.close(f) plot_array.append([x_plot, y_mean[0:len(x_plot)], y_error[0:len( x_plot)], min_value_runs, max_value_runs]) return plot_array
[docs] def write_last_metric_values(metric_name_load, run_paths_array, save_folder, metric_name_label=None): values_algo = {} algos = list(run_paths_array.keys()) for key, (algo, run_paths) in enumerate(run_paths_array.items()): values_algo[algo] = [] for run_path in run_paths: eval_res = EvaluationResult.load( run_path + os.sep + BACKUP_FOLDER, metric_name_load) _, v = eval_res.steps, eval_res.values values_algo[algo].append(v[-1]) Path(save_folder).mkdir(parents=True, exist_ok=True) with open(save_folder + f'overview_{metric_name_label}.csv', 'w', encoding='UTF8', newline='') as f: write_to = csv.writer(f) header = ['run'] for algo in algos: header.append(algo) write_to.writerow(header) for i in range(0, len(values_algo[algo])): line = [f'{i+1}'] for algo in algos: line.append(values_algo[algo][i]) write_to.writerow(line) f.close()
[docs] def plot_combined_analysis(metric_name_load, run_paths_array, save_folder, n_func_evals_lim, n_fitting_points, metric_name_label=None, step_chkp=None, error_mean=False): plot_array = [] for key, (algo, run_paths) in enumerate(run_paths_array.items()): num_evals_limit = [] for run_path in run_paths: eval_res = EvaluationResult.load( run_path + os.sep + BACKUP_FOLDER, metric_name_load) n_evals, hv = eval_res.steps, eval_res.values num_evals_limit.append(n_evals[-1]) min_num_evals = n_func_evals_lim + 1 step = min_num_evals // n_fitting_points if step_chkp is not None: step = step_chkp x = np.arange(step, min_num_evals, step=step) y_stacked = np.zeros((len(run_paths), len(x))) f = plt.figure(figsize=(7, 5)) for key_run, run_path in enumerate(run_paths): eval_res = EvaluationResult.load( run_path + os.sep + BACKUP_FOLDER, metric_name_load) n_evals, hv = eval_res.steps, eval_res.values spl = scipy.interpolate.interp1d( np.array(n_evals), np.array(hv), fill_value="extrapolate") x_run = np.arange(step, min_num_evals, step) y = spl(x_run) y_stacked[key_run, 0:len(y)] = y plt.plot(n_evals, hv, marker='.', linestyle='--', label='run ' + str(key_run + 1)) y_mean = np.sum(y_stacked, axis=0) / \ np.count_nonzero(y_stacked, axis=0) y_error = [] for i in range(len(y_mean)): square_sum = 0 n = 0 for j in range(len(run_paths)): if np.all(y_stacked[j, i]): n += 1 deviation = y_stacked[j, i] - y_mean[i] square_sum += deviation ** 2 if n <= 1: variance = 0 else: variance = square_sum / (n - 1) standard_deviation = np.sqrt(variance) if error_mean: y_error.append(standard_deviation / np.sqrt(n)) else: y_error.append(standard_deviation) x_plot = np.arange(step, min_num_evals, step=step) plt.plot(x_plot, y_mean[0:len(x_plot)], color='black', marker='o', lw=2, label='combined') plt.errorbar(x_plot, y_mean[0:len(x_plot)], y_error[0:len(x_plot)], fmt='.k', capsize=5) plt.legend(loc='best') Path(save_folder).mkdir(parents=True, exist_ok=True) if metric_name_label is None: metric_name_label = metric_name_load.upper() plt.title(f"{metric_name_label} Analysis ({algo})") plt.xlabel("Evaluations") plt.ylabel(f"{metric_name_label}") plt.savefig( save_folder + f'{metric_name_label}_combined_' + str(algo) + '.png') plt.clf() plt.close(f) plot_array.append( [x_plot, y_mean[0:len(x_plot)], y_error[0:len(x_plot)]]) return plot_array
[docs] def plot_combined_hypervolume_lin_analysis(run_paths_array, save_folder): for key, (algo, run_paths) in enumerate(run_paths_array.items()): if len(run_paths) == 0: log.info("Path list is empty") return f = plt.figure(figsize=(7, 5)) plt.title(f"Performance Analysis ({algo})") plt.xlabel("Evaluations") plt.ylabel("Hypervolume") n_runs = len(run_paths) hv_run = [] evals_run = [] cmap = plt.get_cmap('gnuplot') colors = [cmap(i) for i in np.linspace(0, 1, n_runs)] for ind, run_path in enumerate(run_paths): eval_res = EvaluationResult.load( run_path + os.sep + BACKUP_FOLDER, "hv") n_evals, hv = eval_res.steps, eval_res.values plt.plot(n_evals, hv, marker='o', linestyle=":", linewidth=0.5, markersize=3, color=colors[ind], label=f"run: " + str(ind + 1)) hv_run.append(hv) evals_run.append(n_evals) def get_interpol_value(pos, n_evals, hv): for ind, eval in enumerate(n_evals): if n_evals[ind] > pos: if ind == 0: value = (hv[ind]) / 2 else: diff = pos - n_evals[ind - 1] grad = (hv[ind] - hv[ind - 1]) / \ (n_evals[ind] - n_evals[ind - 1]) value = hv[ind - 1] + grad * diff return value step = 10 last_n_evals = [n_evals[-1] for n_evals in evals_run] # n_steps = floor(min(last_n_evals)/step) last_n_eval = min(last_n_evals) n_evals_comb = np.arange(0, last_n_eval, step) for ind in range(0, n_runs): n_evals = evals_run[ind] hv = hv_run[ind] hv_run[ind] = [get_interpol_value( val, n_evals, hv) for val in n_evals_comb] hv_comb_all = np.sum(hv_run, axis=0) hv_comb = np.asarray(hv_comb_all) / n_runs plt.plot(n_evals_comb, hv_comb, marker='o', linewidth=1, markersize=3, color='black', label=f"combined") plt.legend(loc='best') # plt.legend(loc='center left', bbox_to_anchor=(1, 0.5)) plt.savefig(save_folder + 'hypervolume_combined_' + str(algo) + '.png') plt.clf() plt.close(f)
[docs] def write_analysis_results(result_runs_all, save_folder, nadir, ideal): evaluations_all = {} evaluations_all_no_duplicates = {} critical_all = {} critical_all_no_duplicates = {} n_critical_all = {} n_critical_all_no_duplicates = {} mean_critical_all = {} mean_critical_all_no_duplicates = {} mean_evaluations_all = {} mean_evaluations_all_no_duplicates = {} ratio_critical_single_run = {} ratio_critical_single_run_no_duplicates = {} # temporary # algo1 = list(result_runs_all.keys())[0] # algo2 = list(result_runs_all.keys())[1] n_crit_no_dup_best = {} n_dominated_best_crit = {} grid_crit_distinct = {} n_crit_distinct_avg = {} n_crit_distinct_all = {} grid_crit_distinct_X = {} n_crit_distinct_avg_X = {} n_crit_distinct_all_X = {} base_algo = None for algo, result_runs in result_runs_all.items(): if len(result_runs) == 0: log.info("Result list is empty") return n_runs = len(result_runs) # n evaluations analysis n_evals = [res.algorithm.evaluator.n_eval for res in result_runs] min_n_evals = np.min(n_evals) max_n_evals = np.max(n_evals) mean_n_evals = np.sum(n_evals) / len(result_runs) # time analysis exec_time = [res.exec_time for res in result_runs] min_exec_time = np.min(exec_time) max_exec_time = np.max(exec_time) mean_exec_time = np.sum(exec_time) / len(result_runs) # criticality analysis evaluations_all[algo] = [res.obtain_all_population() for res in result_runs] evaluations_all_no_duplicates[algo] = [duplicate_free( res.obtain_all_population()) for res in result_runs] critical_all[algo] = [evals.divide_critical_non_critical()[0] for evals in evaluations_all[algo]] critical_all_no_duplicates[algo] = [evals.divide_critical_non_critical()[0] for evals in evaluations_all_no_duplicates[algo]] n_critical_all[algo] = np.asarray( [len(evals) for evals in critical_all[algo]], dtype=object) n_critical_all_no_duplicates[algo] = np.asarray( [len(evals) for evals in critical_all_no_duplicates[algo]], dtype=object) # mean_critical_all[algo] = np.sum( # n_critical_all[algo]) # /len(result_runs) # mean_critical_all_no_duplicates[algo] = np.sum( # n_critical_all_no_duplicates[algo]) # /len(result_runs) # mean_evaluations_all[algo] = np.sum( # len(evals) for evals in evaluations_all[algo]) # /len(result_runs) # mean_evaluations_all_no_duplicates[algo] = np.sum( # len(evals) for evals in evaluations_all_no_duplicates[algo]) # /len(result_runs) # ratio_critical_single_run[algo] = [ # n_critical_all[algo][i] / len(evaluations_all[algo][i]) for i in range(0, n_runs)] # ratio_critical_single_run_no_duplicates[algo] = [ # n_critical_all_no_duplicates[algo][i] / len(evaluations_all_no_duplicates[algo][i]) for i in range(0, n_runs)] n_crit_no_dup_best[algo] = len(get_nondominated_population( evaluations_all_no_duplicates[algo][-1].divide_critical_non_critical()[0]) ) # criticality analysis using grid-based distinctness in fitness space grid_crit_distinct[algo] = [ ncrit.get_n_crit_grid(evals.get("F"), b_min=ideal, b_max=nadir, n_cells=N_CELLS)[1] for evals in critical_all_no_duplicates[algo]] n_crit_distinct_avg[algo] = sum( np.asarray( [ np.count_nonzero(grid) for grid in grid_crit_distinct[algo] ] ) ) / n_runs # conjunction over all grids n_crit_distinct_all[algo] = np.count_nonzero(np.logical_or.reduce(grid_crit_distinct[algo])) # # criticality analysis using grid-based distinctness in input space grid_crit_distinct_X[algo] = [ ncrit.get_n_crit_grid(evals.get("X"), b_min=result_runs[-1].problem.xl, b_max=result_runs[-1].problem.xu, n_cells=N_CELLS)[1] for evals in critical_all_no_duplicates[algo]] n_crit_distinct_avg_X[algo] = sum( np.asarray( [ np.count_nonzero(grid) for grid in grid_crit_distinct_X[algo] ] ) ) / n_runs # conjunction over all grids n_crit_distinct_all_X[algo] = np.count_nonzero(np.logical_or.reduce(grid_crit_distinct_X[algo])) # grid_crit_distinct_X[algo] = [ ncrit.get_n_crit_grid(evals.get("X"), # b_min=result_runs[-1].problem.xl, # b_max=result_runs[-1].problem.xu, # n_cells=N_CELLS)[1] # for evals in critical_all_no_duplicates[algo]] if base_algo is None: base_algo = algo '''Output of summery of the performance''' with open(save_folder + f'analysis_results_{algo}.csv', 'w', encoding='UTF8', newline='') as f: write_to = csv.writer(f) header = ['Attribute', 'Value'] write_to.writerow(header) write_to.writerow(['min_n_evals', min_n_evals]) write_to.writerow(['max_n_evals', max_n_evals]) write_to.writerow(['mean_n_evals', mean_n_evals]) write_to.writerow(['min_exec_time [s]', min_exec_time]) write_to.writerow(['max_exec_time [s]', max_exec_time]) write_to.writerow(['mean_exec_time [s]', mean_exec_time]) write_to.writerow([f'num_critical_distinct_sum_F ({algo})', n_crit_distinct_all[algo]]) write_to.writerow([f'num_critical_distinct_avg_F ({algo})', n_crit_distinct_avg[algo]]) write_to.writerow([f'num_critical_distinct_sum_X ({algo})', n_crit_distinct_all_X[algo]]) write_to.writerow([f'num_critical_distinct_avg_X ({algo})', n_crit_distinct_avg_X[algo]]) # write_to.writerow([f'num_opt_critical_distinct_sum_F ({algo})', int(sum(n_crit_distinct_opt[algo]))]) # write_to.writerow([f'num_opt_critical_distinct_avg_F ({algo})', int(sum(n_crit_distinct_opt[algo])/n_runs)]) # write_to.writerow([f'num_critical_distinct_sum_X ({algo})', sum(n_crit_distinct_X[algo])]) # write_to.writerow([f'num_critical_distinct_avg_X ({algo})', int(sum(n_crit_distinct_X[algo])/n_runs)]) # write_to.writerow([f'num_opt_critical_distinct_sum_X ({algo})', int(sum(n_crit_distinct_opt_X[algo]))]) # write_to.writerow([f'num_opt_critical_distinct_avg_X ({algo})', int(sum(n_crit_distinct_opt_X[algo])/n_runs)]) # write_to.writerow(['Mean number critical Scenarios', len(mean_critical)]) # write_to.writerow(['Mean evaluations all scenarios', len(mean_evaluations_all)]) # write_to.writerow(['Mean ratio critical/all scenarios', '{0:.2f}'.format(len(critical_all) / len(all_population))]) # write_to.writerow(['Mean ratio best Critical/best Scenarios', '{0:.2f}'.format(len(critical_best) / len(best_population))]) f.close() # ratio_critical_both = np.sum(n_critical_all[algo]) if np.sum(n_critical_all[algo]) == 0 \ # else np.sum(n_critical_all[algo]) / np.sum(n_critical_all[algo]) # ratio_critical_both_no_duplicates = np.sum(n_critical_all_no_duplicates[algo]) if np.sum(n_critical_all_no_duplicates[algo]) == 0 \ # else np.sum(n_critical_all_no_duplicates[algo]) / np.sum(n_critical_all_no_duplicates[algo]) # ratio_critical_both_average = np.sum( # ratio_critical_single_run[algo]) / np.sum(ratio_critical_single_run[algo]) # ratio_critical_both_average_no_duplicates = np.sum( # ratio_critical_single_run_no_duplicates[algo]) / np.sum(ratio_critical_single_run_no_duplicates[algo]) '''Output of summery of the performance''' with open(save_folder + f'analysis_combined.csv', 'w', encoding='UTF8', newline='') as f: write_to = csv.writer(f) algo_names = list(result_runs_all.keys()) for algo in algo_names: write_to.writerow([f'Critical Scenarios {algo}', np.sum(n_critical_all[algo])]) write_to.writerow([f'Critical Scenarios {algo} (duplicate free)', np.sum( n_critical_all_no_duplicates[algo])]) write_to.writerow([f'Non-dominated Critical Scenarios {algo} (duplicate free)',n_crit_no_dup_best[algo]]) # write_to.writerow([f'Best Critical Scenarios {algo} Dominated By {base_algo} (duplicate free)',n_dominated_best_crit[algo]]) # write_to.writerow( # [f'Critical Scenarios {algo}', np.sum(n_critical_all[algo])]) # write_to.writerow([f'Critical Scenarios {algo} (duplicate free)', np.sum( # n_critical_all_no_duplicates[algo])]) # write_to.writerow( # [f'Ratio Critical Scenarios {algo}/{algo} (union)', '{0:.2f}'.format(ratio_critical_both)]) # write_to.writerow( # [f'Ratio Critical Scenarios {algo}/{algo} (union, duplicate free)', '{0:.2f}'.format(ratio_critical_both_no_duplicates)]) # write_to.writerow( # [f'Ratio Critical Scenarios {algo}/{algo} (average)', '{0:.2f}'.format(ratio_critical_both_average)]) # write_to.writerow([f'Ratio Critical Scenarios {algo}/{algo} (average, duplicate free)', # '{0:.2f}'.format(ratio_critical_both_average_no_duplicates)]) # write_to.writerow([f'Mean evaluations all scenarios {algo}', mean_evaluations_all[algo]]) # write_to.writerow([f'Mean evaluations all scenarios {algo2}', mean_evaluations_all[algo2]]) # write_to.writerow([f'Mean critical all {algo}', '{0:.2f}'.format(mean_critical_all[algo])]) # write_to.writerow([f'Mean critical all {algo2}', '{0:.2f}'.format(mean_critical_all[algo2])]) # write_to.writerow(['Mean ratio best Critical/best Scenarios', '{0:.2f}'.format(len(critical_best) / len(best_population))]) f.close()
[docs] def read_metric_single(filename, metric_name): table = pd.read_csv(filename, names=["n_eval", metric_name]) n_evals = np.array(table["n_eval"][1:].values.tolist(), dtype=float) hv = np.array(table[metric_name][1:].values.tolist(), dtype=float) return n_evals, hv
[docs] def read_pf_single(filename): individuals = [] table = pd.read_csv(filename) n_var = -1 k = 0 # identify number of objectives for col in table.columns[1:]: if col.startswith("Fitness_"): n_var = k break k = k + 1 for i in range(len(table)): X = table.iloc[i, 1:n_var + 1].to_numpy() F = table.iloc[i, n_var + 1:-1].to_numpy() CB = table.iloc[i, -1] ind = Individual() ind.set("X", X) ind.set("F", F) ind.set("CB", CB) individuals.append(ind) return Population(individuals=individuals)
[docs] def make_comparison_single(max_evaluations, save_folder, subplot_metrics, subplot_names, algo_names, suffix=""): font = {'family': 'sans-serif', 'size': 17, 'weight': 'bold'} matplotlib.rcParams['font.family'] = "sans-serif" matplotlib.rcParams['font.weight'] = "bold" matplotlib.rcParams['font.size'] = 35 offset_x_ax = 0.1 * max_evaluations stop = max_evaluations + offset_x_ax step = 200 algos = algo_names colors = ['#9a226a', '#1347ac'] n_subplots = len(subplot_metrics) fig, ax = plt.subplots(n_subplots, 1, figsize=(9, 9)) for i in range(len(algos)): for key, metric in enumerate(subplot_metrics): # plt.subplot(n_subplots, 1, key + 1) ax[key].plot(metric[i][0], metric[i][1], color=colors[i], marker=None, lw=1) ax[key].errorbar(metric[i][0], metric[i][1], metric[i] [2], ecolor=colors[i], fmt='.k', capsize=5) ax[key].set_xticks(np.arange(0, stop + step, step)) ax[key].set_xlim([0, stop]) ax[key].locator_params(tight=True, axis='y', nbins=4) if key != 2: ax[key].xaxis.set_ticklabels([]) for i in range(n_subplots): ax[i].set_ylabel(subplot_names[i], **font) # ax[i].xaxis.set_minor_locator() ax[i].xaxis.set_minor_locator(AutoMinorLocator()) ax[i].tick_params(which='major', axis='y', length=7) ax[i].tick_params(which='minor', axis='y', length=4) ax[i].tick_params(which='minor', axis='x', length=0) ax[n_subplots - 1].set_xlabel("Number of Evaluations", **font) ax[n_subplots - 1].legend(algos, loc='best') ax[n_subplots - 1].set_xticks(np.arange(0, stop+step, step)) ax[n_subplots - 1].set_xlim([0, stop]) plt.subplots_adjust(wspace=0.05, hspace=0.05) plt.savefig(save_folder + f'subplots_combined{suffix}.png') plt.clf() plt.close(fig)
[docs] def make_comparison_plot(max_evaluations, save_folder, subplot_metrics, subplot_names, algo_names, distance_tick, shift_error=False, suffix="", colors=None, cmap=None, figsize=(7,5), linestyles=None, alpha=None): font = {'family': 'sans-serif', 'size': 16, 'weight': 'bold'} matplotlib.rcParams['font.family'] = "sans-serif" matplotlib.rcParams['font.weight'] = "bold" matplotlib.rcParams['font.size'] = 15 size_title = 12 size_ticks = 10 if shift_error is True: offset_algo_x = 0.010 * max_evaluations offset_algo_y = 0.012 else: offset_algo_x = 0 offset_algo_y = 0 marker_error = '' line_width = 2 error_lw = 2 error_ls = '' offset_x_ax = 0.1 * max_evaluations stop = max_evaluations + offset_x_ax if distance_tick is None: step = 0.1*max_evaluations else: step = distance_tick algos = algo_names if colors is None: colors = ['#9a226a', '#1347ac', '#ffbb00', '#a0052d', '#666666'] n_subplots = len(subplot_metrics) # only one metric to plot if n_subplots == 1: fig, ax = plt.subplots(n_subplots, figsize=figsize) for i in range(len(algos)): for key, metric in enumerate(subplot_metrics): ax.plot(metric[i][0], metric[i][1], color= cmap[algos[i]] if cmap is not None else colors[i], marker = None, linestyle=linestyles[i] if linestyles is not None else None, lw=1) ax.errorbar(np.asarray(metric[i][0]) + offset_algo_x*i, np.asarray(metric[i][1]) + offset_algo_y*i, metric[i][2], ecolor= cmap[algos[i]] if cmap is not None else colors[i], marker=marker_error, capsize=5, linestyle=error_ls, linewidth=error_lw) ax.set_xticks(np.arange(0, max_evaluations, step)) ax.set_xlim([0, stop]) ax.locator_params(tight=True, axis='y', nbins=4) ax.set_ylabel(subplot_names[0], **font) # ax[i].xaxis.set_minor_locator() ax.xaxis.set_minor_locator(AutoMinorLocator()) ax.tick_params(which='major', axis='y', length=7, labelsize=size_ticks) ax.tick_params(which='minor', axis='y', length=4, labelsize=size_ticks) ax.tick_params(which='minor', axis='x', length=0, labelsize=size_ticks) ax.set_xlabel("Number of Evaluations", **font) ax.title.set_size(size_title) ax.legend(algos, loc='best',alpha=0.2) ax.set_xticks(np.arange(0, stop, step)) ax.set_xlim([0, stop]) else: # several metrics to plot fig, ax = plt.subplots(n_subplots, 1, figsize=figsize) for i in range(len(algos)): for key, metric in enumerate(subplot_metrics): # plt.subplot(n_subplots, 1, key + 1) # log.info(f"metric: {metric}") ax[key].plot(np.asarray(metric[i][0]), np.asarray(metric[i][1]), color= cmap[algos[i]] if cmap is not None else colors[i], marker = None, linestyle=linestyles[i] if linestyles is not None else None, lw=line_width) ax[key].errorbar( np.asarray(metric[i][0])+ offset_algo_x*i, np.asarray(metric[i][1]) + offset_algo_y*i, metric[i][2], marker=marker_error, ecolor= cmap[algos[i]] if cmap is not None else colors[i], capsize=5, linestyle=error_ls, linewidth=error_lw) ax[key].set_xticks(np.arange(0,stop+step -1, step)) ax[key].set_xlim([0, stop]) ax[key].locator_params(tight=True, axis='y', nbins=4) if key != 2: ax[key].xaxis.set_ticklabels([]) for i in range(n_subplots): ax[i].set_ylabel(subplot_names[i], **font) # ax[i].xaxis.set_minor_locator() ax[i].xaxis.set_minor_locator(AutoMinorLocator()) ax[i].tick_params(which='major', axis='y', length=7) ax[i].tick_params(which='minor', axis='y', length=4) ax[i].tick_params(which='minor', axis='x', length=0) ax[n_subplots - 1].set_xlabel("Number of Evaluations", **font) ax[n_subplots - 1].legend(algos, loc='best', framealpha=alpha) ax[n_subplots - 1].set_xticks(np.arange(0, stop, step)) ax[n_subplots - 1].set_xlim([0, stop]) plt.subplots_adjust(wspace=0.05, hspace=0.05) Path(save_folder).mkdir(parents=True, exist_ok=True) plt.savefig(save_folder + f'subplots_combined{suffix}.png') plt.savefig(save_folder + f'subplots_combined{suffix}.pdf', format="pdf") plt.clf() plt.close(fig)
[docs] def make_subplots(max_evaluations, save_folder, subplot_metrics, subplot_names, algo_names, distance_tick, suffix=""): font = {'family': 'sans-serif', 'size': 17, 'weight': 'bold'} matplotlib.rcParams['font.family'] = "sans-serif" matplotlib.rcParams['font.weight'] = "bold" matplotlib.rcParams['font.size'] = 35 offset_x_ax = 0.1 * max_evaluations stop = max_evaluations + offset_x_ax if distance_tick is not None: step = distance_tick else: step = 0.1 * max_evaluations algos = algo_names colors = ['#9a226a', '#1347ac'] n_subplots = len(subplot_metrics) fig, ax = plt.subplots(n_subplots, 1, figsize=(9, 9)) for i in range(len(algos)): for key, metric in enumerate(subplot_metrics): # plt.subplot(n_subplots, 1, key + 1) ax[key].plot(metric[i][0], metric[i][1], color=colors[i], marker=None, lw=1) ax[key].errorbar(metric[i][0], metric[i][1], metric[i] [2], ecolor=colors[i], fmt='.k', capsize=5) ax[key].set_xticks(np.arange(0, stop + step, step)) ax[key].set_xlim([0, stop]) ax[key].locator_params(tight=True, axis='y', nbins=4) if key != 2: ax[key].xaxis.set_ticklabels([]) for i in range(n_subplots): ax[i].set_ylabel(subplot_names[i], **font) # ax[i].xaxis.set_minor_locator() ax[i].xaxis.set_minor_locator(AutoMinorLocator()) ax[i].tick_params(which='major', axis='y', length=7) ax[i].tick_params(which='minor', axis='y', length=4) ax[i].tick_params(which='minor', axis='x', length=0) ax[n_subplots - 1].set_xlabel("Number of Evaluations", **font) ax[n_subplots - 1].legend(algos, loc='best') ax[n_subplots - 1].set_xticks(np.arange(0, stop+step, step)) ax[n_subplots - 1].set_xlim([0, stop]) plt.subplots_adjust(wspace=0.05, hspace=0.05) plt.savefig(save_folder + f'subplots_combined{suffix}.png') plt.clf() plt.close(fig)
''' Write plot idge plot data in csv form. (TODO make more generic, i.e. independent of number of metrics) '''
[docs] def write_metric_data_to_csv_cid(save_folder, metric_name, algo_names, plot_array, suffix=""): algo_1 = algo_names[0] algo_2 = algo_names[1] m = metric_name header_igd = [f'{algo_1}_n_evals', f'{algo_1}_{m}', f'{algo_1}_{m}_sigma', # f'{algo_1}__{m}_min', # f'{algo_1}__{m}_max', f'{algo_2}__n_evals', f'{algo_2}__{m}', f'{algo_2}__{m}_sigma', # f'{algo_2}__{m}_min', # f'{algo_2}__{m}_max' ] # metric_names = ['hv', 'gd', 'sp'] paths = [] metric = plot_array filename = save_folder + metric_name + os.sep + \ 'combined_' + metric_name + suffix + '.csv' with open(filename, 'w', encoding='UTF8', newline='') as f: write_to = csv.writer(f) write_to.writerow(header_igd) for i in range(len(metric[0][0])): write_to.writerow([metric[0][0][i], # algo1 metric[0][1][i], metric[0][2][i], # metric[0][3][i], # metric[0][4][i], metric[1][0][i], # algo2 metric[1][1][i], metric[1][2][i], # metric[1][3][i], # metric[1][4][i] ]) f.close() paths.append(filename) return paths
[docs] def write_metric_data_to_csv(save_folder, metric_names, algo_names, plot_array_hv, plot_array_igd, plot_array_sp, suffix=""): headers = [] for m in metric_names: header = [] for algo in algo_names: header += [f'{algo}_n_evals', f'{algo}_{m}', f'{algo}_{m}_sigma'] headers.append(header) paths = [] for key, metric in enumerate([plot_array_hv, plot_array_igd, plot_array_sp]): filename = save_folder + \ metric_names[key] + os.sep + 'combined_' + \ metric_names[key] + suffix + '.csv' with open(filename, 'w', encoding='UTF8', newline='') as f: write_to = csv.writer(f) write_to.writerow(headers[key]) for i, value in enumerate(metric[0][0]): res_algo = [] for j, _ in enumerate(algo_names): res_algo += [metric[j][0][i], metric[j][1][i], metric[j][2][i]] write_to.writerow(res_algo) f.close() paths.append(filename) return paths
[docs] def retrieve_metric_data_from_csv(paths, n_algos): storing_arrays = [] for key, path in enumerate(paths): table = pd.read_csv(path) by_metric = [] for i in range(n_algos): metric_algo = [ table.iloc[:, 3*i].values.tolist(), table.iloc[:,3*i + 1].values.tolist(), table.iloc[:, 3*i + 2].values.tolist() ] by_metric.append(metric_algo) storing_arrays.append(by_metric) return storing_arrays # plot_array_hv, plot_array_igd, plot_array_sp
[docs] def statistical_analysis(metric_name_load, runs_bases, runs_test, algo_test, save_folder, metric_name_label=None): def get_last_metric_value(path,name): eval_res = EvaluationResult.load(path + os.sep + BACKUP_FOLDER, name) _, hv = eval_res.steps, eval_res.values return hv[-1] m_test = [] m_bases = {} for run_path in runs_test: m_test.append(get_last_metric_value(run_path,metric_name_load)) log.info(f"metric values test: {m_test}") algos = runs_bases.keys() for algo in algos: # We allow to have the algorithm under test in the list if algo == algo_test: continue m_bases[algo] = [] results = {} for run_path in runs_bases[algo]: m_bases[algo].append(get_last_metric_value(run_path,metric_name_load)) assert(len(m_bases[algo]) == len(m_test)) for i,algo in enumerate(m_bases): log.info(f"metric values base_{i}: {m_bases[algo]}") p_val, effect = wilcoxon.run_wilcoxon_and_delaney(m_test,m_bases[algo]) results[algo] = [p_val, effect] log.info(m_bases) # write to file Path(save_folder).mkdir(parents=True, exist_ok=True) with open(save_folder + f'{metric_name_label}_significance.csv', 'w', encoding='UTF8', newline='') as f: write_to = csv.writer(f) header = [f'algo (subject: {algo_test})', 'p_value', 'effect'] write_to.writerow(header) for algo in algos: if algo == algo_test: continue write_to.writerow([f'{algo}', results[algo][0], results[algo][1]]) f.close()
[docs] def statistical_analysis_from_overview(metric_name, input_folder, save_folder): try: file_name = input_folder + f"overview_{metric_name}.csv" table = pd.read_csv(file_name) m_test = [] m_bases = {} results = {} algos =list(table.columns.values)[1:] log.info(algos) algo = algos[0] m_test = table[algos[0]].to_list() #table.iloc[:, 1].to_numpy() for algo in algos[1:]: m_bases[algo] = table[algo].to_list() #table.iloc[:, i + 1].to_numpy() # assert(len(m_bases[col]) == len(m_test)) d1 = m_test d2 = m_bases[algo] log.info(f"test: {d1}") log.info(f"base: {d2}") p_val, effect = wilcoxon.run_wilcoxon_and_delaney(d1,d2) results[algo] = [p_val, effect] log.info(m_bases) # write to file Path(save_folder).mkdir(parents=True, exist_ok=True) with open(save_folder + f'{metric_name}_significance.csv', 'w', encoding='UTF8', newline='') as f: write_to = csv.writer(f) header = [f'algo (subject: {algos[0]})', 'p_value', 'effect'] write_to.writerow(header) for algo in algos[1:]: write_to.writerow( [ f'{algo}', results[algo][0], results[algo][1] ] ) f.close() except ValueError as e: # we do this to catch this exception from wilcoxon: # ValueError("zero_method 'wilcox' and 'pratt' do not " # "work if x - y is zero for all elements.") traceback.print_exc() print("No statistical analysis is possible. No files generated.")