Pytorch循环神经网络LSTM时间序列预测

官方文档:

https://pytorch.org/docs/stable/generated/torch.nn.LSTM.html

示例:

预测某只股票未来5天的收盘价

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
import copy
import numpy as np
import pandas as pd
import matplotlib.pyplot as plt
import torch
from torch.utils.data.dataset import Dataset
from torch.utils.data.dataloader import DataLoader

class Args():
input_size = 50
hidden_size = 4
output_size = 5
num_layers = 2
batch_size = 16
model_path = "./tpem3_model.pth"

class MyDataset(Dataset):
def __init__(self, data):
self.data = data
self.len = len(self.data)

# 返回tensor对象shape(batch_size, seq_len, input_size)
def __getitem__(self, index):
return self.data[index]

def __len__(self):
return self.len

# 模型
class LSTM(torch.nn.Module):
def __init__(self, input_size, # 输入层特征维度()
hidden_size, # 隐藏层特征维度
output_size, # 隐藏层层数,默认1
num_layers, # 输出层维度
batch_size = 1): # 数据分批次训练()
super().__init__()
self.input_size = input_size
self.hidden_size = hidden_size
self.num_layers = num_layers
self.output_size = output_size
self.num_directions = 1 # 单向LSTM为1双向为2
self.batch_size = batch_size

# 使用batch_first数据维度表达方式
# input和output第一维和第二维互换
# 即:
# input(batch_size, seq_len, input_size)
# output(batch_size, seq_len, num_directions * hidden_size)
# 注意: h_n与c_n不变(部分博客存在误导)
self.lstm = torch.nn.LSTM(self.input_size, self.hidden_size, self.num_layers, batch_first=True)
self.linear = torch.nn.Linear(self.hidden_size, self.output_size)

def forward(self, input_seq):
h0 = torch.randn(self.num_directions * self.num_layers, self.batch_size, self.hidden_size)
c0 = torch.randn(self.num_directions * self.num_layers, self.batch_size, self.hidden_size)
output, _ = self.lstm(input_seq, (h0, c0))
pred = self.linear(output)
pred = pred[:, -1, :]
return pred

# 读取数据
def load_data():
data_csv = pd.read_csv('./tushare_stk_factor_000001.csv',usecols=['close_qfq'],skipfooter=5)
return data_csv

# 读取训练模型
def load_model(model, path):
#model = torch.load(path) # 加载整个模型
model.load_state_dict(torch.load(path))# 只加载模型训练的参数(不包括网络结构)
return model

# 过滤数据
def filer_data(data_csv):
data_csv = data_csv.dropna() # 滤除缺失数据
dataset = data_csv.values # 获得csv的值
dataset = dataset.astype('float32')
dataset = np.flipud(dataset) #
max_value = np.max(dataset) # 获得最大值
min_value = np.min(dataset) # 获得最小值
scalar = max_value - min_value # 获得间隔数量
dataset = list(map(lambda x: x / scalar, dataset)) # 归一化
return dataset,max_value,min_value

# 创建训练和测试数据
def create_dataset(args, dataset):

train = dataset[:int(len(dataset) * 0.7)]
valid = dataset[int(len(dataset) * 0.7):len(dataset)-args.input_size-args.output_size-1]
test = dataset[len(dataset)-args.input_size-args.output_size-1:]

def process(dataset, batch_size, shuffle, drop_last):
seq = []
for i in range(len(dataset) - args.input_size - args.output_size):
train_seq = []
train_label = []
for j in range(i, i + args.input_size):
train_seq.append(dataset[j][0])
for j in range(i + args.input_size, i + args.input_size + args.output_size):
train_label.append(dataset[j][0])
train_seq = torch.FloatTensor([train_seq]) # shape[1, args.input_size]
train_label = np.array(train_label)
train_label = torch.FloatTensor(train_label).view(-1) # shape[1]
seq.append([train_seq, train_label])

# 数据集映射
seq = MyDataset(seq)
# 处理模型输入数据
seq = DataLoader(dataset=seq, # 样本
batch_size=batch_size, # 每次训练样本个数
shuffle=shuffle, # 每一个epoch是否为乱序
num_workers=0, # 是否多进程读取数据
drop_last=drop_last, # 当样本数不能被batchsize整除时,最后一批数据是否舍弃
pin_memory=False) # 如果为True会将数据放置到GPU上
return seq

tra = process(train, args.batch_size, True, True)
val = process(valid, args.batch_size, True, True)
tes = process(test, args.batch_size, False, False)

return tra, val, tes

# 验证
def valid(model, val, criterion):
model.eval()
total_valid_loss = []
for (v_x,v_y) in val:
with torch.no_grad():
pred = model(v_x)
vloss = criterion(pred, v_y)
total_valid_loss.append(vloss.item())
return np.mean(total_valid_loss)

# 训练
def train(args, tra, val):
train_loss = []
valid_loss = []
best_model = None
max_epoch = 200
min_val_loss = np.inf

model = LSTM(args.input_size, args.hidden_size, args.output_size, args.num_layers, args.batch_size)
# 均方误差损失函数
criterion = nn.MSELoss()
# 优化器
optimizer = torch.optim.Adam(model.parameters(), lr=1e-2)
# 学习曲率
#scheduler = torch.optim.lr_scheduler.StepLR(optimizer, step_size=100, gamma=0.1)
scheduler = torch.optim.lr_scheduler.MultiStepLR(optimizer, milestones=[max_epoch/5*3,max_epoch/5*4], gamma=0.1)

# 开始训练
for epoch in range(max_epoch):
if epoch % 50 == 0:
print('-----------Epoch: {}, Loss: {:.5f}-----------'.format(epoch + 1, valid_loss[-1]))
# 进入训练模式
model.train()
for (t_x,t_y) in tra:
# 前向传播
out = model(t_x)
# 计算损失函数
loss = criterion(out, t_y)
# 反向传播(梯度清零)
optimizer.zero_grad()
# 计算梯度
loss.backward()
# 更新可训练权重
optimizer.step()

# 验证模型
mean_valid_loss = valid(model, val, criterion)

# 记录最好的模型
valid_loss.append(mean_valid_loss)
if epoch > max_epoch/4 and valid_loss[-1] < min_val_loss:
min_val_loss = valid_loss[-1]
best_model = copy.deepcopy(model)
print('best_model Epoch: {}, Loss: {:.5f}'.format(epoch + 1, min_val_loss))

# 更新学习曲率
scheduler.step()


# 保存最好的模型
#torch.save(model, path) # 保存整个模型
torch.save(best_model.state_dict(), args.model_path)# 只保存模型训练的参数(不包括网络结构)
return best_model


# 测试
def test(args, dataset, dtr, model = None):
if model == None:
model = LSTM(args.input_size, args.hidden_size, args.output_size, args.num_layers, 1)
model = load_model(model, args.model_path)

model.eval() # 转换成测试模式

# 测试集的预测结果
pred = []
real = []
for (seq, target) in dtr:
real.extend(np.array(target, dtype=np.float32).reshape((args.output_size,1)))
with torch.no_grad():
y_pred = model(seq)
pred.extend(np.array(y_pred, dtype=np.float32).reshape((args.output_size,1)))

#plt.plot(pred, 'r', label='pred')
#plt.plot(real, 'b', label='real')
plt.plot(range(args.input_size-len(pred),args.input_size),pred, 'r', label='prediction')
plt.plot(dataset[len(dataset)-args.input_size:], 'b', label='real')
plt.legend(loc='best')

if __name__ == '__main__':
args = Args()
data_csv = load_data()
dataset,max_value,min_value = filer_data(data_csv)
tra, val, tes = create_dataset(args,dataset)
train(args, tra, val)
test(args, dataset, tes)
plt.show()

预测结果: