import gradio as gr from gradio_client import Client import os import zipfile from datasets import Dataset from huggingface_hub import HfApi import logging from datetime import datetime # Set up logging logging.basicConfig(level=logging.INFO, format="%(asctime)s - %(levelname)s - %(message)s") logger = logging.getLogger(__name__) # Initialize the Gradio client client = Client("MiniMaxAI/MiniMax-Text-01") # Function to call the API and get the result def call_api(prompt): try: logger.info(f"Calling API with prompt: {prompt[:100]}...") # Log the first 100 chars of the prompt result = client.predict( message=prompt, max_tokens=12800, temperature=0.1, top_p=0.9, api_name="/chat" ) logger.info("API call successful.") return result except Exception as e: logger.error(f"API call failed: {e}") raise gr.Error(f"API call failed: {str(e)}") # Function to segment the text file into chunks of 3000 words def segment_text(file_path): try: logger.info(f"Reading file: {file_path}") # Try reading with UTF-8 encoding first with open(file_path, "r", encoding="utf-8") as f: text = f.read() logger.info("File read successfully with UTF-8 encoding.") except UnicodeDecodeError: logger.warning("UTF-8 encoding failed. Trying latin-1 encoding.") # Fallback to latin-1 encoding if UTF-8 fails with open(file_path, "r", encoding="latin-1") as f: text = f.read() logger.info("File read successfully with latin-1 encoding.") except Exception as e: logger.error(f"Failed to read file: {e}") raise gr.Error(f"Failed to read file: {str(e)}") # Split the text into chunks of 3000 words words = text.split() chunks = [" ".join(words[i:i + 3000]) for i in range(0, len(words), 3000)] logger.info(f"Segmented text into {len(chunks)} chunks.") return chunks # Function to process the text file and make API calls def process_text(file, prompt): try: logger.info("Starting text processing...") # Segment the text file into chunks file_path = file.name if hasattr(file, "name") else file chunks = segment_text(file_path) # Perform API calls for each chunk results = [] for idx, chunk in enumerate(chunks): logger.info(f"Processing chunk {idx + 1}/{len(chunks)}") try: result = call_api(f"{prompt}\n\n{chunk}") results.append(result) logger.info(f"Chunk {idx + 1} processed successfully.") except Exception as e: logger.error(f"Failed to process chunk {idx + 1}: {e}") raise gr.Error(f"Failed to process chunk {idx + 1}: {str(e)}") # Save results as individual text files os.makedirs("outputs", exist_ok=True) for idx, result in enumerate(results): output_file = f"outputs/output_{idx}.txt" with open(output_file, "w", encoding="utf-8") as f: f.write(result) logger.info(f"Saved result to {output_file}") # Upload to Hugging Face dataset try: logger.info("Uploading results to Hugging Face dataset...") hf_api = HfApi(token=os.environ.get("HUGGINGFACE_TOKEN")) if not hf_api.token: raise ValueError("Hugging Face token not found in environment variables.") dataset = Dataset.from_dict({"text": results}) dataset.push_to_hub("TeacherPuffy/book") # Updated dataset name logger.info("Results uploaded to Hugging Face dataset successfully.") except Exception as e: logger.error(f"Failed to upload to Hugging Face: {e}") raise gr.Error(f"Failed to upload to Hugging Face: {str(e)}") # Create a ZIP file try: logger.info("Creating ZIP file...") with zipfile.ZipFile("outputs.zip", "w") as zipf: for root, dirs, files in os.walk("outputs"): for file in files: zipf.write(os.path.join(root, file), file) logger.info("ZIP file created successfully.") except Exception as e: logger.error(f"Failed to create ZIP file: {e}") raise gr.Error(f"Failed to create ZIP file: {str(e)}") return "outputs.zip", "Results uploaded to Hugging Face dataset and ZIP file created." except Exception as e: logger.error(f"An error occurred during processing: {e}") raise gr.Error(f"An error occurred: {str(e)}") # Gradio interface with gr.Blocks() as demo: gr.Markdown("## Text File Processor with API Calls") with gr.Row(): file_input = gr.File(label="Upload Text File") prompt_input = gr.Textbox(label="Enter Prompt") with gr.Row(): output_zip = gr.File(label="Download ZIP File") output_message = gr.Textbox(label="Status Message") submit_button = gr.Button("Submit") submit_button.click( process_text, inputs=[file_input, prompt_input], outputs=[output_zip, output_message] ) # Launch the Gradio app with a public link demo.launch(share=True)