Initial commit: Audio transcription app with Qwen-Omni-7B and speaker diarization

parents
import argparse
import torch
from transformers import AutoModelForCausalLM, AutoProcessor, BitsAndBytesConfig
from pyannote.audio import Pipeline
import librosa
import os
def main():
parser = argparse.ArgumentParser(description='Transcribe audio with speakers and timestamps using Qwen-Omni-7B')
parser.add_argument('audio_file', help='Path to the audio file')
args = parser.parse_args()
audio_file = args.audio_file
# Check if file exists
if not os.path.exists(audio_file):
print(f"Error: Audio file '{audio_file}' not found.")
return
# Load Qwen-Omni-7B model with 4-bit quantization
quantization_config = BitsAndBytesConfig(load_in_4bit=True)
model = AutoModelForCausalLM.from_pretrained(
"Qwen/Qwen-Omni-7B",
quantization_config=quantization_config,
device_map="auto"
)
processor = AutoProcessor.from_pretrained("Qwen/Qwen-Omni-7B")
# Load diarization pipeline
# Note: Requires Hugging Face token set as HF_TOKEN environment variable
hf_token = os.getenv("HF_TOKEN")
if not hf_token:
print("Error: Please set HF_TOKEN environment variable for pyannote authentication.")
return
diarization_pipeline = Pipeline.from_pretrained("pyannote/speaker-diarization", use_auth_token=hf_token)
# Load audio
audio, sr = librosa.load(audio_file, sr=16000)
# Diarize first
diarization = diarization_pipeline(audio_file)
# Transcribe each speaker segment
output_lines = []
for turn, _, speaker_id in diarization.itertracks(yield_label=True):
start = turn.start
end = turn.end
# Extract audio chunk
start_sample = int(start * sr)
end_sample = int(end * sr)
audio_chunk = audio[start_sample:end_sample]
# Prepare inputs for Qwen-Omni
conversation = [
{"role": "user", "content": [
{"type": "audio", "audio": {"waveform": audio_chunk, "sample_rate": sr}},
{"type": "text", "text": "Transcribe this audio segment exactly as spoken."}
]}
]
inputs = processor(conversation=conversation, return_tensors="pt").to(model.device)
# Generate transcription
with torch.no_grad():
generated_ids = model.generate(**inputs, max_new_tokens=200, do_sample=False)
text = processor.batch_decode(generated_ids, skip_special_tokens=True)[0].strip()
# Format timestamps
start_min, start_sec = divmod(start, 60)
start_hour, start_min = divmod(start_min, 60)
start_str = f"{int(start_hour):02d}:{int(start_min):02d}:{start_sec:05.2f}"
end_min, end_sec = divmod(end, 60)
end_hour, end_min = divmod(end_min, 60)
end_str = f"{int(end_hour):02d}:{int(end_min):02d}:{end_sec:05.2f}"
line = f"[{start_str} - {end_str}] {speaker_id}: {text}"
output_lines.append(line)
# Write to output file
base_name = os.path.splitext(audio_file)[0]
output_file = base_name + '.txt'
with open(output_file, 'w', encoding='utf-8') as f:
f.write('\n'.join(output_lines))
print(f"Transcription saved to {output_file}")
if __name__ == "__main__":
main()
\ No newline at end of file
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment