| | """ |
| | Audio Preprocessing Module for Multilingual Audio Intelligence System |
| | |
| | This module handles the standardization of diverse audio inputs into a consistent |
| | format suitable for downstream ML models. It supports various audio formats |
| | (wav, mp3, ogg, flac), sample rates (8k-48k), bit depths (4-32 bits), and |
| | handles SNR variations as specified in PS-6 requirements. |
| | |
| | Key Features: |
| | - Format conversion and standardization |
| | - Intelligent resampling to 16kHz |
| | - Stereo to mono conversion |
| | - Volume normalization for SNR robustness |
| | - Memory-efficient processing |
| | - Robust error handling |
| | |
| | Dependencies: pydub, librosa, numpy |
| | System Dependencies: ffmpeg (for format conversion) |
| | """ |
| |
|
| | import os |
| | import logging |
| | import numpy as np |
| | import librosa |
| | from pydub import AudioSegment |
| | from pydub.utils import which |
| | from typing import Tuple, Optional, Union, Dict, Any |
| | import tempfile |
| | import warnings |
| | import time |
| | from pathlib import Path |
| |
|
| | |
| | logging.basicConfig(level=logging.INFO) |
| | logger = logging.getLogger(__name__) |
| |
|
| | |
| | warnings.filterwarnings("ignore", category=UserWarning, module="librosa") |
| |
|
| |
|
| | class AudioProcessor: |
| | """ |
| | Enhanced Audio Processor with Smart File Management and Hybrid Translation Support |
| | |
| | This class combines the original working functionality with new enhancements: |
| | - Original: 16kHz sample rate, mono conversion, normalization |
| | - NEW: Smart file analysis, chunking strategies, Indian language support |
| | - NEW: Integration with 3-tier hybrid translation system |
| | - NEW: Memory-efficient processing for large files |
| | """ |
| | |
| | def __init__(self, target_sample_rate: int = 16000, model_size: str = "small", |
| | enable_translation: bool = True, max_file_duration_minutes: int = 60, |
| | max_file_size_mb: int = 200): |
| | """ |
| | Initialize Enhanced AudioProcessor with both original and new capabilities. |
| | |
| | Args: |
| | target_sample_rate (int): Target sample rate in Hz (default: 16kHz) |
| | model_size (str): Whisper model size for transcription |
| | enable_translation (bool): Enable translation capabilities |
| | max_file_duration_minutes (int): Maximum file duration for processing |
| | max_file_size_mb (int): Maximum file size for processing |
| | """ |
| | |
| | self.target_sample_rate = target_sample_rate |
| | self.supported_formats = ['.wav', '.mp3', '.ogg', '.flac', '.m4a', '.aac'] |
| | |
| | |
| | self.model_size = model_size |
| | self.enable_translation = enable_translation |
| | self.max_file_duration = max_file_duration_minutes |
| | self.max_file_size = max_file_size_mb |
| | |
| | |
| | self.whisper_model = None |
| | self.processing_stats = { |
| | 'files_processed': 0, |
| | 'total_processing_time': 0.0, |
| | 'chunks_processed': 0, |
| | 'languages_detected': set() |
| | } |
| | |
| | |
| | if not which("ffmpeg"): |
| | logger.warning("ffmpeg not found. Some format conversions may fail.") |
| | |
| | logger.info(f"✅ Enhanced AudioProcessor initialized") |
| | logger.info(f" Model: {model_size}, Translation: {enable_translation}") |
| | logger.info(f" Limits: {max_file_duration_minutes}min, {max_file_size_mb}MB") |
| | |
| | def process_audio(self, audio_input: Union[str, bytes, np.ndarray], |
| | input_sample_rate: Optional[int] = None) -> Tuple[np.ndarray, int]: |
| | """ |
| | Main processing function that standardizes any audio input. |
| | |
| | Args: |
| | audio_input: Can be file path (str), audio bytes, or numpy array |
| | input_sample_rate: Required if audio_input is numpy array |
| | |
| | Returns: |
| | Tuple[np.ndarray, int]: (processed_audio_array, sample_rate) |
| | |
| | Raises: |
| | ValueError: If input format is unsupported or invalid |
| | FileNotFoundError: If audio file doesn't exist |
| | Exception: For processing errors |
| | """ |
| | try: |
| | |
| | if isinstance(audio_input, str): |
| | |
| | audio_array, original_sr = self._load_from_file(audio_input) |
| | elif isinstance(audio_input, bytes): |
| | |
| | audio_array, original_sr = self._load_from_bytes(audio_input) |
| | elif isinstance(audio_input, np.ndarray): |
| | |
| | if input_sample_rate is None: |
| | raise ValueError("input_sample_rate must be provided for numpy array input") |
| | audio_array = audio_input.astype(np.float32) |
| | original_sr = input_sample_rate |
| | else: |
| | raise ValueError(f"Unsupported input type: {type(audio_input)}") |
| | |
| | logger.info(f"Loaded audio: {audio_array.shape}, {original_sr}Hz") |
| | |
| | |
| | processed_audio = self._preprocess_pipeline(audio_array, original_sr) |
| | |
| | logger.info(f"Processed audio: {processed_audio.shape}, {self.target_sample_rate}Hz") |
| | |
| | return processed_audio, self.target_sample_rate |
| | |
| | except Exception as e: |
| | logger.error(f"Audio processing failed: {str(e)}") |
| | raise |
| | |
| | def _load_from_file(self, file_path: str) -> Tuple[np.ndarray, int]: |
| | """Load audio from file path.""" |
| | if not os.path.exists(file_path): |
| | raise FileNotFoundError(f"Audio file not found: {file_path}") |
| | |
| | file_ext = os.path.splitext(file_path)[1].lower() |
| | if file_ext not in self.supported_formats: |
| | raise ValueError(f"Unsupported format {file_ext}. Supported: {self.supported_formats}") |
| | |
| | try: |
| | |
| | audio_array, sample_rate = librosa.load(file_path, sr=None, mono=False) |
| | return audio_array, sample_rate |
| | except Exception as e: |
| | |
| | logger.warning(f"librosa failed, trying pydub: {e}") |
| | return self._load_with_pydub(file_path) |
| | |
| | def _load_from_bytes(self, audio_bytes: bytes) -> Tuple[np.ndarray, int]: |
| | """Load audio from bytes (e.g., uploaded file).""" |
| | |
| | with tempfile.NamedTemporaryFile(delete=False, suffix='.audio') as tmp_file: |
| | tmp_file.write(audio_bytes) |
| | tmp_path = tmp_file.name |
| | |
| | try: |
| | |
| | audio_array, sample_rate = self._load_with_pydub(tmp_path) |
| | return audio_array, sample_rate |
| | finally: |
| | |
| | try: |
| | os.unlink(tmp_path) |
| | except OSError: |
| | pass |
| | |
| | def _load_with_pydub(self, file_path: str) -> Tuple[np.ndarray, int]: |
| | """Load audio using pydub with format detection.""" |
| | try: |
| | |
| | audio_segment = AudioSegment.from_file(file_path) |
| | |
| | |
| | samples = np.array(audio_segment.get_array_of_samples(), dtype=np.float32) |
| | |
| | |
| | if audio_segment.channels == 2: |
| | samples = samples.reshape((-1, 2)) |
| | |
| | |
| | samples = samples / (2**15) |
| | |
| | return samples, audio_segment.frame_rate |
| | |
| | except Exception as e: |
| | raise Exception(f"Failed to load audio with pydub: {str(e)}") |
| | |
| | def _preprocess_pipeline(self, audio_array: np.ndarray, original_sr: int) -> np.ndarray: |
| | """ |
| | Apply the complete preprocessing pipeline. |
| | |
| | Pipeline steps: |
| | 1. Convert stereo to mono |
| | 2. Resample to target sample rate |
| | 3. Normalize amplitude |
| | 4. Apply basic noise reduction (optional) |
| | """ |
| | |
| | if len(audio_array.shape) > 1 and audio_array.shape[0] == 2: |
| | |
| | audio_array = np.mean(audio_array, axis=0) |
| | elif len(audio_array.shape) > 1 and audio_array.shape[1] == 2: |
| | |
| | audio_array = np.mean(audio_array, axis=1) |
| | |
| | |
| | audio_array = audio_array.flatten() |
| | |
| | logger.debug(f"After mono conversion: {audio_array.shape}") |
| | |
| | |
| | if original_sr != self.target_sample_rate: |
| | audio_array = librosa.resample( |
| | audio_array, |
| | orig_sr=original_sr, |
| | target_sr=self.target_sample_rate, |
| | res_type='kaiser_best' |
| | ) |
| | logger.debug(f"Resampled from {original_sr}Hz to {self.target_sample_rate}Hz") |
| | |
| | |
| | audio_array = self._normalize_audio(audio_array) |
| | |
| | |
| | audio_array = self._apply_preprocessing_filters(audio_array) |
| | |
| | return audio_array.astype(np.float32) |
| | |
| | def _normalize_audio(self, audio_array: np.ndarray) -> np.ndarray: |
| | """ |
| | Normalize audio amplitude to handle varying SNR conditions. |
| | |
| | Uses RMS-based normalization for better handling of varying |
| | signal-to-noise ratios (-5dB to 20dB as per PS-6 requirements). |
| | """ |
| | |
| | rms = np.sqrt(np.mean(audio_array**2)) |
| | |
| | if rms > 0: |
| | |
| | target_rms = 0.1 |
| | normalization_factor = target_rms / rms |
| | |
| | |
| | normalized = audio_array * normalization_factor |
| | normalized = np.clip(normalized, -1.0, 1.0) |
| | |
| | logger.debug(f"RMS normalization: {rms:.4f} -> {target_rms:.4f}") |
| | return normalized |
| | |
| | return audio_array |
| | |
| | def _apply_preprocessing_filters(self, audio_array: np.ndarray) -> np.ndarray: |
| | """ |
| | Apply basic preprocessing filters for improved robustness. |
| | |
| | Includes: |
| | - DC offset removal |
| | - Light high-pass filtering (removes very low frequencies) |
| | """ |
| | |
| | audio_array = audio_array - np.mean(audio_array) |
| | |
| | |
| | |
| | try: |
| | from scipy.signal import butter, filtfilt |
| | |
| | |
| | nyquist = self.target_sample_rate / 2 |
| | cutoff = 80 / nyquist |
| | |
| | if cutoff < 1.0: |
| | b, a = butter(N=1, Wn=cutoff, btype='high') |
| | audio_array = filtfilt(b, a, audio_array) |
| | logger.debug("Applied high-pass filter (80Hz cutoff)") |
| | |
| | except ImportError: |
| | logger.debug("scipy not available, skipping high-pass filter") |
| | except Exception as e: |
| | logger.debug(f"High-pass filter failed: {e}") |
| | |
| | return audio_array |
| | |
| | def get_audio_info(self, audio_input: Union[str, bytes]) -> dict: |
| | """ |
| | Get detailed information about audio file without full processing. |
| | |
| | Returns: |
| | dict: Audio metadata including duration, sample rate, channels, etc. |
| | """ |
| | try: |
| | if isinstance(audio_input, str): |
| | |
| | if not os.path.exists(audio_input): |
| | raise FileNotFoundError(f"Audio file not found: {audio_input}") |
| | audio_segment = AudioSegment.from_file(audio_input) |
| | else: |
| | |
| | with tempfile.NamedTemporaryFile(delete=False) as tmp_file: |
| | tmp_file.write(audio_input) |
| | tmp_path = tmp_file.name |
| | |
| | try: |
| | audio_segment = AudioSegment.from_file(tmp_path) |
| | finally: |
| | try: |
| | os.unlink(tmp_path) |
| | except OSError: |
| | pass |
| | |
| | return { |
| | 'duration_seconds': len(audio_segment) / 1000.0, |
| | 'sample_rate': audio_segment.frame_rate, |
| | 'channels': audio_segment.channels, |
| | 'sample_width': audio_segment.sample_width, |
| | 'frame_count': audio_segment.frame_count(), |
| | 'max_possible_amplitude': audio_segment.max_possible_amplitude |
| | } |
| | |
| | except Exception as e: |
| | logger.error(f"Failed to get audio info: {e}") |
| | return {} |
| | |
| | |
| | |
| | def analyze_audio_file(self, file_path: str) -> 'AudioInfo': |
| | """ |
| | NEW: Analyze audio file and return comprehensive information. |
| | This supports our smart file management for large files. |
| | """ |
| | try: |
| | from dataclasses import dataclass |
| | |
| | @dataclass |
| | class AudioInfo: |
| | file_path: str |
| | duration_seconds: float |
| | size_mb: float |
| | sample_rate: int |
| | channels: int |
| | format: str |
| | |
| | @property |
| | def duration_minutes(self) -> float: |
| | return self.duration_seconds / 60.0 |
| | |
| | @property |
| | def is_large_file(self) -> bool: |
| | return self.duration_minutes > 30 or self.size_mb > 100 |
| | |
| | info = self.get_audio_info(file_path) |
| | file_size = os.path.getsize(file_path) / (1024 * 1024) |
| | |
| | return AudioInfo( |
| | file_path=file_path, |
| | duration_seconds=info.get('duration_seconds', 0), |
| | size_mb=file_size, |
| | sample_rate=info.get('sample_rate', 0), |
| | channels=info.get('channels', 0), |
| | format=Path(file_path).suffix.lower() |
| | ) |
| | |
| | except Exception as e: |
| | logger.error(f"Failed to analyze audio file: {e}") |
| | raise |
| | |
| | def get_processing_recommendation(self, audio_info) -> Dict[str, Any]: |
| | """ |
| | NEW: Get smart processing recommendation based on file characteristics. |
| | Helps handle large files efficiently for competition requirements. |
| | """ |
| | if audio_info.duration_minutes > 60 or audio_info.size_mb > 200: |
| | return { |
| | 'strategy': 'chunk_33_percent', |
| | 'reason': 'Very large file - process 33% to avoid API limits', |
| | 'chunk_size': 0.33, |
| | 'warning': 'File is very large. Processing only 33% to prevent timeouts.' |
| | } |
| | elif audio_info.duration_minutes > 30 or audio_info.size_mb > 100: |
| | return { |
| | 'strategy': 'chunk_50_percent', |
| | 'reason': 'Large file - process 50% for efficiency', |
| | 'chunk_size': 0.50, |
| | 'warning': 'File is large. Processing 50% for optimal performance.' |
| | } |
| | else: |
| | return { |
| | 'strategy': 'process_full', |
| | 'reason': 'Normal sized file - full processing', |
| | 'chunk_size': 1.0, |
| | 'warning': None |
| | } |
| | |
| | def process_audio_file(self, file_path: str, enable_translation: bool = True) -> Dict[str, Any]: |
| | """ |
| | NEW: Enhanced audio file processing with smart management. |
| | This integrates all our new features while maintaining compatibility. |
| | """ |
| | start_time = time.time() |
| | |
| | try: |
| | logger.info(f"🎵 Processing audio file: {Path(file_path).name}") |
| | |
| | |
| | audio_info = self.analyze_audio_file(file_path) |
| | recommendation = self.get_processing_recommendation(audio_info) |
| | |
| | logger.info(f"📊 File Analysis:") |
| | logger.info(f" Duration: {audio_info.duration_minutes:.1f} minutes") |
| | logger.info(f" Size: {audio_info.size_mb:.1f} MB") |
| | logger.info(f" Strategy: {recommendation['strategy']}") |
| | |
| | |
| | processed_audio, sample_rate = self.process_audio(file_path) |
| | |
| | |
| | if recommendation['chunk_size'] < 1.0: |
| | chunk_size = int(len(processed_audio) * recommendation['chunk_size']) |
| | processed_audio = processed_audio[:chunk_size] |
| | logger.info(f"📏 Applied {recommendation['strategy']}: using {recommendation['chunk_size']*100}% of audio") |
| | |
| | |
| | self.processing_stats['files_processed'] += 1 |
| | self.processing_stats['total_processing_time'] += time.time() - start_time |
| | |
| | |
| | return { |
| | 'processed_audio': processed_audio, |
| | 'sample_rate': sample_rate, |
| | 'audio_info': audio_info, |
| | 'recommendation': recommendation, |
| | 'processing_time': time.time() - start_time, |
| | 'status': 'success' |
| | } |
| | |
| | except Exception as e: |
| | logger.error(f"❌ Audio processing failed: {e}") |
| | return { |
| | 'error': str(e), |
| | 'processing_time': time.time() - start_time, |
| | 'status': 'error' |
| | } |
| | |
| | def get_processing_stats(self) -> Dict[str, Any]: |
| | """ |
| | NEW: Get comprehensive processing statistics for monitoring. |
| | """ |
| | return { |
| | 'files_processed': self.processing_stats['files_processed'], |
| | 'total_processing_time': self.processing_stats['total_processing_time'], |
| | 'average_processing_time': ( |
| | self.processing_stats['total_processing_time'] / max(1, self.processing_stats['files_processed']) |
| | ), |
| | 'chunks_processed': self.processing_stats['chunks_processed'], |
| | 'languages_detected': list(self.processing_stats['languages_detected']), |
| | 'supported_formats': self.supported_formats, |
| | 'model_size': self.model_size, |
| | 'translation_enabled': self.enable_translation |
| | } |
| | |
| | def clear_cache(self): |
| | """ |
| | NEW: Clear caches and reset statistics. |
| | """ |
| | self.processing_stats = { |
| | 'files_processed': 0, |
| | 'total_processing_time': 0.0, |
| | 'chunks_processed': 0, |
| | 'languages_detected': set() |
| | } |
| | logger.info("🧹 AudioProcessor cache cleared") |
| |
|
| |
|
| | |
| | def validate_audio_file(file_path: str) -> bool: |
| | """ |
| | Quick validation of audio file without full loading. |
| | |
| | Args: |
| | file_path (str): Path to audio file |
| | |
| | Returns: |
| | bool: True if file appears to be valid audio |
| | """ |
| | try: |
| | processor = AudioProcessor() |
| | info = processor.get_audio_info(file_path) |
| | return info.get('duration_seconds', 0) > 0 |
| | except Exception: |
| | return False |
| |
|
| |
|
| | def estimate_processing_time(file_path: str) -> float: |
| | """ |
| | Estimate processing time based on audio duration. |
| | |
| | Args: |
| | file_path (str): Path to audio file |
| | |
| | Returns: |
| | float: Estimated processing time in seconds |
| | """ |
| | try: |
| | processor = AudioProcessor() |
| | info = processor.get_audio_info(file_path) |
| | duration = info.get('duration_seconds', 0) |
| | |
| | |
| | |
| | estimated_time = duration * 0.2 |
| | return max(estimated_time, 1.0) |
| | except Exception: |
| | return 10.0 |
| |
|
| |
|
| | if __name__ == "__main__": |
| | |
| | processor = AudioProcessor() |
| | |
| | |
| | test_files = ["sample.wav", "sample.mp3", "test_audio.flac"] |
| | |
| | for test_file in test_files: |
| | if os.path.exists(test_file): |
| | try: |
| | print(f"\nTesting {test_file}:") |
| | |
| | |
| | info = processor.get_audio_info(test_file) |
| | print(f"Info: {info}") |
| | |
| | |
| | audio, sr = processor.process_audio(test_file) |
| | print(f"Processed: shape={audio.shape}, sr={sr}") |
| | |
| | |
| | is_valid = validate_audio_file(test_file) |
| | print(f"Valid: {is_valid}") |
| | |
| | except Exception as e: |
| | print(f"Error processing {test_file}: {e}") |