834 lines
29 KiB
Python
834 lines
29 KiB
Python
# TODO: this will be one env for both systems
|
|
|
|
import math
|
|
import numpy as np
|
|
import logging
|
|
|
|
import Box2D
|
|
from Box2D import (b2FixtureDef, b2PolygonShape, b2ContactListener)
|
|
|
|
log = logging.getLogger(__name__)
|
|
|
|
# Optional gym import for spaces only; code runs without strict gym registration
|
|
try:
|
|
import gymnasium as gym
|
|
from gymnasium import spaces
|
|
from gymnasium.utils import seeding
|
|
|
|
GYM_AVAILABLE = True
|
|
except Exception:
|
|
GYM_AVAILABLE = False
|
|
spaces = None
|
|
seeding = None
|
|
|
|
# Car dynamics from classic gym (Box2D)
|
|
from gymnasium.envs.box2d.car_dynamics import Car
|
|
|
|
# --- pygame renderer ---
|
|
import pygame
|
|
|
|
DEBUG_DRAWING = False
|
|
LOOK_AHEAD = 10
|
|
|
|
STATE_W = 96
|
|
STATE_H = 96
|
|
VIDEO_W = 1200
|
|
VIDEO_H = 800
|
|
WINDOW_W = 1350
|
|
WINDOW_H = 950
|
|
|
|
SCALE = 6.0
|
|
TRACK_RAD = 900 / SCALE
|
|
PLAYFIELD = 2000 / SCALE
|
|
FPS = 60
|
|
ZOOM = 2.7
|
|
ZOOM_FOLLOW = True
|
|
|
|
TRACK_DETAIL_STEP = 21 / SCALE
|
|
TRACK_TURN_RATE = 0.31
|
|
TRACK_WIDTH = 40 / SCALE
|
|
BORDER = 8 / SCALE
|
|
BORDER_MIN_COUNT = 4
|
|
|
|
ROAD_COLOR = [0.4, 0.4, 0.4]
|
|
|
|
# limits & timeouts
|
|
MAX_TIME_SEC = 90.0
|
|
MAX_STEPS = int(FPS * MAX_TIME_SEC)
|
|
NO_PROGRESS_SEC = 8.0
|
|
NO_PROGRESS_STEPS = int(FPS * NO_PROGRESS_SEC)
|
|
STALL_MIN_SPEED = 4.0
|
|
STALL_SEC = 4.0
|
|
STALL_STEPS = int(FPS * STALL_SEC)
|
|
FUEL_LIMIT = 120.0
|
|
|
|
|
|
# ----------------------------
|
|
# Utilities
|
|
# ----------------------------
|
|
def standardize_angle(theta: float) -> float:
|
|
return np.remainder(theta + np.pi, 2 * np.pi) - np.pi
|
|
|
|
|
|
def f2c(rgb_float):
|
|
"""float [0..1] -> int [0..255] color tuple"""
|
|
return tuple(max(0, min(255, int(255 * x))) for x in rgb_float)
|
|
|
|
|
|
# ----------------------------
|
|
# MyState: feature container
|
|
# ----------------------------
|
|
class MyState:
|
|
def __init__(self):
|
|
self.angle_deltas = None
|
|
self.reward = None
|
|
self.on_road = None
|
|
self.laps = None
|
|
|
|
self.wheel_angle = None
|
|
self.car_angle = None
|
|
self.angular_vel = None
|
|
self.true_speed = None
|
|
self.off_center = None
|
|
self.vel_angle = None
|
|
|
|
def as_array(self, n: int):
|
|
return np.append(
|
|
self.angle_deltas[:n],
|
|
[
|
|
self.wheel_angle,
|
|
self.car_angle,
|
|
self.angular_vel,
|
|
self.true_speed,
|
|
self.off_center,
|
|
self.vel_angle,
|
|
],
|
|
).astype(np.float32)
|
|
|
|
def as_feature_vector(self, lookahead: int = LOOK_AHEAD):
|
|
return self.as_array(lookahead)
|
|
|
|
|
|
# ----------------------------
|
|
# Contact listener: counts tiles, progress & reward
|
|
# ----------------------------
|
|
class FrictionDetector(b2ContactListener):
|
|
def __init__(self, env):
|
|
super().__init__()
|
|
self.env = env
|
|
|
|
def BeginContact(self, contact):
|
|
self._contact(contact, True)
|
|
|
|
def EndContact(self, contact):
|
|
self._contact(contact, False)
|
|
|
|
def _contact(self, contact, begin):
|
|
tile = None
|
|
obj = None
|
|
u1 = contact.fixtureA.body.userData
|
|
u2 = contact.fixtureB.body.userData
|
|
if u1 and "road_friction" in u1.__dict__:
|
|
tile = u1
|
|
obj = u2
|
|
if u2 and "road_friction" in u2.__dict__:
|
|
tile = u2
|
|
obj = u1
|
|
if not tile:
|
|
return
|
|
|
|
tile.color[0] = ROAD_COLOR[0]
|
|
tile.color[1] = ROAD_COLOR[1]
|
|
tile.color[2] = ROAD_COLOR[2]
|
|
if not obj or "tiles" not in obj.__dict__:
|
|
return
|
|
if begin:
|
|
obj.tiles.add(tile)
|
|
if tile.index_on_track == self.env.next_road_tile:
|
|
self.env.reward += 1000.0 / len(self.env.track)
|
|
self.env.tile_visited_count += 1
|
|
self.env.next_road_tile += 1
|
|
if self.env.next_road_tile >= len(self.env.road):
|
|
self.env.next_road_tile = 0
|
|
self.env.laps += 1
|
|
else:
|
|
if tile in obj.tiles:
|
|
obj.tiles.remove(tile)
|
|
self.env.on_road = len(obj.tiles) > 0
|
|
|
|
|
|
# ----------------------------
|
|
# CarRacing with pygame rendering and Gym(nasium) 0.26+ compatible step/reset
|
|
# ----------------------------
|
|
class CarRacing(gym.Env if GYM_AVAILABLE else object):
|
|
metadata = {
|
|
"render_modes": ["human", "rgb_array", None],
|
|
"render_fps": FPS,
|
|
}
|
|
|
|
def __init__(self, seed_value: int = 5, render_mode: str | None = "human"):
|
|
self._prev_s = 0.0
|
|
self._interp = 0.0
|
|
|
|
# RNG
|
|
self.offroad_frames = None
|
|
if seeding is not None:
|
|
self.np_random, _ = seeding.np_random(seed_value)
|
|
else:
|
|
self.np_random = np.random.RandomState(seed_value)
|
|
|
|
# Physics world
|
|
self.contactListener_keepref = FrictionDetector(self)
|
|
self.world = Box2D.b2World((0, 0), contactListener=self.contactListener_keepref)
|
|
|
|
# Gym-style spaces (optional)
|
|
if GYM_AVAILABLE:
|
|
self.action_space = spaces.Box(
|
|
np.array([-1, 0, 0], dtype=np.float32),
|
|
np.array([+1, +1, +1], dtype=np.float32),
|
|
dtype=np.float32,
|
|
)
|
|
feat_dim = LOOK_AHEAD + 6
|
|
self.observation_space = spaces.Box(
|
|
low=-np.inf, high=np.inf, shape=(feat_dim,), dtype=np.float32
|
|
)
|
|
|
|
# State
|
|
self.viewer = None # unused (pyglet placeholder)
|
|
self.road = None
|
|
self.car = None
|
|
self.reward = 0.0
|
|
self.prev_reward = 0.0
|
|
|
|
self.laps = 0
|
|
self.on_road = True
|
|
self.ctrl_pts = None
|
|
self.outward_vectors = None
|
|
self.angles = None
|
|
self.angle_deltas = None
|
|
self.original_road_poly = None
|
|
self.indices = None
|
|
self.my_state = MyState()
|
|
self.next_road_tile = 0
|
|
|
|
# Rendering
|
|
self.render_mode = render_mode
|
|
self._pg = None # pygame objects container
|
|
|
|
# Episode control
|
|
self.tile_visited_count = 0
|
|
self.t = 0.0
|
|
self.human_render = False
|
|
|
|
# Build initial track + car
|
|
self._build_new_episode()
|
|
|
|
self.offroad_frames = 0
|
|
self.offroad_grace_frames = int(0.7 * FPS)
|
|
self.offroad_penalty_per_frame = 2.0
|
|
|
|
self.steps = 0
|
|
self._last_progress_count = 0
|
|
self._no_progress_steps = 0
|
|
self._stall_steps = 0
|
|
|
|
# ------------------------
|
|
# Helpers: pygame
|
|
# ------------------------
|
|
class _PygameCtx:
|
|
def __init__(self):
|
|
self.initialized = False
|
|
self.screen = None
|
|
self.clock = None
|
|
self.font = None
|
|
self.rgb_surface = None # offscreen for rgb_array
|
|
|
|
def _ensure_pygame(self):
|
|
if self._pg is None:
|
|
self._pg = self._PygameCtx()
|
|
if not self._pg.initialized:
|
|
if not pygame.get_init():
|
|
pygame.init()
|
|
flags = 0
|
|
if self.render_mode == "human":
|
|
self._pg.screen = pygame.display.set_mode((WINDOW_W, WINDOW_H))
|
|
else:
|
|
# offscreen surface; we can still blit/draw onto it
|
|
self._pg.screen = pygame.Surface((WINDOW_W, WINDOW_H))
|
|
self._pg.clock = pygame.time.Clock()
|
|
try:
|
|
pygame.font.init()
|
|
self._pg.font = pygame.font.SysFont("Arial", 20)
|
|
except Exception:
|
|
self._pg.font = None
|
|
self._pg.initialized = True
|
|
|
|
def _world_to_screen(self, x, y, zoom, angle, scroll_x, scroll_y):
|
|
ca, sa = math.cos(angle), math.sin(angle)
|
|
# rotate around (scroll_x, scroll_y)
|
|
rx = (x - scroll_x) * ca + (y - scroll_y) * sa
|
|
ry = -(x - scroll_x) * sa + (y - scroll_y) * ca
|
|
# scale & translate (match original camera placement)
|
|
sx = int(WINDOW_W / 2 + rx * zoom)
|
|
sy = int(WINDOW_H / 4 + ry * zoom)
|
|
return sx, sy
|
|
|
|
def get_feature_vector(self, lookahead: int = LOOK_AHEAD) -> list[float]:
|
|
my_s: MyState = self.my_state
|
|
vec = my_s.as_feature_vector(lookahead).tolist()
|
|
return vec
|
|
|
|
def _draw_polygon_world(self, poly, color, zoom, angle, scroll_x, scroll_y):
|
|
pts = [self._world_to_screen(px, py, zoom, angle, scroll_x, scroll_y) for (px, py) in poly]
|
|
pygame.draw.polygon(self._pg.screen, f2c(color), pts)
|
|
|
|
def _draw_body(self, body, color=(0.7, 0.7, 0.7), zoom=1.0, angle=0.0, scroll_x=0.0, scroll_y=0.0):
|
|
# Draw each fixture polygon
|
|
col = f2c(color)
|
|
for fixture in body.fixtures:
|
|
shape = fixture.shape
|
|
if isinstance(shape, b2PolygonShape):
|
|
verts = [body.transform * v for v in shape.vertices]
|
|
pts = [self._world_to_screen(v[0], v[1], zoom, angle, scroll_x, scroll_y) for v in verts]
|
|
pygame.draw.polygon(self._pg.screen, col, pts, width=0)
|
|
|
|
# ------------------------
|
|
# Track & episode setup
|
|
# ------------------------
|
|
def _destroy(self):
|
|
if not self.road:
|
|
return
|
|
# userData lösen, dann Bodies zerstören
|
|
for t in self.road:
|
|
try:
|
|
t.userData = None
|
|
except Exception:
|
|
pass
|
|
try:
|
|
self.world.DestroyBody(t)
|
|
except Exception:
|
|
pass
|
|
self.road = []
|
|
|
|
if self.car is not None:
|
|
try:
|
|
self.car.destroy()
|
|
except Exception:
|
|
pass
|
|
self.car = None
|
|
|
|
def _create_track(self):
|
|
CHECKPOINTS = 12
|
|
checkpoints = []
|
|
for c in range(CHECKPOINTS):
|
|
alpha = 2 * math.pi * c / CHECKPOINTS + self.np_random.uniform(0, 2 * math.pi * 1 / CHECKPOINTS)
|
|
rad = self.np_random.uniform(TRACK_RAD / 3, TRACK_RAD)
|
|
if c == 0:
|
|
alpha = 0
|
|
rad = 1.5 * TRACK_RAD
|
|
if c == CHECKPOINTS - 1:
|
|
alpha = 2 * math.pi * c / CHECKPOINTS
|
|
self.start_alpha = 2 * math.pi * (-0.5) / CHECKPOINTS
|
|
rad = 1.5 * TRACK_RAD
|
|
checkpoints.append((alpha, rad * math.cos(alpha), rad * math.sin(alpha)))
|
|
|
|
self.road = []
|
|
|
|
x, y, beta = 1.5 * TRACK_RAD, 0, 0
|
|
dest_i = 0
|
|
laps = 0
|
|
track = []
|
|
no_freeze = 2500
|
|
visited_other_side = False
|
|
while True:
|
|
alpha = math.atan2(y, x)
|
|
if visited_other_side and alpha > 0:
|
|
laps += 1
|
|
visited_other_side = False
|
|
if alpha < 0:
|
|
visited_other_side = True
|
|
alpha += 2 * math.pi
|
|
while True:
|
|
failed = True
|
|
while True:
|
|
dest_alpha, dest_x, dest_y = checkpoints[dest_i % len(checkpoints)]
|
|
if alpha <= dest_alpha:
|
|
failed = False
|
|
break
|
|
dest_i += 1
|
|
if dest_i % len(checkpoints) == 0:
|
|
break
|
|
if not failed:
|
|
break
|
|
alpha -= 2 * math.pi
|
|
continue
|
|
r1x = math.cos(beta)
|
|
r1y = math.sin(beta)
|
|
p1x = -r1y
|
|
p1y = r1x
|
|
dest_dx = dest_x - x
|
|
dest_dy = dest_y - y
|
|
proj = r1x * dest_dx + r1y * dest_dy
|
|
while beta - alpha > 1.5 * math.pi:
|
|
beta -= 2 * math.pi
|
|
while beta - alpha < -1.5 * math.pi:
|
|
beta += 2 * math.pi
|
|
prev_beta = beta
|
|
proj *= SCALE
|
|
if proj > 0.3:
|
|
beta -= min(TRACK_TURN_RATE, abs(0.001 * proj))
|
|
if proj < -0.3:
|
|
beta += min(TRACK_TURN_RATE, abs(0.001 * proj))
|
|
x += p1x * TRACK_DETAIL_STEP
|
|
y += p1y * TRACK_DETAIL_STEP
|
|
track.append((alpha, prev_beta * 0.5 + beta * 0.5, x, y))
|
|
if laps > 4:
|
|
break
|
|
no_freeze -= 1
|
|
if no_freeze == 0:
|
|
break
|
|
|
|
i1, i2 = -1, -1
|
|
i = len(track)
|
|
while True:
|
|
i -= 1
|
|
if i == 0:
|
|
return False
|
|
pass_through_start = track[i][0] > self.start_alpha >= track[i - 1][0]
|
|
if pass_through_start and i2 == -1:
|
|
i2 = i
|
|
elif pass_through_start and i1 == -1:
|
|
i1 = i
|
|
break
|
|
print(f"Track generation: {i1}..{i2} -> {i2 - i1}-tiles track")
|
|
assert i1 != -1 and i2 != -1
|
|
|
|
track = track[i1: i2 - 1]
|
|
|
|
first_beta = track[0][1]
|
|
first_perp_x = math.cos(first_beta)
|
|
first_perp_y = math.sin(first_beta)
|
|
well_glued_together = np.sqrt(
|
|
np.square(first_perp_x * (track[0][2] - track[-1][2]))
|
|
+ np.square(first_perp_y * (track[0][3] - track[-1][3]))
|
|
)
|
|
if well_glued_together > TRACK_DETAIL_STEP:
|
|
return False
|
|
|
|
border = [False] * len(track)
|
|
for i in range(len(track)):
|
|
good = True
|
|
oneside = 0
|
|
for neg in range(BORDER_MIN_COUNT):
|
|
beta1 = track[i - neg - 0][1]
|
|
beta2 = track[i - neg - 1][1]
|
|
good &= abs(beta1 - beta2) > TRACK_TURN_RATE * 0.2
|
|
oneside += np.sign(beta1 - beta2)
|
|
good &= abs(oneside) == BORDER_MIN_COUNT
|
|
border[i] = bool(good)
|
|
for i in range(len(track)):
|
|
for neg in range(BORDER_MIN_COUNT):
|
|
border[i - neg] |= border[i]
|
|
|
|
self.road_poly = []
|
|
for i in range(len(track)):
|
|
alpha1, beta1, x1, y1 = track[i]
|
|
alpha2, beta2, x2, y2 = track[i - 1]
|
|
road1_l = (x1 - TRACK_WIDTH * math.cos(beta1), y1 - TRACK_WIDTH * math.sin(beta1))
|
|
road1_r = (x1 + TRACK_WIDTH * math.cos(beta1), y1 + TRACK_WIDTH * math.sin(beta1))
|
|
road2_l = (x2 - TRACK_WIDTH * math.cos(beta2), y2 - TRACK_WIDTH * math.sin(beta2))
|
|
road2_r = (x2 + TRACK_WIDTH * math.cos(beta2), y2 + TRACK_WIDTH * math.sin(beta2))
|
|
t = self.world.CreateStaticBody(
|
|
fixtures=b2FixtureDef(shape=b2PolygonShape(vertices=[road1_l, road1_r, road2_r, road2_l]))
|
|
)
|
|
t.userData = t
|
|
t.index_on_track = i
|
|
c = 0.01 * (i % 3)
|
|
t.color = [ROAD_COLOR[0] + c, ROAD_COLOR[1] + c, ROAD_COLOR[2] + c]
|
|
t.road_visited = False
|
|
t.road_friction = 1.0
|
|
t.fixtures[0].sensor = True
|
|
self.road_poly.append(([road1_l, road1_r, road2_r, road2_l], t.color))
|
|
self.road.append(t)
|
|
if border[i]:
|
|
side = np.sign(beta2 - beta1)
|
|
b1_l = (x1 + side * TRACK_WIDTH * math.cos(beta1), y1 + side * TRACK_WIDTH * math.sin(beta1))
|
|
b1_r = (
|
|
x1 + side * (TRACK_WIDTH + BORDER) * math.cos(beta1),
|
|
y1 + side * (TRACK_WIDTH + BORDER) * math.sin(beta1),
|
|
)
|
|
b2_l = (x2 + side * TRACK_WIDTH * math.cos(beta2), y2 + side * TRACK_WIDTH * math.sin(beta2))
|
|
b2_r = (
|
|
x2 + side * (TRACK_WIDTH + BORDER) * math.cos(beta2),
|
|
y2 + side * (TRACK_WIDTH + BORDER) * math.sin(beta2),
|
|
)
|
|
self.road_poly.append(([b1_l, b1_r, b2_r, b2_l], (1, 1, 1) if i % 2 == 0 else (1, 0, 0)))
|
|
self.track = track
|
|
|
|
self.original_road_poly = [((list(poly)), list(color)) for (poly, color) in self.road_poly]
|
|
self.ctrl_pts = np.array(list(map(lambda x: x[2:], self.track)))
|
|
self.angles = np.array(list(map(lambda x: x[1], self.track)))
|
|
self.outward_vectors = [np.array([np.cos(theta), np.sin(theta)]) for theta in self.angles]
|
|
angle_deltas = self.angles - np.roll(self.angles, 1)
|
|
self.angle_deltas = np.array(list(map(standardize_angle, angle_deltas)))
|
|
self.indices = np.array(range(len(self.ctrl_pts)))
|
|
return True
|
|
|
|
def _build_new_episode(self):
|
|
# build track (may retry)
|
|
self._destroy()
|
|
self.reward = 0.0
|
|
self.prev_reward = 0.0
|
|
self.tile_visited_count = 0
|
|
self.t = 0.0
|
|
self.road_poly = []
|
|
self.human_render = False
|
|
self.laps = 0
|
|
self.on_road = True
|
|
self.next_road_tile = 0
|
|
|
|
while True:
|
|
success = self._create_track()
|
|
if success:
|
|
break
|
|
print("retry to generate track (normal if there are not many of this messages)")
|
|
|
|
self.car = Car(self.world, *self.track[0][1:4])
|
|
|
|
# attach tiles set to car for contact tracking
|
|
self.car.tiles = set()
|
|
|
|
self.steps = 0
|
|
self._last_progress_count = 0
|
|
self._no_progress_steps = 0
|
|
self._stall_steps = 0
|
|
|
|
# ------------------------
|
|
# Public API (Gym 0.26+/Gymnasium style)
|
|
# ------------------------
|
|
def _update_features(self):
|
|
|
|
v1 = self.outward_vectors[self.next_road_tile - 2]
|
|
v2 = np.array(self.car.hull.position) - self.ctrl_pts[self.next_road_tile - 1]
|
|
off_center = float(np.dot(v1, v2))
|
|
angular_vel = float(self.car.hull.angularVelocity)
|
|
vel = self.car.hull.linearVelocity
|
|
true_speed = float(np.linalg.norm(vel))
|
|
car_angle = float(self.car.hull.angle - self.angles[self.next_road_tile])
|
|
wheel_angle = float(self.car.wheels[0].joint.angle)
|
|
if true_speed < 0.2:
|
|
vel_angle = 0.0
|
|
else:
|
|
vel_angle = float(math.atan2(vel[1], vel[0]) - (self.angles[self.next_road_tile] + np.pi / 2))
|
|
|
|
wheel_angle = standardize_angle(wheel_angle)
|
|
car_angle = standardize_angle(car_angle)
|
|
vel_angle = standardize_angle(vel_angle)
|
|
|
|
tip = np.array((self.car.wheels[0].position + self.car.wheels[1].position) / 2)
|
|
p1 = self.ctrl_pts[self.next_road_tile - 1]
|
|
p2 = self.ctrl_pts[self.next_road_tile - 2]
|
|
u = (p1 - p2) / TRACK_DETAIL_STEP
|
|
v = (tip - p2) / TRACK_DETAIL_STEP
|
|
interp = float(np.dot(v, u))
|
|
interp_angle_deltas = np.interp(self.indices + interp, self.indices, self.angle_deltas)
|
|
|
|
self.my_state.angle_deltas = np.roll(interp_angle_deltas, -self.next_road_tile)
|
|
self.my_state.reward = self.reward
|
|
self.my_state.on_road = self.on_road
|
|
self.my_state.laps = self.laps
|
|
self.my_state.true_speed = true_speed
|
|
self.my_state.off_center = off_center
|
|
self.my_state.wheel_angle = wheel_angle
|
|
self.my_state.car_angle = car_angle
|
|
self.my_state.angular_vel = angular_vel
|
|
self.my_state.vel_angle = vel_angle
|
|
|
|
# Normalization
|
|
self.my_state.angle_deltas *= 2.3
|
|
self.my_state.true_speed /= 100.0
|
|
self.my_state.off_center /= TRACK_WIDTH
|
|
self.my_state.wheel_angle *= 2.1
|
|
self.my_state.car_angle *= 1.5
|
|
self.my_state.vel_angle *= 1.5
|
|
self.my_state.angular_vel /= 3.74
|
|
|
|
interp_angle_deltas = np.interp(self.indices + interp, self.indices, self.angle_deltas)
|
|
self._interp = interp # <- für Fortschritts-Reward im step()
|
|
|
|
def reset(self, *, seed: int | None = None, options: dict | None = None):
|
|
if seed is not None:
|
|
if seeding is not None:
|
|
self.np_random, _ = seeding.np_random(seed)
|
|
else:
|
|
self.np_random = np.random.RandomState(seed)
|
|
self._build_new_episode()
|
|
# Wichtig: initiale Features befüllen
|
|
self._update_features()
|
|
obs = self.my_state.as_feature_vector(LOOK_AHEAD).astype(np.float32)
|
|
info = {}
|
|
return obs, info
|
|
|
|
def fast_reset(self):
|
|
# keep the same track, respawn car
|
|
self.car = None
|
|
self.laps = 0
|
|
self.on_road = True
|
|
self.next_road_tile = 0
|
|
|
|
self.reward = 0.0
|
|
self.prev_reward = 0.0
|
|
self.tile_visited_count = 0
|
|
self.t = 0.0
|
|
self.human_render = False
|
|
for tile in self.road:
|
|
tile.road_visited = False
|
|
self.road_poly = [((list(poly)), list(color)) for (poly, color) in self.original_road_poly]
|
|
try:
|
|
self.car.destroy()
|
|
except Exception:
|
|
pass
|
|
self.car = Car(self.world, *self.track[0][1:4])
|
|
self.car.tiles = set()
|
|
|
|
self.steps = 0
|
|
self._last_progress_count = 0
|
|
self._no_progress_steps = 0
|
|
self._stall_steps = 0
|
|
|
|
return self.step(np.array([0.0, 0.0, 0.0], dtype=np.float32))
|
|
|
|
def step(self, action):
|
|
# log.info("got action: {}".format(action))
|
|
# Expect action: [steer (-1..1), gas (0..1), brake (0..1)]
|
|
if action is not None:
|
|
# TODO: this was changed from -float(action[0])
|
|
self.car.steer(float(action[0]))
|
|
self.car.gas(float(action[1]))
|
|
self.car.brake(float(action[2]))
|
|
|
|
self.car.step(1.0 / FPS)
|
|
self.world.Step(1.0 / FPS, 6 * 30, 2 * 30)
|
|
self.t += 1.0 / FPS
|
|
|
|
self.steps += 1
|
|
|
|
terminated = False
|
|
truncated = False
|
|
|
|
# -- stall logic ---
|
|
speed = float(np.linalg.norm(self.car.hull.linearVelocity))
|
|
if speed < STALL_MIN_SPEED:
|
|
self._stall_steps += 1
|
|
else:
|
|
self._stall_steps = 0
|
|
if self._stall_steps >= STALL_STEPS:
|
|
self.reward -= 15.0
|
|
terminated = True
|
|
|
|
if action is not None:
|
|
# (1) ALLE Reward-Änderungen zuerst einarbeiten
|
|
self.reward -= 1.0 / FPS # Zeitstrafe
|
|
|
|
# Ziel erreicht?
|
|
if self.tile_visited_count == len(self.track):
|
|
terminated = False
|
|
self.tile_visited_count = 0
|
|
|
|
# Out-of-bounds: Strafe IN reward addieren (nicht step_reward überschreiben)
|
|
x, y = self.car.hull.position
|
|
if abs(x) > PLAYFIELD or abs(y) > PLAYFIELD:
|
|
self.reward -= 100.0
|
|
terminated = True
|
|
|
|
# Offroad: kontinuierliche Strafe + Grace-Fenster; bei Timeout Zusatzstrafe
|
|
if not self.on_road:
|
|
self.offroad_frames += 1
|
|
self.reward -= self.offroad_penalty_per_frame / FPS
|
|
if self.offroad_frames > self.offroad_grace_frames:
|
|
self.reward -= 20.0
|
|
terminated = True
|
|
else:
|
|
self.offroad_frames = 0
|
|
# self.reward += 0.02 * speed / FPS
|
|
|
|
# --- DICHTES SIGNAL: Vortrieb entlang der Streckentangente ---
|
|
# Tangentialrichtung der Strecke am aktuellen Referenz-Index:
|
|
theta = float(self.angles[self.next_road_tile]) # lokale Fahrtrichtung
|
|
t_hat = np.array([-math.sin(theta), math.cos(theta)], dtype=np.float32)
|
|
vel = np.array(self.car.hull.linearVelocity, dtype=np.float32)
|
|
forward = float(np.dot(vel, t_hat)) # Vorwärtskomponente (kann <0 sein)
|
|
|
|
if forward > 0.0:
|
|
# self.reward += 0.03 * forward / FPS # kleiner, dichter Bonus
|
|
self.reward += 0.2 * forward / FPS
|
|
|
|
# --- KONTINUIERLICHER STRECKENFORTSCHRITT (s-Koordinate) ---
|
|
# s = (Index des zuletzt passierten Kontrollpunkts) + Interp innerhalb des Segments
|
|
n = float(len(self.ctrl_pts))
|
|
s_now = ((self.next_road_tile - 1) % len(self.ctrl_pts)) + float(self._interp)
|
|
ds = s_now - self._prev_s
|
|
# zyklische Korrektur (Lap-Übergang)
|
|
|
|
if ds < -0.5 * n:
|
|
ds += n
|
|
|
|
if ds > 0.0:
|
|
self.reward += 4.0 * ds / FPS # Fortschritt in "Tiles pro Sekunde" (klein halten!)
|
|
self._prev_s = s_now
|
|
|
|
if self.tile_visited_count > self._last_progress_count:
|
|
self._last_progress_count = self.tile_visited_count
|
|
self._no_progress_steps = 0
|
|
else:
|
|
self._no_progress_steps += 1
|
|
if self._no_progress_steps >= NO_PROGRESS_STEPS:
|
|
truncated = True
|
|
|
|
# (2) JETZT genau einmal das Delta bilden
|
|
step_reward = self.reward - self.prev_reward
|
|
self.prev_reward = self.reward
|
|
else:
|
|
step_reward = 0.0
|
|
|
|
# --- Feature computation (unverändert) ---
|
|
self._update_features()
|
|
|
|
obs = self.my_state.as_feature_vector(LOOK_AHEAD).astype(np.float32)
|
|
info = {} # features nicht mehr nötig
|
|
return obs, step_reward, terminated, truncated, info
|
|
|
|
def _get_observation(self):
|
|
# This env is feature-first; return None unless user asks for rgb_array via render()
|
|
return None
|
|
|
|
# ------------------------
|
|
# Rendering (pygame)
|
|
# ------------------------
|
|
def render(self):
|
|
self._ensure_pygame()
|
|
|
|
# Handle window events only in human mode
|
|
if self.render_mode == "human":
|
|
for event in pygame.event.get():
|
|
if event.type == pygame.QUIT:
|
|
self.close()
|
|
return None
|
|
|
|
# Camera math (match original)
|
|
zoom = 0.1 * SCALE * max(1 - self.t, 0) + ZOOM * SCALE * min(self.t, 1)
|
|
scroll_x = self.car.hull.position[0]
|
|
scroll_y = self.car.hull.position[1]
|
|
angle = -self.car.hull.angle
|
|
vel = self.car.hull.linearVelocity
|
|
if np.linalg.norm(vel) > 0.5:
|
|
angle = math.atan2(vel[0], vel[1])
|
|
|
|
# Draw grass background
|
|
self._pg.screen.fill((102, 230, 102))
|
|
# simple grid for texture
|
|
k = PLAYFIELD / 20.0
|
|
grid_color = (110, 240, 110)
|
|
for x in range(-20, 20, 2):
|
|
for y in range(-20, 20, 2):
|
|
x0, y0 = k * x + 0, k * y + 0
|
|
x1, y1 = k * x + k, k * y + k
|
|
p0 = self._world_to_screen(x0, y0, zoom, angle, scroll_x, scroll_y)
|
|
p1 = self._world_to_screen(x1, y0, zoom, angle, scroll_x, scroll_y)
|
|
p2 = self._world_to_screen(x1, y1, zoom, angle, scroll_x, scroll_y)
|
|
p3 = self._world_to_screen(x0, y1, zoom, angle, scroll_x, scroll_y)
|
|
pygame.draw.polygon(self._pg.screen, grid_color, [p0, p1, p2, p3])
|
|
|
|
# Road polygons
|
|
for poly, color in self.road_poly:
|
|
self._draw_polygon_world(poly, color, zoom, angle, scroll_x, scroll_y)
|
|
|
|
# Draw car hull + wheels (approx)
|
|
car_col = (0.25, 0.25, 0.25)
|
|
self._draw_body(self.car.hull, car_col, zoom, angle, scroll_x, scroll_y)
|
|
for w in self.car.wheels:
|
|
self._draw_body(w, (0.15, 0.15, 0.15), zoom, angle, scroll_x, scroll_y)
|
|
|
|
# Indicators (speed, wheel, gyro)
|
|
if self._pg.font is not None:
|
|
# simple HUD text
|
|
txt = f"reward={self.reward:0.1f} laps={self.laps}"
|
|
surf = self._pg.font.render(txt, True, (255, 255, 255))
|
|
self._pg.screen.blit(surf, (10, 10))
|
|
|
|
# Output
|
|
if self.render_mode == "human":
|
|
pygame.display.flip()
|
|
self._pg.clock.tick(FPS)
|
|
return None
|
|
else:
|
|
# Offscreen: return RGB array like gym does
|
|
arr = pygame.surfarray.array3d(self._pg.screen) # (W,H,3)
|
|
arr = np.transpose(arr, (1, 0, 2)) # -> (H,W,3)
|
|
return arr
|
|
|
|
def close(self):
|
|
try:
|
|
if self._pg and self._pg.initialized:
|
|
pygame.display.quit()
|
|
pygame.quit()
|
|
except Exception:
|
|
pass
|
|
self._pg = None
|
|
|
|
|
|
# ----------------------------
|
|
# Keyboard demo (pygame)
|
|
# ----------------------------
|
|
if __name__ == "__main__":
|
|
import pygame
|
|
|
|
pygame.init()
|
|
env = CarRacing(render_mode="human")
|
|
|
|
action = np.array([0.0, 0.0, 0.0], dtype=np.float32)
|
|
running = True
|
|
|
|
|
|
def handle_keys(a):
|
|
keys = pygame.key.get_pressed()
|
|
steer = 0.0
|
|
if keys[pygame.K_LEFT]:
|
|
steer -= 1.0
|
|
if keys[pygame.K_RIGHT]:
|
|
steer += 1.0
|
|
gas = 1.0 if keys[pygame.K_UP] else 0.0
|
|
brake = 0.5 if keys[pygame.K_DOWN] else 0.0
|
|
a[0], a[1], a[2] = steer, gas, brake
|
|
|
|
|
|
# initial reset
|
|
env.reset()
|
|
try:
|
|
while running:
|
|
for event in pygame.event.get():
|
|
if event.type == pygame.QUIT:
|
|
running = False
|
|
if event.type == pygame.KEYDOWN and event.key == pygame.K_RETURN:
|
|
env.fast_reset()
|
|
|
|
handle_keys(action)
|
|
obs, r, terminated, truncated, info = env.step(action)
|
|
|
|
# print every ~200 frames
|
|
if int(env.t * FPS) % 200 == 0:
|
|
ms: MyState = info.get("features")
|
|
if ms is not None:
|
|
print(
|
|
f"speed={ms.true_speed:5.2f} off_center={ms.off_center:+.2f} car_ang={ms.car_angle:+.2f} "
|
|
f"reward={r:+.2f}"
|
|
f"reward={ms.reward:+.2f}"
|
|
)
|
|
|
|
env.render()
|
|
if terminated or truncated:
|
|
env.fast_reset()
|
|
|
|
finally:
|
|
env.close()
|