File size: 4,313 Bytes
bd97be7
9fe4dba
b40af2a
d07525d
e88a1f3
 
82b4010
010e9c1
82b4010
e4afaf8
82b4010
e4afaf8
82b4010
bd97be7
e88a1f3
8057378
 
e88a1f3
8057378
e88a1f3
 
8057378
82b4010
8057378
190e895
82b4010
 
190e895
 
 
875dc71
 
 
190e895
875dc71
190e895
 
db3a36a
190e895
8057378
82b4010
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
8057378
525ee37
 
5f36451
d88ec40
525ee37
 
d88ec40
525ee37
 
 
e88a1f3
525ee37
e4afaf8
 
 
 
 
 
 
 
525ee37
82b4010
 
 
 
e4afaf8
525ee37
db3a36a
82b4010
781e9f1
525ee37
 
 
e4afaf8
 
525ee37
 
 
 
82b4010
b40af2a
525ee37
 
 
82b4010
781e9f1
 
55aeb9b
6befe57
525ee37
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
import gradio as gr
import torch
from transformers import pipeline
import librosa
import soundfile as sf
import os
import uuid
import spaces  # Ensure spaces is imported

# Directory to save processed audio files
OUTPUT_DIR = os.getenv("HF_HOME", ".")  # Use dynamic path or default to current directory
OUTPUT_DIR = os.path.join(OUTPUT_DIR, "processed_audio_files")
os.makedirs(OUTPUT_DIR, exist_ok=True)

def split_audio(audio_data, sr, chunk_duration=30):
    """Split audio into chunks of chunk_duration seconds."""
    chunks = []
    for start in range(0, len(audio_data), int(chunk_duration * sr)):
        end = start + int(chunk_duration * sr)
        chunks.append(audio_data[start:end])
    return chunks

def transcribe_long_audio(audio_path, transcriber, chunk_duration=30):
    """Transcribe long audio by splitting into smaller chunks."""
    try:
        # Load the audio file
        audio_data, sr = librosa.load(audio_path, sr=None)
        chunks = split_audio(audio_data, sr, chunk_duration)
        transcriptions = []
        for i, chunk in enumerate(chunks):
            chunk_path = f"temp_chunk_{i}.wav"
            sf.write(chunk_path, chunk, sr)  # Save chunk as WAV
            transcription = transcriber(chunk_path)["text"]
            transcriptions.append(transcription)
            os.remove(chunk_path)  # Cleanup temp files
        return " ".join(transcriptions)
    except Exception as e:
        print(f"Error in transcribe_long_audio: {e}")
        return f"Error processing audio: {e}"

def cleanup_output_dir(max_storage_mb=500):
    """Remove old files if total directory size exceeds max_storage_mb."""
    try:
        total_size = sum(
            os.path.getsize(os.path.join(OUTPUT_DIR, f)) for f in os.listdir(OUTPUT_DIR)
        )
        if total_size > max_storage_mb * 1024 * 1024:
            files = sorted(
                (os.path.join(OUTPUT_DIR, f) for f in os.listdir(OUTPUT_DIR)),
                key=os.path.getctime,
            )
            for file in files:
                os.remove(file)
                total_size -= os.path.getsize(file)
                if total_size <= max_storage_mb * 1024 * 1024:
                    break
    except Exception as e:
        print(f"Error during cleanup: {e}")

@spaces.GPU(duration=3)
def main():
    device = 0 if torch.cuda.is_available() else -1

    try:
        transcriber = pipeline("automatic-speech-recognition", model="openai/whisper-base", device=device)
        summarizer = pipeline("summarization", model="facebook/bart-large-cnn")
    except Exception as e:
        print(f"Error loading models: {e}")
        raise

    def process_audio(audio_input):
        try:
            print(f"Processing uploaded audio: {audio_input}")
            if not isinstance(audio_input, str):
                raise ValueError("Invalid input type. Please upload a valid audio file.")
            if os.path.isdir(audio_input):
                raise ValueError("Input is a directory, not a file.")

            # Transcribe the uploaded audio file
            transcription = transcribe_long_audio(audio_input, transcriber, chunk_duration=30)
            summary = summarizer(transcription, max_length=50, min_length=10, do_sample=False)[0]["summary_text"]

            # Cleanup old files
            cleanup_output_dir()

            return transcription, summary, audio_input
        except Exception as e:
            print(f"Error in process_audio: {e}")
            return f"Error processing audio: {e}", "", ""

    with gr.Blocks() as interface:
        with gr.Row():
            with gr.Column():
                # Only support file uploads
                audio_input = gr.Audio(type="filepath", label="Upload Audio File")
                process_button = gr.Button("Process Audio")
            with gr.Column():
                transcription_output = gr.Textbox(label="Full Transcription", lines=10)
                summary_output = gr.Textbox(label="Summary", lines=5)
                audio_output = gr.Audio(label="Playback Processed Audio")

        process_button.click(
            process_audio,
            inputs=[audio_input],
            outputs=[transcription_output, summary_output, audio_output]
        )

    interface.launch(share=False)

if __name__ == "__main__":
    main()