from opensbt.model_ga.individual import Individual
from typing import List
from opensbt.simulation.simulator import Simulator, SimulationOutput
from math import sin, cos, pi, ceil
import numpy as np
from random import random
from opensbt.utils import geometric
import json
from datetime import datetime
import csv
import os
[docs]
class DummySimulator(Simulator):
"""
This class implements a simplified driving simulator for two actors, where the ego and adversary move with linear velocity.
The ego scans for nearby vehicles. If the distance is below some threshold the ego reduces its speed until it is 0.
The ego continues driving when no adversary is visible within the threshold distance.
The position of ego deviates based on some random noise.
"""
time_step = 1
DETECTION_THRESH = 2 # threshold in meters where other actors can be detected
RANDOMNESS_BIAS = 0.1 # noise to be added to positions
## Simulates a set of scenarios and returns the output
archive = {}
now = datetime.now().strftime("%d-%m-%Y_%H-%M-%S")
[docs]
@staticmethod
def ind_in_archive(ind):
""" Checks if individual is in the archive of individuals."""
if tuple(ind) in DummySimulator.archive:
return True
else:
return False
[docs]
@staticmethod
def simulate(list_individuals: List[Individual],
variable_names: List[str],
scenario_path: str,
sim_time: float,
time_step: float = time_step,
do_visualize: bool = False
) -> List[SimulationOutput]:
"""
Simulates a set of scenarios based on a list of individuals and returns the simulation outputs.
Parameters
----------
list_individuals : List[Individual]
A list of individuals representing different scenarios to be simulated.
variable_names : List[str]
A list of variable names used in the simulation.
scenario_path : str
The file path to the scenario configuration.
sim_time : float
Total simulation time.
time_step : float, optional
Time step for the simulation, by default DummySimulator.time_step.
do_visualize : bool, optional
Whether to visualize the simulation, by default False.
Returns
-------
List[SimulationOutput]
A list of simulation outputs for each individual in the input list.
"""
results = []
for ind in list_individuals:
################## duplicate simulation avoidance
if not DummySimulator.ind_in_archive(ind):
# with open(os.getcwd() + os.sep + f'simulated_inds_{DummySimulator.now}.csv', mode = 'a+') as f:
# write_to = csv.writer(f)
# write_to.writerow([ind])
# f.close()
simout = DummySimulator.simulate_single(ind,
variable_names,
filepath=scenario_path,
sim_time=sim_time,
time_step=time_step)
else:
simout = DummySimulator.archive[tuple(ind)]
results.append(simout)
return results
[docs]
@staticmethod
def simulate_single(vars: List[Individual],
variable_names: List[str],
filepath: str,
sim_time: float,
time_step: float,
detection_dist = DETECTION_THRESH,
randomness_bias = RANDOMNESS_BIAS
) -> SimulationOutput:
"""Executes single scenario in DummySimulator
"""
egoInitialVelocity = vars[1]
pedInitialVelocity = vars[3]
egoOrientation = vars[0]
pedOrientation = vars[2]
start_pos_ped = [2, 10] # should be also defined by input parameter
start_pos_ego = [0, 0] # should be also defined by input parameter
# first plan the motion of ego, get for each time step predicted position/velocity
plan_ego_traj= plan_motion(start_pos_ego, egoOrientation, egoInitialVelocity, sim_time, time_step)
plan_adv_traj = plan_motion(start_pos_ped, pedOrientation, pedInitialVelocity, sim_time, time_step)
n_steps = plan_ego_traj.shape[1]
# iterate through each time step checking other actors nearby to avoid collision
real_ego_traj = np.asarray([
[plan_ego_traj[0,0]],
[plan_ego_traj[1,0]],
[plan_ego_traj[2,0]],
[plan_ego_traj[3,0]],
[plan_ego_traj[4,0]]
])
# first position is real
k = 1
for i in range(1,n_steps):
current_size = real_ego_traj[1,:]
if len(current_size) < n_steps:
pos_ego = [real_ego_traj[1,-1],real_ego_traj[2,-1]]
pos_others = [[plan_adv_traj[1,i],plan_adv_traj[2,i]]]
if are_actors_nearby(pos_ego, pos_others, detection_dist=detection_dist):
# plan make a step, ego vehicle trajectory in the form (Time, X, Y, V, Yaw)
# stop for some time steps
# t_stop = 3
# for t in range(0, n_steps - current_size - t_stop)):
new_t = real_ego_traj[0, i - 1]
new_x = real_ego_traj[1, i - 1]
new_y = real_ego_traj[2, i - 1]
new_v = 0
new_yaw = real_ego_traj[4, i - 1]
new_state = np.asarray([[new_t],[new_x],[new_y], [new_v], [new_yaw]])
real_ego_traj = np.append(real_ego_traj, new_state, axis = 1)
# waiting_steps += t_stop - 1
else:
new_t = plan_ego_traj[0, k]
new_x = plan_ego_traj[1, k]
new_y = plan_ego_traj[2, k]
new_v = plan_ego_traj[3, k]
if real_ego_traj[3, k-1] == 0:
new_v = plan_ego_traj[3,k]/2
new_yaw = plan_ego_traj[4,k]
new_state = np.asarray([[new_t],[new_x],[new_y], [new_v], [new_yaw]])
real_ego_traj = np.append(real_ego_traj, new_state, axis = 1)
k += 1
# add some noise to location vector
ego_location = [pos for pos in zip(list(real_ego_traj[1, :]), list(real_ego_traj[2, :]))]
obj_location = [pos for pos in zip(list(plan_adv_traj[1, :]), list(plan_adv_traj[2, :]))]
ego_location = [(pos[0] +random()*randomness_bias,pos[1] +random()*randomness_bias) for pos in ego_location]
result = {
"simTime": 0,
"times": list(real_ego_traj[0, :]),
"location": {"ego": ego_location,
"adversary": obj_location},
"velocity": {"ego": list(real_ego_traj[3, :]),
"adversary": list(plan_adv_traj[3, :]),
},
"speed": {"ego": list(real_ego_traj[3, :]),
"adversary": list(plan_adv_traj[3, :]),
},
"acceleration": {"ego": list(real_ego_traj[3, :]),
"adversary": list(plan_adv_traj[3, :]),
},
"yaw": {"ego": list(real_ego_traj[4, :]),
"adversary": list(plan_adv_traj[4, :]),
},
"collisions": [],
"actors" : {
"ego": "ego",
"adversary": "adversary",
"vehicles" : [],
"pedestrians" : []
},
"otherParams": {
}
}
return SimulationOutput.from_json(json.dumps(result))
[docs]
def are_actors_nearby(pos_ego, pos_others, detection_dist=3):
"""
Checks if the ego actor is within a certain distance of any actor in a list of positions
Parameters
----------
pos_ego : tuple or list
The position of the ego actor
pos_others : list
A list of positions of other actors
detection_dist : float
The distance within which actors are considered nearby
Returns
-------
bool
True if the ego actor is within the specified distance of any other actor, False otherwise
"""
for i in range(0, len(pos_others)):
if geometric.dist(pos_ego,pos_others[i]) < detection_dist:
return True
return False
[docs]
def plan_motion(starting_position, orientation, velocity, sim_time, sampling_time):
"""
Plans the motion of an ego vehicle in a linear trajectory based on initial conditions.
Parameters
----------
starting_position : tuple
The initial (x, y) position of the ego vehicle.
orientation : float
The initial orientation (yaw) of the ego vehicle in degrees.
velocity : float
The constant velocity of the ego vehicle.
sim_time : float
The total simulation time.
sampling_time : float
The time interval between each simulation step.
Returns
-------
np.ndarray
A 2D array representing the trajectory with rows as [Time, X, Y, V, Yaw].
"""
theta = orientation
T = sim_time
v = velocity
t = sampling_time
S = v * T
dist_x = cos(theta * pi / 180) * S
dist_y = sin(theta * pi / 180) * S
n_steps = ceil(T / t)
asize = n_steps + 1
array_time = np.linspace(0, T, asize)
array_x = np.linspace(starting_position[0], starting_position[0] + dist_x, asize)
array_y = np.linspace(starting_position[1], starting_position[1] + dist_y, asize)
array_v = v * np.ones(asize, dtype=np.int64)
array_yaw = theta * np.ones(array_time.size)
return np.concatenate((array_time, array_x, array_y, array_v, array_yaw)).reshape(5, asize)