Files
neuroevolution/mathema/envs/openai_car_racing_sac.py
2025-12-13 14:12:35 +01:00

791 lines
27 KiB
Python

# TODO: this will be one env for both systems
import math
import numpy as np
import logging
import Box2D
from Box2D import (b2FixtureDef, b2PolygonShape, b2ContactListener)
log = logging.getLogger(__name__)
# Optional gym import for spaces only; code runs without strict gym registration
try:
import gymnasium as gym
from gymnasium import spaces
from gymnasium.utils import seeding
GYM_AVAILABLE = True
except Exception:
GYM_AVAILABLE = False
spaces = None
seeding = None
# Car dynamics from classic gym (Box2D)
from gymnasium.envs.box2d.car_dynamics import Car
# --- pygame renderer ---
import pygame
DEBUG_DRAWING = False
LOOK_AHEAD = 10
STATE_W = 96
STATE_H = 96
VIDEO_W = 1200
VIDEO_H = 800
WINDOW_W = 1350
WINDOW_H = 950
SCALE = 6.0
TRACK_RAD = 900 / SCALE
PLAYFIELD = 2000 / SCALE
FPS = 60
ZOOM = 2.7
ZOOM_FOLLOW = True
TRACK_DETAIL_STEP = 21 / SCALE
TRACK_TURN_RATE = 0.31
TRACK_WIDTH = 40 / SCALE
BORDER = 8 / SCALE
BORDER_MIN_COUNT = 4
ROAD_COLOR = [0.4, 0.4, 0.4]
# limits & timeouts
MAX_TIME_SEC = 90.0
MAX_STEPS = int(FPS * MAX_TIME_SEC)
NO_PROGRESS_SEC = 8.0
NO_PROGRESS_STEPS = int(FPS * NO_PROGRESS_SEC)
STALL_MIN_SPEED = 4.0
STALL_SEC = 4.0
STALL_STEPS = int(FPS * STALL_SEC)
FUEL_LIMIT = 120.0
# ----------------------------
# Utilities
# ----------------------------
def standardize_angle(theta: float) -> float:
return np.remainder(theta + np.pi, 2 * np.pi) - np.pi
def f2c(rgb_float):
"""float [0..1] -> int [0..255] color tuple"""
return tuple(max(0, min(255, int(255 * x))) for x in rgb_float)
# ----------------------------
# MyState: feature container
# ----------------------------
class MyState:
def __init__(self):
self.angle_deltas = None
self.reward = None
self.on_road = None
self.laps = None
self.wheel_angle = None
self.car_angle = None
self.angular_vel = None
self.true_speed = None
self.off_center = None
self.vel_angle = None
def as_array(self, n: int):
return np.append(
self.angle_deltas[:n],
[
self.wheel_angle,
self.car_angle,
self.angular_vel,
self.true_speed,
self.off_center,
self.vel_angle,
],
).astype(np.float32)
def as_feature_vector(self, lookahead: int = LOOK_AHEAD):
return self.as_array(lookahead)
# ----------------------------
# Contact listener: counts tiles, progress & reward
# ----------------------------
class FrictionDetector(b2ContactListener):
def __init__(self, env):
super().__init__()
self.env = env
def BeginContact(self, contact):
self._contact(contact, True)
def EndContact(self, contact):
self._contact(contact, False)
def _contact(self, contact, begin):
tile = None
obj = None
u1 = contact.fixtureA.body.userData
u2 = contact.fixtureB.body.userData
if u1 and "road_friction" in u1.__dict__:
tile = u1
obj = u2
if u2 and "road_friction" in u2.__dict__:
tile = u2
obj = u1
if not tile:
return
tile.color[0] = ROAD_COLOR[0]
tile.color[1] = ROAD_COLOR[1]
tile.color[2] = ROAD_COLOR[2]
if not obj or "tiles" not in obj.__dict__:
return
if begin:
obj.tiles.add(tile)
if tile.index_on_track == self.env.next_road_tile:
self.env.reward += 1000.0 / len(self.env.track)
self.env.tile_visited_count += 1
self.env.next_road_tile += 1
if self.env.next_road_tile >= len(self.env.road):
self.env.next_road_tile = 0
self.env.laps += 1
else:
if tile in obj.tiles:
obj.tiles.remove(tile)
self.env.on_road = len(obj.tiles) > 0
# ----------------------------
# CarRacing with pygame rendering and Gym(nasium) 0.26+ compatible step/reset
# ----------------------------
class CarRacing(gym.Env):
metadata = {
"render_modes": ["human", "rgb_array", None],
"render_fps": FPS,
}
def __init__(self, seed_value: int = 5, render_mode: str | None = "human"):
# RNG
self.offroad_frames = None
if seeding is not None:
self.np_random, _ = seeding.np_random(seed_value)
else:
self.np_random = np.random.RandomState(seed_value)
# Physics world
self.contactListener_keepref = FrictionDetector(self)
self.world = Box2D.b2World((0, 0), contactListener=self.contactListener_keepref)
# Gym-style spaces (optional)
if GYM_AVAILABLE:
self.action_space = spaces.Box(
np.array([-1, 0, 0], dtype=np.float32),
np.array([+1, +1, +1], dtype=np.float32),
dtype=np.float32,
)
# Feature-Vektor-Länge = LOOK_AHEAD + 6 (wheel, car, angular_vel, true_speed, off_center, vel_angle)
feat_len = LOOK_AHEAD + 6
self.observation_space = spaces.Box(
low=-np.inf, high=np.inf, shape=(feat_len,), dtype=np.float32
)
# State
self.viewer = None # unused (pyglet placeholder)
self.road = None
self.car = None
self.reward = 0.0
self.prev_reward = 0.0
self.laps = 0
self.on_road = True
self.ctrl_pts = None
self.outward_vectors = None
self.angles = None
self.angle_deltas = None
self.original_road_poly = None
self.indices = None
self.my_state = MyState()
self.next_road_tile = 0
# Rendering
self.render_mode = render_mode
self._pg = None # pygame objects container
# Episode control
self.tile_visited_count = 0
self.t = 0.0
self.human_render = False
# Build initial track + car
self._build_new_episode()
self.offroad_frames = 0
self.offroad_grace_frames = int(0.2 * FPS)
self.offroad_penalty_per_frame = 2.0
self.steps = 0
self._last_progress_count = 0
self._no_progress_steps = 0
self._stall_steps = 0
# ------------------------
# Helpers: pygame
# ------------------------
class _PygameCtx:
def __init__(self):
self.initialized = False
self.screen = None
self.clock = None
self.font = None
self.rgb_surface = None # offscreen for rgb_array
def _ensure_pygame(self):
if self._pg is None:
self._pg = self._PygameCtx()
if not self._pg.initialized:
if not pygame.get_init():
pygame.init()
flags = 0
if self.render_mode == "human":
self._pg.screen = pygame.display.set_mode((WINDOW_W, WINDOW_H))
else:
# offscreen surface; we can still blit/draw onto it
self._pg.screen = pygame.Surface((WINDOW_W, WINDOW_H))
self._pg.clock = pygame.time.Clock()
try:
pygame.font.init()
self._pg.font = pygame.font.SysFont("Arial", 20)
except Exception:
self._pg.font = None
self._pg.initialized = True
def _world_to_screen(self, x, y, zoom, angle, scroll_x, scroll_y):
ca, sa = math.cos(angle), math.sin(angle)
# rotate around (scroll_x, scroll_y)
rx = (x - scroll_x) * ca + (y - scroll_y) * sa
ry = -(x - scroll_x) * sa + (y - scroll_y) * ca
# scale & translate (match original camera placement)
sx = int(WINDOW_W / 2 + rx * zoom)
sy = int(WINDOW_H / 4 + ry * zoom)
return sx, sy
def get_feature_vector(self, lookahead: int = LOOK_AHEAD) -> list[float]:
my_s: MyState = self.my_state
vec = my_s.as_feature_vector(lookahead).tolist()
return vec
def _draw_polygon_world(self, poly, color, zoom, angle, scroll_x, scroll_y):
pts = [self._world_to_screen(px, py, zoom, angle, scroll_x, scroll_y) for (px, py) in poly]
pygame.draw.polygon(self._pg.screen, f2c(color), pts)
def _draw_body(self, body, color=(0.7, 0.7, 0.7), zoom=1.0, angle=0.0, scroll_x=0.0, scroll_y=0.0):
# Draw each fixture polygon
col = f2c(color)
for fixture in body.fixtures:
shape = fixture.shape
if isinstance(shape, b2PolygonShape):
verts = [body.transform * v for v in shape.vertices]
pts = [self._world_to_screen(v[0], v[1], zoom, angle, scroll_x, scroll_y) for v in verts]
pygame.draw.polygon(self._pg.screen, col, pts, width=0)
# ------------------------
# Track & episode setup
# ------------------------
def _destroy(self):
if not self.road:
return
# userData lösen, dann Bodies zerstören
for t in self.road:
try:
t.userData = None
except Exception:
pass
try:
self.world.DestroyBody(t)
except Exception:
pass
self.road = []
if self.car is not None:
try:
self.car.destroy()
except Exception:
pass
self.car = None
def _create_track(self):
CHECKPOINTS = 12
checkpoints = []
for c in range(CHECKPOINTS):
alpha = 2 * math.pi * c / CHECKPOINTS + self.np_random.uniform(0, 2 * math.pi * 1 / CHECKPOINTS)
rad = self.np_random.uniform(TRACK_RAD / 3, TRACK_RAD)
if c == 0:
alpha = 0
rad = 1.5 * TRACK_RAD
if c == CHECKPOINTS - 1:
alpha = 2 * math.pi * c / CHECKPOINTS
self.start_alpha = 2 * math.pi * (-0.5) / CHECKPOINTS
rad = 1.5 * TRACK_RAD
checkpoints.append((alpha, rad * math.cos(alpha), rad * math.sin(alpha)))
self.road = []
x, y, beta = 1.5 * TRACK_RAD, 0, 0
dest_i = 0
laps = 0
track = []
no_freeze = 2500
visited_other_side = False
while True:
alpha = math.atan2(y, x)
if visited_other_side and alpha > 0:
laps += 1
visited_other_side = False
if alpha < 0:
visited_other_side = True
alpha += 2 * math.pi
while True:
failed = True
while True:
dest_alpha, dest_x, dest_y = checkpoints[dest_i % len(checkpoints)]
if alpha <= dest_alpha:
failed = False
break
dest_i += 1
if dest_i % len(checkpoints) == 0:
break
if not failed:
break
alpha -= 2 * math.pi
continue
r1x = math.cos(beta)
r1y = math.sin(beta)
p1x = -r1y
p1y = r1x
dest_dx = dest_x - x
dest_dy = dest_y - y
proj = r1x * dest_dx + r1y * dest_dy
while beta - alpha > 1.5 * math.pi:
beta -= 2 * math.pi
while beta - alpha < -1.5 * math.pi:
beta += 2 * math.pi
prev_beta = beta
proj *= SCALE
if proj > 0.3:
beta -= min(TRACK_TURN_RATE, abs(0.001 * proj))
if proj < -0.3:
beta += min(TRACK_TURN_RATE, abs(0.001 * proj))
x += p1x * TRACK_DETAIL_STEP
y += p1y * TRACK_DETAIL_STEP
track.append((alpha, prev_beta * 0.5 + beta * 0.5, x, y))
if laps > 4:
break
no_freeze -= 1
if no_freeze == 0:
break
i1, i2 = -1, -1
i = len(track)
while True:
i -= 1
if i == 0:
return False
pass_through_start = track[i][0] > self.start_alpha >= track[i - 1][0]
if pass_through_start and i2 == -1:
i2 = i
elif pass_through_start and i1 == -1:
i1 = i
break
print(f"Track generation: {i1}..{i2} -> {i2 - i1}-tiles track")
assert i1 != -1 and i2 != -1
track = track[i1: i2 - 1]
first_beta = track[0][1]
first_perp_x = math.cos(first_beta)
first_perp_y = math.sin(first_beta)
well_glued_together = np.sqrt(
np.square(first_perp_x * (track[0][2] - track[-1][2]))
+ np.square(first_perp_y * (track[0][3] - track[-1][3]))
)
if well_glued_together > TRACK_DETAIL_STEP:
return False
border = [False] * len(track)
for i in range(len(track)):
good = True
oneside = 0
for neg in range(BORDER_MIN_COUNT):
beta1 = track[i - neg - 0][1]
beta2 = track[i - neg - 1][1]
good &= abs(beta1 - beta2) > TRACK_TURN_RATE * 0.2
oneside += np.sign(beta1 - beta2)
good &= abs(oneside) == BORDER_MIN_COUNT
border[i] = bool(good)
for i in range(len(track)):
for neg in range(BORDER_MIN_COUNT):
border[i - neg] |= border[i]
self.road_poly = []
for i in range(len(track)):
alpha1, beta1, x1, y1 = track[i]
alpha2, beta2, x2, y2 = track[i - 1]
road1_l = (x1 - TRACK_WIDTH * math.cos(beta1), y1 - TRACK_WIDTH * math.sin(beta1))
road1_r = (x1 + TRACK_WIDTH * math.cos(beta1), y1 + TRACK_WIDTH * math.sin(beta1))
road2_l = (x2 - TRACK_WIDTH * math.cos(beta2), y2 - TRACK_WIDTH * math.sin(beta2))
road2_r = (x2 + TRACK_WIDTH * math.cos(beta2), y2 + TRACK_WIDTH * math.sin(beta2))
t = self.world.CreateStaticBody(
fixtures=b2FixtureDef(shape=b2PolygonShape(vertices=[road1_l, road1_r, road2_r, road2_l]))
)
t.userData = t
t.index_on_track = i
c = 0.01 * (i % 3)
t.color = [ROAD_COLOR[0] + c, ROAD_COLOR[1] + c, ROAD_COLOR[2] + c]
t.road_visited = False
t.road_friction = 1.0
t.fixtures[0].sensor = True
self.road_poly.append(([road1_l, road1_r, road2_r, road2_l], t.color))
self.road.append(t)
if border[i]:
side = np.sign(beta2 - beta1)
b1_l = (x1 + side * TRACK_WIDTH * math.cos(beta1), y1 + side * TRACK_WIDTH * math.sin(beta1))
b1_r = (
x1 + side * (TRACK_WIDTH + BORDER) * math.cos(beta1),
y1 + side * (TRACK_WIDTH + BORDER) * math.sin(beta1),
)
b2_l = (x2 + side * TRACK_WIDTH * math.cos(beta2), y2 + side * TRACK_WIDTH * math.sin(beta2))
b2_r = (
x2 + side * (TRACK_WIDTH + BORDER) * math.cos(beta2),
y2 + side * (TRACK_WIDTH + BORDER) * math.sin(beta2),
)
self.road_poly.append(([b1_l, b1_r, b2_r, b2_l], (1, 1, 1) if i % 2 == 0 else (1, 0, 0)))
self.track = track
self.original_road_poly = [((list(poly)), list(color)) for (poly, color) in self.road_poly]
self.ctrl_pts = np.array(list(map(lambda x: x[2:], self.track)))
self.angles = np.array(list(map(lambda x: x[1], self.track)))
self.outward_vectors = [np.array([np.cos(theta), np.sin(theta)]) for theta in self.angles]
angle_deltas = self.angles - np.roll(self.angles, 1)
self.angle_deltas = np.array(list(map(standardize_angle, angle_deltas)))
self.indices = np.array(range(len(self.ctrl_pts)))
return True
def _build_new_episode(self):
# build track (may retry)
self._destroy()
self.reward = 0.0
self.prev_reward = 0.0
self.tile_visited_count = 0
self.t = 0.0
self.road_poly = []
self.human_render = False
self.laps = 0
self.on_road = True
self.next_road_tile = 0
while True:
success = self._create_track()
if success:
break
print("retry to generate track (normal if there are not many of this messages)")
self.car = Car(self.world, *self.track[0][1:4])
# attach tiles set to car for contact tracking
self.car.tiles = set()
self.steps = 0
self._last_progress_count = 0
self._no_progress_steps = 0
self._stall_steps = 0
# ------------------------
# Public API (Gym 0.26+/Gymnasium style)
# ------------------------
def _update_features(self):
v1 = self.outward_vectors[self.next_road_tile - 2]
v2 = np.array(self.car.hull.position) - self.ctrl_pts[self.next_road_tile - 1]
off_center = float(np.dot(v1, v2))
angular_vel = float(self.car.hull.angularVelocity)
vel = self.car.hull.linearVelocity
true_speed = float(np.linalg.norm(vel))
car_angle = float(self.car.hull.angle - self.angles[self.next_road_tile])
wheel_angle = float(self.car.wheels[0].joint.angle)
if true_speed < 0.2:
vel_angle = 0.0
else:
vel_angle = float(math.atan2(vel[1], vel[0]) - (self.angles[self.next_road_tile] + np.pi / 2))
wheel_angle = standardize_angle(wheel_angle)
car_angle = standardize_angle(car_angle)
vel_angle = standardize_angle(vel_angle)
tip = np.array((self.car.wheels[0].position + self.car.wheels[1].position) / 2)
p1 = self.ctrl_pts[self.next_road_tile - 1]
p2 = self.ctrl_pts[self.next_road_tile - 2]
u = (p1 - p2) / TRACK_DETAIL_STEP
v = (tip - p2) / TRACK_DETAIL_STEP
interp = float(np.dot(v, u))
interp_angle_deltas = np.interp(self.indices + interp, self.indices, self.angle_deltas)
self.my_state.angle_deltas = np.roll(interp_angle_deltas, -self.next_road_tile)
self.my_state.reward = self.reward
self.my_state.on_road = self.on_road
self.my_state.laps = self.laps
self.my_state.true_speed = true_speed
self.my_state.off_center = off_center
self.my_state.wheel_angle = wheel_angle
self.my_state.car_angle = car_angle
self.my_state.angular_vel = angular_vel
self.my_state.vel_angle = vel_angle
# Normalization
self.my_state.angle_deltas *= 2.3
self.my_state.true_speed /= 100.0
self.my_state.off_center /= TRACK_WIDTH
self.my_state.wheel_angle *= 2.1
self.my_state.car_angle *= 1.5
self.my_state.vel_angle *= 1.5
self.my_state.angular_vel /= 3.74
def reset(self, *, seed: int | None = None, options: dict | None = None):
if seed is not None:
if seeding is not None:
self.np_random, _ = seeding.np_random(seed)
else:
self.np_random = np.random.RandomState(seed)
self._build_new_episode()
# Wichtig: initiale Features befüllen
self._update_features()
obs = self.my_state.as_feature_vector(LOOK_AHEAD).astype(np.float32)
info = {}
return obs, info
def fast_reset(self):
# keep the same track, respawn car
self.car = None
self.laps = 0
self.on_road = True
self.next_road_tile = 0
self.reward = 0.0
self.prev_reward = 0.0
self.tile_visited_count = 0
self.t = 0.0
self.human_render = False
for tile in self.road:
tile.road_visited = False
self.road_poly = [((list(poly)), list(color)) for (poly, color) in self.original_road_poly]
try:
self.car.destroy()
except Exception:
pass
self.car = Car(self.world, *self.track[0][1:4])
self.car.tiles = set()
self.steps = 0
self._last_progress_count = 0
self._no_progress_steps = 0
self._stall_steps = 0
return self.step(np.array([0.0, 0.0, 0.0], dtype=np.float32))
def step(self, action):
# log.info("got action: {}".format(action))
# Expect action: [steer (-1..1), gas (0..1), brake (0..1)]
if action is not None:
# TODO: this was changed! minus in steer was removed
self.car.steer(float(action[0]))
self.car.gas(float(action[1]))
self.car.brake(float(action[2]))
self.car.step(1.0 / FPS)
self.world.Step(1.0 / FPS, 6 * 30, 2 * 30)
self.t += 1.0 / FPS
self.steps += 1
terminated = False
truncated = False
if action is not None:
# (1) ALLE Reward-Änderungen zuerst einarbeiten
self.reward -= 5.0 / FPS # Zeitstrafe
# Ziel erreicht?
if self.tile_visited_count == len(self.track):
terminated = True
# Out-of-bounds: Strafe IN reward addieren (nicht step_reward überschreiben)
x, y = self.car.hull.position
if abs(x) > PLAYFIELD or abs(y) > PLAYFIELD:
self.reward -= 100.0
terminated = True
# Offroad: kontinuierliche Strafe + Grace-Fenster; bei Timeout Zusatzstrafe
if not self.on_road:
self.offroad_frames += 1
self.reward -= self.offroad_penalty_per_frame / FPS
if self.offroad_frames > self.offroad_grace_frames:
self.reward -= 20.0
terminated = True
else:
self.offroad_frames = 0
if self.tile_visited_count > self._last_progress_count:
self._last_progress_count = self.tile_visited_count
self._no_progress_steps = 0
else:
self._no_progress_steps += 1
if self._no_progress_steps >= NO_PROGRESS_STEPS:
truncated = True
# (2) JETZT genau einmal das Delta bilden
step_reward = self.reward - self.prev_reward
self.prev_reward = self.reward
else:
step_reward = 0.0
# --- Feature computation (unverändert) ---
self._update_features()
obs = self.my_state.as_feature_vector(LOOK_AHEAD).astype(np.float32)
info = {} # features nicht mehr nötig
return obs, step_reward, terminated, truncated, info
def _get_observation(self):
# This env is feature-first; return None unless user asks for rgb_array via render()
return None
# ------------------------
# Rendering (pygame)
# ------------------------
def render(self):
self._ensure_pygame()
# Handle window events only in human mode
if self.render_mode == "human":
for event in pygame.event.get():
if event.type == pygame.QUIT:
self.close()
return None
# Camera math (match original)
zoom = 0.1 * SCALE * max(1 - self.t, 0) + ZOOM * SCALE * min(self.t, 1)
scroll_x = self.car.hull.position[0]
scroll_y = self.car.hull.position[1]
angle = -self.car.hull.angle
vel = self.car.hull.linearVelocity
if np.linalg.norm(vel) > 0.5:
angle = math.atan2(vel[0], vel[1])
# Draw grass background
self._pg.screen.fill((102, 230, 102))
# simple grid for texture
k = PLAYFIELD / 20.0
grid_color = (110, 240, 110)
for x in range(-20, 20, 2):
for y in range(-20, 20, 2):
x0, y0 = k * x + 0, k * y + 0
x1, y1 = k * x + k, k * y + k
p0 = self._world_to_screen(x0, y0, zoom, angle, scroll_x, scroll_y)
p1 = self._world_to_screen(x1, y0, zoom, angle, scroll_x, scroll_y)
p2 = self._world_to_screen(x1, y1, zoom, angle, scroll_x, scroll_y)
p3 = self._world_to_screen(x0, y1, zoom, angle, scroll_x, scroll_y)
pygame.draw.polygon(self._pg.screen, grid_color, [p0, p1, p2, p3])
# Road polygons
for poly, color in self.road_poly:
self._draw_polygon_world(poly, color, zoom, angle, scroll_x, scroll_y)
# Draw car hull + wheels (approx)
car_col = (0.25, 0.25, 0.25)
self._draw_body(self.car.hull, car_col, zoom, angle, scroll_x, scroll_y)
for w in self.car.wheels:
self._draw_body(w, (0.15, 0.15, 0.15), zoom, angle, scroll_x, scroll_y)
# Indicators (speed, wheel, gyro)
if self._pg.font is not None:
# simple HUD text
txt = f"reward={self.reward:0.1f} laps={self.laps}"
surf = self._pg.font.render(txt, True, (255, 255, 255))
self._pg.screen.blit(surf, (10, 10))
# Output
if self.render_mode == "human":
pygame.display.flip()
self._pg.clock.tick(FPS)
return None
else:
# Offscreen: return RGB array like gym does
arr = pygame.surfarray.array3d(self._pg.screen) # (W,H,3)
arr = np.transpose(arr, (1, 0, 2)) # -> (H,W,3)
return arr
def close(self):
try:
if self._pg and self._pg.initialized:
pygame.display.quit()
pygame.quit()
except Exception:
pass
self._pg = None
# ----------------------------
# Keyboard demo (pygame)
# ----------------------------
if __name__ == "__main__":
import pygame
pygame.init()
env = CarRacing(render_mode="human")
action = np.array([0.0, 0.0, 0.0], dtype=np.float32)
running = True
def handle_keys(a):
keys = pygame.key.get_pressed()
steer = 0.0
if keys[pygame.K_LEFT]:
steer -= 1.0
if keys[pygame.K_RIGHT]:
steer += 1.0
gas = 1.0 if keys[pygame.K_UP] else 0.0
brake = 0.5 if keys[pygame.K_DOWN] else 0.0
a[0], a[1], a[2] = steer, gas, brake
# initial reset
env.reset()
try:
while running:
for event in pygame.event.get():
if event.type == pygame.QUIT:
running = False
if event.type == pygame.KEYDOWN and event.key == pygame.K_RETURN:
env.fast_reset()
handle_keys(action)
obs, r, terminated, truncated, info = env.step(action)
# print every ~200 frames
if int(env.t * FPS) % 200 == 0:
ms: MyState = info.get("features")
if ms is not None:
print(
f"speed={ms.true_speed:5.2f} off_center={ms.off_center:+.2f} car_ang={ms.car_angle:+.2f} "
f"reward={r:+.2f}"
f"reward={ms.reward:+.2f}"
)
env.render()
if terminated or truncated:
env.fast_reset()
finally:
env.close()