|
|
|
|
|
|
|
|
|
|
|
import json |
|
from tqdm import tqdm |
|
import os |
|
import torchaudio |
|
from utils import audio |
|
import csv |
|
import random |
|
|
|
from utils.util import has_existed |
|
from text import _clean_text |
|
import librosa |
|
import soundfile as sf |
|
from scipy.io import wavfile |
|
|
|
from pathlib import Path |
|
import numpy as np |
|
|
|
|
|
def textgird_extract( |
|
corpus_directory, |
|
output_directory, |
|
mfa_path=os.path.join("mfa", "montreal-forced-aligner", "bin", "mfa_align"), |
|
lexicon=os.path.join("mfa", "lexicon", "librispeech-lexicon.txt"), |
|
acoustic_model_path=os.path.join( |
|
"mfa", "montreal-forced-aligner", "pretrained_models", "english.zip" |
|
), |
|
jobs="8", |
|
): |
|
assert os.path.exists( |
|
corpus_directory |
|
), "Please check the directionary contains *.wav, *.lab" |
|
assert ( |
|
os.path.exists(mfa_path) |
|
and os.path.exists(lexicon) |
|
and os.path.exists(acoustic_model_path) |
|
), f"Please download the MFA tools to {mfa_path} firstly" |
|
Path(output_directory).mkdir(parents=True, exist_ok=True) |
|
print(f"MFA results are save in {output_directory}") |
|
os.system( |
|
f".{os.path.sep}{mfa_path} {corpus_directory} {lexicon} {acoustic_model_path} {output_directory} -j {jobs} --clean" |
|
) |
|
|
|
|
|
def get_lines(file): |
|
lines = [] |
|
with open(file, encoding="utf-8") as f: |
|
for line in tqdm(f): |
|
lines.append(line.strip()) |
|
return lines |
|
|
|
|
|
def get_uid2utt(ljspeech_path, dataset, cfg): |
|
index_count = 0 |
|
total_duration = 0 |
|
|
|
uid2utt = [] |
|
for l in tqdm(dataset): |
|
items = l.split("|") |
|
uid = items[0] |
|
text = items[2] |
|
|
|
res = { |
|
"Dataset": "LJSpeech", |
|
"index": index_count, |
|
"Singer": "LJSpeech", |
|
"Uid": uid, |
|
"Text": text, |
|
} |
|
|
|
|
|
audio_file = os.path.join(ljspeech_path, "wavs/{}.wav".format(uid)) |
|
|
|
res["Path"] = audio_file |
|
|
|
waveform, sample_rate = torchaudio.load(audio_file) |
|
duration = waveform.size(-1) / sample_rate |
|
res["Duration"] = duration |
|
|
|
uid2utt.append(res) |
|
|
|
index_count = index_count + 1 |
|
total_duration += duration |
|
|
|
return uid2utt, total_duration / 3600 |
|
|
|
|
|
def split_dataset(lines, test_rate=0.05, test_size=None): |
|
if test_size == None: |
|
test_size = int(len(lines) * test_rate) |
|
random.shuffle(lines) |
|
|
|
train_set = [] |
|
test_set = [] |
|
|
|
for line in lines[:test_size]: |
|
test_set.append(line) |
|
for line in lines[test_size:]: |
|
train_set.append(line) |
|
return train_set, test_set |
|
|
|
|
|
max_wav_value = 32768.0 |
|
|
|
|
|
def prepare_align(dataset, dataset_path, cfg, output_path): |
|
in_dir = dataset_path |
|
out_dir = os.path.join(output_path, dataset, cfg.raw_data) |
|
sampling_rate = cfg.sample_rate |
|
cleaners = cfg.text_cleaners |
|
speaker = "LJSpeech" |
|
with open(os.path.join(dataset_path, "metadata.csv"), encoding="utf-8") as f: |
|
for line in tqdm(f): |
|
parts = line.strip().split("|") |
|
base_name = parts[0] |
|
text = parts[2] |
|
text = _clean_text(text, cleaners) |
|
|
|
output_wav_path = os.path.join(out_dir, speaker, "{}.wav".format(base_name)) |
|
output_lab_path = os.path.join(out_dir, speaker, "{}.lab".format(base_name)) |
|
|
|
if os.path.exists(output_wav_path) and os.path.exists(output_lab_path): |
|
continue |
|
|
|
wav_path = os.path.join(in_dir, "wavs", "{}.wav".format(base_name)) |
|
if os.path.exists(wav_path): |
|
os.makedirs(os.path.join(out_dir, speaker), exist_ok=True) |
|
wav, _ = librosa.load(wav_path, sampling_rate) |
|
wav = wav / max(abs(wav)) * max_wav_value |
|
|
|
wavfile.write( |
|
os.path.join(out_dir, speaker, "{}.wav".format(base_name)), |
|
sampling_rate, |
|
wav.astype(np.int16), |
|
) |
|
|
|
with open( |
|
os.path.join(out_dir, speaker, "{}.lab".format(base_name)), |
|
"w", |
|
) as f1: |
|
f1.write(text) |
|
|
|
textgird_extract( |
|
corpus_directory=out_dir, |
|
output_directory=os.path.join(output_path, dataset, "TextGrid"), |
|
) |
|
|
|
|
|
def main(output_path, dataset_path, cfg): |
|
print("-" * 10) |
|
print("Dataset splits for {}...\n".format("LJSpeech")) |
|
|
|
dataset = "LJSpeech" |
|
|
|
save_dir = os.path.join(output_path, dataset) |
|
os.makedirs(save_dir, exist_ok=True) |
|
ljspeech_path = dataset_path |
|
|
|
train_output_file = os.path.join(save_dir, "train.json") |
|
test_output_file = os.path.join(save_dir, "test.json") |
|
singer_dict_file = os.path.join(save_dir, "singers.json") |
|
|
|
speaker = "LJSpeech" |
|
speakers = [dataset + "_" + speaker] |
|
singer_lut = {name: i for i, name in enumerate(sorted(speakers))} |
|
with open(singer_dict_file, "w") as f: |
|
json.dump(singer_lut, f, indent=4, ensure_ascii=False) |
|
|
|
if has_existed(train_output_file) and has_existed(test_output_file): |
|
return |
|
|
|
meta_file = os.path.join(ljspeech_path, "metadata.csv") |
|
lines = get_lines(meta_file) |
|
|
|
train_set, test_set = split_dataset(lines) |
|
|
|
res, hours = get_uid2utt(ljspeech_path, train_set, cfg) |
|
|
|
|
|
os.makedirs(save_dir, exist_ok=True) |
|
with open(train_output_file, "w") as f: |
|
json.dump(res, f, indent=4, ensure_ascii=False) |
|
|
|
print("Train_hours= {}".format(hours)) |
|
|
|
res, hours = get_uid2utt(ljspeech_path, test_set, cfg) |
|
|
|
|
|
os.makedirs(save_dir, exist_ok=True) |
|
with open(test_output_file, "w") as f: |
|
json.dump(res, f, indent=4, ensure_ascii=False) |
|
|
|
print("Test_hours= {}".format(hours)) |
|
|