commit a94d8c4fd3c3e3b728c85ab97f049ab888c5e25a Author: Morten Rohgalf Date: Sat Apr 18 08:57:24 2026 +0200 initial data commit. diff --git a/__init__.py b/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/car_racing_env.py b/car_racing_env.py new file mode 100644 index 0000000..3a9bfd1 --- /dev/null +++ b/car_racing_env.py @@ -0,0 +1,833 @@ +# TODO: this will be one env for both systems + +import math +import numpy as np +import logging + +import Box2D +from Box2D import (b2FixtureDef, b2PolygonShape, b2ContactListener) + +log = logging.getLogger(__name__) + +# Optional gym import for spaces only; code runs without strict gym registration +try: + import gymnasium as gym + from gymnasium import spaces + from gymnasium.utils import seeding + + GYM_AVAILABLE = True +except Exception: + GYM_AVAILABLE = False + spaces = None + seeding = None + +# Car dynamics from classic gym (Box2D) +from gymnasium.envs.box2d.car_dynamics import Car + +# --- pygame renderer --- +import pygame + +DEBUG_DRAWING = False +LOOK_AHEAD = 10 + +STATE_W = 96 +STATE_H = 96 +VIDEO_W = 1200 +VIDEO_H = 800 +WINDOW_W = 1350 +WINDOW_H = 950 + +SCALE = 6.0 +TRACK_RAD = 900 / SCALE +PLAYFIELD = 2000 / SCALE +FPS = 60 +ZOOM = 2.7 +ZOOM_FOLLOW = True + +TRACK_DETAIL_STEP = 21 / SCALE +TRACK_TURN_RATE = 0.31 +TRACK_WIDTH = 40 / SCALE +BORDER = 8 / SCALE +BORDER_MIN_COUNT = 4 + +ROAD_COLOR = [0.4, 0.4, 0.4] + +# limits & timeouts +MAX_TIME_SEC = 90.0 +MAX_STEPS = int(FPS * MAX_TIME_SEC) +NO_PROGRESS_SEC = 8.0 +NO_PROGRESS_STEPS = int(FPS * NO_PROGRESS_SEC) +STALL_MIN_SPEED = 4.0 +STALL_SEC = 4.0 +STALL_STEPS = int(FPS * STALL_SEC) +FUEL_LIMIT = 120.0 + + +# ---------------------------- +# Utilities +# ---------------------------- +def standardize_angle(theta: float) -> float: + return np.remainder(theta + np.pi, 2 * np.pi) - np.pi + + +def f2c(rgb_float): + """float [0..1] -> int [0..255] color tuple""" + return tuple(max(0, min(255, int(255 * x))) for x in rgb_float) + + +# ---------------------------- +# MyState: feature container +# ---------------------------- +class MyState: + def __init__(self): + self.angle_deltas = None + self.reward = None + self.on_road = None + self.laps = None + + self.wheel_angle = None + self.car_angle = None + self.angular_vel = None + self.true_speed = None + self.off_center = None + self.vel_angle = None + + def as_array(self, n: int): + return np.append( + self.angle_deltas[:n], + [ + self.wheel_angle, + self.car_angle, + self.angular_vel, + self.true_speed, + self.off_center, + self.vel_angle, + ], + ).astype(np.float32) + + def as_feature_vector(self, lookahead: int = LOOK_AHEAD): + return self.as_array(lookahead) + + +# ---------------------------- +# Contact listener: counts tiles, progress & reward +# ---------------------------- +class FrictionDetector(b2ContactListener): + def __init__(self, env): + super().__init__() + self.env = env + + def BeginContact(self, contact): + self._contact(contact, True) + + def EndContact(self, contact): + self._contact(contact, False) + + def _contact(self, contact, begin): + tile = None + obj = None + u1 = contact.fixtureA.body.userData + u2 = contact.fixtureB.body.userData + if u1 and "road_friction" in u1.__dict__: + tile = u1 + obj = u2 + if u2 and "road_friction" in u2.__dict__: + tile = u2 + obj = u1 + if not tile: + return + + tile.color[0] = ROAD_COLOR[0] + tile.color[1] = ROAD_COLOR[1] + tile.color[2] = ROAD_COLOR[2] + if not obj or "tiles" not in obj.__dict__: + return + if begin: + obj.tiles.add(tile) + if tile.index_on_track == self.env.next_road_tile: + self.env.reward += 1000.0 / len(self.env.track) + self.env.tile_visited_count += 1 + self.env.next_road_tile += 1 + if self.env.next_road_tile >= len(self.env.road): + self.env.next_road_tile = 0 + self.env.laps += 1 + else: + if tile in obj.tiles: + obj.tiles.remove(tile) + self.env.on_road = len(obj.tiles) > 0 + + +# ---------------------------- +# CarRacing with pygame rendering and Gym(nasium) 0.26+ compatible step/reset +# ---------------------------- +class CarRacing(gym.Env if GYM_AVAILABLE else object): + metadata = { + "render_modes": ["human", "rgb_array", None], + "render_fps": FPS, + } + + def __init__(self, seed_value: int = 5, render_mode: str | None = "human"): + self._prev_s = 0.0 + self._interp = 0.0 + + # RNG + self.offroad_frames = None + if seeding is not None: + self.np_random, _ = seeding.np_random(seed_value) + else: + self.np_random = np.random.RandomState(seed_value) + + # Physics world + self.contactListener_keepref = FrictionDetector(self) + self.world = Box2D.b2World((0, 0), contactListener=self.contactListener_keepref) + + # Gym-style spaces (optional) + if GYM_AVAILABLE: + self.action_space = spaces.Box( + np.array([-1, 0, 0], dtype=np.float32), + np.array([+1, +1, +1], dtype=np.float32), + dtype=np.float32, + ) + feat_dim = LOOK_AHEAD + 6 + self.observation_space = spaces.Box( + low=-np.inf, high=np.inf, shape=(feat_dim,), dtype=np.float32 + ) + + # State + self.viewer = None # unused (pyglet placeholder) + self.road = None + self.car = None + self.reward = 0.0 + self.prev_reward = 0.0 + + self.laps = 0 + self.on_road = True + self.ctrl_pts = None + self.outward_vectors = None + self.angles = None + self.angle_deltas = None + self.original_road_poly = None + self.indices = None + self.my_state = MyState() + self.next_road_tile = 0 + + # Rendering + self.render_mode = render_mode + self._pg = None # pygame objects container + + # Episode control + self.tile_visited_count = 0 + self.t = 0.0 + self.human_render = False + + # Build initial track + car + self._build_new_episode() + + self.offroad_frames = 0 + self.offroad_grace_frames = int(0.7 * FPS) + self.offroad_penalty_per_frame = 2.0 + + self.steps = 0 + self._last_progress_count = 0 + self._no_progress_steps = 0 + self._stall_steps = 0 + + # ------------------------ + # Helpers: pygame + # ------------------------ + class _PygameCtx: + def __init__(self): + self.initialized = False + self.screen = None + self.clock = None + self.font = None + self.rgb_surface = None # offscreen for rgb_array + + def _ensure_pygame(self): + if self._pg is None: + self._pg = self._PygameCtx() + if not self._pg.initialized: + if not pygame.get_init(): + pygame.init() + flags = 0 + if self.render_mode == "human": + self._pg.screen = pygame.display.set_mode((WINDOW_W, WINDOW_H)) + else: + # offscreen surface; we can still blit/draw onto it + self._pg.screen = pygame.Surface((WINDOW_W, WINDOW_H)) + self._pg.clock = pygame.time.Clock() + try: + pygame.font.init() + self._pg.font = pygame.font.SysFont("Arial", 20) + except Exception: + self._pg.font = None + self._pg.initialized = True + + def _world_to_screen(self, x, y, zoom, angle, scroll_x, scroll_y): + ca, sa = math.cos(angle), math.sin(angle) + # rotate around (scroll_x, scroll_y) + rx = (x - scroll_x) * ca + (y - scroll_y) * sa + ry = -(x - scroll_x) * sa + (y - scroll_y) * ca + # scale & translate (match original camera placement) + sx = int(WINDOW_W / 2 + rx * zoom) + sy = int(WINDOW_H / 4 + ry * zoom) + return sx, sy + + def get_feature_vector(self, lookahead: int = LOOK_AHEAD) -> list[float]: + my_s: MyState = self.my_state + vec = my_s.as_feature_vector(lookahead).tolist() + return vec + + def _draw_polygon_world(self, poly, color, zoom, angle, scroll_x, scroll_y): + pts = [self._world_to_screen(px, py, zoom, angle, scroll_x, scroll_y) for (px, py) in poly] + pygame.draw.polygon(self._pg.screen, f2c(color), pts) + + def _draw_body(self, body, color=(0.7, 0.7, 0.7), zoom=1.0, angle=0.0, scroll_x=0.0, scroll_y=0.0): + # Draw each fixture polygon + col = f2c(color) + for fixture in body.fixtures: + shape = fixture.shape + if isinstance(shape, b2PolygonShape): + verts = [body.transform * v for v in shape.vertices] + pts = [self._world_to_screen(v[0], v[1], zoom, angle, scroll_x, scroll_y) for v in verts] + pygame.draw.polygon(self._pg.screen, col, pts, width=0) + + # ------------------------ + # Track & episode setup + # ------------------------ + def _destroy(self): + if not self.road: + return + # userData lösen, dann Bodies zerstören + for t in self.road: + try: + t.userData = None + except Exception: + pass + try: + self.world.DestroyBody(t) + except Exception: + pass + self.road = [] + + if self.car is not None: + try: + self.car.destroy() + except Exception: + pass + self.car = None + + def _create_track(self): + CHECKPOINTS = 12 + checkpoints = [] + for c in range(CHECKPOINTS): + alpha = 2 * math.pi * c / CHECKPOINTS + self.np_random.uniform(0, 2 * math.pi * 1 / CHECKPOINTS) + rad = self.np_random.uniform(TRACK_RAD / 3, TRACK_RAD) + if c == 0: + alpha = 0 + rad = 1.5 * TRACK_RAD + if c == CHECKPOINTS - 1: + alpha = 2 * math.pi * c / CHECKPOINTS + self.start_alpha = 2 * math.pi * (-0.5) / CHECKPOINTS + rad = 1.5 * TRACK_RAD + checkpoints.append((alpha, rad * math.cos(alpha), rad * math.sin(alpha))) + + self.road = [] + + x, y, beta = 1.5 * TRACK_RAD, 0, 0 + dest_i = 0 + laps = 0 + track = [] + no_freeze = 2500 + visited_other_side = False + while True: + alpha = math.atan2(y, x) + if visited_other_side and alpha > 0: + laps += 1 + visited_other_side = False + if alpha < 0: + visited_other_side = True + alpha += 2 * math.pi + while True: + failed = True + while True: + dest_alpha, dest_x, dest_y = checkpoints[dest_i % len(checkpoints)] + if alpha <= dest_alpha: + failed = False + break + dest_i += 1 + if dest_i % len(checkpoints) == 0: + break + if not failed: + break + alpha -= 2 * math.pi + continue + r1x = math.cos(beta) + r1y = math.sin(beta) + p1x = -r1y + p1y = r1x + dest_dx = dest_x - x + dest_dy = dest_y - y + proj = r1x * dest_dx + r1y * dest_dy + while beta - alpha > 1.5 * math.pi: + beta -= 2 * math.pi + while beta - alpha < -1.5 * math.pi: + beta += 2 * math.pi + prev_beta = beta + proj *= SCALE + if proj > 0.3: + beta -= min(TRACK_TURN_RATE, abs(0.001 * proj)) + if proj < -0.3: + beta += min(TRACK_TURN_RATE, abs(0.001 * proj)) + x += p1x * TRACK_DETAIL_STEP + y += p1y * TRACK_DETAIL_STEP + track.append((alpha, prev_beta * 0.5 + beta * 0.5, x, y)) + if laps > 4: + break + no_freeze -= 1 + if no_freeze == 0: + break + + i1, i2 = -1, -1 + i = len(track) + while True: + i -= 1 + if i == 0: + return False + pass_through_start = track[i][0] > self.start_alpha >= track[i - 1][0] + if pass_through_start and i2 == -1: + i2 = i + elif pass_through_start and i1 == -1: + i1 = i + break + print(f"Track generation: {i1}..{i2} -> {i2 - i1}-tiles track") + assert i1 != -1 and i2 != -1 + + track = track[i1: i2 - 1] + + first_beta = track[0][1] + first_perp_x = math.cos(first_beta) + first_perp_y = math.sin(first_beta) + well_glued_together = np.sqrt( + np.square(first_perp_x * (track[0][2] - track[-1][2])) + + np.square(first_perp_y * (track[0][3] - track[-1][3])) + ) + if well_glued_together > TRACK_DETAIL_STEP: + return False + + border = [False] * len(track) + for i in range(len(track)): + good = True + oneside = 0 + for neg in range(BORDER_MIN_COUNT): + beta1 = track[i - neg - 0][1] + beta2 = track[i - neg - 1][1] + good &= abs(beta1 - beta2) > TRACK_TURN_RATE * 0.2 + oneside += np.sign(beta1 - beta2) + good &= abs(oneside) == BORDER_MIN_COUNT + border[i] = bool(good) + for i in range(len(track)): + for neg in range(BORDER_MIN_COUNT): + border[i - neg] |= border[i] + + self.road_poly = [] + for i in range(len(track)): + alpha1, beta1, x1, y1 = track[i] + alpha2, beta2, x2, y2 = track[i - 1] + road1_l = (x1 - TRACK_WIDTH * math.cos(beta1), y1 - TRACK_WIDTH * math.sin(beta1)) + road1_r = (x1 + TRACK_WIDTH * math.cos(beta1), y1 + TRACK_WIDTH * math.sin(beta1)) + road2_l = (x2 - TRACK_WIDTH * math.cos(beta2), y2 - TRACK_WIDTH * math.sin(beta2)) + road2_r = (x2 + TRACK_WIDTH * math.cos(beta2), y2 + TRACK_WIDTH * math.sin(beta2)) + t = self.world.CreateStaticBody( + fixtures=b2FixtureDef(shape=b2PolygonShape(vertices=[road1_l, road1_r, road2_r, road2_l])) + ) + t.userData = t + t.index_on_track = i + c = 0.01 * (i % 3) + t.color = [ROAD_COLOR[0] + c, ROAD_COLOR[1] + c, ROAD_COLOR[2] + c] + t.road_visited = False + t.road_friction = 1.0 + t.fixtures[0].sensor = True + self.road_poly.append(([road1_l, road1_r, road2_r, road2_l], t.color)) + self.road.append(t) + if border[i]: + side = np.sign(beta2 - beta1) + b1_l = (x1 + side * TRACK_WIDTH * math.cos(beta1), y1 + side * TRACK_WIDTH * math.sin(beta1)) + b1_r = ( + x1 + side * (TRACK_WIDTH + BORDER) * math.cos(beta1), + y1 + side * (TRACK_WIDTH + BORDER) * math.sin(beta1), + ) + b2_l = (x2 + side * TRACK_WIDTH * math.cos(beta2), y2 + side * TRACK_WIDTH * math.sin(beta2)) + b2_r = ( + x2 + side * (TRACK_WIDTH + BORDER) * math.cos(beta2), + y2 + side * (TRACK_WIDTH + BORDER) * math.sin(beta2), + ) + self.road_poly.append(([b1_l, b1_r, b2_r, b2_l], (1, 1, 1) if i % 2 == 0 else (1, 0, 0))) + self.track = track + + self.original_road_poly = [((list(poly)), list(color)) for (poly, color) in self.road_poly] + self.ctrl_pts = np.array(list(map(lambda x: x[2:], self.track))) + self.angles = np.array(list(map(lambda x: x[1], self.track))) + self.outward_vectors = [np.array([np.cos(theta), np.sin(theta)]) for theta in self.angles] + angle_deltas = self.angles - np.roll(self.angles, 1) + self.angle_deltas = np.array(list(map(standardize_angle, angle_deltas))) + self.indices = np.array(range(len(self.ctrl_pts))) + return True + + def _build_new_episode(self): + # build track (may retry) + self._destroy() + self.reward = 0.0 + self.prev_reward = 0.0 + self.tile_visited_count = 0 + self.t = 0.0 + self.road_poly = [] + self.human_render = False + self.laps = 0 + self.on_road = True + self.next_road_tile = 0 + + while True: + success = self._create_track() + if success: + break + print("retry to generate track (normal if there are not many of this messages)") + + self.car = Car(self.world, *self.track[0][1:4]) + + # attach tiles set to car for contact tracking + self.car.tiles = set() + + self.steps = 0 + self._last_progress_count = 0 + self._no_progress_steps = 0 + self._stall_steps = 0 + + # ------------------------ + # Public API (Gym 0.26+/Gymnasium style) + # ------------------------ + def _update_features(self): + + v1 = self.outward_vectors[self.next_road_tile - 2] + v2 = np.array(self.car.hull.position) - self.ctrl_pts[self.next_road_tile - 1] + off_center = float(np.dot(v1, v2)) + angular_vel = float(self.car.hull.angularVelocity) + vel = self.car.hull.linearVelocity + true_speed = float(np.linalg.norm(vel)) + car_angle = float(self.car.hull.angle - self.angles[self.next_road_tile]) + wheel_angle = float(self.car.wheels[0].joint.angle) + if true_speed < 0.2: + vel_angle = 0.0 + else: + vel_angle = float(math.atan2(vel[1], vel[0]) - (self.angles[self.next_road_tile] + np.pi / 2)) + + wheel_angle = standardize_angle(wheel_angle) + car_angle = standardize_angle(car_angle) + vel_angle = standardize_angle(vel_angle) + + tip = np.array((self.car.wheels[0].position + self.car.wheels[1].position) / 2) + p1 = self.ctrl_pts[self.next_road_tile - 1] + p2 = self.ctrl_pts[self.next_road_tile - 2] + u = (p1 - p2) / TRACK_DETAIL_STEP + v = (tip - p2) / TRACK_DETAIL_STEP + interp = float(np.dot(v, u)) + interp_angle_deltas = np.interp(self.indices + interp, self.indices, self.angle_deltas) + + self.my_state.angle_deltas = np.roll(interp_angle_deltas, -self.next_road_tile) + self.my_state.reward = self.reward + self.my_state.on_road = self.on_road + self.my_state.laps = self.laps + self.my_state.true_speed = true_speed + self.my_state.off_center = off_center + self.my_state.wheel_angle = wheel_angle + self.my_state.car_angle = car_angle + self.my_state.angular_vel = angular_vel + self.my_state.vel_angle = vel_angle + + # Normalization + self.my_state.angle_deltas *= 2.3 + self.my_state.true_speed /= 100.0 + self.my_state.off_center /= TRACK_WIDTH + self.my_state.wheel_angle *= 2.1 + self.my_state.car_angle *= 1.5 + self.my_state.vel_angle *= 1.5 + self.my_state.angular_vel /= 3.74 + + interp_angle_deltas = np.interp(self.indices + interp, self.indices, self.angle_deltas) + self._interp = interp # <- für Fortschritts-Reward im step() + + def reset(self, *, seed: int | None = None, options: dict | None = None): + if seed is not None: + if seeding is not None: + self.np_random, _ = seeding.np_random(seed) + else: + self.np_random = np.random.RandomState(seed) + self._build_new_episode() + # Wichtig: initiale Features befüllen + self._update_features() + obs = self.my_state.as_feature_vector(LOOK_AHEAD).astype(np.float32) + info = {} + return obs, info + + def fast_reset(self): + # keep the same track, respawn car + self.car = None + self.laps = 0 + self.on_road = True + self.next_road_tile = 0 + + self.reward = 0.0 + self.prev_reward = 0.0 + self.tile_visited_count = 0 + self.t = 0.0 + self.human_render = False + for tile in self.road: + tile.road_visited = False + self.road_poly = [((list(poly)), list(color)) for (poly, color) in self.original_road_poly] + try: + self.car.destroy() + except Exception: + pass + self.car = Car(self.world, *self.track[0][1:4]) + self.car.tiles = set() + + self.steps = 0 + self._last_progress_count = 0 + self._no_progress_steps = 0 + self._stall_steps = 0 + + return self.step(np.array([0.0, 0.0, 0.0], dtype=np.float32)) + + def step(self, action): + # log.info("got action: {}".format(action)) + # Expect action: [steer (-1..1), gas (0..1), brake (0..1)] + if action is not None: + # TODO: this was changed from -float(action[0]) + self.car.steer(float(action[0])) + self.car.gas(float(action[1])) + self.car.brake(float(action[2])) + + self.car.step(1.0 / FPS) + self.world.Step(1.0 / FPS, 6 * 30, 2 * 30) + self.t += 1.0 / FPS + + self.steps += 1 + + terminated = False + truncated = False + + # -- stall logic --- + speed = float(np.linalg.norm(self.car.hull.linearVelocity)) + if speed < STALL_MIN_SPEED: + self._stall_steps += 1 + else: + self._stall_steps = 0 + if self._stall_steps >= STALL_STEPS: + self.reward -= 15.0 + terminated = True + + if action is not None: + # (1) ALLE Reward-Änderungen zuerst einarbeiten + self.reward -= 1.0 / FPS # Zeitstrafe + + # Ziel erreicht? + if self.tile_visited_count == len(self.track): + terminated = False + self.tile_visited_count = 0 + + # Out-of-bounds: Strafe IN reward addieren (nicht step_reward überschreiben) + x, y = self.car.hull.position + if abs(x) > PLAYFIELD or abs(y) > PLAYFIELD: + self.reward -= 100.0 + terminated = True + + # Offroad: kontinuierliche Strafe + Grace-Fenster; bei Timeout Zusatzstrafe + if not self.on_road: + self.offroad_frames += 1 + self.reward -= self.offroad_penalty_per_frame / FPS + if self.offroad_frames > self.offroad_grace_frames: + self.reward -= 20.0 + terminated = True + else: + self.offroad_frames = 0 + # self.reward += 0.02 * speed / FPS + + # --- DICHTES SIGNAL: Vortrieb entlang der Streckentangente --- + # Tangentialrichtung der Strecke am aktuellen Referenz-Index: + theta = float(self.angles[self.next_road_tile]) # lokale Fahrtrichtung + t_hat = np.array([-math.sin(theta), math.cos(theta)], dtype=np.float32) + vel = np.array(self.car.hull.linearVelocity, dtype=np.float32) + forward = float(np.dot(vel, t_hat)) # Vorwärtskomponente (kann <0 sein) + + if forward > 0.0: + # self.reward += 0.03 * forward / FPS # kleiner, dichter Bonus + self.reward += 0.2 * forward / FPS + + # --- KONTINUIERLICHER STRECKENFORTSCHRITT (s-Koordinate) --- + # s = (Index des zuletzt passierten Kontrollpunkts) + Interp innerhalb des Segments + n = float(len(self.ctrl_pts)) + s_now = ((self.next_road_tile - 1) % len(self.ctrl_pts)) + float(self._interp) + ds = s_now - self._prev_s + # zyklische Korrektur (Lap-Übergang) + + if ds < -0.5 * n: + ds += n + + if ds > 0.0: + self.reward += 4.0 * ds / FPS # Fortschritt in "Tiles pro Sekunde" (klein halten!) + self._prev_s = s_now + + if self.tile_visited_count > self._last_progress_count: + self._last_progress_count = self.tile_visited_count + self._no_progress_steps = 0 + else: + self._no_progress_steps += 1 + if self._no_progress_steps >= NO_PROGRESS_STEPS: + truncated = True + + # (2) JETZT genau einmal das Delta bilden + step_reward = self.reward - self.prev_reward + self.prev_reward = self.reward + else: + step_reward = 0.0 + + # --- Feature computation (unverändert) --- + self._update_features() + + obs = self.my_state.as_feature_vector(LOOK_AHEAD).astype(np.float32) + info = {} # features nicht mehr nötig + return obs, step_reward, terminated, truncated, info + + def _get_observation(self): + # This env is feature-first; return None unless user asks for rgb_array via render() + return None + + # ------------------------ + # Rendering (pygame) + # ------------------------ + def render(self): + self._ensure_pygame() + + # Handle window events only in human mode + if self.render_mode == "human": + for event in pygame.event.get(): + if event.type == pygame.QUIT: + self.close() + return None + + # Camera math (match original) + zoom = 0.1 * SCALE * max(1 - self.t, 0) + ZOOM * SCALE * min(self.t, 1) + scroll_x = self.car.hull.position[0] + scroll_y = self.car.hull.position[1] + angle = -self.car.hull.angle + vel = self.car.hull.linearVelocity + if np.linalg.norm(vel) > 0.5: + angle = math.atan2(vel[0], vel[1]) + + # Draw grass background + self._pg.screen.fill((102, 230, 102)) + # simple grid for texture + k = PLAYFIELD / 20.0 + grid_color = (110, 240, 110) + for x in range(-20, 20, 2): + for y in range(-20, 20, 2): + x0, y0 = k * x + 0, k * y + 0 + x1, y1 = k * x + k, k * y + k + p0 = self._world_to_screen(x0, y0, zoom, angle, scroll_x, scroll_y) + p1 = self._world_to_screen(x1, y0, zoom, angle, scroll_x, scroll_y) + p2 = self._world_to_screen(x1, y1, zoom, angle, scroll_x, scroll_y) + p3 = self._world_to_screen(x0, y1, zoom, angle, scroll_x, scroll_y) + pygame.draw.polygon(self._pg.screen, grid_color, [p0, p1, p2, p3]) + + # Road polygons + for poly, color in self.road_poly: + self._draw_polygon_world(poly, color, zoom, angle, scroll_x, scroll_y) + + # Draw car hull + wheels (approx) + car_col = (0.25, 0.25, 0.25) + self._draw_body(self.car.hull, car_col, zoom, angle, scroll_x, scroll_y) + for w in self.car.wheels: + self._draw_body(w, (0.15, 0.15, 0.15), zoom, angle, scroll_x, scroll_y) + + # Indicators (speed, wheel, gyro) + if self._pg.font is not None: + # simple HUD text + txt = f"reward={self.reward:0.1f} laps={self.laps}" + surf = self._pg.font.render(txt, True, (255, 255, 255)) + self._pg.screen.blit(surf, (10, 10)) + + # Output + if self.render_mode == "human": + pygame.display.flip() + self._pg.clock.tick(FPS) + return None + else: + # Offscreen: return RGB array like gym does + arr = pygame.surfarray.array3d(self._pg.screen) # (W,H,3) + arr = np.transpose(arr, (1, 0, 2)) # -> (H,W,3) + return arr + + def close(self): + try: + if self._pg and self._pg.initialized: + pygame.display.quit() + pygame.quit() + except Exception: + pass + self._pg = None + + +# ---------------------------- +# Keyboard demo (pygame) +# ---------------------------- +if __name__ == "__main__": + import pygame + + pygame.init() + env = CarRacing(render_mode="human") + + action = np.array([0.0, 0.0, 0.0], dtype=np.float32) + running = True + + + def handle_keys(a): + keys = pygame.key.get_pressed() + steer = 0.0 + if keys[pygame.K_LEFT]: + steer -= 1.0 + if keys[pygame.K_RIGHT]: + steer += 1.0 + gas = 1.0 if keys[pygame.K_UP] else 0.0 + brake = 0.5 if keys[pygame.K_DOWN] else 0.0 + a[0], a[1], a[2] = steer, gas, brake + + + # initial reset + env.reset() + try: + while running: + for event in pygame.event.get(): + if event.type == pygame.QUIT: + running = False + if event.type == pygame.KEYDOWN and event.key == pygame.K_RETURN: + env.fast_reset() + + handle_keys(action) + obs, r, terminated, truncated, info = env.step(action) + + # print every ~200 frames + if int(env.t * FPS) % 200 == 0: + ms: MyState = info.get("features") + if ms is not None: + print( + f"speed={ms.true_speed:5.2f} off_center={ms.off_center:+.2f} car_ang={ms.car_angle:+.2f} " + f"reward={r:+.2f}" + f"reward={ms.reward:+.2f}" + ) + + env.render() + if terminated or truncated: + env.fast_reset() + + finally: + env.close() diff --git a/replay.py b/replay.py new file mode 100644 index 0000000..59cdd39 --- /dev/null +++ b/replay.py @@ -0,0 +1,12 @@ +from stable_baselines3 import TD3 +from car_racing_env import CarRacing + +test_env = CarRacing(render_mode="human") +best = TD3.load("./td3_run_2_best/best_model.zip", env=test_env) +obs, info = test_env.reset() +done = trunc = False +while not (done or trunc): + action, _ = best.predict(obs, deterministic=True) + obs, r, done, trunc, info = test_env.step(action) + test_env.render() +test_env.close() diff --git a/sac_main.py b/sac_main.py new file mode 100644 index 0000000..7361bb0 --- /dev/null +++ b/sac_main.py @@ -0,0 +1,52 @@ +import gymnasium as gym +import numpy as np +from stable_baselines3 import SAC +from stable_baselines3.common.env_util import make_vec_env +from stable_baselines3.common.monitor import Monitor +from stable_baselines3.common.vec_env import VecMonitor + +from car_racing_env import CarRacing + +SEED = 5 + + +def make_env(): + env = CarRacing(render_mode=None) + env.reset(seed=SEED) + return Monitor(env) + + +venv = make_vec_env(make_env, n_envs=1) +venv = VecMonitor(venv) + +model = SAC( + "MlpPolicy", + venv, + seed=SEED, + learning_rate=3e-4, + buffer_size=300_000, + batch_size=256, + tau=0.01, + gamma=0.99, + train_freq=(1, "step"), + gradient_steps=1, + ent_coef="auto", + target_entropy=-3, + verbose=1, + device="auto", +) + +model.learn(total_timesteps=500_000) +model.save("sac_carracing_features") + +# Testen (mit Rendern) +test_env = CarRacing(render_mode="human") +obs, _ = test_env.reset() +done = False +trunc = False +while True: + action, _ = model.predict(obs, deterministic=True) + obs, r, done, trunc, _ = test_env.step(action) + test_env.render() + if done or trunc: + obs, _ = test_env.reset() diff --git a/tb_td3/TD3_1/events.out.tfevents.1762163920.Mortens-MacBook-Air-2.local.5035.0 b/tb_td3/TD3_1/events.out.tfevents.1762163920.Mortens-MacBook-Air-2.local.5035.0 new file mode 100644 index 0000000..f562964 Binary files /dev/null and b/tb_td3/TD3_1/events.out.tfevents.1762163920.Mortens-MacBook-Air-2.local.5035.0 differ diff --git a/tb_td3/TD3_10/events.out.tfevents.1762177521.Mortens-MacBook-Air-2.local.10993.0 b/tb_td3/TD3_10/events.out.tfevents.1762177521.Mortens-MacBook-Air-2.local.10993.0 new file mode 100644 index 0000000..d93e2bb Binary files /dev/null and b/tb_td3/TD3_10/events.out.tfevents.1762177521.Mortens-MacBook-Air-2.local.10993.0 differ diff --git a/tb_td3/TD3_11/events.out.tfevents.1762180920.Mortens-MacBook-Air-2.local.12488.0 b/tb_td3/TD3_11/events.out.tfevents.1762180920.Mortens-MacBook-Air-2.local.12488.0 new file mode 100644 index 0000000..3d50376 Binary files /dev/null and b/tb_td3/TD3_11/events.out.tfevents.1762180920.Mortens-MacBook-Air-2.local.12488.0 differ diff --git a/tb_td3/TD3_12/events.out.tfevents.1762181284.Mortens-MacBook-Air-2.local.12598.0 b/tb_td3/TD3_12/events.out.tfevents.1762181284.Mortens-MacBook-Air-2.local.12598.0 new file mode 100644 index 0000000..f553e6c Binary files /dev/null and b/tb_td3/TD3_12/events.out.tfevents.1762181284.Mortens-MacBook-Air-2.local.12598.0 differ diff --git a/tb_td3/TD3_2/events.out.tfevents.1762166040.Mortens-MacBook-Air-2.local.5880.0 b/tb_td3/TD3_2/events.out.tfevents.1762166040.Mortens-MacBook-Air-2.local.5880.0 new file mode 100644 index 0000000..4738b9a Binary files /dev/null and b/tb_td3/TD3_2/events.out.tfevents.1762166040.Mortens-MacBook-Air-2.local.5880.0 differ diff --git a/tb_td3/TD3_3/events.out.tfevents.1762168022.Mortens-MacBook-Air-2.local.6688.0 b/tb_td3/TD3_3/events.out.tfevents.1762168022.Mortens-MacBook-Air-2.local.6688.0 new file mode 100644 index 0000000..7703cf1 Binary files /dev/null and b/tb_td3/TD3_3/events.out.tfevents.1762168022.Mortens-MacBook-Air-2.local.6688.0 differ diff --git a/tb_td3/TD3_4/events.out.tfevents.1762170315.Mortens-MacBook-Air-2.local.7732.0 b/tb_td3/TD3_4/events.out.tfevents.1762170315.Mortens-MacBook-Air-2.local.7732.0 new file mode 100644 index 0000000..43a5755 Binary files /dev/null and b/tb_td3/TD3_4/events.out.tfevents.1762170315.Mortens-MacBook-Air-2.local.7732.0 differ diff --git a/tb_td3/TD3_5/events.out.tfevents.1762172118.Mortens-MacBook-Air-2.local.8393.0 b/tb_td3/TD3_5/events.out.tfevents.1762172118.Mortens-MacBook-Air-2.local.8393.0 new file mode 100644 index 0000000..83bc36b Binary files /dev/null and b/tb_td3/TD3_5/events.out.tfevents.1762172118.Mortens-MacBook-Air-2.local.8393.0 differ diff --git a/tb_td3/TD3_6/events.out.tfevents.1762172808.Mortens-MacBook-Air-2.local.8755.0 b/tb_td3/TD3_6/events.out.tfevents.1762172808.Mortens-MacBook-Air-2.local.8755.0 new file mode 100644 index 0000000..27b69a7 Binary files /dev/null and b/tb_td3/TD3_6/events.out.tfevents.1762172808.Mortens-MacBook-Air-2.local.8755.0 differ diff --git a/tb_td3/TD3_7/events.out.tfevents.1762174244.Mortens-MacBook-Air-2.local.9444.0 b/tb_td3/TD3_7/events.out.tfevents.1762174244.Mortens-MacBook-Air-2.local.9444.0 new file mode 100644 index 0000000..99bc5fa Binary files /dev/null and b/tb_td3/TD3_7/events.out.tfevents.1762174244.Mortens-MacBook-Air-2.local.9444.0 differ diff --git a/tb_td3/TD3_8/events.out.tfevents.1762174986.Mortens-MacBook-Air-2.local.9771.0 b/tb_td3/TD3_8/events.out.tfevents.1762174986.Mortens-MacBook-Air-2.local.9771.0 new file mode 100644 index 0000000..772c6db Binary files /dev/null and b/tb_td3/TD3_8/events.out.tfevents.1762174986.Mortens-MacBook-Air-2.local.9771.0 differ diff --git a/tb_td3/TD3_9/events.out.tfevents.1762175874.Mortens-MacBook-Air-2.local.10222.0 b/tb_td3/TD3_9/events.out.tfevents.1762175874.Mortens-MacBook-Air-2.local.10222.0 new file mode 100644 index 0000000..3b6d02e Binary files /dev/null and b/tb_td3/TD3_9/events.out.tfevents.1762175874.Mortens-MacBook-Air-2.local.10222.0 differ diff --git a/tb_td3_run_2/TD3_1/events.out.tfevents.1762184080.Mortens-MacBook-Air-2.local.13399.0 b/tb_td3_run_2/TD3_1/events.out.tfevents.1762184080.Mortens-MacBook-Air-2.local.13399.0 new file mode 100644 index 0000000..bd7b66f Binary files /dev/null and b/tb_td3_run_2/TD3_1/events.out.tfevents.1762184080.Mortens-MacBook-Air-2.local.13399.0 differ diff --git a/tb_td3_run_2/TD3_2/events.out.tfevents.1762184383.Mortens-MacBook-Air-2.local.13549.0 b/tb_td3_run_2/TD3_2/events.out.tfevents.1762184383.Mortens-MacBook-Air-2.local.13549.0 new file mode 100644 index 0000000..6dca29b Binary files /dev/null and b/tb_td3_run_2/TD3_2/events.out.tfevents.1762184383.Mortens-MacBook-Air-2.local.13549.0 differ diff --git a/tb_td3_run_2/TD3_3/events.out.tfevents.1762184618.Mortens-MacBook-Air-2.local.13648.0 b/tb_td3_run_2/TD3_3/events.out.tfevents.1762184618.Mortens-MacBook-Air-2.local.13648.0 new file mode 100644 index 0000000..95a73a6 Binary files /dev/null and b/tb_td3_run_2/TD3_3/events.out.tfevents.1762184618.Mortens-MacBook-Air-2.local.13648.0 differ diff --git a/tb_td3_run_2/TD3_4/events.out.tfevents.1762187653.Mortens-MacBook-Air-2.local.15306.0 b/tb_td3_run_2/TD3_4/events.out.tfevents.1762187653.Mortens-MacBook-Air-2.local.15306.0 new file mode 100644 index 0000000..239d691 Binary files /dev/null and b/tb_td3_run_2/TD3_4/events.out.tfevents.1762187653.Mortens-MacBook-Air-2.local.15306.0 differ diff --git a/tb_td3_run_2/TD3_5/events.out.tfevents.1762190221.Mortens-MacBook-Air-2.local.16164.0 b/tb_td3_run_2/TD3_5/events.out.tfevents.1762190221.Mortens-MacBook-Air-2.local.16164.0 new file mode 100644 index 0000000..6ec376d Binary files /dev/null and b/tb_td3_run_2/TD3_5/events.out.tfevents.1762190221.Mortens-MacBook-Air-2.local.16164.0 differ diff --git a/td3_best/best_model.zip b/td3_best/best_model.zip new file mode 100644 index 0000000..f45312d Binary files /dev/null and b/td3_best/best_model.zip differ diff --git a/td3_eval/evaluations.npz b/td3_eval/evaluations.npz new file mode 100644 index 0000000..df2efba Binary files /dev/null and b/td3_eval/evaluations.npz differ diff --git a/td3_main.py b/td3_main.py new file mode 100644 index 0000000..61453db --- /dev/null +++ b/td3_main.py @@ -0,0 +1,61 @@ +import os +import numpy as np +from stable_baselines3 import TD3 +from stable_baselines3.common.monitor import Monitor +from stable_baselines3.common.callbacks import EvalCallback, StopTrainingOnNoModelImprovement +from stable_baselines3.common.env_util import make_vec_env +import torch as th + +from car_racing_env import CarRacing # <-- Pfad zu deiner Datei + + +if __name__ == "__main__": + # Optional: reproducibility + np.random.seed(0) + th.manual_seed(0) + + run_name = "td3_run_2" # oder datetime.now().strftime("%Y%m%d_%H%M") + + tensorboard_log = f"./tb_{run_name}/" + best_model_path = f"./{run_name}_best/" + eval_log_path = f"./{run_name}_eval/" + model_save_path = f"./{run_name}_models/" + + os.makedirs(model_save_path, exist_ok=True) + + train_env = Monitor(CarRacing(seed_value=0, render_mode=None)) + model = TD3( + policy="MlpPolicy", + env=train_env, + verbose=1, + tensorboard_log=tensorboard_log, + learning_starts=20_000, + ) + + eval_env = Monitor(CarRacing(seed_value=1, render_mode=None)) + stop_cb = StopTrainingOnNoModelImprovement( + max_no_improvement_evals=20, min_evals=5, verbose=1 + ) + eval_cb = EvalCallback( + eval_env, + best_model_save_path=best_model_path, + log_path=eval_log_path, + eval_freq=5_000, + deterministic=True, + render=False, + callback_after_eval=stop_cb, + ) + + model.learn(total_timesteps=400_000, callback=eval_cb, progress_bar=True) + model.save(f"{model_save_path}/td3_carracing_features") + + # Kurzer Testlauf mit Rendering (optional) + test_env = CarRacing(seed_value=0, render_mode="human") + obs, info = test_env.reset() + done = False + trunc = False + while not (done or trunc): + action, _ = model.predict(obs, deterministic=True) + obs, reward, done, trunc, info = test_env.step(action) + test_env.render() + test_env.close() diff --git a/td3_models/td3_carracing_features.zip b/td3_models/td3_carracing_features.zip new file mode 100644 index 0000000..198701d Binary files /dev/null and b/td3_models/td3_carracing_features.zip differ diff --git a/td3_run_2_best/best_model.zip b/td3_run_2_best/best_model.zip new file mode 100644 index 0000000..d1dc841 Binary files /dev/null and b/td3_run_2_best/best_model.zip differ diff --git a/td3_run_2_eval/evaluations.npz b/td3_run_2_eval/evaluations.npz new file mode 100644 index 0000000..4b3a9d7 Binary files /dev/null and b/td3_run_2_eval/evaluations.npz differ diff --git a/td3_run_2_models/td3_carracing_features.zip b/td3_run_2_models/td3_carracing_features.zip new file mode 100644 index 0000000..efada5c Binary files /dev/null and b/td3_run_2_models/td3_carracing_features.zip differ