RNN核心之时间预测
数据如下所示:
这个数据是 12年飞机月流量,可视化得到下面的效果。
1
2
3
4
5
6
7import numpy as np import pandas as pd import matplotlib.pyplot as plt data_csv = pd.read_csv('./data.csv', usecols=[1]) plt.plot(data_csv)
首先我们进行预处理,将数据中 na 的数据去掉,然后将数据标准化到 0 ~ 1 之间。
数据预处理
1
2
3
4
5
6
7
8data_csv = data_csv.dropna() dataset = data_csv.values dataset = dataset.astype('float32') max_value = np.max(dataset) min_value = np.min(dataset) scalar = max_value - min_value dataset = list(map(lambda x: x / scalar, dataset))
接着我们进行数据集的创建,我们想通过前面几个月的流量来预测当月的流量,比如我们希望通过前两个月的流量来预测当月的流量,我们可以将前两个月的流量当做输入,当月的流量当做输出。同时我们需要将我们的数据集分为训练集和测试集,通过测试集的效果来测试模型的性能,这里我们简单的将前面几年的数据作为训练集,后面两年的数据作为测试集。
1
2
3
4
5
6
7
8def create_dataset(dataset, look_back=2): dataX, dataY = [], [] for i in range(len(dataset) - look_back): a = dataset[i:(i + look_back)] dataX.append(a) dataY.append(dataset[i + look_back]) return np.array(dataX), np.array(dataY)
创建好输入输出
1
2data_X, data_Y = create_dataset(dataset)
划分训练集和测试集,70% 作为训练集
1
2
3
4
5
6
7train_size = int(len(data_X) * 0.7) test_size = len(data_X) - train_size train_X = data_X[:train_size] train_Y = data_Y[:train_size] test_X = data_X[train_size:] test_Y = data_Y[train_size:]
最后,我们需要将数据改变一下形状,因为 RNN 读入的数据维度是 (seq, batch, feature),所以要重新改变一下数据的维度,这里只有一个序列,所以 batch 是 1,而输入的 feature 就是我们希望依据的几个月份,这里我们定的是两个月份,所以 feature 就是 2.
1
2
3
4
5
6
7
8
9
10
11
12
13import torch train_X = train_X.reshape(-1, 1, 2) train_Y = train_Y.reshape(-1, 1, 1) test_X = test_X.reshape(-1, 1, 2) train_x = torch.from_numpy(train_X) train_y = torch.from_numpy(train_Y) test_x = torch.from_numpy(test_X) from torch import nn from torch.autograd import Variable
这里定义好模型,模型的第一部分是一个两层的 RNN,每一步模型接受两个月的输入作为特征,得到一个输出特征。接着通过一个线性层将 RNN 的输出回归到流量的具体数值,这里我们需要用 view 来重新排列,因为 nn.Linear 不接受三维的输入,所以我们先将前两维合并在一起,然后经过线性层之后再将其分开,最后输出结果。
定义模型
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19class lstm_reg(nn.Module): def __init__(self, input_size, hidden_size, output_size=1, num_layers=2): super(lstm_reg, self).__init__() self.rnn = nn.LSTM(input_size, hidden_size, num_layers) # rnn self.reg = nn.Linear(hidden_size, output_size) # 回归 def forward(self, x): x, _ = self.rnn(x) # (seq, batch, hidden) s, b, h = x.shape x = x.view(s*b, h) # 转换成线性层的输入格式 x = self.reg(x) x = x.view(s, b, -1) return x net = lstm_reg(2, 4) criterion = nn.MSELoss() optimizer = torch.optim.Adam(net.parameters(), lr=1e-2)
定义好网络结构,输入的维度是 2,因为我们使用两个月的流量作为输入,隐藏层的维度可以任意指定,这里我们选的 4
开始训练
1
2
3
4
5
6
7
8
9
10
11
12
13for e in range(1000): var_x = Variable(train_x) var_y = Variable(train_y) # 前向传播 out = net(var_x) loss = criterion(out, var_y) # 反向传播 optimizer.zero_grad() loss.backward() optimizer.step() if (e + 1) % 100 == 0: # 每 100 次输出结果 print('Epoch: {}, Loss: {:.5f}'.format(e + 1, loss.data[0]))
训练结果:
训练完成之后,我们可以用训练好的模型去预测后面的结果
1
2
3
4
5
6
7net = net.eval() # 转换成测试模式 data_X = data_X.reshape(-1, 1, 2) data_X = torch.from_numpy(data_X) var_data = Variable(data_X) pred_test = net(var_data) # 测试集的预测结果
改变输出的格式
1
2pred_test = pred_test.view(-1).data.numpy()
画出实际结果和预测的结果
1
2
3
4plt.plot(pred_test, 'r', label='prediction') plt.plot(dataset, 'b', label='real') plt.legend(loc='best')
这里蓝色的是真实的数据集,红色的是预测的结果,我们能够看到,使用 lstm 能够得到比较相近的结果,预测的趋势也与真实的数据集是相同的,因为其能够记忆之前的信息,而单纯的使用线性回归并不能得到较好的结果,从这个例子也说明了 RNN 对于序列有着非常好的性能。
全部代码:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85import numpy as np import pandas as pd import matplotlib.pyplot as plt import torch from torch import nn from torch.autograd import Variable data_csv = pd.read_csv('./data.csv', usecols=[1]) plt.plot(data_csv) # 数据预处理 data_csv = data_csv.dropna() dataset = data_csv.values dataset = dataset.astype('float32') max_value = np.max(dataset) min_value = np.min(dataset) scalar = max_value - min_value dataset = list(map(lambda x: x / scalar, dataset)) def create_dataset(dataset, look_back=2): dataX, dataY = [], [] for i in range(len(dataset) - look_back): a = dataset[i:(i + look_back)] dataX.append(a) dataY.append(dataset[i + look_back]) return np.array(dataX), np.array(dataY) # 创建好输入输出 data_X, data_Y = create_dataset(dataset) # 划分训练集和测试集,70% 作为训练集 train_size = int(len(data_X) * 0.7) test_size = len(data_X) - train_size train_X = data_X[:train_size] train_Y = data_Y[:train_size] test_X = data_X[train_size:] test_Y = data_Y[train_size:] train_X = train_X.reshape(-1, 1, 2) train_Y = train_Y.reshape(-1, 1, 1) test_X = test_X.reshape(-1, 1, 2) train_x = torch.from_numpy(train_X) train_y = torch.from_numpy(train_Y) test_x = torch.from_numpy(test_X) # 定义模型 class lstm_reg(nn.Module): def __init__(self, input_size, hidden_size, output_size=1, num_layers=2): super(lstm_reg, self).__init__() self.rnn = nn.LSTM(input_size, hidden_size, num_layers) # rnn self.reg = nn.Linear(hidden_size, output_size) # 回归 def forward(self, x): x, _ = self.rnn(x) # (seq, batch, hidden) s, b, h = x.shape x = x.view(s*b, h) # 转换成线性层的输入格式 x = self.reg(x) x = x.view(s, b, -1) return x net = lstm_reg(2, 4) criterion = nn.MSELoss() optimizer = torch.optim.Adam(net.parameters(), lr=1e-2) # 开始训练 for e in range(1000): var_x = Variable(train_x) var_y = Variable(train_y) # 前向传播 out = net(var_x) loss = criterion(out, var_y) # 反向传播 optimizer.zero_grad() loss.backward() optimizer.step() if (e + 1) % 100 == 0: # 每 100 次输出结果 print('Epoch: {}, Loss: {:.5f}'.format(e + 1, loss.data[0])) net = net.eval() # 转换成测试模式 data_X = data_X.reshape(-1, 1, 2) data_X = torch.from_numpy(data_X) var_data = Variable(data_X) pred_test = net(var_data) # 测试集的预测结果 # 改变输出的格式 pred_test = pred_test.view(-1).data.numpy() # 画出实际结果和预测的结果 plt.plot(pred_test, 'r', label='prediction') plt.plot(dataset, 'b', label='real') plt.legend(loc='best')
最后
以上就是舒服发箍最近收集整理的关于RNN之时间预测(数组数据)RNN核心之时间预测数据预处理创建好输入输出划分训练集和测试集,70% 作为训练集定义模型开始训练改变输出的格式画出实际结果和预测的结果的全部内容,更多相关RNN之时间预测(数组数据)RNN核心之时间预测数据预处理创建好输入输出划分训练集和测试集,70%内容请搜索靠谱客的其他文章。
发表评论 取消回复