import gradio as gr import requests import re import os import json import time import threading from googleapiclient.discovery import build from huggingface_hub import InferenceClient from pytube import YouTube import whisper import logging # 로그 설정 logging.basicConfig(level=logging.INFO) # Whisper 모델 로드 model = whisper.load_model("base") # YouTube API 키 API_KEY = 'AIzaSyDUz3wkGal0ewRtPlzeMit88bV4hS4ZIVY' # YouTube API 서비스 빌드 youtube = build('youtube', 'v3', developerKey=API_KEY) # Hugging Face API 설정 client = InferenceClient(model="meta-llama/Meta-Llama-3-70B-Instruct", token=os.getenv("HF_TOKEN")) WEBHOOK_URL = "https://connect.pabbly.com/workflow/sendwebhookdata/IjU3NjUwNTZhMDYzMDA0MzA1MjZhNTUzMzUxM2Ii_pc" COMMENTS_FILE = 'comments.json' DEFAULT_SYSTEM_PROMPT = "대화시 반드시 나의 이름 'GPTube'를 밝히며 한글로 인사를하라. 반드시 '한글'(한국어)로 250 토큰 이내로 답변을 생성하고 출력하라. Respond to the following YouTube comment in a friendly and helpful manner:" stop_event = threading.Event() # 스레드 중지를 위한 이벤트 def load_existing_comments(): if os.path.exists(COMMENTS_FILE): with open(COMMENTS_FILE, 'r') as file: return json.load(file) return [] def save_comments(comments): with open(COMMENTS_FILE, 'w') as file: json.dump(comments, file) def download_audio(video_url): try: yt = YouTube(video_url) audio = yt.streams.filter(only_audio=True).first() if audio is None: logging.error('오디오 스트림을 찾을 수 없습니다.') return None audio_path = audio.download(output_path=".") file_stats = os.stat(audio_path) logging.info(f'오디오 파일 크기(Bytes): {file_stats.st_size}') if file_stats.st_size <= 30000000: # 파일 크기 제한 확인 base, ext = os.path.splitext(audio_path) new_file = base + '.mp3' os.rename(audio_path, new_file) return new_file else: logging.error('파일 크기가 너무 큽니다. 1.5시간 이하의 비디오만 지원됩니다.') return None except Exception as e: logging.error(f"오디오 다운로드 중 오류 발생: {str(e)}") return None def generate_transcript(audio_path): try: if not audio_path or not os.path.exists(audio_path): raise ValueError("유효한 오디오 파일 경로가 아닙니다.") result = model.transcribe(audio_path) return result['text'].strip() except Exception as e: logging.error(f"전사 중 오류 발생: {str(e)}") return f"전사 중 오류가 발생했습니다: {str(e)}" def generate_reply(comment_text, system_prompt): prompt = f"{system_prompt}\n\nComment: {comment_text}\n\nReply:" response = client.text_generation( prompt=prompt, max_new_tokens=250, temperature=0.7, top_p=0.9 ) if isinstance(response, dict) and 'generated_text' in response: return response['generated_text'] return response def send_webhook(data): response = requests.post(WEBHOOK_URL, json=data) return response.status_code, response.text def get_video_comments(video_id): try: comments = [] request = youtube.commentThreads().list( part='snippet', videoId=video_id, maxResults=100, # 댓글 읽어들이는 수 정의 textFormat='plainText' ) response = request.execute() while request is not None: for item in response['items']: snippet = item['snippet']['topLevelComment']['snippet'] comment = { 'comment_id': item['snippet']['topLevelComment']['id'], 'author': snippet['authorDisplayName'], 'published_at': snippet['publishedAt'], 'text': snippet['textDisplay'], 'reply_count': item['snippet']['totalReplyCount'] } comments.append(comment) if 'nextPageToken' in response: request = youtube.commentThreads().list( part='snippet', videoId=video_id, pageToken=response['nextPageToken'], maxResults=100, # 댓글 읽어들이는 수 정의 textFormat='plainText' ) response = request.execute() else: break return comments except Exception as e: return [{'error': str(e)}] def fetch_comments(video_url, system_prompt): log_entries = [] video_id_match = re.search(r'(?:v=|\/)([0-9A-Za-z_-]{11}).*', video_url) if video_id_match: video_id = video_id_match.group(1) audio_path = download_audio(video_url) if not audio_path: return "오디오를 다운로드할 수 없습니다." transcript = generate_transcript(audio_path) existing_comments = load_existing_comments() new_comments = get_video_comments(video_id) if not new_comments or 'error' in new_comments[0]: return "댓글을 찾을 수 없거나 오류가 발생했습니다." recent_new_comments = [c for c in new_comments if c['comment_id'] not in {c['comment_id'] for c in existing_comments} and c['reply_count'] == 0] if recent_new_comments: for most_recent_comment in recent_new_comments: combined_prompt = f"{transcript}\n\n{system_prompt}" reply_text = generate_reply(most_recent_comment['text'], combined_prompt) webhook_data = { "comment_id": most_recent_comment['comment_id'], "author": most_recent_comment['author'], "published_at": most_recent_comment['published_at'], "text": most_recent_comment['text'], "reply_text": reply_text } webhook_status, webhook_response = send_webhook(webhook_data) log_entries.append(f"최근 댓글: {most_recent_comment['text']}\n\n답변 생성: {reply_text}\n\n웹훅 응답: {webhook_status} - {webhook_response}") existing_comments.append(most_recent_comment) save_comments(existing_comments) else: log_entries.append("새로운 댓글이 없습니다.") else: log_entries.append("유효하지 않은 YouTube URL입니다.") return "\n\n".join(log_entries) def background_fetch_comments(): while not stop_event.is_set(): result = fetch_comments("https://www.youtube.com/watch?v=dQw4w9WgXcQ", DEFAULT_SYSTEM_PROMPT) # URL과 프롬프트 실제 사용 예시 print(result) time.sleep(10) def start_background_fetch(): threading.Thread(target=background_fetch_comments).start() def stop_background_fetch(): stop_event.set() def get_text(video_url): audio_path = download_audio(video_url) if not audio_path: return "오디오를 다운로드할 수 없습니다." transcript = generate_transcript(audio_path) return transcript # Gradio 인터페이스 정의 demo = gr.Blocks() with demo: gr.Markdown("