머신러닝/주가 예측

딥러닝으로 삼성전자 주가 예측하기 - (lstm)

byoelcardi 2022. 3. 25. 22:12

 

딥러닝이라는 것 자체가 과거를 토대로 미래를 예측하는 것이기 때문에 과거의 주가 데이터를 이용하여 미래의 주가를 예측하려는 연구들은 상당히 많이 이루어지고 있습니다.

 

그것이 쉽게 이루어진다면 모델만 돌리면 돈이 들어오니 인공지능 개발자들은 모두 돈을 많이 벌겠죠? 하지만 인공지능 전문가들 사이에서는 어느 정도 예측은 가능하지만 수익으로 연결하는 것은 쉽지 않다는 의견이 지배적입니다. "딥러닝 모델들로 주식 데이터를 예측하면 어떻게 될까?!" 정도의 정보를 제공하는 글로 봐주시면 감사하겠습니다. 이 글은 시계열 데이터를 다룰 때 가장 유명한 모델인 LSTM(Long short-term memory)를 이용한 글입니다. 순차적으로 다른 모델들을 이용한 글을 업로드할 예정입니다. 한 달에 한 개의 글을 업로드할 것 같습니다. 

 


 

본 글은 구글 colab을 통해서 진행했습니다.

!pip install yfinance
%tensorflow_version 1.x

colab에 설치되어있지 않는 라이브러리는 yfinance 라이브러리입니다. 명령어를 통해서 설치해줍시다. 그리고 colab은 기본적으로 tensorflow 2.0 버전으로 설정되어있습니다. Tensorflow 1.0 버전을 사용하기 위해서 Colab에 다시 설치하는 분들이 있는데 그럴 필요 없이 2번째 줄에 작성된 명령어로 1.0 버전을 사용할 수 있습니다.

 

본 예제들을 실행하기 위한 라이브러리는 import 부분은 다음과 같습니다.

import sys
import warnings
import tensorflow as tf
import numpy as np
import matplotlib.pyplot as plt
import seaborn as sns
import pandas as pd
from sklearn.preprocessing import MinMaxScaler
from datetime import datetime
from datetime import timedelta
from tqdm import tqdm
sns.set()
tf.compat.v1.random.set_random_seed(1234)
%matplotlib inline
import pandas as pd
import numpy as np
import matplotlib.pyplot as plt
import yfinance as yf

라이브러리를 import 하였다면 삼성전자 주가 데이터를 yfinance 라이브러리를 이용하여 추출합시다.

 

samsung_df = yf.download('005930.KS',
                      start='2020-01-01',
                      end='2022-03-03',
                      progress=False)
df = samsung_df

다음 코드를 통하여 날짜별 종가(시장이 마감했을 때의 가격) 데이터를 얻을 수 있습니다. 조금 더 자세히 살펴봅시다.

 

종목 코드, 며칠부터 며칠까지 입력해주시면 입력 정보에 맞춰서 데이터가 추출됩니다. 따라서 해당 코드는 2020년 1월 1일부터 2022년 3월 3일까지의 삼성전자 주가 정보를 추출하는 코드입니다.

결괏값은 다음과 같습니다.

그 이후 데이터에 대하여 모델이 더 잘 학습할 수 있도록 정규화를 해주도록 하겠습니다.

minmax = MinMaxScaler().fit(df.iloc[:, 4:5].astype('float32')) # Close index
df_log = minmax.transform(df.iloc[:, 4:5].astype('float32')) # Close index
df_log = pd.DataFrame(df_log)

df.iloc를 통하여 close(종가) 가격만 이용합니다.

 

 

테스트 데이터(df_test)는 최근 30개의 데이터로 테스트를 진행하고 그 외의 데이터는 학습(df_train)에 이용하겠습니다.

test_size = 30
simulation_size = 5

df_train = df_log.iloc[:-test_size]
df_test = df_log.iloc[-test_size:]

모델 생성

class Model:
    def __init__(
        self,
        learning_rate,
        num_layers,
        size,
        size_layer,
        output_size,
        forget_bias = 0.1,
    ):
        def lstm_cell(size_layer):
            return tf.nn.rnn_cell.LSTMCell(size_layer, state_is_tuple = False)

        rnn_cells = tf.nn.rnn_cell.MultiRNNCell(
            [lstm_cell(size_layer) for _ in range(num_layers)],
            state_is_tuple = False,
        )
        self.X = tf.placeholder(tf.float32, (None, None, size))
        self.Y = tf.placeholder(tf.float32, (None, output_size))
        drop = tf.contrib.rnn.DropoutWrapper(
            rnn_cells, output_keep_prob = forget_bias
        )
        self.hidden_layer = tf.placeholder(
            tf.float32, (None, num_layers * 2 * size_layer)
        )
        self.outputs, self.last_state = tf.nn.dynamic_rnn(
            drop, self.X, initial_state = self.hidden_layer, dtype = tf.float32
        )
        self.logits = tf.layers.dense(self.outputs[-1], output_size)
        self.cost = tf.reduce_mean(tf.square(self.Y - self.logits))
        self.optimizer = tf.train.AdamOptimizer(learning_rate).minimize(
            self.cost
        )
        
def calculate_accuracy(real, predict):
    real = np.array(real) + 1
    predict = np.array(predict) + 1
    percentage = 1 - np.sqrt(np.mean(np.square((real - predict) / real)))
    return percentage * 100

def anchor(signal, weight):
    buffer = []
    last = signal[0]
    for i in signal:
        smoothed_val = last * weight + (1 - weight) * i
        buffer.append(smoothed_val)
        last = smoothed_val
    return buffer

모델의 이용할 파라미터는 다음과 같습니다.

num_layers = 1
size_layer = 128
timestamp = 5
epoch = 300
dropout_rate = 0.8
future_day = test_size
learning_rate = 0.01

 

 

 forecast 함수는 함수명에서 유추 가능하겠지만 앞의 것을 학습하고 학습한 것을 토대로 데이터를 예측하는 함수입니다.

def forecast():
    tf.reset_default_graph()
    modelnn = Model(
        learning_rate, num_layers, df_log.shape[1], size_layer, df_log.shape[1], dropout_rate
    )
    #세션 선언문
    sess = tf.InteractiveSession()
    sess.run(tf.global_variables_initializer())
    date_ori = pd.to_datetime(df.iloc[:, 0]).tolist()
	
    #tqdm은 현재 loop가 얼마나 돌아가고 있는지 보여주는 라이브러리
    pbar = tqdm(range(epoch), desc = 'train loop')
    for i in pbar:
        init_value = np.zeros((1, num_layers * 2 * size_layer))
        total_loss, total_acc = [], []
        for k in range(0, df_train.shape[0] - 1, timestamp):
            index = min(k + timestamp, df_train.shape[0] - 1)
            batch_x = np.expand_dims(
                df_train.iloc[k : index, :].values, axis = 0
            )
            batch_y = df_train.iloc[k + 1 : index + 1, :].values
            logits, last_state, _, loss = sess.run(
                [modelnn.logits, modelnn.last_state, modelnn.optimizer, modelnn.cost],
                feed_dict = {
                    modelnn.X: batch_x,
                    modelnn.Y: batch_y,
                    modelnn.hidden_layer: init_value,
                },
            )        
            init_value = last_state
            total_loss.append(loss)
            total_acc.append(calculate_accuracy(batch_y[:, 0], logits[:, 0]))
        pbar.set_postfix(cost = np.mean(total_loss), acc = np.mean(total_acc))
    
    future_day = test_size

    output_predict = np.zeros((df_train.shape[0] + future_day, df_train.shape[1]))
    output_predict[0] = df_train.iloc[0]
    upper_b = (df_train.shape[0] // timestamp) * timestamp
    init_value = np.zeros((1, num_layers * 2 * size_layer))

    for k in range(0, (df_train.shape[0] // timestamp) * timestamp, timestamp):
        out_logits, last_state = sess.run(
            [modelnn.logits, modelnn.last_state],
            feed_dict = {
                modelnn.X: np.expand_dims(
                    df_train.iloc[k : k + timestamp], axis = 0
                ),
                modelnn.hidden_layer: init_value,
            },
        )
        init_value = last_state
        output_predict[k + 1 : k + timestamp + 1] = out_logits

    if upper_b != df_train.shape[0]:
        out_logits, last_state = sess.run(
            [modelnn.logits, modelnn.last_state],
            feed_dict = {
                modelnn.X: np.expand_dims(df_train.iloc[upper_b:], axis = 0),
                modelnn.hidden_layer: init_value,
            },
        )
        output_predict[upper_b + 1 : df_train.shape[0] + 1] = out_logits
        future_day -= 1
        date_ori.append(date_ori[-1] + timedelta(days = 1))

    init_value = last_state
    
    for i in range(future_day):
        o = output_predict[-future_day - timestamp + i:-future_day + i]
        out_logits, last_state = sess.run(
            [modelnn.logits, modelnn.last_state],
            feed_dict = {
                modelnn.X: np.expand_dims(o, axis = 0),
                modelnn.hidden_layer: init_value,
            },
        )
        init_value = last_state
        output_predict[-future_day + i] = out_logits[-1]
        date_ori.append(date_ori[-1] + timedelta(days = 1))
    
    output_predict = minmax.inverse_transform(output_predict)
    deep_future = anchor(output_predict[:, 0], 0.3)
    
    return deep_future[-test_size:]

simulation_size 만큼 횟수를 반복하고 결과를 results에 저장합니다.

results = []
for i in range(simulation_size):
    print('simulation %d'%(i + 1))
    results.append(forecast())

 

저장된 결과들을 아래의 코드들을 통해 시각화해서 확인해보겠습니다.

accuracies = [calculate_accuracy(df['Close'].iloc[-test_size:].values, r) for r in results]

plt.figure(figsize = (15, 5))
for no, r in enumerate(results):
    plt.plot(r, label = 'forecast %d'%(no + 1))
plt.plot(df['Close'].iloc[-test_size:].values, label = 'true trend', c = 'black')
plt.legend()
plt.title('average accuracy: %.4f'%(np.mean(accuracies)))
plt.show()

 

true trend가 실제값, forecast가 예측값입니다.

 

df_result = pd.DataFrame(results)
df_result

result는 list이기 때문에 데이터프레임으로 바꿔서 각각의 forecast N이 어떻게 예측했는지 수치를 확인할 수 있습니다.