이번 포스트는 LSTM-seq2seq를 이용한 삼성전자 주가 예측 모델에 대해서 설명드리고자 합니다. LSTM을 이용한 삼성전자 주가 예측 관련 포스트는 아래 링크를 참고하여 주시고 해당 포스트에서는 seq2seq 모델을 이용하여 주가를 예측하여 보겠습니다.
https://byeolcardi.tistory.com/4
딥러닝으로 삼성전자 주가 예측하기 - (lstm)
딥러닝이라는 것 자체가 과거를 토대로 미래를 예측하는 것이기 때문에 과거의 주가 데이터를 이용하여 미래의 주가를 예측하려는 연구들은 상당히 많이 이루어지고 있습니다. 그것이 쉽게 이
byeolcardi.tistory.com
시퀀스-투-시퀀스(Sequence-to-Sequence)라고 불리는 seq2seq는 다양한 도메인에서 사용되지만 NLP분야에서 정말 많이 사용됩니다. seq2seq 모델로 만들어진 번역기가 있다고 가정해보겠습니다. 'I am a student'라는 영어 문장을 같은 뜻의 프랑스 문장인 'je suis étudiant'라는 문장을 만드는 seq2seq 모델 기반 번역기를 그림으로 표현한다면 아래와 같습니다.
여기의 I , am , a, student는 LSTM으로 이루어진 인코더로 입력받습니다. LSTM으로 구성되어 있기 때문에 문장의 순서 정보도 모델 학습에 이용할 수 있습니다.
간략하게 설명드린 seq2seq 모델을 통하여 삼성전자 주가 예측을 진행해보겠습니다.
import sys
import warnings
if not sys.warnoptions:
warnings.simplefilter('ignore')
import tensorflow as tf
import numpy as np
import matplotlib.pyplot as plt
import seaborn as sns
import pandas as pd
from sklearn.preprocessing import MinMaxScaler
from datetime import datetime
from datetime import timedelta
from tqdm import tqdm
import yfinance as yf
sns.set()
tf.compat.v1.random.set_random_seed(1234)
Lstm cell을 구성하기 위한 tensorflow(해당 코드는 1.15.1 version을 기준으로 작성되어 있습니다.), 주가 데이터를 가져오기 위한 yfinanace 라이브러리, 기타 필요한 라이브러리들을 import 시켜줍니다.
df = yf.download('005930.KS',
start='2021-05-01',
end='2022-05-31',
progress=False)
df = df.reset_index()
2021년 5월 1일부터 2022년 5월 31일까지의 주가 데이트를 이용합시다. 그리고 yfinance 라이브러리는 Date(날짜)가 기본 index로 되어있습니다. reset_index()를 통하여 새로운 index를 추가해줍니다.
학습이 잘되도록 데이터를 정규화시켜주도록 합니다.
minmax = MinMaxScaler().fit(df.iloc[:, 4:5].astype('float32'))
df_log = minmax.transform(df.iloc[:, 4:5].astype('float32'))
df_log = pd.DataFrame(df_log)
그리고 학습의 size를 결정합니다.
test_size = 30
simulation_size = 10
df_train = df_log.iloc[:-test_size]
df_test = df_log.iloc[-test_size:]
df.shape, df_train.shape, df_test.shape
파라미터를 다음과 같이 준다면 지난 30일 동안의 데이터를 가지고 다음날의 데이터를 예측합니다.
class Model:
def __init__(
self,
learning_rate,
num_layers,
size,
size_layer,
output_size,
forget_bias = 0.1,
):
def lstm_cell(size_layer):
return tf.nn.rnn_cell.LSTMCell(size_layer, state_is_tuple = False)
rnn_cells = tf.nn.rnn_cell.MultiRNNCell(
[lstm_cell(size_layer) for _ in range(num_layers)],
state_is_tuple = False,
)
self.X = tf.placeholder(tf.float32, (None, None, size))
self.Y = tf.placeholder(tf.float32, (None, output_size))
drop = tf.contrib.rnn.DropoutWrapper(
rnn_cells, output_keep_prob = forget_bias
)
self.hidden_layer = tf.placeholder(
tf.float32, (None, num_layers * 2 * size_layer)
)
_, last_state = tf.nn.dynamic_rnn(
drop, self.X, initial_state = self.hidden_layer, dtype = tf.float32
)
with tf.variable_scope('decoder', reuse = False):
rnn_cells_dec = tf.nn.rnn_cell.MultiRNNCell(
[lstm_cell(size_layer) for _ in range(num_layers)], state_is_tuple = False
)
drop_dec = tf.contrib.rnn.DropoutWrapper(
rnn_cells_dec, output_keep_prob = forget_bias
)
self.outputs, self.last_state = tf.nn.dynamic_rnn(
drop_dec, self.X, initial_state = last_state, dtype = tf.float32
)
self.logits = tf.layers.dense(self.outputs[-1], output_size)
self.cost = tf.reduce_mean(tf.square(self.Y - self.logits))
self.optimizer = tf.train.AdamOptimizer(learning_rate).minimize(
self.cost
)
def calculate_accuracy(real, predict):
real = np.array(real) + 1
predict = np.array(predict) + 1
percentage = 1 - np.sqrt(np.mean(np.square((real - predict) / real)))
return percentage * 100
def anchor(signal, weight):
buffer = []
last = signal[0]
for i in signal:
smoothed_val = last * weight + (1 - weight) * i
buffer.append(smoothed_val)
last = smoothed_val
return buffer
seq2seq 모델을 구축하는 코드입니다. LSTM과 비교하면 다음 부분에서 차이가 있습니다.(기준은 앞서서 말씀드린 포스트입니다.)
_, last_state = tf.nn.dynamic_rnn(
drop, self.X, initial_state = self.hidden_layer, dtype = tf.float32
)
with tf.variable_scope('decoder', reuse = False):
rnn_cells_dec = tf.nn.rnn_cell.MultiRNNCell(
[lstm_cell(size_layer) for _ in range(num_layers)], state_is_tuple = False
)
drop_dec = tf.contrib.rnn.DropoutWrapper(
rnn_cells_dec, output_keep_prob = forget_bias
)
차이가 발생하는 부분은 decoder를 선언해주는 부분입니다. cell의 경우에는 똑같이 MulitRNNCell로 선언해 줍니다.
그리고 학습을 통하여 예측을 하는 함수를 선언해줍니다.
def forecast():
tf.reset_default_graph()
modelnn = Model(
learning_rate, num_layers, df_log.shape[1], size_layer, df_log.shape[1], dropout_rate
)
sess = tf.InteractiveSession()
sess.run(tf.global_variables_initializer())
date_ori = pd.to_datetime(df.iloc[:, 0]).tolist()
pbar = tqdm(range(epoch), desc = 'train loop')
for i in pbar:
init_value = np.zeros((1, num_layers * 2 * size_layer))
total_loss, total_acc = [], []
for k in range(0, df_train.shape[0] - 1, timestamp):
index = min(k + timestamp, df_train.shape[0] - 1)
batch_x = np.expand_dims(
df_train.iloc[k : index, :].values, axis = 0
)
batch_y = df_train.iloc[k + 1 : index + 1, :].values
logits, last_state, _, loss = sess.run(
[modelnn.logits, modelnn.last_state, modelnn.optimizer, modelnn.cost],
feed_dict = {
modelnn.X: batch_x,
modelnn.Y: batch_y,
modelnn.hidden_layer: init_value,
},
)
init_value = last_state
total_loss.append(loss)
total_acc.append(calculate_accuracy(batch_y[:, 0], logits[:, 0]))
pbar.set_postfix(cost = np.mean(total_loss), acc = np.mean(total_acc))
future_day = test_size
output_predict = np.zeros((df_train.shape[0] + future_day, df_train.shape[1]))
output_predict[0] = df_train.iloc[0]
upper_b = (df_train.shape[0] // timestamp) * timestamp
init_value = np.zeros((1, num_layers * 2 * size_layer))
for k in range(0, (df_train.shape[0] // timestamp) * timestamp, timestamp):
out_logits, last_state = sess.run(
[modelnn.logits, modelnn.last_state],
feed_dict = {
modelnn.X: np.expand_dims(
df_train.iloc[k : k + timestamp], axis = 0
),
modelnn.hidden_layer: init_value,
},
)
init_value = last_state
output_predict[k + 1 : k + timestamp + 1] = out_logits
if upper_b != df_train.shape[0]:
out_logits, last_state = sess.run(
[modelnn.logits, modelnn.last_state],
feed_dict = {
modelnn.X: np.expand_dims(df_train.iloc[upper_b:], axis = 0),
modelnn.hidden_layer: init_value,
},
)
output_predict[upper_b + 1 : df_train.shape[0] + 1] = out_logits
future_day -= 1
date_ori.append(date_ori[-1] + timedelta(days = 1))
init_value = last_state
for i in range(future_day):
o = output_predict[-future_day - timestamp + i:-future_day + i]
out_logits, last_state = sess.run(
[modelnn.logits, modelnn.last_state],
feed_dict = {
modelnn.X: np.expand_dims(o, axis = 0),
modelnn.hidden_layer: init_value,
},
)
init_value = last_state
output_predict[-future_day + i] = out_logits[-1]
date_ori.append(date_ori[-1] + timedelta(days = 1))
output_predict = minmax.inverse_transform(output_predict)
deep_future = anchor(output_predict[:, 0], 0.3)
return deep_future[-test_size:]
학습을 진행하고 결과를 확인해 보면 다음과 같습니다.
results = []
for i in range(simulation_size):
print('simulation %d'%(i + 1))
results.append(forecast())
accuracies = [calculate_accuracy(df['Close'].iloc[-test_size:].values, r) for r in results]
plt.figure(figsize = (15, 5))
for no, r in enumerate(results):
plt.plot(r, label = 'forecast %d'%(no + 1))
plt.plot(df['Close'].iloc[-test_size:].values, label = 'true trend', c = 'black')
plt.legend()
plt.title('average accuracy: %.4f'%(np.mean(accuracies)))
plt.show()
5번째로 예측한 forecasst 5와 10번째로 예측한 forcecast 10의 경우에는 터무니없이 값이 티는 것을 볼 수 있습니다만. 나머지의 경우는 비교적 유사하게 값을 예측하는 것을 확인하실 수 있습니다.
Gemma3 기반 LoRA 튜닝으로 주식 예측 모델 만들기 (0) | 2025.04.17 |
---|---|
keras를 통한 머신러닝 간단한 애플 주가 예측 (0) | 2023.07.27 |
딥러닝으로 삼성전자 주가 예측하기 - (GRU) (0) | 2022.05.13 |
딥러닝으로 삼성전자 주가 예측하기 - (bidirectional RNN) (0) | 2022.04.06 |
딥러닝으로 삼성전자 주가 예측하기 - (lstm) (0) | 2022.03.25 |