Source code for src.CARLA.Classes.CarlaClient

# Copyright (c) 2026 Kai Braun, Ozan Miguel Gündogdu, Yeri Jikong, Sven Winkelmann
#
# SPDX-License-Identifier: MIT
#
# Licensed under the MIT License. 
# See LICENSE file in the project root for full license information.
# Also consult our README to comply with Third-Party Licenses.

import carla
import time
import math
import keyboard

from Classes.CollisionSensor import *

[docs] class CarlaClient: """Communication interface for the CARLA simulator. This class handles the connection to the CARLA server, identifies the player vehicle (tagged as 'hero'), and extracts real-time simulation data required for the sound engine and logic processing. Attributes: client (carla.Client): The official CARLA client instance. :carla_docs:`carla.Client <python_api/#carlaclient>` world (carla.World): The current simulation world instance. :carla_docs:`carla.World <core_world/#the-world>` vehicle (carla.Vehicle): The identified player vehicle. :carla_docs:`carla.Vehicle <python_api/#carla.Vehicle>` collision_sensor (CollisionSensor): Sensor for detecting impact events. :carla_docs:`Collision detector <ref_sensors/#collision-detector>` crash_counter (int): Counter to track the number of collisions. honk_trigger (bool): State tracker for the horn input logic. """ _MIN_COLLISION_INTENSITY = 100 _MS_IN_KMH = 3.6 def __init__(self, ip,port,timeout): try: self.client = carla.Client(ip, port) self.client.set_timeout(timeout) except Exception as e: print(f"Carla_client.py konnte sich nicht mit Carla Server verbinden.\nSicherstellen, dass Carla Simulator läuft.") self.world = None self.vehicle_found = False self.vehicle = None self.collision_sensor = None self.crash_counter = 0 self.crash_impulse = False self.honk_trigger = False self.old_handbrake_state = False self.handbrake_trigger = False self.__connect() def __connect(self): """Connects to the CARLA server and retrieves the simulation world. Sets the `self.world` attribute by fetching the current world instance from the CARLA client. This is a prerequisite for manipulating environment actors or weather settings. Returns: None """ self.world = self.client.get_world() def __get_vehicle(self): """Finds and returns the primary ego-vehicle from the simulation. Iterates through all active vehicle actors in the current world and identifies the one assigned the 'hero' role. This is typically used to bind the simulation sensors to the user-controlled vehicle. Returns: carla.Vehicle: The vehicle actor with role_name 'hero', or None if no such vehicle is found. Note: This method expects the vehicle to have been spawned with the attribute 'role_name' set to 'hero' in CARLA. """ vehicles = self.world.get_actors().filter('vehicle.*') if vehicles: for vehicle in vehicles: if vehicle.attributes.get('role_name') == "hero": print(f"Verbunden mit vorhandenem Fahrzeug: {vehicle.type_id}") return vehicle else: print("Es wurden keine Autos gefunden!") return None
[docs] def retrieve_data(self): """Extracts and aggregates simulation state data into a telemetry packet. This method polls the CARLA world for environmental conditions (weather), monitors hardware/keyboard inputs (honk), and fetches real-time vehicle physics. It also manages the lifecycle of the collision sensor and processes collision impulses based on defined intensity thresholds. The resulting data packet is formatted for downstream consumption, typically for synchronization with the FMOD audio engine. Returns: dict or None: A dictionary containing telemetry data, or None if no vehicle is found or initialized. Telemetry dictionary keys: - speed (float): Vehicle speed in km/h. - throttle (float): Throttle position [0.0, 1.0]. - brake (float): Brake position [0.0, 1.0]. - speed_limit (float): Current road speed limit. - gear (int): Current active gear. - collision_event (bool): True if a collision above intensity 100 is detected. - rain_intensity (float): Precipitation amount [0, 100]. - wind_intensity (float): Wind strength [0, 100]. - acceleration (float): Lateral acceleration (Y-axis). - honk (bool): Single-trigger state of the horn. - handbrake (bool): State of the handbrake. Raises: AttributeError: Handled internally if vehicle reference is lost during actor switching or rapid simulation resets. """ #Wetterdaten weather = self.world.get_weather() rain_intensity = weather.precipitation wind_intensity = weather.wind_intensity #Hupen honk = False if keyboard.is_pressed('h') and self.honk_trigger: honk = False elif keyboard.is_pressed('h') and not self.honk_trigger: honk = True self.honk_trigger = True elif not keyboard.is_pressed('h'): self.honk_trigger = False #Fahrzeugdaten if self.vehicle_found == False: #1. Fahrzeug finden: self.vehicle = self.__get_vehicle() #2. Variable setzen: if self.vehicle is not None: self.vehicle_found = True #Attach Collision Sensor to vehicle self.collision_sensor = CollisionSensor(self.vehicle) #3. Daten auslesen: if self.vehicle is not None: if not self.vehicle.is_alive: self.vehicle = self.__get_vehicle() self.collision_sensor = CollisionSensor(self.vehicle) try: acceleration = self.vehicle.get_acceleration() speed_limit = self.vehicle.get_speed_limit() v = self.vehicle.get_velocity() kmh = self._MS_IN_KMH * math.sqrt(v.x**2 + v.y**2 + v.z**2) control = self.vehicle.get_control() gear = control.gear handbrake = control.hand_brake steer = control.steer if self.old_handbrake_state is False and handbrake is True: self.handbrake_trigger = True self.old_handbrake_state = self.handbrake_trigger if self.collision_sensor.collision_counter > self.crash_counter and self.collision_sensor.intensity > self._MIN_COLLISION_INTENSITY: self.crash_impulse = True self.crash_counter = self.collision_sensor.collision_counter #4. Daten in JSON Packet umwandeln: data_packet = { "speed": round(kmh, 2), "throttle": round(control.throttle, 2), "brake": round(control.brake, 2), "speed_limit": speed_limit, "message": "GREEN", "gear" : gear, "collision_event" : self.crash_impulse, "rain_intensity" : rain_intensity, "wind_intensity" : wind_intensity, "acceleration" : acceleration.y, "honk" : honk, "handbrake" : self.handbrake_trigger } self.crash_impulse = False self.handbrake_trigger = False return data_packet except (AttributeError, RuntimeError) as e: # Catching RuntimeError as well, as CARLA often throws this if the actor is destroyed print(f"Vehicle reference lost or switching: {e}") self.vehicle_found = False self.vehicle = None return None # Return None so the caller knows data is currently unavailable
[docs] def set_rain(self, in_rain_intensity): """Adjusts the precipitation and road wetness levels in the simulation. This method updates the CARLA weather parameters simultaneously to ensure visual rain matches the physical road conditions (puddles/friction). Args: in_rain_intensity (float/int): The intensity of the rain. Typically a value between 0 (none) and 100 (heavy). Returns: None """ weather = self.world.get_weather() weather.precipitation = float(in_rain_intensity) weather.wetness = float(in_rain_intensity) self.world.set_weather(weather)
[docs] def set_wind(self, in_wind_intensity=0): """Sets the wind intensity for the simulation environment. Updates the physical wind force acting on actors and environmental elements like trees or rain particles. Args: in_wind_intensity (float/int, optional): The wind strength. Ranges from 0 to 100. Defaults to 0. Returns: None """ weather = self.world.get_weather() weather.wind_intensity = float(in_wind_intensity) self.world.set_weather(weather)