我是靠谱客的博主 帅气硬币,最近开发中收集的这篇文章主要介绍DTLN模型代码简介前言run_training.pyrun_evaluation.pyDTLN_model.py,觉得挺不错的,现在分享给大家,希望可以做个参考。

概述

前言

本文将主要介绍DTLN模型的代码,可以在github上下载其模型完整的代码。该模型代码由tensorflow2.0实现,主要包含训练文件run_training.py、推理文件run_evaluation.py和模型文件DTLN_model.py。

run_training.py

#!/usr/bin/env python3
# -*- coding: utf-8 -*-

from DTLN_model import DTLN_model
import os

# 配置gup,选择需要使用的显卡序列号,如果注释掉,tensorflow默认使用所有显卡
os.environ["CUDA_VISIBLE_DEVICES"]='0'
# 使代码变得确定,排除每次运行的不确定性
os.environ['TF_DETERMINISTIC_OPS'] = '1'


# 设置训练数据集和验证集的路径
path_to_train_mix = '/path/to/noisy/training/data/'
path_to_train_speech = '/path/to/clean/training/data/'
path_to_val_mix = '/path/to/noisy/validation/data/'
path_to_val_speech = '/path/to/clean/validation/data/'



# 模型名称
runName = 'DTLN_model'
# 创建模型实例
modelTrainer = DTLN_model()
# 建立模型
modelTrainer.build_DTLN_model()
# 编译模型
modelTrainer.compile_model()
# 训练模型
modelTrainer.train_model(runName, path_to_train_mix, path_to_train_speech, 
                         path_to_val_mix, path_to_val_speech)

run_evaluation.py

# -*- coding: utf-8 -*-
import soundfile as sf
import librosa
import numpy as np
import os
import argparse
from DTLN_model import DTLN_model



def process_file(model, audio_file_name, out_file_name):
   
    # 读取.wav文件,并通过网络模型实现语音降噪
    
    # 读取音频文件,采样率为16000,单通道读入
    in_data,fs = librosa.core.load(audio_file_name, sr=16000, mono=True)
    # 进行推理
    predicted = model.predict_on_batch(
        np.expand_dims(in_data,axis=0).astype(np.float32))
    # 缩减推理结果的维度
    predicted_speech = np.squeeze(predicted)
    # 将结果写入到out_file_name中
    sf.write(out_file_name, predicted_speech,fs)
      

def process_folder(model, folder_name, new_folder_name):
    # 该方法主要对推理的输入和输出路径进行处理
    
    file_names = [];
    directories = [];
    new_directories = [];
    # 获得输入文件路径的各部分
    for root, dirs, files in os.walk(folder_name):
        for file in files:
            # 将所有.wav结尾的文件添加到file_names里面,同时将其所在路径添加到directories里面
            if file.endswith(".wav"):
                file_names.append(file)
                directories.append(root)
                # 创建新的目录名,并添加到new_directories
                new_directories.append(root.replace(folder_name, new_folder_name))
                # 如果新的目录不存在则创建它
                if not os.path.exists(root.replace(folder_name, new_folder_name)):
                    os.makedirs(root.replace(folder_name, new_folder_name))
    # 迭代遍历所有 .wav 文件
    for idx in range(len(file_names)):
        # 进行推理
        process_file(model, os.path.join(directories[idx],file_names[idx]), 
                     os.path.join(new_directories[idx],file_names[idx]))
        print(file_names[idx] + ' processed successfully!')
    
           


if __name__ == '__main__':
    # 设置参数
    parser = argparse.ArgumentParser(description='data evaluation')
    # 设置推理的输入数据路径
    parser.add_argument('--in_folder', '-i', default='./data/input/',
                        help='folder with input files')
    # 推理结果输出路径
    parser.add_argument('--out_folder', '-o', default='./data/output/',
                        help='target folder for processed files')
    # 加载训练好的模型路径
    parser.add_argument('--model', '-m', default='./pretrained_model/weights-improvement-37--17.45.h5',  help='weights of the enhancement model in .h5 format')
    args = parser.parse_args()
    # 确定模型类型
    if args.model.find('_norm_') != -1:
        norm_stft = True
    else:
        norm_stft = False
    # 创建模型实例
    modelClass = DTLN_model();
    # 建立模型
    modelClass.build_DTLN_model(norm_stft=norm_stft)
    # 加载模型
    modelClass.model.load_weights(args.model)
    # 进入推理函数
    process_folder(modelClass.model, args.in_folder, args.out_folder)

DTLN_model.py

模型文件主要包含三个类,audio_generator()、DTLN_model()和 InstantLayerNormalization(Layer),第一个类主要是对语音信号进行处理,第三个是模型中一维卷积用到的ILN层,在这里不做过多介绍,在进行实验时也不需要改动,本文主要对DTLN_model()类进行简单的介绍。

# -*- coding: utf-8 -*-

import os, fnmatch
import tensorflow.keras as keras
from tensorflow.keras.models import Model
from tensorflow.keras.layers import Activation, Dense, LSTM, Dropout, 
    Lambda, Input, Multiply, Layer, Conv1D
from tensorflow.keras.callbacks import ReduceLROnPlateau, CSVLogger, 
    EarlyStopping, ModelCheckpoint
import tensorflow as tf
import soundfile as sf
from wavinfo import WavInfoReader
from random import shuffle, seed
import numpy as np





class DTLN_model():
    '''
    Class to create and train the DTLN model
    '''
    
    def __init__(self):
        '''
        Constructor
        '''

        # 定义损失函数
        self.cost_function = self.snr_cost
        # empty property for the model
        self.model = []
        # 定义各种参数
        self.fs = 16000
        self.batchsize = 32
        self.len_samples = 15
        self.activation = 'sigmoid'
        self.numUnits = 128
        self.numLayer = 2
        self.blockLen = 512
        self.block_shift = 128
        self.dropout = 0.25
        self.lr = 1e-3
        self.max_epochs = 200
        self.encoder_size = 256
        self.eps = 1e-7
        # 设置seed减少训练运行之间的不变性
        os.environ['PYTHONHASHSEED']=str(42)
        seed(42)
        np.random.seed(42)
        tf.random.set_seed(42)
        # some line to correctly find some libraries in TF 2.x
        physical_devices = tf.config.experimental.list_physical_devices('GPU')
        if len(physical_devices) > 0:
            for device in physical_devices:
                tf.config.experimental.set_memory_growth(device, enable=True)
        

    @staticmethod
    def snr_cost(s_estimate, s_true):
        '''
        计算负信噪比
        '''

        # calculating the SNR
        snr = tf.reduce_mean(tf.math.square(s_true), axis=-1, keepdims=True) / 
            (tf.reduce_mean(tf.math.square(s_true-s_estimate), axis=-1, keepdims=True)+1e-7)
        # using some more lines, because TF has no log10
        num = tf.math.log(snr) 
        denom = tf.math.log(tf.constant(10, dtype=num.dtype))
        loss = -10*(num / (denom))
        # returning the loss
        return loss
        

    def lossWrapper(self):
        '''
        A wrapper function which returns the loss function. This is done to
        to enable additional arguments to the loss function if necessary.
        '''
        def lossFunction(y_true,y_pred):
            # calculating loss and squeezing single dimensions away
            loss = tf.squeeze(self.cost_function(y_pred,y_true))
            # calculate mean over batches
            loss = tf.reduce_mean(loss)
            # return the loss
            return loss
        # returning the loss function as handle
        return lossFunction
    
    

    '''
    In the following some helper layers are defined.
    '''  
    
    def stftLayer(self, x):
        '''
        短时傅里叶层
        '''
        
        # creating frames from the continuous waveform
        frames = tf.signal.frame(x, self.blockLen, self.block_shift)
        # calculating the fft over the time frames. rfft returns NFFT/2+1 bins.
        stft_dat = tf.signal.rfft(frames)
        # calculating magnitude and phase from the complex signal
        mag = tf.abs(stft_dat)
        phase = tf.math.angle(stft_dat)
        # returning magnitude and phase as list
        return [mag, phase]
    
    def fftLayer(self, x):
        '''
        快速傅里叶层
        '''
        
        # expanding dimensions
        frame = tf.expand_dims(x, axis=1)
        # calculating the fft over the time frames. rfft returns NFFT/2+1 bins.
        stft_dat = tf.signal.rfft(frame)
        # calculating magnitude and phase from the complex signal
        mag = tf.abs(stft_dat)
        phase = tf.math.angle(stft_dat)
        # returning magnitude and phase as list
        return [mag, phase]

 
        
    def ifftLayer(self, x):
        '''
        逆傅里叶层
        '''
        
        # calculating the complex representation
        s1_stft = (tf.cast(x[0], tf.complex64) * 
                    tf.exp( (1j * tf.cast(x[1], tf.complex64))))
        # returning the time domain frames
        return tf.signal.irfft(s1_stft)  
    
    
    def overlapAddLayer(self, x):
        '''
        这一层从帧信号重建波形
        '''

        # calculating and returning the reconstructed waveform
        return tf.signal.overlap_and_add(x, self.block_shift)
    
        

    def seperation_kernel(self, num_layer, mask_size, x, stateful=False):
        '''
        建立LSTM的函数
        '''

        # creating num_layer number of LSTM layers
        for idx in range(num_layer):
            x = LSTM(self.numUnits, return_sequences=True, stateful=stateful)(x)
            # using dropout between the LSTM layer for regularization 
            if idx<(num_layer-1):
                x = Dropout(self.dropout)(x)
        # creating the mask with a Dense and an Activation layer
        mask = Dense(mask_size)(x)
        mask = Activation(self.activation)(mask)
        # returning the mask
        return mask
    

    def build_DTLN_model(self, norm_stft=False):
        '''
        这里是建立模型的函数
        Method to build and compile the DTLN model. The model takes time domain 
        batches of size (batchsize, len_in_samples) and returns enhanced clips 
        in the same dimensions. As optimizer for the Training process the Adam
        optimizer with a gradient norm clipping of 3 is used. 
        The model contains two separation cores. The first has an STFT signal 
        transformation and the second a learned transformation based on 1D-Conv 
        layer. 
        '''
        
        # 输入数据
        time_dat = Input(batch_shape=(None, None))
        # 通过短时傅里叶变换得到输入信号频域下的幅度和相位
        mag,angle = Lambda(self.stftLayer)(time_dat)
        # normalizing log magnitude stfts to get more robust against level variations
        if norm_stft:
            mag_norm = InstantLayerNormalization()(tf.math.log(mag + 1e-7))
        else:
            # behaviour like in the paper
            mag_norm = mag
        # 幅度信号通过LSTM网络,self.numLayer为LSTM的层数,论文是2层,self.blockLen//2+1是LSTM模块的输出维度,论文为257,最后获得一个mask1。
        mask_1 = self.seperation_kernel(self.numLayer, (self.blockLen//2+1), mag_norm)
        # 将mask1和幅度信号点乘进行第一次降噪
        estimated_mag = Multiply()([mag, mask_1])
        # 第一次降噪后的信号经过逆傅里叶变换到时域
        estimated_frames_1 = Lambda(self.ifftLayer)([estimated_mag,angle])
        # 用一维卷积在时域上提取特征
        encoded_frames = Conv1D(self.encoder_size,1,strides=1,use_bias=False)(estimated_frames_1)
        # 正则化
        encoded_frames_norm = InstantLayerNormalization()(encoded_frames)
        # 同样经过LSTM层计算mask2
        mask_2 = self.seperation_kernel(self.numLayer, self.encoder_size, encoded_frames_norm)
        # 第二次降噪
        estimated = Multiply()([encoded_frames, mask_2]) 
        # 将信号转换到原始输入形状
        decoded_frames = Conv1D(self.blockLen, 1, padding='causal',use_bias=False)(estimated)
        # 将每次输出信号进行叠加
        estimated_sig = Lambda(self.overlapAddLayer)(decoded_frames)

        
        # create the model
        self.model = Model(inputs=time_dat, outputs=estimated_sig)
        # show the model summary
        print(self.model.summary())
        
   
        
    def compile_model(self):
        '''
        编译模型
        '''
        
        # use the Adam optimizer with a clipnorm of 3
        optimizerAdam = keras.optimizers.Adam(lr=self.lr, clipnorm=3.0)
        # compile model with loss function
        self.model.compile(loss=self.lossWrapper(), optimizer=optimizerAdam)
        
    def create_saved_model(self, weights_file, target_name):
        '''
        保存模型的方法
        '''
        # check for type
        if weights_file.find('_norm_') != -1:
            norm_stft = True
        else:
            norm_stft = False
        # build model    
        self.build_DTLN_model_stateful(norm_stft=norm_stft)
        # load weights
        self.model.load_weights(weights_file)
        # save model
        tf.saved_model.save(self.model, target_name)
        
   
    
    def train_model(self, runName, path_to_train_mix, path_to_train_speech, 
                    path_to_val_mix, path_to_val_speech):
        '''
        模型训练函数
        '''
        
        # 保存模型的路径
        savePath = './models_'+ runName+'/' 
        if not os.path.exists(savePath):
            os.makedirs(savePath)
        # 训练日志文件
        csv_logger = CSVLogger(savePath+ 'training_' +runName+ '.log')
        # 为自适应学习率创建回调,验证loss三个epoch不变,lr减半
        reduce_lr = ReduceLROnPlateau(monitor='val_loss', factor=0.5,
                              patience=3, min_lr=10**(-10), cooldown=1)
        # 创建用于自动停止的回调,验证loss十个epoch不变,自动停止训练
        early_stopping = EarlyStopping(monitor='val_loss', min_delta=0, 
            patience=10, verbose=0, mode='auto', baseline=None)
        # 创建模型检查点,便于训练异常中断时,可以从断点继续训练
        checkpointer = ModelCheckpoint(savePath+runName+'.h5',
                                       monitor='val_loss',
                                       verbose=1,
                                       save_best_only=True,
                                       save_weights_only=True,
                                       mode='auto',
                                       save_freq='epoch'
                                       )

        # 计算样本中音频块的长度
        len_in_samples = int(np.fix(self.fs * self.len_samples / 
                                    self.block_shift)*self.block_shift)
        # 创建训练数据生成器
        generator_input = audio_generator(path_to_train_mix, 
                                          path_to_train_speech, 
                                          len_in_samples, 
                                          self.fs, train_flag=True)
        dataset = generator_input.tf_data_set
        dataset = dataset.batch(self.batchsize, drop_remainder=True).repeat()
        # 计算一个epoch的训练步数
        steps_train = generator_input.total_samples//self.batchsize
        #创建验证数据生成器
        generator_val = audio_generator(path_to_val_mix,
                                        path_to_val_speech, 
                                        len_in_samples, self.fs)
        dataset_val = generator_val.tf_data_set
        dataset_val = dataset_val.batch(self.batchsize, drop_remainder=True).repeat()
        # 计算验证步骤的数量
        steps_val = generator_val.total_samples//self.batchsize
        # 训练模型,keras使用self.model.fit()来训练模型
        self.model.fit(
            x=dataset, 
            batch_size=None,
            steps_per_epoch=steps_train, 
            epochs=self.max_epochs,
            verbose=1,
            validation_data=dataset_val,
            validation_steps=steps_val, 
            callbacks=[checkpointer, reduce_lr, csv_logger, early_stopping],
            max_queue_size=50,
            workers=4,
            use_multiprocessing=True)
        # clear out garbage
        tf.keras.backend.clear_session()

这里主要针对训练代码、验证代码已经模型构建的代码进行简单的注释,项目完整代码中还包含一些移植到tflite的代码,笔者没有去研究,有兴趣的朋友可以移步到github下载完整代码。

最后

以上就是帅气硬币为你收集整理的DTLN模型代码简介前言run_training.pyrun_evaluation.pyDTLN_model.py的全部内容,希望文章能够帮你解决DTLN模型代码简介前言run_training.pyrun_evaluation.pyDTLN_model.py所遇到的程序开发问题。

如果觉得靠谱客网站的内容还不错,欢迎将靠谱客网站推荐给程序员好友。

本图文内容来源于网友提供,作为学习参考使用,或来自网络收集整理,版权属于原作者所有。
点赞(76)

评论列表共有 0 条评论

立即
投稿
返回
顶部