Subscribe and receive free guide - Ultimate Data Visualization Guide with Python

* indicates required

A Hidden Markov Model (HMM) is a specific case of the state-space model in which the latent variables are discrete and multinomial variables. From the graphical representation, you can consider an HMM to be a double stochastic process consisting of a hidden stochastic Markov process (of latent variables) that you cannot observe directly and another stochastic process that produces a sequence of the observation given the first process.

HMMs are capable of predicting and analyzing time-based phenomena. Hence, they are very useful in fields such as speech recognition, natural language processing, and financial market prediction. In this article, you’ll look into the applications of HMMs in the field of financial market analysis, mainly stock price prediction.

Ultimate Guide to Machine Learning with Python

This bundle of e-books is specially crafted for beginners.
Everything from Python basics to the deployment of Machine Learning algorithms to production in one place.
Become a Machine Learning Superhero 
TODAY!

In this article we cover:

  1. Stock Price Prediction
  2. Collecting Stock Price Data
  3. Features for Stock Price Prediction
  4. Predicting Price Using HMM

1. Stock Price Prediction

The stock market prediction has been one of the more active research areas in the past, given the obvious interest of a lot of major companies. Historically, various machine learning algorithms have been applied with varying degrees of success.

However, stock forecasting is still severely limited due to its non-stationary, seasonal, and unpredictable nature. Predicting forecasts from just the previous stock data is an even more challenging task since it ignores several outlying factors.

Bank Note Annotation Dataset Visual

HMMs are capable of modeling hidden state transitions from the sequential observed data. The problem of stock prediction can also be thought of as following the same pattern. The price of the stock depends upon a multitude of factors, which generally remain invisible to the investor (hidden variables).

The transition between the underlying factors changes based on company policy and decisions, its financial conditions, and management decisions, and these affect the price of the stock (observed data). So, HMMs are a natural fit to the problem of price prediction.

Now, you can put this to test by predicting the stock prices for Alphabet Inc. (GOOGL), Facebook (FB), and Apple Inc. (AAPL) with HMM.

2. Collecting Stock Price Data

Use pystock data¬†(http://data.pystock.com) to get the¬†historical¬†stock prices data.¬†Every day, before the US stock exchanges open at 9:30 EST/EDT, the pystock¬†crawler¬†collects the stock prices and financial reports, and pushes the data, such as the previous day’s opening price, closing price, highest price, and lowest price for a given stock, to the repository. This data is day-based, which means that there won’t be any hour or minute-level data.

Download the pystock data for a given year. As the dataset is large, create a Python script to download the data for a given year and run the program simultaneously for three different years to download all the data in parallel:

"""
Usage: get_data.py --year=<year>
"""
import requests
import os
from docopt import docopt
 
# docopt helps parsing the command line argument in
# a simple manner (http://docopt.org/)
args = docopt(doc=__doc__, argv=None,
help=True, version=None,
options_first=False)
 
year = args['--year']
 
# Create directory if not present
year_directory_name = 'data/{year}'.format(year=year)
if not os.path.exists(year_directory_name):
    os.makedirs(year_directory_name)
 
# Fetching file list for the corresponding year
year_data_files = requests.get(
'http://data.pystock.com/{year}/index.txt'.format(year=year)
).text.strip().split('\n')
 
for data_file_name in year_data_files:
    file_location = '{year_directory_name}/{data_file_name}'.format(
year_directory_name=year_directory_name,
data_file_name=data_file_name)
 
with open(file_location, 'wb+') as data_file:
print('>>> Downloading \t {file_location}'.format(file_location=file_location))
        data_file_content = requests.get(
'http://data.pystock.com/{year}/{data_file_name}'.format(year=year, data_file_name=data_file_name)
        ).content
print('<<< Download Completed \t {file_location}'.format(file_location=file_location))
        data_file.write(data_file_content)

Run the following scripts simultaneously for three different years:

python get_data.py --year 2015
python get_data.py --year 2016
python get_data.py --year 2017
Coding Visual

Once the data is downloaded, get all the data for each of the preceding stated stocks by combining data corresponding to all years:

"""
Usage: parse_data.py --company=<company>
"""
import os
import tarfile
import pandas as pd
from pandas import errors as pd_errors
from functools import reduce
from docopt import docopt
 
args = docopt(doc=__doc__, argv=None,
help=True, version=None,
options_first=False)
 
years = [2015, 2016, 2017]
company = args['--company']
 
 
# Getting the data files list
data_files_list = []
for year in years:
    year_directory = 'data/{year}'.format(year=year)
for file in os.listdir(year_directory):
        data_files_list.append('{year_directory}/{file}'.format(year_directory=year_directory, file=file))
 
 
def parse_data(file_name, company_symbol):
"""
    Returns data for the corresponding company
 
:param file_name: name of the tar file
:param company_symbol: company symbol
:type file_name: str
:type company_symbol: str
:return: dataframe for the corresponding company data
:rtype: pd.DataFrame
    """
tar = tarfile.open(file_name)
try:
        price_report = pd.read_csv(tar.extractfile('prices.csv'))
        company_price_data = price_report[price_report['symbol'] == company_symbol]
return company_price_data
except (KeyError, pd_errors.EmptyDataError):
return pd.DataFrame()
 
 
# Getting the complete data for a given company
company_data = reduce(lambda df, file_name: df.append(parse_data(file_name, company)),
data_files_list,
pd.DataFrame())
company_data = company_data.sort_values(by=['date'])
 
# Create folder for company data if does not exists
if not os.path.exists('data/company_data'):
    os.makedirs('data/company_data')
 
# Write data to a CSV file
company_data.to_csv('data/company_data/{company}.csv'.format(company=company),
columns=['date', 'open', 'high', 'low', 'close', 'volume', 'adj_close'],
index=False)

Run the following scripts to create a .csv file containing all the historical data for the GOOGL, FB, and AAPL stocks:

python parse_data.py --company GOOGL
python parse_data.py --company FB
python parse_data.py --company AAPL

2. Features for Stock Price Prediction

You have very limited features for each day, namely the opening price of the stock for that day, closing price, the highest price of the stock, and the lowest price of the stock. So, use them to compute the stock prices. You can compute the closing stock price for a day, given the opening stock price for that day, and previous d days’ data. Your predictor would have a latency of d days. 

Now, create a predictor called StockPredictor, which will contain all the logic to predict the stock price for a given company during a given day. 

Instead of directly using the opening, closing, low, and high prices of a stock, extract the fractional changes in each of them that would be used to train your HMM. Define these parameters as follows:

For the stock price predictor HMM, you can represent a single observation as a vector for these parameters, namely Xt= < fracchange, frachigh, fraclow >:

import pandas as pd
 
class StockPredictor(object):
    def __init__(self, company, n_latency_days=10):
        self._init_logger()
 
        self.company = company
        self.n_latency_days = n_latency_days
        self.data = pd.read_csv(
            'data/company_data/{company}.csv'.format(company=self.company))
 
 
    def _init_logger(self):
        self._logger = logging.getLogger(__name__)
        handler = logging.StreamHandler()
        formatter = logging.Formatter(
            '%(asctime)s %(name)-12s %(levelname)-8s %(message)s')
        handler.setFormatter(formatter)
        self._logger.addHandler(handler)
        self._logger.setLevel(logging.DEBUG)
 
 
    @staticmethod
    def _extract_features(data):
        open_price = np.array(data['open'])
        close_price = np.array(data['close'])
        high_price = np.array(data['high'])
        low_price = np.array(data['low'])
 
        # Compute the fraction change in close, high and low prices
        # which would be used a feature
        frac_change = (close_price - open_price) / open_price
        frac_high = (high_price - open_price) / open_price
        frac_low = (open_price - low_price) / open_price
 
        return np.column_stack((frac_change, frac_high, frac_low))
 
 
# Predictor for GOOGL stocks
stock_predictor = StockPredictor(company='GOOGL')

3. Predicting Price Using HMM

The first step in predicting the price is to train an HMM to compute the parameters from a given sequence of observations. As the observations are a vector of continuous random variables, assume that the emission probability distribution is continuous.

For simplicity, assume that it is a multinomial Gaussian distribution with parameters (őľ¬†and¬†ő£). So, you have to determine the following parameters for the transition matrix,¬†A, prior probabilities,¬†ŌÄ, along with¬†őľ¬†and¬†ő£, which represent the multinomial Gaussian distribution.¬†

Programming Visual

For now, assume that you have four hidden states. In the coming sections, you’ll look into the ways of finding the optimal number of hidden states. Use the GaussianHMM class provided by the hmmlearn package as your HMM and perform parameter estimation using the fit() method provided by it:

from hmmlearn.hmm import GaussianHMM
 
class StockPredictor(object):
    def __init__(self, company, n_latency_days=10, n_hidden_states=4):
        self._init_logger()
 
        self.company = company
        self.n_latency_days = n_latency_days
 
        self.hmm = GaussianHMM(n_components=n_hidden_states)
 
        self.data = pd.read_csv(
            'data/company_data/{company}.csv'.format(company=self.company))
 
    def fit(self):
        self._logger.info('>>> Extracting Features')
        feature_vector = StockPredictor._extract_features(self.data)
        self._logger.info('Features extraction Completed <<<')
 
        self.hmm.fit(feature_vector)

In machine learning, you divide the entire dataset into two categories. The first set, the training dataset, is used to train the model. The second set, the test dataset, is used to provide an unbiased evaluation of a final model fit on the training dataset.

Separating the training dataset from the test dataset prevents overfitting the data into the model. So, in this case, split the dataset into two categories, train_data for training the model and test_data for evaluating the model. To do so, use the train_test_split method provided by the sklearn.model_selection module:

from sklearn.model_selection import train_test_split

class StockPredictor(object):
    def __init__(self, company, test_size=0.33,
                 n_latency_days=10, n_hidden_states=4):
        self._init_logger()
 
        self.company = company
        self.n_latency_days = n_latency_days
 
        self.hmm = GaussianHMM(n_components=n_hidden_states)
 
        self._split_train_test_data(test_size)
 
    def _split_train_test_data(self, test_size):
        data = pd.read_csv(
            'data/company_data/{company}.csv'.format(company=self.company))
        _train_data, test_data = train_test_split(
            data, test_size=test_size, shuffle=False)
 
        self._train_data = _train_data
        self._test_data = test_data
 
    def fit(self):
        self._logger.info('>>> Extracting Features')
        feature_vector = StockPredictor._extract_features(self._train_data)
        self._logger.info('Features extraction Completed <<<')
 
        self.hmm.fit(feature_vector)
AI Visual

The train_test_split can split arrays or matrices into the random train and test subsets. As you train your HMM with sequential data, you do not want to randomly split the data. To prevent random splitting of the test and train data, pass shuffle=False as the argument.

Once your model is trained, you need to predict the stock closing price. As mentioned earlier, you want to predict the stock closing price for a day given that you know the opening price. This means that if you are able to predict fracchange for a given day, you can compute the closing price as follows:

Thus, your problem boils down to computing the¬†Xt+1¬†= < fracchange, frachigh, fraclow¬†>¬†observation vector for a day given the observation data for¬†t¬†days,¬†x1,…,xt, and the parameters of the HMM

, which¬†is finding the value of¬†Xt+1¬†that maximizes the posterior probability of¬†P(Xt+1|X1,…,Xt,őł):

Once you remove all the parameters that are independent of¬†Xt+1¬†from the maximization equation, you are left with the problem of finding the value of¬†Xt+1, which optimizes the probability of¬†P(X1,…,Xt+1|őł). If you assume that¬†fracchange¬†is a continuous variable, the optimization of the problem would be computationally difficult.

So, divide these fractional changes into some discrete values ranging between two finite variables (as stated in the following table) and find a set of fractional changes,¬†< fracchange, frachigh, fraclow¬†> that would maximize the probability,¬†P(X1,…,Xt+1|őł):

Observation Minimum value Maximum value Number of points
fracchange -0.1 0.1 20
frachigh 0 0.1 10
fraclow 0 0.1 10

So, with the preceding discrete set of values, run (20 x 10 x 10 =) 2,000 operations:

def _compute_all_possible_outcomes(self, n_steps_frac_change,
                                       n_steps_frac_high, n_steps_frac_low):
        frac_change_range = np.linspace(-0.1, 0.1, n_steps_frac_change)
        frac_high_range = np.linspace(0, 0.1, n_steps_frac_high)
        frac_low_range = np.linspace(0, 0.1, n_steps_frac_low)
 
        self._possible_outcomes = np.array(list(itertools.product(
            frac_change_range, frac_high_range, frac_low_range)))

Now, implement the method to predict the closing price, as follows:

def _get_most_probable_outcome(self, day_index):
        previous_data_start_index = max(0, day_index - self.n_latency_days)
        previous_data_end_index = max(0, day_index - 1)
        previous_data = self._test_data.iloc[previous_data_end_index: previous_data_end_index]
        previous_data_features = StockPredictor._extract_features(
            previous_data)
 
        outcome_score = []
        for possible_outcome in self._possible_outcomes:
            total_data = np.row_stack(
                (previous_data_features, possible_outcome))
            outcome_score.append(self.hmm.score(total_data))
        most_probable_outcome = self._possible_outcomes[np.argmax(
            outcome_score)]
 
        return most_probable_outcome
 
    def predict_close_price(self, day_index):
        open_price = self._test_data.iloc[day_index]['open']
        predicted_frac_change, _, _ = self._get_most_probable_outcome(
            day_index)
        return open_price * (1 + predicted_frac_change)

Predict the closing price for some days and plot both the curves:

"""
Usage: analyse_data.py --company=<company>
"""
import warnings
import logging
import itertools
import pandas as pd
import numpy as np
import matplotlib.pyplot as plt
from hmmlearn.hmm import GaussianHMM
from sklearn.model_selection import train_test_split
from tqdm import tqdm
from docopt import docopt
 
args = docopt(doc=__doc__, argv=None, help=True,
              version=None, options_first=False)
 
# Supress warning in hmmlearn
warnings.filterwarnings("ignore")
# Change plot style to ggplot (for better and more aesthetic visualisation)
plt.style.use('ggplot')
 
 
class StockPredictor(object):
    def __init__(self, company, test_size=0.33,
                 n_hidden_states=4, n_latency_days=10,
                 n_steps_frac_change=50, n_steps_frac_high=10,
                 n_steps_frac_low=10):
        self._init_logger()
 
        self.company = company
        self.n_latency_days = n_latency_days
 
        self.hmm = GaussianHMM(n_components=n_hidden_states)
 
        self._split_train_test_data(test_size)
 
        self._compute_all_possible_outcomes(
            n_steps_frac_change, n_steps_frac_high, n_steps_frac_low)
 
    def _init_logger(self):
        self._logger = logging.getLogger(__name__)
        handler = logging.StreamHandler()
        formatter = logging.Formatter(
            '%(asctime)s %(name)-12s %(levelname)-8s %(message)s')
        handler.setFormatter(formatter)
        self._logger.addHandler(handler)
        self._logger.setLevel(logging.DEBUG)
 
    def _split_train_test_data(self, test_size):
        data = pd.read_csv(
            'data/company_data/{company}.csv'.format(company=self.company))
        _train_data, test_data = train_test_split(
            data, test_size=test_size, shuffle=False)
 
        self._train_data = _train_data
        self._test_data = test_data
 
    @staticmethod
    def _extract_features(data):
        open_price = np.array(data['open'])
        close_price = np.array(data['close'])
        high_price = np.array(data['high'])
        low_price = np.array(data['low'])
 
        # Compute the fraction change in close, high and low prices
        # which would be used a feature
        frac_change = (close_price - open_price) / open_price
        frac_high = (high_price - open_price) / open_price
        frac_low = (open_price - low_price) / open_price
 
        return np.column_stack((frac_change, frac_high, frac_low))
 
    def fit(self):
        self._logger.info('>>> Extracting Features')
        feature_vector = StockPredictor._extract_features(self._train_data)
        self._logger.info('Features extraction Completed <<<')
 
        self.hmm.fit(feature_vector)
 
    def _compute_all_possible_outcomes(self, n_steps_frac_change,
                                       n_steps_frac_high, n_steps_frac_low):
        frac_change_range = np.linspace(-0.1, 0.1, n_steps_frac_change)
        frac_high_range = np.linspace(0, 0.1, n_steps_frac_high)
        frac_low_range = np.linspace(0, 0.1, n_steps_frac_low)
 
        self._possible_outcomes = np.array(list(itertools.product(
            frac_change_range, frac_high_range, frac_low_range)))
 
    def _get_most_probable_outcome(self, day_index):
        previous_data_start_index = max(0, day_index - self.n_latency_days)
        previous_data_end_index = max(0, day_index - 1)
        previous_data = self._test_data.iloc[previous_data_end_index: previous_data_start_index]
        previous_data_features = StockPredictor._extract_features(
            previous_data)
 
        outcome_score = []
        for possible_outcome in self._possible_outcomes:
            total_data = np.row_stack(
                (previous_data_features, possible_outcome))
            outcome_score.append(self.hmm.score(total_data))
        most_probable_outcome = self._possible_outcomes[np.argmax(
            outcome_score)]
 
        return most_probable_outcome
 
    def predict_close_price(self, day_index):
        open_price = self._test_data.iloc[day_index]['open']
        predicted_frac_change, _, _ = self._get_most_probable_outcome(
            day_index)
        return open_price * (1 + predicted_frac_change)
 
    def predict_close_prices_for_days(self, days, with_plot=False):
        predicted_close_prices = []
        for day_index in tqdm(range(days)):
            predicted_close_prices.append(self.predict_close_price(day_index))
 
        if with_plot:
            test_data = self._test_data[0: days]
            days = np.array(test_data['date'], dtype="datetime64[ms]")
            actual_close_prices = test_data['close']
 
            fig = plt.figure()
 
            axes = fig.add_subplot(111)
            axes.plot(days, actual_close_prices, 'bo-', label="actual")
            axes.plot(days, predicted_close_prices, 'r+-', label="predicted")
            axes.set_title('{company}'.format(company=self.company))
 
            fig.autofmt_xdate()
 
            plt.legend()
            plt.show()
 
        return predicted_close_prices
 
 
stock_predictor = StockPredictor(company=args['--company'])
stock_predictor.fit()
stock_predictor.predict_close_prices_for_days(500, with_plot=True)

The output is as follows:

Conclusion

You’ve successfully predicted the price of stocks using HMM. You applied the parameter-estimation and evaluation-of-model methods to determine the closing price of a stock. Using HMM in the stock market analysis is just another example of the application of HMM in analyzing time series data. 

Thank you for reading!

 

Ultimate Guide to Machine Learning with Python

This bundle of e-books is specially crafted for beginners.
Everything from Python basics to the deployment of Machine Learning algorithms to production in one place.
Become a Machine Learning Superhero 
TODAY!

Ultimate Guide to Machine Learning with Python

Everything from Python basics to the deployment of Machine Learning algorithms to production in one place.

Become a Machine Learning Superhero TODAY!