我是靠谱客的博主 深情灯泡,最近开发中收集的这篇文章主要介绍天池入门赛-心跳信号分类预测-PyTorch CNN模型天池赛-心跳信号分类预测,觉得挺不错的,现在分享给大家,希望可以做个参考。

概述

天池入门赛-心跳信号分类预测-PyTorch CNN模型

  • 天池赛-心跳信号分类预测
    • 赛题简介
    • 评测标准
    • 数据分析
    • CNN模型
    • 损失函数
    • 主要代码
    • 结语
    • 代码

天池赛-心跳信号分类预测

在这里插入图片描述

赛题简介

赛题以预测心电图心跳信号类别为任务,数据集报名后可见并可下载,该数据来自某平台心电图数据记录,总数据量超过20万,主要为1列心跳信号序列数据,其中每个样本的信号序列采样频次一致,长度相等。为了保证比赛的公平性,将会从中抽取10万条作为训练集,2万条作为测试集A,2万条作为测试集B,同时会对心跳信号类别(label)信息进行脱敏。

FieldDescription
id为心跳信号分配的唯一标识
heartbeat_signals心跳信号序列
label心跳信号类别(0、1、2、3)

评测标准

选手需提交4种不同心跳信号预测的概率,选手提交结果与实际心跳类型结果进行对比,求预测的概率与真实值差值的绝对值(越小越好)。

具体计算公式如下:
针对某一个信号,若真实值为[ y 1 y_1 y1, y 2 y_2 y2, y 3 y_3 y3, y 4 y_4 y4], 模型预测概率值为[ a 1 a_1 a1, a 2 a_2 a2, a 3 a_3 a3, a 4 a_4 a4], 那么该模型的平均指标 a b s − s u m abs-sum abssum

a b s − s u m abs-sum abssum = ∑ y = 1 n ∑ i = 1 4 ∣ y i − a i ∣ displaystylesum_{y=1}^{n}displaystylesum_{i=1}^{4} |y_i -a_i| y=1ni=14yiai
例如,心跳信号为1, 会通过编码转成[0, 1, 0, 0], 预测不同心跳信号概率为[0.1, 0.7, 0.1, 0.1], 那么这个预测结果的 a b s − s u m abs-sum abssum
a b s − s u m abs-sum abssum = ∣0.1−0∣+∣0.7−1∣+∣0.1−0∣+∣0.1−0∣=0.6

数据分析

这一部分在天池nootbook上已有作者提供代码示例,在此引用。链接如下:
Task 2 数据分析

CNN模型

这个CNN模型是用PyTorch框架实现的。
大概思路如下:
在这里插入图片描述
希望上图能够帮助新手理解卷积层和采样层是个什么意思,图片大小有限没能画出kernel和kernel怎么移动的。对于每一层输入的shape可以结合笔者代码中的注释进行推算。
也可参考PyTorch的文档和Source code进行理解,链接如下:
nn.Conv1d

class Model(nn.Module):
    def __init__(self):
        """
            CNN模型构造
        """
        super(Model, self).__init__()
        self.conv_layer1 = nn.Sequential(
            # input shape(32, 1, 205) -> [batch_size, channel, features]
            # 参考->https://pytorch.org/docs/stable/generated/torch.nn.Conv1d.html#torch.nn.Conv1d
            nn.Conv1d(in_channels=1, out_channels=16, kernel_size=3, padding=1),   # 卷积后(16, 1, 205)
            nn.BatchNorm1d(16),
            nn.ReLU()
        )
        # 下采样down-sampling
        self.sampling_layer1 = nn.Sequential(
            # input shape(32, 16, 205)
            nn.Conv1d(in_channels=16, out_channels=32, kernel_size=3, padding=1),
            nn.BatchNorm1d(32),
            nn.ReLU(),
            nn.MaxPool1d(kernel_size=2, stride=2),  # size随便选的, 这里output应该是(32, 32, 102)
        )

        self.conv_layer2 = nn.Sequential(
            nn.Conv1d(in_channels=32, out_channels=64, kernel_size=3, padding=1),   # 输出(32, 64, 102)
            nn.BatchNorm1d(64),
            nn.ReLU()
        )

        self.sampling_layer2 = nn.Sequential(
            nn.Conv1d(in_channels=64, out_channels=128, kernel_size=3, padding=1),  # 输出(32, 128, 102)
            nn.BatchNorm1d(128),
            nn.ReLU(),
            nn.MaxPool1d(kernel_size=2, stride=2),  # 输出(32, 64, 51)
        )

        self.conv_layer3 = nn.Sequential(
            nn.Conv1d(in_channels=128, out_channels=256, kernel_size=3, padding=1),  # 输出(32, 256, 51)
            nn.BatchNorm1d(256),
            nn.ReLU()
        )

        self.sampling_layer3 = nn.Sequential(
            nn.Conv1d(in_channels=256, out_channels=512, kernel_size=3, padding=1),  # 输出(32, 512, 51)
            nn.BatchNorm1d(512),
            nn.ReLU(),
            nn.MaxPool1d(kernel_size=2, stride=2),  # 输出(32, 512, 25)
        )
        # 全连接层
        self.full_layer = nn.Sequential(
            nn.Linear(in_features=512*25, out_features=256*25),
            nn.ReLU(),
            nn.Linear(in_features=256*25, out_features=128*25),
            nn.ReLU(),
            nn.Linear(in_features=128*25, out_features=64*25),
            nn.ReLU(),
            nn.Linear(in_features=64*25, out_features=4)
        )
        # 这个是输出label预测概率, 不知道这写法对不对
        self.pred_layer = nn.Softmax(dim=1)

    def forward(self, x):
        """
            前向传播
        :param x: batch
        :return: training == Ture 返回的是全连接层输出, training == False 加上一个Softmax(), 返回各个label概率.
        """
        x = x.unsqueeze(dim=1)  # 升维. input shape(32, 205), output shape(32, 1, 205)
        x = self.conv_layer1(x)
        x = self.sampling_layer1(x)
        x = self.conv_layer2(x)
        x = self.sampling_layer2(x)
        x = self.conv_layer3(x)
        x = self.sampling_layer3(x)
        x = x.view(x.size(0), -1)   # output(32, 12800)
        x = self.full_layer(x)

        if self.training:
            return x	# CrossEntropyLoss自带LogSoftmax, 所以训练的时候不用输出概率(我也不知道这个写法对不对, 我是试错出来的.)
        else:
            return self.pred_layer(x)
   

损失函数

Cross Entropy Loss:
这个损失函数是多分类问题经常使用的, 需要注意的是Cross Entropy Loss结合了LogSoftmax和NLLLoss, 如果你在输出层使用了Softmax可能会导致你的模型无法拟合.
参考链接: Cross Entropy Loss
在这里插入图片描述
L1 Loss:
在评分标准中有题提到这个比赛采用的是 a b s − s u m abs-sum abssum = ∑ y = 1 n ∑ i = 1 4 ∣ y i − a i ∣ displaystylesum_{y=1}^{n}displaystylesum_{i=1}^{4} |y_i -a_i| y=1ni=14yiai
这个其实就是PyTorch中的nn.L1Loss()或者F.l1_loss(). PyTorch默认的是mean absolute error (MAE), 但根据文档你只要将reduction设置为 reduction=‘sum’ 就可以了. 这不就成了sum absolute error(SAE)了.
L1 Loss的参考链接: nn.L1Loss
在这里插入图片描述

主要代码

一下是主要代码供大家参考.

def train_loop(dataloader, model, loss_fn, optimizer):
    """
        模型训练部分
    :param dataloader: 训练数据集
    :param model: 训练用到的模型
    :param loss_fn: 评估用的损失函数
    :param optimizer: 优化器
    :return: None
    """
    for batch, x_y in enumerate(dataloader):
        X, y = x_y[:, :205].type(torch.float64), torch.tensor(x_y[:, 205], dtype=torch.long, device='cuda:0')
        # 开启梯度
        with torch.set_grad_enabled(True):
            # Compute prediction and loss
            pred = model(X.float())
            loss = loss_fn(pred, y)
            optimizer.zero_grad()
            # Backpropagation
            loss.backward()
            optimizer.step()


def test_loop(dataloader, model, loss_fn):
    """
        模型测试部分
    :param dataloader: 测试数据集
    :param model: 测试模型
    :param loss_fn: 损失函数
    :return: None
    """
    size = len(dataloader.dataset)
    test_loss, correct, l1_loss = 0, 0, 0
    # 用来计算abs-sum. 等于PyTorch L1Loss-->
    # https://pytorch.org/docs/stable/generated/torch.nn.L1Loss.html#torch.nn.L1Loss
    l1loss_fn = AbsSumLoss()
    with torch.no_grad():   # 关掉梯度
        model.eval()
        for x_y in dataloader:
            X, y = x_y[:, :205].type(torch.float64), torch.tensor(x_y[:, 205], dtype=torch.long, device='cuda:0')
            # 注意Y和y的区别, Y用来计算L1 loss, y是CrossEntropy loss.
            Y = torch.zeros(size=(len(y), 4), device='cuda:0')
            for i in range(len(Y)):
                Y[i][y[i]] = 1

            pred = model(X.float())
            test_loss += loss_fn(pred, y).item()    # 这个是CrossEntropy loss
            l1_loss += l1loss_fn(pred, Y).item()    # 这个是abs-sum/L1 loss
            correct += (pred.argmax(1) == y).type(torch.float).sum().item()  # 这个是计算准确率的, 取概率最大值的下标.

    test_loss /= size   # 等于CrossEntropy的reduction='mean', 这里有些多此一举可删掉.
    correct /= size
    print(f"Test Results:nAccuracy: {(100*correct):>0.1f}% abs-sum loss: {l1_loss:>8f} CroEtr loss: {test_loss:>8f}")


def prediction(net, loss):
    """
        对数据进行预测
    :param net: 训练好的模型
    :param loss: 模型的测试误差值, 不是损失函数. 可以去掉, 这里是用来给预测数据命名方便区分.
    :return: None
    """
    with torch.no_grad():
        net.eval()
        pred_loader = torch.utils.data.DataLoader(dataset=pred_data)
        res = []
        for x in pred_loader:
            x = torch.tensor(x, device='cuda:0', dtype=torch.float64)
            output = net(x.float())
            res.append(output.cpu().numpy().tolist())

        res = [i[0] for i in res]
        res_df = pd.DataFrame(res, columns=['label_0', 'label_1', 'label_2', 'label_3'])
        res_df.insert(0, 'id', value=range(100000, 120000))

        res_df.to_csv('res-loss '+str(loss)+'.csv', index=False)


class Model(nn.Module):
    def __init__(self):
        """
            CNN模型构造
        """
        super(Model, self).__init__()
        self.conv_layer1 = nn.Sequential(
            # input shape(32, 1, 205) -> [batch_size, channel, features]
            # 参考->https://pytorch.org/docs/stable/generated/torch.nn.Conv1d.html#torch.nn.Conv1d
            nn.Conv1d(in_channels=1, out_channels=16, kernel_size=3, padding=1),   # 卷积后(32, 16, 205)
            nn.BatchNorm1d(16),
            nn.ReLU()
        )
        # 下采样down-sampling
        self.sampling_layer1 = nn.Sequential(
            # input shape(32, 16, 205)
            nn.Conv1d(in_channels=16, out_channels=32, kernel_size=3, padding=1),
            nn.BatchNorm1d(32),
            nn.ReLU(),
            nn.MaxPool1d(kernel_size=2, stride=2),  # size随便选的, 这里output应该是(32, 32, 102)
        )

        self.conv_layer2 = nn.Sequential(
            nn.Conv1d(in_channels=32, out_channels=64, kernel_size=3, padding=1),   # 输出(32, 64, 102)
            nn.BatchNorm1d(64),
            nn.ReLU()
        )

        self.sampling_layer2 = nn.Sequential(
            nn.Conv1d(in_channels=64, out_channels=128, kernel_size=3, padding=1),  # 输出(32, 128, 102)
            nn.BatchNorm1d(128),
            nn.ReLU(),
            nn.MaxPool1d(kernel_size=2, stride=2),  # 输出(32, 64, 51)
        )

        self.conv_layer3 = nn.Sequential(
            nn.Conv1d(in_channels=128, out_channels=256, kernel_size=3, padding=1),  # 输出(32, 256, 51)
            nn.BatchNorm1d(256),
            nn.ReLU()
        )

        self.sampling_layer3 = nn.Sequential(
            nn.Conv1d(in_channels=256, out_channels=512, kernel_size=3, padding=1),  # 输出(32, 512, 51)
            nn.BatchNorm1d(512),
            nn.ReLU(),
            nn.MaxPool1d(kernel_size=2, stride=2),  # 输出(32, 512, 25)
        )
        # 全连接层
        self.full_layer = nn.Sequential(
            nn.Linear(in_features=512*25, out_features=256*25),
            nn.ReLU(),
            nn.Linear(in_features=256*25, out_features=128*25),
            nn.ReLU(),
            nn.Linear(in_features=128*25, out_features=64*25),
            nn.ReLU(),
            nn.Linear(in_features=64*25, out_features=4)
        )
        # 这个是输出label预测概率, 不知道这写法对不对
        self.pred_layer = nn.Softmax(dim=1)

    def forward(self, x):
        """
            前向传播
        :param x: batch
        :return: training == Ture 返回的是全连接层输出, training == False 加上一个Softmax(), 返回各个label概率.
        """
        x = x.unsqueeze(dim=1)  # 升维. input shape(32, 205), output shape(32, 1, 205)
        x = self.conv_layer1(x)
        x = self.sampling_layer1(x)
        x = self.conv_layer2(x)
        x = self.sampling_layer2(x)
        x = self.conv_layer3(x)
        x = self.sampling_layer3(x)
        x = x.view(x.size(0), -1)   # output(32, 12800)
        x = self.full_layer(x)

        if self.training:
            return x    # CrossEntropyLoss自带LogSoftmax, 训练的时候不用输出概率(我也不知道这个写法对不对, 我是试错出来的.)
        else:
            return self.pred_layer(x)


class AbsSumLoss(nn.Module):
    def __init__(self):
        """
            可以直接用PyTorch的nn.L1Loss, 这个我写的时候不知道。
        """
        super(AbsSumLoss, self).__init__()

    def forward(self, output, target):
        loss = F.l1_loss(target, output, reduction='sum')

        return loss


if __name__ == '__main__':
    set_random_seed(1996)   # 设定随机种子
    # 加载数据集
    data = pd.read_csv('train.csv')
    data = process_data(data)
    pred_data = pd.read_csv('testA.csv')
    pred_data = get_pred_x(pred_data)

    # 初始化模型
    lr_rate = 1e-5
    w_decay = 1e-6
    n_epoch = 100
    b_size = 32
    device = torch.device('cuda:0' if torch.cuda.is_available() else 'cpu')
    net = Model()
    net.to(device)
    optimizer = torch.optim.Adam(params=net.parameters(), lr=lr_rate, weight_decay=w_decay)
    loss_fn = nn.CrossEntropyLoss(reduction='sum')

    # 拆分训练测试集
    train, test = train_test_split(data, test_size=0.2)
    train, test = torch.cuda.FloatTensor(train), torch.cuda.FloatTensor(test)
    train_loader = torch.utils.data.DataLoader(dataset=train, batch_size=b_size)
    test_loader = torch.utils.data.DataLoader(dataset=test, batch_size=b_size)

    for epoch in range(n_epoch):
        start = time.time()
        print(f"n----------Epoch {epoch + 1}----------")
        train_loop(train_loader, net, loss_fn, optimizer)
        test_loop(test_loader, net, loss_fn)
        end = time.time()
        print('training time: ', end-start)

    # predict

结语

我也是个新手, 没什么经验, 难免存在错误和纰漏还请各位大佬指正. 比赛还在进行中, 如有新的发现和经验会在后续和大家继续分享.

代码

上传了代码,CNN最后B榜的表现是16名。
模型代码

最后

以上就是深情灯泡为你收集整理的天池入门赛-心跳信号分类预测-PyTorch CNN模型天池赛-心跳信号分类预测的全部内容,希望文章能够帮你解决天池入门赛-心跳信号分类预测-PyTorch CNN模型天池赛-心跳信号分类预测所遇到的程序开发问题。

如果觉得靠谱客网站的内容还不错,欢迎将靠谱客网站推荐给程序员好友。

本图文内容来源于网友提供,作为学习参考使用,或来自网络收集整理,版权属于原作者所有。
点赞(41)

评论列表共有 0 条评论

立即
投稿
返回
顶部