|
import subprocess |
|
import os |
|
import sys |
|
import gdown |
|
import errno |
|
import shutil |
|
import yt_dlp |
|
import datetime |
|
import torch |
|
import glob |
|
import gradio as gr |
|
import traceback |
|
import lib.infer.infer_libs.uvr5_pack.mdx as mdx |
|
from lib.infer.modules.uvr5.mdxprocess import ( |
|
get_model_list, |
|
id_to_ptm, |
|
prepare_mdx, |
|
run_mdx, |
|
) |
|
import requests |
|
import wget |
|
import ffmpeg |
|
import hashlib |
|
current_script_path = os.path.abspath(__file__) |
|
script_parent_directory = os.path.dirname(current_script_path) |
|
now_dir = os.path.dirname(script_parent_directory) |
|
sys.path.append(now_dir) |
|
import re |
|
from lib.infer.modules.vc.pipeline import Pipeline |
|
|
|
VC = Pipeline |
|
from lib.infer.infer_pack.models import ( |
|
SynthesizerTrnMs256NSFsid, |
|
SynthesizerTrnMs256NSFsid_nono, |
|
SynthesizerTrnMs768NSFsid, |
|
SynthesizerTrnMs768NSFsid_nono, |
|
) |
|
|
|
from assets.configs.config import Config |
|
from lib.infer.modules.uvr5.mdxnet import MDXNetDereverb |
|
from lib.infer.modules.uvr5.preprocess import AudioPre, AudioPreDeEcho |
|
from assets.i18n.i18n import I18nAuto |
|
|
|
i18n = I18nAuto() |
|
from bs4 import BeautifulSoup |
|
from dotenv import load_dotenv |
|
|
|
load_dotenv() |
|
config = Config() |
|
|
|
weight_root = os.getenv("weight_root") |
|
weight_uvr5_root = os.getenv("weight_uvr5_root") |
|
index_root = os.getenv("index_root") |
|
audio_root = "assets/audios" |
|
names = [ |
|
os.path.join(root, file) |
|
for root, _, files in os.walk(weight_root) |
|
for file in files |
|
if file.endswith((".pth", ".onnx")) |
|
] |
|
|
|
sup_audioext = { |
|
"wav", |
|
"mp3", |
|
"flac", |
|
"ogg", |
|
"opus", |
|
"m4a", |
|
"mp4", |
|
"aac", |
|
"alac", |
|
"wma", |
|
"aiff", |
|
"webm", |
|
"ac3", |
|
} |
|
audio_paths = [ |
|
os.path.join(root, name) |
|
for root, _, files in os.walk(audio_root, topdown=False) |
|
for name in files |
|
if name.endswith(tuple(sup_audioext)) and root == audio_root |
|
] |
|
|
|
|
|
uvr5_names = [ |
|
name.replace(".pth", "") |
|
for name in os.listdir(weight_uvr5_root) |
|
if name.endswith(".pth") or "onnx" in name |
|
] |
|
|
|
|
|
def calculate_md5(file_path): |
|
hash_md5 = hashlib.md5() |
|
with open(file_path, "rb") as f: |
|
for chunk in iter(lambda: f.read(4096), b""): |
|
hash_md5.update(chunk) |
|
return hash_md5.hexdigest() |
|
import unicodedata |
|
|
|
def format_title(title): |
|
formatted_title = unicodedata.normalize('NFKD', title).encode('ascii', 'ignore').decode('utf-8') |
|
formatted_title = re.sub(r'[\u2500-\u257F]+', '', title) |
|
formatted_title = re.sub(r'[^\w\s-]', '', title) |
|
formatted_title = re.sub(r'\s+', '_', formatted_title) |
|
return formatted_title |
|
|
|
|
|
def silentremove(filename): |
|
try: |
|
os.remove(filename) |
|
except OSError as e: |
|
if e.errno != errno.ENOENT: |
|
raise |
|
|
|
|
|
def get_md5(temp_folder): |
|
for root, subfolders, files in os.walk(temp_folder): |
|
for file in files: |
|
if ( |
|
not file.startswith("G_") |
|
and not file.startswith("D_") |
|
and file.endswith(".pth") |
|
and not "_G_" in file |
|
and not "_D_" in file |
|
): |
|
md5_hash = calculate_md5(os.path.join(root, file)) |
|
return md5_hash |
|
|
|
return None |
|
|
|
|
|
def find_parent(search_dir, file_name): |
|
for dirpath, dirnames, filenames in os.walk(search_dir): |
|
if file_name in filenames: |
|
return os.path.abspath(dirpath) |
|
return None |
|
|
|
|
|
def find_folder_parent(search_dir, folder_name): |
|
for dirpath, dirnames, filenames in os.walk(search_dir): |
|
if folder_name in dirnames: |
|
return os.path.abspath(dirpath) |
|
return None |
|
|
|
file_path = find_folder_parent(now_dir, "assets") |
|
tmp = os.path.join(file_path, "temp") |
|
shutil.rmtree(tmp, ignore_errors=True) |
|
os.environ["temp"] = tmp |
|
|
|
def get_mediafire_download_link(url): |
|
response = requests.get(url) |
|
response.raise_for_status() |
|
soup = BeautifulSoup(response.text, 'html.parser') |
|
download_button = soup.find('a', {'class': 'input popsok', 'aria-label': 'Download file'}) |
|
if download_button: |
|
download_link = download_button.get('href') |
|
return download_link |
|
else: |
|
return None |
|
|
|
def delete_large_files(directory_path, max_size_megabytes): |
|
for filename in os.listdir(directory_path): |
|
file_path = os.path.join(directory_path, filename) |
|
if os.path.isfile(file_path): |
|
size_in_bytes = os.path.getsize(file_path) |
|
size_in_megabytes = size_in_bytes / (1024 * 1024) |
|
|
|
if size_in_megabytes > max_size_megabytes: |
|
print("###################################") |
|
print(f"Deleting s*** {filename} (Size: {size_in_megabytes:.2f} MB)") |
|
os.remove(file_path) |
|
print("###################################") |
|
|
|
def download_from_url(url): |
|
file_path = find_folder_parent(now_dir, "assets") |
|
print(file_path) |
|
zips_path = os.path.join(file_path, "assets", "zips") |
|
print(zips_path) |
|
os.makedirs(zips_path, exist_ok=True) |
|
print(f"Limit download size in MB {os.getenv('MAX_DOWNLOAD_SIZE')}, duplicate the space for modify the limit") |
|
|
|
if url != "": |
|
print(i18n("Downloading the file: ") + f"{url}") |
|
if "drive.google.com" in url: |
|
if "file/d/" in url: |
|
file_id = url.split("file/d/")[1].split("/")[0] |
|
elif "id=" in url: |
|
file_id = url.split("id=")[1].split("&")[0] |
|
else: |
|
return None |
|
|
|
if file_id: |
|
os.chdir(zips_path) |
|
try: |
|
gdown.download(f"https://drive.google.com/uc?id={file_id}", quiet=False, fuzzy=True) |
|
except Exception as e: |
|
error_message = str(e) |
|
if "Too many users have viewed or downloaded this file recently" in error_message: |
|
os.chdir(file_path) |
|
return "too much use" |
|
elif "Cannot retrieve the public link of the file." in error_message: |
|
os.chdir(file_path) |
|
return "private link" |
|
else: |
|
print(error_message) |
|
os.chdir(file_path) |
|
return None |
|
|
|
elif "/blob/" in url or "/resolve/" in url: |
|
os.chdir(zips_path) |
|
if "/blob/" in url: |
|
url = url.replace("/blob/", "/resolve/") |
|
|
|
response = requests.get(url, stream=True) |
|
if response.status_code == 200: |
|
file_name = url.split("/")[-1] |
|
file_name = file_name.replace("%20", "_") |
|
total_size_in_bytes = int(response.headers.get('content-length', 0)) |
|
block_size = 1024 |
|
progress_bar_length = 50 |
|
progress = 0 |
|
with open(os.path.join(zips_path, file_name), 'wb') as file: |
|
for data in response.iter_content(block_size): |
|
file.write(data) |
|
progress += len(data) |
|
progress_percent = int((progress / total_size_in_bytes) * 100) |
|
num_dots = int((progress / total_size_in_bytes) * progress_bar_length) |
|
progress_bar = "[" + "." * num_dots + " " * (progress_bar_length - num_dots) + "]" |
|
|
|
if progress_percent == 100: |
|
print("\n") |
|
else: |
|
os.chdir(file_path) |
|
return None |
|
elif "mega.nz" in url: |
|
if "#!" in url: |
|
file_id = url.split("#!")[1].split("!")[0] |
|
elif "file/" in url: |
|
file_id = url.split("file/")[1].split("/")[0] |
|
else: |
|
return None |
|
if file_id: |
|
print("Mega.nz is unsupported due mega.py deprecation") |
|
elif "/tree/main" in url: |
|
response = requests.get(url) |
|
soup = BeautifulSoup(response.content, "html.parser") |
|
temp_url = "" |
|
for link in soup.find_all("a", href=True): |
|
if link["href"].endswith(".zip"): |
|
temp_url = link["href"] |
|
break |
|
if temp_url: |
|
url = temp_url |
|
url = url.replace("blob", "resolve") |
|
if "huggingface.co" not in url: |
|
url = "https://huggingface.co" + url |
|
|
|
wget.download(url) |
|
else: |
|
print("No .zip file found on the page.") |
|
elif "cdn.discordapp.com" in url: |
|
file = requests.get(url) |
|
os.chdir("./assets/zips") |
|
if file.status_code == 200: |
|
name = url.split("/") |
|
with open( |
|
os.path.join(name[-1]), "wb" |
|
) as newfile: |
|
newfile.write(file.content) |
|
else: |
|
return None |
|
elif "pixeldrain.com" in url: |
|
try: |
|
file_id = url.split("pixeldrain.com/u/")[1] |
|
os.chdir(zips_path) |
|
print(file_id) |
|
response = requests.get(f"https://pixeldrain.com/api/file/{file_id}") |
|
if response.status_code == 200: |
|
file_name = ( |
|
response.headers.get("Content-Disposition") |
|
.split("filename=")[-1] |
|
.strip('";') |
|
) |
|
os.makedirs(zips_path, exist_ok=True) |
|
with open(os.path.join(zips_path, file_name), "wb") as newfile: |
|
newfile.write(response.content) |
|
os.chdir(file_path) |
|
return "downloaded" |
|
else: |
|
os.chdir(file_path) |
|
return None |
|
except Exception as e: |
|
print(e) |
|
os.chdir(file_path) |
|
return None |
|
elif "mediafire.com" in url: |
|
download_link = get_mediafire_download_link(url) |
|
if download_link: |
|
os.chdir(zips_path) |
|
wget.download(download_link) |
|
else: |
|
return None |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
else: |
|
try: |
|
os.chdir(zips_path) |
|
wget.download(url) |
|
except Exception as e: |
|
os.chdir(file_path) |
|
print(e) |
|
return None |
|
|
|
|
|
|
|
for currentPath, _, zipFiles in os.walk(zips_path): |
|
for Files in zipFiles: |
|
filePart = Files.split(".") |
|
extensionFile = filePart[len(filePart) - 1] |
|
filePart.pop() |
|
nameFile = "_".join(filePart) |
|
realPath = os.path.join(currentPath, Files) |
|
os.rename(realPath, nameFile + "." + extensionFile) |
|
|
|
delete_large_files(zips_path, int(os.getenv("MAX_DOWNLOAD_SIZE"))) |
|
|
|
os.chdir(file_path) |
|
print(i18n("Full download")) |
|
return "downloaded" |
|
else: |
|
return None |
|
|
|
|
|
class error_message(Exception): |
|
def __init__(self, mensaje): |
|
self.mensaje = mensaje |
|
super().__init__(mensaje) |
|
|
|
|
|
def get_vc(sid, to_return_protect0, to_return_protect1): |
|
global n_spk, tgt_sr, net_g, vc, cpt, version |
|
if sid == "" or sid == []: |
|
global hubert_model |
|
if hubert_model is not None: |
|
print("clean_empty_cache") |
|
del net_g, n_spk, vc, hubert_model, tgt_sr |
|
hubert_model = net_g = n_spk = vc = hubert_model = tgt_sr = None |
|
if torch.cuda.is_available(): |
|
torch.cuda.empty_cache() |
|
if_f0 = cpt.get("f0", 1) |
|
version = cpt.get("version", "v1") |
|
if version == "v1": |
|
if if_f0 == 1: |
|
net_g = SynthesizerTrnMs256NSFsid( |
|
*cpt["config"], is_half=config.is_half |
|
) |
|
else: |
|
net_g = SynthesizerTrnMs256NSFsid_nono(*cpt["config"]) |
|
elif version == "v2": |
|
if if_f0 == 1: |
|
net_g = SynthesizerTrnMs768NSFsid( |
|
*cpt["config"], is_half=config.is_half |
|
) |
|
else: |
|
net_g = SynthesizerTrnMs768NSFsid_nono(*cpt["config"]) |
|
del net_g, cpt |
|
if torch.cuda.is_available(): |
|
torch.cuda.empty_cache() |
|
cpt = None |
|
return ( |
|
{"visible": False, "__type__": "update"}, |
|
{"visible": False, "__type__": "update"}, |
|
{"visible": False, "__type__": "update"}, |
|
) |
|
person = "%s/%s" % (weight_root, sid) |
|
print("loading %s" % person) |
|
cpt = torch.load(person, map_location="cpu") |
|
tgt_sr = cpt["config"][-1] |
|
cpt["config"][-3] = cpt["weight"]["emb_g.weight"].shape[0] |
|
if_f0 = cpt.get("f0", 1) |
|
if if_f0 == 0: |
|
to_return_protect0 = to_return_protect1 = { |
|
"visible": False, |
|
"value": 0.5, |
|
"__type__": "update", |
|
} |
|
else: |
|
to_return_protect0 = { |
|
"visible": True, |
|
"value": to_return_protect0, |
|
"__type__": "update", |
|
} |
|
to_return_protect1 = { |
|
"visible": True, |
|
"value": to_return_protect1, |
|
"__type__": "update", |
|
} |
|
version = cpt.get("version", "v1") |
|
if version == "v1": |
|
if if_f0 == 1: |
|
net_g = SynthesizerTrnMs256NSFsid(*cpt["config"], is_half=config.is_half) |
|
else: |
|
net_g = SynthesizerTrnMs256NSFsid_nono(*cpt["config"]) |
|
elif version == "v2": |
|
if if_f0 == 1: |
|
net_g = SynthesizerTrnMs768NSFsid(*cpt["config"], is_half=config.is_half) |
|
else: |
|
net_g = SynthesizerTrnMs768NSFsid_nono(*cpt["config"]) |
|
del net_g.enc_q |
|
print(net_g.load_state_dict(cpt["weight"], strict=False)) |
|
net_g.eval().to(config.device) |
|
if config.is_half: |
|
net_g = net_g.half() |
|
else: |
|
net_g = net_g.float() |
|
vc = VC(tgt_sr, config) |
|
n_spk = cpt["config"][-3] |
|
return ( |
|
{"visible": True, "maximum": n_spk, "__type__": "update"}, |
|
to_return_protect0, |
|
to_return_protect1, |
|
) |
|
import zipfile |
|
from tqdm import tqdm |
|
|
|
def extract_and_show_progress(zipfile_path, unzips_path): |
|
try: |
|
with zipfile.ZipFile(zipfile_path, 'r') as zip_ref: |
|
total_files = len(zip_ref.infolist()) |
|
with tqdm(total=total_files, unit='files', ncols= 100, colour= 'green') as pbar: |
|
for file_info in zip_ref.infolist(): |
|
zip_ref.extract(file_info, unzips_path) |
|
pbar.update(1) |
|
return True |
|
except Exception as e: |
|
print(f"Error al descomprimir {zipfile_path}: {e}") |
|
return False |
|
|
|
|
|
def load_downloaded_model(url): |
|
parent_path = find_folder_parent(now_dir, "assets") |
|
try: |
|
infos = [] |
|
zips_path = os.path.join(parent_path, "assets", "zips") |
|
unzips_path = os.path.join(parent_path, "assets", "unzips") |
|
weights_path = os.path.join(parent_path, "logs", "weights") |
|
logs_dir = "" |
|
|
|
if os.path.exists(zips_path): |
|
shutil.rmtree(zips_path) |
|
if os.path.exists(unzips_path): |
|
shutil.rmtree(unzips_path) |
|
|
|
os.mkdir(zips_path) |
|
os.mkdir(unzips_path) |
|
|
|
download_file = download_from_url(url) |
|
if not download_file: |
|
print(i18n("The file could not be downloaded.")) |
|
infos.append(i18n("The file could not be downloaded.")) |
|
yield "\n".join(infos) |
|
elif download_file == "downloaded": |
|
print(i18n("It has been downloaded successfully.")) |
|
infos.append(i18n("It has been downloaded successfully.")) |
|
yield "\n".join(infos) |
|
elif download_file == "too much use": |
|
raise Exception( |
|
i18n("Too many users have recently viewed or downloaded this file") |
|
) |
|
elif download_file == "private link": |
|
raise Exception(i18n("Cannot get file from this private link")) |
|
|
|
for filename in os.listdir(zips_path): |
|
if filename.endswith(".zip"): |
|
zipfile_path = os.path.join(zips_path, filename) |
|
print(i18n("Proceeding with the extraction...")) |
|
infos.append(i18n("Proceeding with the extraction...")) |
|
|
|
model_name = os.path.basename(zipfile_path) |
|
logs_dir = os.path.join( |
|
parent_path, |
|
"logs", |
|
os.path.normpath(str(model_name).replace(".zip", "")), |
|
) |
|
|
|
yield "\n".join(infos) |
|
success = extract_and_show_progress(zipfile_path, unzips_path) |
|
if success: |
|
yield f"Extracción exitosa: {model_name}" |
|
else: |
|
yield f"Fallo en la extracción: {model_name}" |
|
yield "\n".join(infos) |
|
else: |
|
print(i18n("Unzip error.")) |
|
infos.append(i18n("Unzip error.")) |
|
yield "\n".join(infos) |
|
return "" |
|
|
|
index_file = False |
|
model_file = False |
|
|
|
for path, subdirs, files in os.walk(unzips_path): |
|
for item in files: |
|
item_path = os.path.join(path, item) |
|
if not "G_" in item and not "D_" in item and item.endswith(".pth"): |
|
model_file = True |
|
model_name = item.replace(".pth", "") |
|
logs_dir = os.path.join(parent_path, "logs", model_name) |
|
if os.path.exists(logs_dir): |
|
shutil.rmtree(logs_dir) |
|
os.mkdir(logs_dir) |
|
if not os.path.exists(weights_path): |
|
os.mkdir(weights_path) |
|
if os.path.exists(os.path.join(weights_path, item)): |
|
os.remove(os.path.join(weights_path, item)) |
|
if os.path.exists(item_path): |
|
shutil.move(item_path, weights_path) |
|
|
|
if not model_file and not os.path.exists(logs_dir): |
|
os.mkdir(logs_dir) |
|
for path, subdirs, files in os.walk(unzips_path): |
|
for item in files: |
|
item_path = os.path.join(path, item) |
|
if item.startswith("added_") and item.endswith(".index"): |
|
index_file = True |
|
if os.path.exists(item_path): |
|
if os.path.exists(os.path.join(logs_dir, item)): |
|
os.remove(os.path.join(logs_dir, item)) |
|
shutil.move(item_path, logs_dir) |
|
if item.startswith("total_fea.npy") or item.startswith("events."): |
|
if os.path.exists(item_path): |
|
if os.path.exists(os.path.join(logs_dir, item)): |
|
os.remove(os.path.join(logs_dir, item)) |
|
shutil.move(item_path, logs_dir) |
|
|
|
result = "" |
|
if model_file: |
|
if index_file: |
|
print(i18n("The model works for inference, and has the .index file.")) |
|
infos.append( |
|
"\n" |
|
+ i18n("The model works for inference, and has the .index file.") |
|
) |
|
yield "\n".join(infos) |
|
else: |
|
print( |
|
i18n( |
|
"The model works for inference, but it doesn't have the .index file." |
|
) |
|
) |
|
infos.append( |
|
"\n" |
|
+ i18n( |
|
"The model works for inference, but it doesn't have the .index file." |
|
) |
|
) |
|
yield "\n".join(infos) |
|
|
|
if not index_file and not model_file: |
|
print(i18n("No relevant file was found to upload.")) |
|
infos.append(i18n("No relevant file was found to upload.")) |
|
yield "\n".join(infos) |
|
|
|
|
|
if os.path.exists(zips_path): |
|
shutil.rmtree(zips_path) |
|
if os.path.exists(unzips_path): |
|
shutil.rmtree(unzips_path) |
|
os.chdir(parent_path) |
|
return result |
|
except Exception as e: |
|
os.chdir(parent_path) |
|
if "too much use" in str(e): |
|
print(i18n("Too many users have recently viewed or downloaded this file")) |
|
yield i18n("Too many users have recently viewed or downloaded this file") |
|
elif "private link" in str(e): |
|
print(i18n("Cannot get file from this private link")) |
|
yield i18n("Cannot get file from this private link") |
|
else: |
|
print(e) |
|
yield i18n("An error occurred downloading") |
|
finally: |
|
os.chdir(parent_path) |
|
|
|
|
|
def load_dowloaded_dataset(url): |
|
parent_path = find_folder_parent(now_dir, "assets") |
|
infos = [] |
|
try: |
|
zips_path = os.path.join(parent_path, "assets", "zips") |
|
unzips_path = os.path.join(parent_path, "assets", "unzips") |
|
datasets_path = os.path.join(parent_path, "datasets") |
|
audio_extenions = [ |
|
"wav", |
|
"mp3", |
|
"flac", |
|
"ogg", |
|
"opus", |
|
"m4a", |
|
"mp4", |
|
"aac", |
|
"alac", |
|
"wma", |
|
"aiff", |
|
"webm", |
|
"ac3", |
|
] |
|
|
|
if os.path.exists(zips_path): |
|
shutil.rmtree(zips_path) |
|
if os.path.exists(unzips_path): |
|
shutil.rmtree(unzips_path) |
|
|
|
if not os.path.exists(datasets_path): |
|
os.mkdir(datasets_path) |
|
|
|
os.mkdir(zips_path) |
|
os.mkdir(unzips_path) |
|
|
|
download_file = download_from_url(url) |
|
|
|
if not download_file: |
|
print(i18n("An error occurred downloading")) |
|
infos.append(i18n("An error occurred downloading")) |
|
yield "\n".join(infos) |
|
raise Exception(i18n("An error occurred downloading")) |
|
elif download_file == "downloaded": |
|
print(i18n("It has been downloaded successfully.")) |
|
infos.append(i18n("It has been downloaded successfully.")) |
|
yield "\n".join(infos) |
|
elif download_file == "too much use": |
|
raise Exception( |
|
i18n("Too many users have recently viewed or downloaded this file") |
|
) |
|
elif download_file == "private link": |
|
raise Exception(i18n("Cannot get file from this private link")) |
|
|
|
zip_path = os.listdir(zips_path) |
|
foldername = "" |
|
for file in zip_path: |
|
if file.endswith(".zip"): |
|
file_path = os.path.join(zips_path, file) |
|
print("....") |
|
foldername = file.replace(".zip", "").replace(" ", "").replace("-", "_") |
|
dataset_path = os.path.join(datasets_path, foldername) |
|
print(i18n("Proceeding with the extraction...")) |
|
infos.append(i18n("Proceeding with the extraction...")) |
|
yield "\n".join(infos) |
|
shutil.unpack_archive(file_path, unzips_path, "zip") |
|
if os.path.exists(dataset_path): |
|
shutil.rmtree(dataset_path) |
|
|
|
os.mkdir(dataset_path) |
|
|
|
for root, subfolders, songs in os.walk(unzips_path): |
|
for song in songs: |
|
song_path = os.path.join(root, song) |
|
if song.endswith(tuple(audio_extenions)): |
|
formatted_song_name = format_title( |
|
os.path.splitext(song)[0] |
|
) |
|
extension = os.path.splitext(song)[1] |
|
new_song_path = os.path.join( |
|
dataset_path, f"{formatted_song_name}{extension}" |
|
) |
|
shutil.move(song_path, new_song_path) |
|
else: |
|
print(i18n("Unzip error.")) |
|
infos.append(i18n("Unzip error.")) |
|
yield "\n".join(infos) |
|
|
|
if os.path.exists(zips_path): |
|
shutil.rmtree(zips_path) |
|
if os.path.exists(unzips_path): |
|
shutil.rmtree(unzips_path) |
|
|
|
print(i18n("The Dataset has been loaded successfully.")) |
|
infos.append(i18n("The Dataset has been loaded successfully.")) |
|
yield "\n".join(infos) |
|
except Exception as e: |
|
os.chdir(parent_path) |
|
if "too much use" in str(e): |
|
print(i18n("Too many users have recently viewed or downloaded this file")) |
|
yield i18n("Too many users have recently viewed or downloaded this file") |
|
elif "private link" in str(e): |
|
print(i18n("Cannot get file from this private link")) |
|
yield i18n("Cannot get file from this private link") |
|
else: |
|
print(e) |
|
yield i18n("An error occurred downloading") |
|
finally: |
|
os.chdir(parent_path) |
|
|
|
|
|
SAVE_ACTION_CONFIG = { |
|
i18n("Save all"): { |
|
'destination_folder': "manual_backup", |
|
'copy_files': True, |
|
'include_weights': False |
|
}, |
|
i18n("Save D and G"): { |
|
'destination_folder': "manual_backup", |
|
'copy_files': False, |
|
'files_to_copy': ["D_*.pth", "G_*.pth", "added_*.index"], |
|
'include_weights': True, |
|
}, |
|
i18n("Save voice"): { |
|
'destination_folder': "finished", |
|
'copy_files': False, |
|
'files_to_copy': ["added_*.index"], |
|
'include_weights': True, |
|
}, |
|
} |
|
|
|
import os |
|
import shutil |
|
import zipfile |
|
import glob |
|
import fnmatch |
|
|
|
import os |
|
import shutil |
|
import zipfile |
|
import glob |
|
|
|
import os |
|
import shutil |
|
import zipfile |
|
|
|
|
|
def save_model(modelname, save_action): |
|
parent_path = find_folder_parent(now_dir, "assets") |
|
zips_path = os.path.join(parent_path, "assets", "zips") |
|
dst = os.path.join(zips_path, f"{modelname}.zip") |
|
logs_path = os.path.join(parent_path, "logs", modelname) |
|
weights_path = os.path.join(logs_path, "weights") |
|
save_folder = parent_path |
|
infos = [] |
|
|
|
try: |
|
if not os.path.exists(logs_path): |
|
raise Exception("No model found.") |
|
|
|
if not "content" in parent_path: |
|
save_folder = os.path.join(parent_path, "logs") |
|
else: |
|
save_folder = "/content/drive/MyDrive/RVC_Backup" |
|
|
|
infos.append(i18n("Save model")) |
|
yield "\n".join(infos) |
|
|
|
if not os.path.exists(save_folder): |
|
os.mkdir(save_folder) |
|
if not os.path.exists(os.path.join(save_folder, "manual_backup")): |
|
os.mkdir(os.path.join(save_folder, "manual_backup")) |
|
if not os.path.exists(os.path.join(save_folder, "finished")): |
|
os.mkdir(os.path.join(save_folder, "finished")) |
|
|
|
if os.path.exists(zips_path): |
|
shutil.rmtree(zips_path) |
|
|
|
os.mkdir(zips_path) |
|
|
|
if save_action == i18n("Choose the method"): |
|
raise Exception("No method chosen.") |
|
|
|
if save_action == i18n("Save all"): |
|
save_folder = os.path.join(save_folder, "manual_backup") |
|
elif save_action == i18n("Save D and G"): |
|
save_folder = os.path.join(save_folder, "manual_backup") |
|
elif save_action == i18n("Save voice"): |
|
save_folder = os.path.join(save_folder, "finished") |
|
|
|
|
|
save_action_config = SAVE_ACTION_CONFIG.get(save_action) |
|
|
|
if save_action_config is None: |
|
raise Exception("Invalid save action.") |
|
|
|
|
|
if save_action_config['copy_files']: |
|
with zipfile.ZipFile(dst, 'w', zipfile.ZIP_DEFLATED) as zipf: |
|
for root, dirs, files in os.walk(logs_path): |
|
for file in files: |
|
file_path = os.path.join(root, file) |
|
zipf.write(file_path, os.path.relpath(file_path, logs_path)) |
|
else: |
|
|
|
if save_action_config['include_weights']: |
|
if not os.path.exists(weights_path): |
|
infos.append(i18n("Saved without inference model...")) |
|
else: |
|
pth_files = [file for file in os.listdir(weights_path) if file.endswith('.pth')] |
|
if not pth_files: |
|
infos.append(i18n("Saved without inference model...")) |
|
else: |
|
with zipfile.ZipFile(dst, 'w', zipfile.ZIP_DEFLATED) as zipf: |
|
skipped_files = set() |
|
for pth_file in pth_files: |
|
match = re.search(r'(.*)_s\d+.pth$', pth_file) |
|
if match: |
|
base_name = match.group(1) |
|
if base_name not in skipped_files: |
|
print(f'Skipping autosave epoch files for {base_name}.') |
|
skipped_files.add(base_name) |
|
continue |
|
|
|
print(f'Processing file: {pth_file}') |
|
zipf.write(os.path.join(weights_path, pth_file), arcname=os.path.basename(pth_file)) |
|
|
|
yield "\n".join(infos) |
|
infos.append("\n" + i18n("This may take a few minutes, please wait...")) |
|
yield "\n".join(infos) |
|
|
|
|
|
for pattern in save_action_config.get('files_to_copy', []): |
|
matching_files = glob.glob(os.path.join(logs_path, pattern)) |
|
with zipfile.ZipFile(dst, 'a', zipfile.ZIP_DEFLATED) as zipf: |
|
for file_path in matching_files: |
|
zipf.write(file_path, os.path.basename(file_path)) |
|
|
|
|
|
shutil.move(dst, os.path.join(save_folder, f"{modelname}.zip")) |
|
|
|
shutil.rmtree(zips_path) |
|
infos.append("\n" + i18n("Model saved successfully")) |
|
yield "\n".join(infos) |
|
|
|
except Exception as e: |
|
|
|
error_message = str(e) |
|
print(f"Error: {error_message}") |
|
yield error_message |
|
|
|
def load_downloaded_backup(url): |
|
parent_path = find_folder_parent(now_dir, "assets") |
|
try: |
|
infos = [] |
|
logs_folders = [ |
|
"0_gt_wavs", |
|
"1_16k_wavs", |
|
"2a_f0", |
|
"2b-f0nsf", |
|
"3_feature256", |
|
"3_feature768", |
|
] |
|
zips_path = os.path.join(parent_path, "assets", "zips") |
|
unzips_path = os.path.join(parent_path, "assets", "unzips") |
|
weights_path = os.path.join(parent_path, "assets", "logs", "weights") |
|
logs_dir = os.path.join(parent_path, "logs") |
|
|
|
if os.path.exists(zips_path): |
|
shutil.rmtree(zips_path) |
|
if os.path.exists(unzips_path): |
|
shutil.rmtree(unzips_path) |
|
|
|
os.mkdir(zips_path) |
|
os.mkdir(unzips_path) |
|
|
|
download_file = download_from_url(url) |
|
if not download_file: |
|
print(i18n("The file could not be downloaded.")) |
|
infos.append(i18n("The file could not be downloaded.")) |
|
yield "\n".join(infos) |
|
elif download_file == "downloaded": |
|
print(i18n("It has been downloaded successfully.")) |
|
infos.append(i18n("It has been downloaded successfully.")) |
|
yield "\n".join(infos) |
|
elif download_file == "too much use": |
|
raise Exception( |
|
i18n("Too many users have recently viewed or downloaded this file") |
|
) |
|
elif download_file == "private link": |
|
raise Exception(i18n("Cannot get file from this private link")) |
|
|
|
for filename in os.listdir(zips_path): |
|
if filename.endswith(".zip"): |
|
zipfile_path = os.path.join(zips_path, filename) |
|
zip_dir_name = os.path.splitext(filename)[0] |
|
unzip_dir = unzips_path |
|
print(i18n("Proceeding with the extraction...")) |
|
infos.append(i18n("Proceeding with the extraction...")) |
|
shutil.unpack_archive(zipfile_path, unzip_dir, "zip") |
|
|
|
if os.path.exists(os.path.join(unzip_dir, zip_dir_name)): |
|
shutil.move(os.path.join(unzip_dir, zip_dir_name), logs_dir) |
|
else: |
|
new_folder_path = os.path.join(logs_dir, zip_dir_name) |
|
os.mkdir(new_folder_path) |
|
for item_name in os.listdir(unzip_dir): |
|
item_path = os.path.join(unzip_dir, item_name) |
|
if os.path.isfile(item_path): |
|
shutil.move(item_path, new_folder_path) |
|
elif os.path.isdir(item_path): |
|
shutil.move(item_path, new_folder_path) |
|
|
|
yield "\n".join(infos) |
|
else: |
|
print(i18n("Unzip error.")) |
|
infos.append(i18n("Unzip error.")) |
|
yield "\n".join(infos) |
|
|
|
result = "" |
|
|
|
for filename in os.listdir(unzips_path): |
|
if filename.endswith(".zip"): |
|
silentremove(filename) |
|
|
|
if os.path.exists(zips_path): |
|
shutil.rmtree(zips_path) |
|
if os.path.exists(os.path.join(parent_path, "assets", "unzips")): |
|
shutil.rmtree(os.path.join(parent_path, "assets", "unzips")) |
|
print(i18n("The Backup has been uploaded successfully.")) |
|
infos.append("\n" + i18n("The Backup has been uploaded successfully.")) |
|
yield "\n".join(infos) |
|
os.chdir(parent_path) |
|
return result |
|
except Exception as e: |
|
os.chdir(parent_path) |
|
if "too much use" in str(e): |
|
print(i18n("Too many users have recently viewed or downloaded this file")) |
|
yield i18n("Too many users have recently viewed or downloaded this file") |
|
elif "private link" in str(e): |
|
print(i18n("Cannot get file from this private link")) |
|
yield i18n("Cannot get file from this private link") |
|
else: |
|
print(e) |
|
yield i18n("An error occurred downloading") |
|
finally: |
|
os.chdir(parent_path) |
|
|
|
|
|
def save_to_wav(record_button): |
|
if record_button is None: |
|
pass |
|
else: |
|
path_to_file = record_button |
|
new_name = datetime.datetime.now().strftime("%Y-%m-%d_%H-%M-%S") + ".wav" |
|
new_path = ".assets/audios/" + new_name |
|
shutil.move(path_to_file, new_path) |
|
return new_name |
|
|
|
|
|
def change_choices2(): |
|
audio_paths = [ |
|
os.path.join(root, name) |
|
for root, _, files in os.walk(audio_root, topdown=False) |
|
for name in files |
|
if name.endswith(tuple(sup_audioext)) and root == audio_root |
|
] |
|
return {"choices": sorted(audio_paths), "__type__": "update"}, { |
|
"__type__": "update" |
|
} |
|
|
|
|
|
def uvr( |
|
input_url, |
|
output_path, |
|
model_name, |
|
inp_root, |
|
save_root_vocal, |
|
paths, |
|
save_root_ins, |
|
agg, |
|
format0, |
|
architecture, |
|
): |
|
carpeta_a_eliminar = "yt_downloads" |
|
if os.path.exists(carpeta_a_eliminar) and os.path.isdir(carpeta_a_eliminar): |
|
for archivo in os.listdir(carpeta_a_eliminar): |
|
ruta_archivo = os.path.join(carpeta_a_eliminar, archivo) |
|
if os.path.isfile(ruta_archivo): |
|
os.remove(ruta_archivo) |
|
elif os.path.isdir(ruta_archivo): |
|
shutil.rmtree(ruta_archivo) |
|
|
|
ydl_opts = { |
|
"no-windows-filenames": True, |
|
"restrict-filenames": True, |
|
"extract_audio": True, |
|
"format": "bestaudio", |
|
"quiet": True, |
|
"no-warnings": True, |
|
} |
|
|
|
try: |
|
print(i18n("Downloading audio from the video...")) |
|
with yt_dlp.YoutubeDL(ydl_opts) as ydl: |
|
info_dict = ydl.extract_info(input_url, download=False) |
|
formatted_title = format_title(info_dict.get("title", "default_title")) |
|
formatted_outtmpl = output_path + "/" + formatted_title + ".wav" |
|
ydl_opts["outtmpl"] = formatted_outtmpl |
|
ydl = yt_dlp.YoutubeDL(ydl_opts) |
|
ydl.download([input_url]) |
|
print(i18n("Audio downloaded!")) |
|
except Exception as error: |
|
print(i18n("An error occurred:"), error) |
|
|
|
actual_directory = os.path.dirname(__file__) |
|
actual_directory = os.path.abspath(os.path.join(actual_directory, "..")) |
|
|
|
vocal_directory = os.path.join(actual_directory, save_root_vocal) |
|
instrumental_directory = os.path.join(actual_directory, save_root_ins) |
|
|
|
vocal_formatted = f"vocal_{formatted_title}.wav.reformatted.wav_10.wav" |
|
instrumental_formatted = f"instrument_{formatted_title}.wav.reformatted.wav_10.wav" |
|
|
|
vocal_audio_path = os.path.join(vocal_directory, vocal_formatted) |
|
instrumental_audio_path = os.path.join( |
|
instrumental_directory, instrumental_formatted |
|
) |
|
|
|
vocal_formatted_mdx = f"{formatted_title}_vocal_.wav" |
|
instrumental_formatted_mdx = f"{formatted_title}_instrument_.wav" |
|
|
|
vocal_audio_path_mdx = os.path.join(vocal_directory, vocal_formatted_mdx) |
|
instrumental_audio_path_mdx = os.path.join( |
|
instrumental_directory, instrumental_formatted_mdx |
|
) |
|
|
|
if architecture == "VR": |
|
try: |
|
print(i18n("Starting audio conversion... (This might take a moment)")) |
|
inp_root = inp_root.strip(" ").strip('"').strip("\n").strip('"').strip(" ") |
|
save_root_vocal = ( |
|
save_root_vocal.strip(" ").strip('"').strip("\n").strip('"').strip(" ") |
|
) |
|
save_root_ins = ( |
|
save_root_ins.strip(" ").strip('"').strip("\n").strip('"').strip(" ") |
|
) |
|
usable_files = [ |
|
os.path.join(inp_root, file) |
|
for file in os.listdir(inp_root) |
|
if file.endswith(tuple(sup_audioext)) |
|
] |
|
if model_name == "onnx_dereverb_By_FoxJoy": |
|
pre_fun = MDXNetDereverb(15, config.device) |
|
else: |
|
func = AudioPre if "DeEcho" not in model_name else AudioPreDeEcho |
|
pre_fun = func( |
|
agg=int(agg), |
|
model_path=os.path.join( |
|
os.getenv("weight_uvr5_root"), model_name + ".pth" |
|
), |
|
device=config.device, |
|
is_half=config.is_half, |
|
) |
|
if inp_root != "": |
|
paths = usable_files |
|
else: |
|
paths = [path.name for path in paths] |
|
for path in paths: |
|
inp_path = os.path.join(inp_root, path) |
|
need_reformat = 1 |
|
done = 0 |
|
try: |
|
info = ffmpeg.probe(inp_path, cmd="ffprobe") |
|
if ( |
|
info["streams"][0]["channels"] == 2 |
|
and info["streams"][0]["sample_rate"] == "44100" |
|
): |
|
need_reformat = 0 |
|
pre_fun._path_audio_( |
|
inp_path, save_root_ins, save_root_vocal, format0 |
|
) |
|
done = 1 |
|
except: |
|
need_reformat = 1 |
|
traceback.print_exc() |
|
if need_reformat == 1: |
|
tmp_path = "%s/%s.reformatted.wav" % ( |
|
os.path.join(os.environ["temp"]), |
|
os.path.basename(inp_path), |
|
) |
|
os.system( |
|
"ffmpeg -i %s -vn -acodec pcm_s16le -ac 2 -ar 44100 %s -y" |
|
% (inp_path, tmp_path) |
|
) |
|
inp_path = tmp_path |
|
try: |
|
if done == 0: |
|
pre_fun.path_audio( |
|
inp_path, save_root_ins, save_root_vocal, format0 |
|
) |
|
print("%s->Success" % (os.path.basename(inp_path))) |
|
except: |
|
try: |
|
if done == 0: |
|
pre_fun._path_audio_( |
|
inp_path, save_root_ins, save_root_vocal, format0 |
|
) |
|
print("%s->Success" % (os.path.basename(inp_path))) |
|
except: |
|
print( |
|
"%s->%s" |
|
% (os.path.basename(inp_path), traceback.format_exc()) |
|
) |
|
except: |
|
print(traceback.format_exc()) |
|
finally: |
|
try: |
|
if model_name == "onnx_dereverb_By_FoxJoy": |
|
del pre_fun.pred.model |
|
del pre_fun.pred.model_ |
|
else: |
|
del pre_fun.model |
|
del pre_fun |
|
return i18n("Finished"), vocal_audio_path, instrumental_audio_path |
|
except: |
|
traceback.print_exc() |
|
if torch.cuda.is_available(): |
|
torch.cuda.empty_cache() |
|
print("Executed torch.cuda.empty_cache()") |
|
elif architecture == "MDX": |
|
try: |
|
print(i18n("Starting audio conversion... (This might take a moment)")) |
|
inp_root, save_root_vocal, save_root_ins = [ |
|
x.strip(" ").strip('"').strip("\n").strip('"').strip(" ") |
|
for x in [inp_root, save_root_vocal, save_root_ins] |
|
] |
|
|
|
usable_files = [ |
|
os.path.join(inp_root, file) |
|
for file in os.listdir(inp_root) |
|
if file.endswith(tuple(sup_audioext)) |
|
] |
|
try: |
|
if paths != None: |
|
paths = [path.name for path in paths] |
|
else: |
|
paths = usable_files |
|
|
|
except: |
|
traceback.print_exc() |
|
paths = usable_files |
|
print(paths) |
|
invert = True |
|
denoise = True |
|
use_custom_parameter = True |
|
dim_f = 2048 |
|
dim_t = 256 |
|
n_fft = 7680 |
|
use_custom_compensation = True |
|
compensation = 1.025 |
|
suffix = "vocal_" |
|
suffix_invert = "instrument_" |
|
print_settings = True |
|
onnx = id_to_ptm(model_name) |
|
compensation = ( |
|
compensation |
|
if use_custom_compensation or use_custom_parameter |
|
else None |
|
) |
|
mdx_model = prepare_mdx( |
|
onnx, |
|
use_custom_parameter, |
|
dim_f, |
|
dim_t, |
|
n_fft, |
|
compensation=compensation, |
|
) |
|
|
|
for path in paths: |
|
|
|
suffix_naming = suffix if use_custom_parameter else None |
|
diff_suffix_naming = suffix_invert if use_custom_parameter else None |
|
run_mdx( |
|
onnx, |
|
mdx_model, |
|
path, |
|
format0, |
|
diff=invert, |
|
suffix=suffix_naming, |
|
diff_suffix=diff_suffix_naming, |
|
denoise=denoise, |
|
) |
|
|
|
if print_settings: |
|
print() |
|
print("[MDX-Net_Colab settings used]") |
|
print(f"Model used: {onnx}") |
|
print(f"Model MD5: {mdx.MDX.get_hash(onnx)}") |
|
print(f"Model parameters:") |
|
print(f" -dim_f: {mdx_model.dim_f}") |
|
print(f" -dim_t: {mdx_model.dim_t}") |
|
print(f" -n_fft: {mdx_model.n_fft}") |
|
print(f" -compensation: {mdx_model.compensation}") |
|
print() |
|
print("[Input file]") |
|
print("filename(s): ") |
|
for filename in paths: |
|
print(f" -{filename}") |
|
print(f"{os.path.basename(filename)}->Success") |
|
except: |
|
traceback.print_exc() |
|
finally: |
|
try: |
|
del mdx_model |
|
return ( |
|
i18n("Finished"), |
|
vocal_audio_path_mdx, |
|
instrumental_audio_path_mdx, |
|
) |
|
except: |
|
traceback.print_exc() |
|
|
|
print("clean_empty_cache") |
|
|
|
if torch.cuda.is_available(): |
|
torch.cuda.empty_cache() |
|
|
|
|
|
def load_downloaded_audio(url): |
|
parent_path = find_folder_parent(now_dir, "assets") |
|
try: |
|
infos = [] |
|
audios_path = os.path.join(parent_path, "assets", "audios") |
|
zips_path = os.path.join(parent_path, "assets", "zips") |
|
|
|
if not os.path.exists(audios_path): |
|
os.mkdir(audios_path) |
|
|
|
download_file = download_from_url(url) |
|
if not download_file: |
|
print(i18n("The file could not be downloaded.")) |
|
infos.append(i18n("The file could not be downloaded.")) |
|
yield "\n".join(infos) |
|
elif download_file == "downloaded": |
|
print(i18n("It has been downloaded successfully.")) |
|
infos.append(i18n("It has been downloaded successfully.")) |
|
yield "\n".join(infos) |
|
elif download_file == "too much use": |
|
raise Exception( |
|
i18n("Too many users have recently viewed or downloaded this file") |
|
) |
|
elif download_file == "private link": |
|
raise Exception(i18n("Cannot get file from this private link")) |
|
|
|
for filename in os.listdir(zips_path): |
|
item_path = os.path.join(zips_path, filename) |
|
if item_path.split(".")[-1] in sup_audioext: |
|
if os.path.exists(item_path): |
|
shutil.move(item_path, audios_path) |
|
|
|
result = "" |
|
print(i18n("Audio files have been moved to the 'audios' folder.")) |
|
infos.append(i18n("Audio files have been moved to the 'audios' folder.")) |
|
yield "\n".join(infos) |
|
|
|
os.chdir(parent_path) |
|
return result |
|
except Exception as e: |
|
os.chdir(parent_path) |
|
if "too much use" in str(e): |
|
print(i18n("Too many users have recently viewed or downloaded this file")) |
|
yield i18n("Too many users have recently viewed or downloaded this file") |
|
elif "private link" in str(e): |
|
print(i18n("Cannot get file from this private link")) |
|
yield i18n("Cannot get file from this private link") |
|
else: |
|
print(e) |
|
yield i18n("An error occurred downloading") |
|
finally: |
|
os.chdir(parent_path) |
|
|
|
|
|
class error_message(Exception): |
|
def __init__(self, mensaje): |
|
self.mensaje = mensaje |
|
super().__init__(mensaje) |
|
|
|
|
|
def get_vc(sid, to_return_protect0, to_return_protect1): |
|
global n_spk, tgt_sr, net_g, vc, cpt, version |
|
if sid == "" or sid == []: |
|
global hubert_model |
|
if hubert_model is not None: |
|
print("clean_empty_cache") |
|
del net_g, n_spk, vc, hubert_model, tgt_sr |
|
hubert_model = net_g = n_spk = vc = hubert_model = tgt_sr = None |
|
if torch.cuda.is_available(): |
|
torch.cuda.empty_cache() |
|
if_f0 = cpt.get("f0", 1) |
|
version = cpt.get("version", "v1") |
|
if version == "v1": |
|
if if_f0 == 1: |
|
net_g = SynthesizerTrnMs256NSFsid( |
|
*cpt["config"], is_half=config.is_half |
|
) |
|
else: |
|
net_g = SynthesizerTrnMs256NSFsid_nono(*cpt["config"]) |
|
elif version == "v2": |
|
if if_f0 == 1: |
|
net_g = SynthesizerTrnMs768NSFsid( |
|
*cpt["config"], is_half=config.is_half |
|
) |
|
else: |
|
net_g = SynthesizerTrnMs768NSFsid_nono(*cpt["config"]) |
|
del net_g, cpt |
|
if torch.cuda.is_available(): |
|
torch.cuda.empty_cache() |
|
cpt = None |
|
return ( |
|
{"visible": False, "__type__": "update"}, |
|
{"visible": False, "__type__": "update"}, |
|
{"visible": False, "__type__": "update"}, |
|
) |
|
person = "%s/%s" % (weight_root, sid) |
|
print("loading %s" % person) |
|
cpt = torch.load(person, map_location="cpu") |
|
tgt_sr = cpt["config"][-1] |
|
cpt["config"][-3] = cpt["weight"]["emb_g.weight"].shape[0] |
|
if_f0 = cpt.get("f0", 1) |
|
if if_f0 == 0: |
|
to_return_protect0 = to_return_protect1 = { |
|
"visible": False, |
|
"value": 0.5, |
|
"__type__": "update", |
|
} |
|
else: |
|
to_return_protect0 = { |
|
"visible": True, |
|
"value": to_return_protect0, |
|
"__type__": "update", |
|
} |
|
to_return_protect1 = { |
|
"visible": True, |
|
"value": to_return_protect1, |
|
"__type__": "update", |
|
} |
|
version = cpt.get("version", "v1") |
|
if version == "v1": |
|
if if_f0 == 1: |
|
net_g = SynthesizerTrnMs256NSFsid(*cpt["config"], is_half=config.is_half) |
|
else: |
|
net_g = SynthesizerTrnMs256NSFsid_nono(*cpt["config"]) |
|
elif version == "v2": |
|
if if_f0 == 1: |
|
net_g = SynthesizerTrnMs768NSFsid(*cpt["config"], is_half=config.is_half) |
|
else: |
|
net_g = SynthesizerTrnMs768NSFsid_nono(*cpt["config"]) |
|
del net_g.enc_q |
|
print(net_g.load_state_dict(cpt["weight"], strict=False)) |
|
net_g.eval().to(config.device) |
|
if config.is_half: |
|
net_g = net_g.half() |
|
else: |
|
net_g = net_g.float() |
|
vc = VC(tgt_sr, config) |
|
n_spk = cpt["config"][-3] |
|
return ( |
|
{"visible": True, "maximum": n_spk, "__type__": "update"}, |
|
to_return_protect0, |
|
to_return_protect1, |
|
) |
|
|
|
|
|
def update_model_choices(select_value): |
|
model_ids = get_model_list() |
|
model_ids_list = list(model_ids) |
|
if select_value == "VR": |
|
return {"choices": uvr5_names, "__type__": "update"} |
|
elif select_value == "MDX": |
|
return {"choices": model_ids_list, "__type__": "update"} |
|
|
|
|
|
def save_drop_model_pth(dropbox): |
|
file_path = dropbox.name |
|
file_name = os.path.basename(file_path) |
|
target_path = os.path.join("logs", "weights", os.path.basename(file_path)) |
|
|
|
if not file_name.endswith('.pth'): |
|
print(i18n("The file does not have the .pth extension. Please upload the correct file.")) |
|
return None |
|
|
|
shutil.move(file_path, target_path) |
|
return target_path |
|
|
|
def extract_folder_name(file_name): |
|
match = re.search(r'nprobe_(.*?)\.index', file_name) |
|
|
|
if match: |
|
return match.group(1) |
|
else: |
|
return |
|
|
|
def save_drop_model_index(dropbox): |
|
file_path = dropbox.name |
|
file_name = os.path.basename(file_path) |
|
folder_name = extract_folder_name(file_name) |
|
|
|
if not file_name.endswith('.index'): |
|
print(i18n("The file does not have the .index extension. Please upload the correct file.")) |
|
return None |
|
|
|
out_path = os.path.join("logs", folder_name) |
|
os.mkdir(out_path) |
|
|
|
target_path = os.path.join(out_path, os.path.basename(file_path)) |
|
|
|
shutil.move(file_path, target_path) |
|
return target_path |
|
|
|
|
|
def download_model(): |
|
gr.Markdown(value="# " + i18n("Download Model")) |
|
gr.Markdown(value=i18n("It is used to download your inference models.")) |
|
with gr.Row(): |
|
model_url = gr.Textbox(label=i18n("Url:")) |
|
with gr.Row(): |
|
download_model_status_bar = gr.Textbox(label=i18n("Status:")) |
|
with gr.Row(): |
|
download_button = gr.Button(i18n("Download")) |
|
download_button.click( |
|
fn=load_downloaded_model, |
|
inputs=[model_url], |
|
outputs=[download_model_status_bar], |
|
) |
|
gr.Markdown(value=i18n("You can also drop your files to load your model.")) |
|
with gr.Row(): |
|
dropbox_pth = gr.File(label=i18n("Drag your .pth file here:")) |
|
dropbox_index = gr.File(label=i18n("Drag your .index file here:")) |
|
|
|
dropbox_pth.upload( |
|
fn=save_drop_model_pth, |
|
inputs=[dropbox_pth], |
|
) |
|
dropbox_index.upload( |
|
fn=save_drop_model_index, |
|
inputs=[dropbox_index], |
|
) |
|
|
|
|
|
def download_backup(): |
|
gr.Markdown(value="# " + i18n("Download Backup")) |
|
gr.Markdown(value=i18n("It is used to download your training backups.")) |
|
with gr.Row(): |
|
model_url = gr.Textbox(label=i18n("Url:")) |
|
with gr.Row(): |
|
download_model_status_bar = gr.Textbox(label=i18n("Status:")) |
|
with gr.Row(): |
|
download_button = gr.Button(i18n("Download")) |
|
download_button.click( |
|
fn=load_downloaded_backup, |
|
inputs=[model_url], |
|
outputs=[download_model_status_bar], |
|
) |
|
|
|
|
|
def update_dataset_list(name): |
|
new_datasets = [] |
|
file_path = find_folder_parent(now_dir, "assets") |
|
for foldername in os.listdir("./datasets"): |
|
if "." not in foldername: |
|
new_datasets.append( |
|
os.path.join( |
|
file_path, "datasets", foldername |
|
) |
|
) |
|
return gr.Dropdown.update(choices=new_datasets) |
|
|
|
|
|
def download_dataset(trainset_dir4): |
|
gr.Markdown(value="# " + i18n("Download Dataset")) |
|
gr.Markdown( |
|
value=i18n( |
|
"Download the dataset with the audios in a compatible format (.wav/.flac) to train your model." |
|
) |
|
) |
|
with gr.Row(): |
|
dataset_url = gr.Textbox(label=i18n("Url:")) |
|
with gr.Row(): |
|
load_dataset_status_bar = gr.Textbox(label=i18n("Status:")) |
|
with gr.Row(): |
|
load_dataset_button = gr.Button(i18n("Download")) |
|
load_dataset_button.click( |
|
fn=load_dowloaded_dataset, |
|
inputs=[dataset_url], |
|
outputs=[load_dataset_status_bar], |
|
) |
|
load_dataset_status_bar.change(update_dataset_list, dataset_url, trainset_dir4) |
|
|
|
|
|
def download_audio(): |
|
gr.Markdown(value="# " + i18n("Download Audio")) |
|
gr.Markdown( |
|
value=i18n( |
|
"Download audios of any format for use in inference (recommended for mobile users)." |
|
) |
|
) |
|
with gr.Row(): |
|
audio_url = gr.Textbox(label=i18n("Url:")) |
|
with gr.Row(): |
|
download_audio_status_bar = gr.Textbox(label=i18n("Status:")) |
|
with gr.Row(): |
|
download_button2 = gr.Button(i18n("Download")) |
|
download_button2.click( |
|
fn=load_downloaded_audio, |
|
inputs=[audio_url], |
|
outputs=[download_audio_status_bar], |
|
) |
|
|
|
|
|
def youtube_separator(): |
|
gr.Markdown(value="# " + i18n("Separate YouTube tracks")) |
|
gr.Markdown( |
|
value=i18n( |
|
"Download audio from a YouTube video and automatically separate the vocal and instrumental tracks" |
|
) |
|
) |
|
with gr.Row(): |
|
input_url = gr.inputs.Textbox(label=i18n("Enter the YouTube link:")) |
|
output_path = gr.Textbox( |
|
label=i18n( |
|
"Enter the path of the audio folder to be processed (copy it from the address bar of the file manager):" |
|
), |
|
value=os.path.abspath(os.getcwd()).replace("\\", "/") + "/yt_downloads", |
|
visible=False, |
|
) |
|
advanced_settings_checkbox = gr.Checkbox( |
|
value=False, |
|
label=i18n("Advanced Settings"), |
|
interactive=True, |
|
) |
|
with gr.Row( |
|
label=i18n("Advanced Settings"), visible=False, variant="compact" |
|
) as advanced_settings: |
|
with gr.Column(): |
|
model_select = gr.Radio( |
|
label=i18n("Model Architecture:"), |
|
choices=["VR", "MDX"], |
|
value="VR", |
|
interactive=True, |
|
) |
|
model_choose = gr.Dropdown( |
|
label=i18n( |
|
"Model: (Be aware that in some models the named vocal will be the instrumental)" |
|
), |
|
choices=uvr5_names, |
|
value="HP5_only_main_vocal", |
|
) |
|
with gr.Row(): |
|
agg = gr.Slider( |
|
minimum=0, |
|
maximum=20, |
|
step=1, |
|
label=i18n("Vocal Extraction Aggressive"), |
|
value=10, |
|
interactive=True, |
|
) |
|
with gr.Row(): |
|
opt_vocal_root = gr.Textbox( |
|
label=i18n("Specify the output folder for vocals:"), |
|
value=((os.getcwd()).replace("\\", "/") + "/assets/audios"), |
|
) |
|
opt_ins_root = gr.Textbox( |
|
label=i18n("Specify the output folder for accompaniment:"), |
|
value=((os.getcwd()).replace("\\", "/") + "/assets/audios/audio-others"), |
|
) |
|
dir_wav_input = gr.Textbox( |
|
label=i18n("Enter the path of the audio folder to be processed:"), |
|
value=((os.getcwd()).replace("\\", "/") + "/yt_downloads"), |
|
visible=False, |
|
) |
|
format0 = gr.Radio( |
|
label=i18n("Export file format"), |
|
choices=["wav", "flac", "mp3", "m4a"], |
|
value="wav", |
|
visible=False, |
|
interactive=True, |
|
) |
|
wav_inputs = gr.File( |
|
file_count="multiple", |
|
label=i18n( |
|
"You can also input audio files in batches. Choose one of the two options. Priority is given to reading from the folder." |
|
), |
|
visible=False, |
|
) |
|
model_select.change( |
|
fn=update_model_choices, |
|
inputs=model_select, |
|
outputs=model_choose, |
|
) |
|
with gr.Row(): |
|
vc_output4 = gr.Textbox(label=i18n("Status:")) |
|
vc_output5 = gr.Audio(label=i18n("Vocal"), type="filepath") |
|
vc_output6 = gr.Audio(label=i18n("Instrumental"), type="filepath") |
|
with gr.Row(): |
|
but2 = gr.Button(i18n("Download and Separate")) |
|
but2.click( |
|
uvr, |
|
[ |
|
input_url, |
|
output_path, |
|
model_choose, |
|
dir_wav_input, |
|
opt_vocal_root, |
|
wav_inputs, |
|
opt_ins_root, |
|
agg, |
|
format0, |
|
model_select, |
|
], |
|
[vc_output4, vc_output5, vc_output6], |
|
) |
|
|
|
def toggle_advanced_settings(checkbox): |
|
return {"visible": checkbox, "__type__": "update"} |
|
|
|
advanced_settings_checkbox.change( |
|
fn=toggle_advanced_settings, |
|
inputs=[advanced_settings_checkbox], |
|
outputs=[advanced_settings], |
|
) |
|
|
|
|
|
|