from fastapi import FastAPI, WebSocket, HTTPException from fastapi.responses import StreamingResponse from pydantic import BaseModel import requests import json import asyncio import os from huggingface_hub import HfApi import schedule import threading import time app = FastAPI() # Define the request model class ChatRequest(BaseModel): messages: list = [{"role": "user", "content": "Lol full form"}] model: str = "gemini-1.5-pro-latest" temperature: float = 1.0 top_p: float = 0.8 max_tokens: int = 4000 # Define the URL and headers url = "https://chat.typegpt.net/api/openai/v1/chat/completions" headers = { "Accept": "application/json, text/event-stream", "Content-Type": "application/json", "User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/130.0.0.0 Safari/537.36 Edg/130.0.0.0", } def restart_hf_space(): HF_TOKEN = os.environ.get("HF_TOKEN", None) api = HfApi(token=HF_TOKEN) api.restart_space('paola1/proxy', factory_reboot=False) # Schedule the task to run every hour schedule.every(6).hours.do(restart_hf_space) def run_schedule(): while True: schedule.run_pending() time.sleep(1) # Run the scheduled tasks in a separate thread thread = threading.Thread(target=run_schedule) thread.daemon = True # Set as daemon thread so it exits when main thread exits thread.start() @app.post("/chat") async def chat(request: ChatRequest): # Define the payload payload = { "messages": request.messages, "stream": True, "model": request.model, "temperature": request.temperature, "top_p": request.top_p, "max_tokens": request.max_tokens } # Make the POST request with streaming try: response = requests.post(url, headers=headers, data=json.dumps(payload), stream=True) # Check if the request was successful if response.status_code == 200: async def event_stream(): for line in response.iter_lines(): if line: decoded_line = line.decode('utf-8') if decoded_line.startswith("data: "): try: data = json.loads(decoded_line[len('data: '):]) content = data.get("choices", [{}])[0].get("delta", {}).get("content", '') if content: yield f"{json.dumps({'response': content})}\n\n" except json.JSONDecodeError: continue return StreamingResponse(event_stream(), media_type="text/event-stream") else: raise HTTPException(status_code=response.status_code, detail=response.text) except Exception as e: print(e) raise HTTPException(status_code=500, detail=str(e)) @app.websocket("/ws/chat") async def websocket_chat(websocket: WebSocket): await websocket.accept() while True: # Keep the WebSocket connection alive try: # Receive the chat request from the client data = await websocket.receive_text() try: request = ChatRequest.parse_raw(data) except Exception as e: await websocket.send_text(json.dumps({"error": "Invalid request format"})) continue # Define the payload payload = { "messages": request.messages, "stream": True, "model": request.model, "temperature": request.temperature, "top_p": request.top_p, "max_tokens": request.max_tokens } # Make the POST request with streaming response = requests.post(url, headers=headers, data=json.dumps(payload), stream=True) # Check if the request was successful if response.status_code == 200: # Stream the response line by line for line in response.iter_lines(): if line: decoded_line = line.decode('utf-8') if decoded_line.startswith("data: "): try: data = json.loads(decoded_line[len('data: '):]) content = data.get("choices", [{}])[0].get("delta", {}).get("content", '') if content: await websocket.send_text(json.dumps({"response": content})) except json.JSONDecodeError: continue else: await websocket.send_text(json.dumps({"error": "Failed to get a valid response from the server"})) except Exception as e: print(e) await websocket.send_text(json.dumps({"error": str(e)})) break # Exit the loop if an error occurs if __name__ == "__main__": import uvicorn uvicorn.run(app, host="0.0.0.0", port=8083)