| """
|
| Vehicle Speed Estimation Module
|
| ================================
|
|
|
| Implements speed calculation for tracked vehicles using perspective transformation
|
| and temporal position tracking with smoothing and outlier detection.
|
|
|
| Authors:
|
| - Abhay Gupta (0205CC221005)
|
| - Aditi Lakhera (0205CC221011)
|
| - Balraj Patel (0205CC221049)
|
| - Bhumika Patel (0205CC221050)
|
|
|
| Technical Approach:
|
| - Tracks vehicle positions across frames in transformed coordinate space
|
| - Calculates displacement over time windows
|
| - Applies smoothing to reduce noise
|
| - Converts to desired speed units
|
| """
|
|
|
| import numpy as np
|
| import supervision as sv
|
| from collections import defaultdict, deque
|
| from typing import Dict, Optional
|
| import logging
|
|
|
| from .view_transformer import PerspectiveTransformer
|
|
|
| logger = logging.getLogger(__name__)
|
|
|
|
|
| class VehicleSpeedEstimator:
|
| """
|
| Estimates vehicle speeds using perspective-corrected position tracking.
|
|
|
| This class maintains a history of vehicle positions in real-world coordinates
|
| and calculates speeds based on displacement over time.
|
| """
|
|
|
| def __init__(
|
| self,
|
| fps: int,
|
| transformer: PerspectiveTransformer,
|
| history_duration: int = 1,
|
| speed_unit: str = "km/h",
|
| min_frames_for_speed: Optional[int] = None
|
| ):
|
| """
|
| Initialize the speed estimator.
|
|
|
| Args:
|
| fps: Video frames per second
|
| transformer: Perspective transformation instance
|
| history_duration: Time window for speed calculation (seconds)
|
| speed_unit: Output speed unit ("km/h", "mph", or "m/s")
|
| min_frames_for_speed: Minimum frames needed for speed calculation
|
| (defaults to fps/2)
|
| """
|
| self.fps = fps
|
| self.transformer = transformer
|
| self.history_duration = history_duration
|
| self.speed_unit = speed_unit
|
|
|
|
|
| self.min_frames = min_frames_for_speed or max(int(fps / 2), 5)
|
|
|
|
|
| max_history_frames = int(fps * history_duration)
|
|
|
|
|
|
|
| self.position_history: Dict[int, deque] = defaultdict(
|
| lambda: deque(maxlen=max_history_frames)
|
| )
|
|
|
|
|
| self.unit_conversions = {
|
| "km/h": 3.6,
|
| "mph": 2.23694,
|
| "m/s": 1.0
|
| }
|
|
|
| if speed_unit not in self.unit_conversions:
|
| raise ValueError(f"Invalid speed unit: {speed_unit}")
|
|
|
| self.conversion_factor = self.unit_conversions[speed_unit]
|
|
|
| logger.info(f"Speed estimator initialized: {fps}fps, {history_duration}s history, "
|
| f"unit={speed_unit}, min_frames={self.min_frames}")
|
|
|
| def _calculate_speed_for_vehicle(self, tracker_id: int) -> Optional[float]:
|
| """
|
| Calculate speed for a specific tracked vehicle.
|
|
|
| Args:
|
| tracker_id: Unique tracker ID
|
|
|
| Returns:
|
| Speed in configured units, or None if insufficient data
|
| """
|
| positions = self.position_history[tracker_id]
|
|
|
|
|
| if len(positions) < self.min_frames:
|
| return None
|
|
|
| try:
|
|
|
| start_pos = positions[0]
|
| end_pos = positions[-1]
|
|
|
|
|
| displacement = np.linalg.norm(end_pos - start_pos)
|
|
|
|
|
| time_elapsed = len(positions) / self.fps
|
|
|
|
|
| if time_elapsed == 0:
|
| return None
|
|
|
|
|
| speed_ms = displacement / time_elapsed
|
|
|
|
|
| speed = speed_ms * self.conversion_factor
|
|
|
|
|
| max_speed = 300 * self.unit_conversions["km/h"] / self.conversion_factor
|
| if speed < 0 or speed > max_speed:
|
| logger.debug(f"Outlier speed detected for vehicle {tracker_id}: {speed:.1f}")
|
| return None
|
|
|
| return speed
|
|
|
| except Exception as e:
|
| logger.warning(f"Error calculating speed for vehicle {tracker_id}: {e}")
|
| return None
|
|
|
| def _apply_smoothing(self, speeds: np.ndarray, window_size: int = 3) -> np.ndarray:
|
| """
|
| Apply moving average smoothing to speed values.
|
|
|
| Args:
|
| speeds: Array of speed values
|
| window_size: Smoothing window size
|
|
|
| Returns:
|
| Smoothed speed array
|
| """
|
| if len(speeds) < window_size:
|
| return speeds
|
|
|
|
|
| smoothed = np.convolve(speeds, np.ones(window_size)/window_size, mode='same')
|
| return smoothed
|
|
|
| def estimate(self, detections: sv.Detections) -> sv.Detections:
|
| """
|
| Estimate speeds for all detected vehicles in current frame.
|
|
|
| Args:
|
| detections: Detection results from current frame
|
|
|
| Returns:
|
| Updated detections with 'speed' field added to data
|
| """
|
|
|
| speeds = []
|
|
|
|
|
| if not hasattr(detections, 'tracker_id') or detections.tracker_id is None:
|
| logger.warning("No tracker IDs found in detections")
|
| detections.data["speed"] = np.zeros(len(detections))
|
| return detections
|
|
|
|
|
| anchor_points = detections.get_anchors_coordinates(
|
| anchor=sv.Position.BOTTOM_CENTER
|
| )
|
|
|
|
|
| try:
|
| transformed_points = self.transformer.apply_transformation(anchor_points)
|
| except Exception as e:
|
| logger.error(f"Error transforming points: {e}")
|
| detections.data["speed"] = np.zeros(len(detections))
|
| return detections
|
|
|
|
|
| for tracker_id, point in zip(detections.tracker_id, transformed_points):
|
|
|
| self.position_history[tracker_id].append(point)
|
|
|
|
|
| speed = self._calculate_speed_for_vehicle(tracker_id)
|
|
|
|
|
| speeds.append(speed if speed is not None else 0.0)
|
|
|
|
|
| speeds = np.array(speeds)
|
| speeds = np.round(speeds).astype(int)
|
|
|
|
|
| detections.data["speed"] = speeds
|
|
|
| return detections
|
|
|
| def reset(self) -> None:
|
| """Clear all position history."""
|
| self.position_history.clear()
|
| logger.info("Speed estimator history cleared")
|
|
|
| def get_tracked_vehicle_count(self) -> int:
|
| """
|
| Get number of currently tracked vehicles.
|
|
|
| Returns:
|
| Number of vehicles with position history
|
| """
|
| return len(self.position_history)
|
|
|
| def cleanup_old_tracks(self, active_ids: set) -> None:
|
| """
|
| Remove position history for vehicles no longer being tracked.
|
|
|
| Args:
|
| active_ids: Set of currently active tracker IDs
|
| """
|
| inactive_ids = set(self.position_history.keys()) - active_ids
|
| for tracker_id in inactive_ids:
|
| del self.position_history[tracker_id]
|
|
|
| if inactive_ids:
|
| logger.debug(f"Cleaned up {len(inactive_ids)} inactive tracks")
|
|
|