| import torch |
| import nemo.collections.asr as nemo_asr |
| import gc |
| import numpy as np |
| import torchaudio |
| import gradio as gr |
|
|
| pretrained_model_path="./stt_fa_fastconformer_hybrid_large_finetuned.nemo" |
|
|
| |
| torch.cuda.empty_cache() |
| gc.collect() |
| model = nemo_asr.models.EncDecHybridRNNTCTCModel.restore_from(pretrained_model_path) |
| device = 'cuda' if torch.cuda.is_available() else 'cpu' |
| |
| model = model.to(device) |
| model.freeze() |
|
|
| def transcribe(stream, new_chunk): |
| if new_chunk is None: |
| return None, "" |
| |
| sample_rate, data = new_chunk |
|
|
| |
| device = 'cuda' if torch.cuda.is_available() else 'cpu' |
|
|
| |
| if isinstance(data, np.ndarray): |
| audio_tensor = torch.tensor(data, dtype=torch.float32) |
| else: |
| raise ValueError("Audio data must be a numpy array") |
|
|
| |
| target_sample_rate = 16000 |
| if sample_rate != target_sample_rate: |
| resampler = torchaudio.transforms.Resample(orig_freq=sample_rate, new_freq=target_sample_rate) |
| audio_tensor = resampler(audio_tensor) |
|
|
| if stream is not None: |
| stream['audio'] = torch.cat([stream['audio'], audio_tensor], dim=-1) |
| else: |
| stream = {"text": ""} |
| stream['audio'] = audio_tensor |
|
|
|
|
| max_length = 5 * target_sample_rate |
| new_text = "" |
|
|
| |
| while stream['audio'].shape[-1] > max_length: |
| |
| audio_chunk = stream['audio'][..., :max_length] |
|
|
| |
| with torch.no_grad(): |
| transcript = model.transcribe(audio_chunk) |
|
|
| |
| new_text += " " + transcript[0][0].strip() |
|
|
| |
| stream['audio'] = stream['audio'][..., max_length:] |
|
|
| stream['text'] += new_text |
| return stream, stream['text'].strip() |
|
|
|
|
| interface = gr.Interface( |
| fn=transcribe, |
| inputs=['state', gr.Audio(sources="microphone", streaming=True, type="numpy")], |
| outputs=["state", "text"], |
| live=True, |
| ) |
|
|
| interface.launch() |