Replace pyannote with resemblyzer for token-free diarization

parent 460eaa23
Pipeline #196 canceled with stages
...@@ -27,10 +27,7 @@ This Python application transcribes audio files with speaker diarization and tim ...@@ -27,10 +27,7 @@ This Python application transcribes audio files with speaker diarization and tim
```bash ```bash
pip install -r requirements.txt pip install -r requirements.txt
``` ```
5. Optional: Set Hugging Face token for pyannote models (required for first download): 5. No additional setup required - models download automatically on first run.
```bash
export HF_TOKEN=your_huggingface_token
```
## Usage ## Usage
```bash ```bash
......
import argparse import argparse
import torch import torch
from transformers import AutoModelForCausalLM, AutoProcessor, BitsAndBytesConfig from transformers import AutoModelForCausalLM, AutoProcessor, BitsAndBytesConfig
from pyannote.audio import Pipeline from resemblyzer import VoiceEncoder
from sklearn.cluster import AgglomerativeClustering
import webrtcvad
import librosa import librosa
import numpy as np
import os import os
def get_diarization(audio, sr):
encoder = VoiceEncoder()
vad = webrtcvad.Vad(3)
frame_duration = 0.01
frame_length = int(sr * frame_duration)
segments = []
start = None
for i in range(0, len(audio) - frame_length, frame_length):
frame = audio[i:i+frame_length]
is_speech = vad.is_speech((frame * 32767).astype(np.int16).tobytes(), sr)
if is_speech and start is None:
start = i / sr
elif not is_speech and start is not None:
end = i / sr
segments.append((start, end))
start = None
if start is not None:
segments.append((start, len(audio)/sr))
if not segments:
return []
embeddings = []
for start, end in segments:
start_sample = int(start * sr)
end_sample = int(end * sr)
chunk = audio[start_sample:end_sample]
if len(chunk) > 0:
emb = encoder.embed_utterance(chunk)
embeddings.append(emb)
if len(embeddings) <= 1:
labels = [0] * len(segments)
else:
clustering = AgglomerativeClustering(n_clusters=None, distance_threshold=0.5).fit(embeddings)
labels = clustering.labels_
n_speakers = len(set(labels))
if n_speakers > 1:
clustering = AgglomerativeClustering(n_clusters=n_speakers).fit(embeddings)
labels = clustering.labels_
diarization = []
for (start, end), label in zip(segments, labels):
diarization.append((start, end, f"SPEAKER_{label:02d}"))
# Merge continuous same speaker
merged = []
if diarization:
current_start, current_end, current_speaker = diarization[0]
for start, end, speaker in diarization[1:]:
if speaker == current_speaker and abs(start - current_end) < 0.1: # small gap
current_end = end
else:
merged.append((current_start, current_end, current_speaker))
current_start, current_end, current_speaker = start, end, speaker
merged.append((current_start, current_end, current_speaker))
return merged
def main(): def main():
parser = argparse.ArgumentParser(description='Transcribe audio with speakers and timestamps using Qwen-Omni-7B') parser = argparse.ArgumentParser(description='Transcribe audio with speakers and timestamps using Qwen-Omni-7B')
parser.add_argument('audio_file', help='Path to the audio file') parser.add_argument('audio_file', help='Path to the audio file')
...@@ -26,27 +82,23 @@ def main(): ...@@ -26,27 +82,23 @@ def main():
) )
processor = AutoProcessor.from_pretrained("Qwen/Qwen-Omni-7B") processor = AutoProcessor.from_pretrained("Qwen/Qwen-Omni-7B")
# Load diarization pipeline
# Optional: Set HF_TOKEN environment variable for pyannote authentication (required for first download)
diarization_pipeline = Pipeline.from_pretrained("pyannote/speaker-diarization", use_auth_token=os.getenv("HF_TOKEN"))
# Load audio # Load audio
audio, sr = librosa.load(audio_file, sr=16000) audio, sr = librosa.load(audio_file, sr=16000)
# Diarize first # Diarize
diarization = diarization_pipeline(audio_file) diarization = get_diarization(audio, sr)
# Transcribe each speaker segment # Transcribe each speaker segment
output_lines = [] output_lines = []
for turn, _, speaker_id in diarization.itertracks(yield_label=True): for start, end, speaker_id in diarization:
start = turn.start
end = turn.end
# Extract audio chunk # Extract audio chunk
start_sample = int(start * sr) start_sample = int(start * sr)
end_sample = int(end * sr) end_sample = int(end * sr)
audio_chunk = audio[start_sample:end_sample] audio_chunk = audio[start_sample:end_sample]
if len(audio_chunk) == 0:
continue
# Prepare inputs for Qwen-Omni # Prepare inputs for Qwen-Omni
conversation = [ conversation = [
{"role": "user", "content": [ {"role": "user", "content": [
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment