Select Page

Learn to predict stock prices using HMM in this article by Ankur Ankan, an open source enthusiast, and Abinash Panda, a data scientist who has worked at multiple start-ups.

A Hidden Markov Model (HMM) is a specific case of the state space model in which the latent variables are discrete and multinomial variables. From the graphical representation, you can consider an HMM to be a double stochastic process consisting of a hidden stochastic Markov process (of latent variables) that you cannot observe directly and another stochastic process that produces a sequence of the observation given the first process.

HMMs are capable of predicting and analyzing time-based phenomena. Hence, they are very useful in fields such as speech recognition, natural language processing, and financial market prediction. In this article, you’ll look into the applications of HMMs in the field of financial market analysis, mainly stock price prediction.

## Stock Price Prediction The stock market prediction has been one of the more active research areas in the past, given the obvious interest of a lot of major companies. Historically, various machine learning algorithms have been applied with varying degrees of success. However, stock forecasting is still severely limited due to its non-stationary, seasonal, and unpredictable nature. Predicting forecasts from just the previous stock data is an even more challenging task since it ignores several outlying factors.

HMMs are capable of modeling hidden state transitions from the sequential observed data. The problem of stock prediction can also be thought of as following the same pattern. The price of the stock depends upon a multitude of factors, which generally remain invisible to the investor (hidden variables). The transition between the underlying factors change based on company policy and decisions, its financial conditions, and management decisions, and these affect the price of the stock (observed data). So, HMMs are a natural fit to the problem of price prediction.

Now, you can put this to test by predicting the stock prices for Alphabet Inc. (GOOGL), Facebook (FB), and Apple Inc. (AAPL) with HMM.

## Collecting Stock Price Data Use pystock data (http://data.pystock.com) to get the historical stock prices data. Every day, before the US stock exchanges open at 9:30 EST/EDT, the pystock crawler collects the stock prices and financial reports, and pushes the data, such as the previous day’s opening price, closing price, highest price, and lowest price for a given stock, to the repository. This data is day-based, which means that there won’t be any hour or minute-level data.

Download the pystock data for a given year. As the dataset is large, create a Python script to download the data for a given year and run the program simultaneously for three different years to download all the data in parallel:

 """ Usage: get_data.py –year= """ import requests import os from docopt import docopt # docopt helps parsing the command line argument in # a simple manner (http://docopt.org/) args = docopt(doc=__doc__, argv=None, help=True, version=None, options_first=False) year = args['–year'] # Create directory if not present year_directory_name = 'data/{year}'.format(year=year) if not os.path.exists(year_directory_name): os.makedirs(year_directory_name) # Fetching file list for the corresponding year year_data_files = requests.get( 'http://data.pystock.com/{year}/index.txt'.format(year=year) ).text.strip().split('\n') for data_file_name in year_data_files: file_location = '{year_directory_name}/{data_file_name}'.format( year_directory_name=year_directory_name, data_file_name=data_file_name) with open(file_location, 'wb+') as data_file: print('>>> Downloading \t {file_location}'.format(file_location=file_location)) data_file_content = requests.get( 'http://data.pystock.com/{year}/{data_file_name}'.format(year=year, data_file_name=data_file_name) ).content print('<<< Download Completed \t {file_location}'.format(file_location=file_location)) data_file.write(data_file_content)

view raw
get_data_hmm.py
hosted with ❤ by GitHub

Run the following scripts simultaneously for three different years:

```python get_data.py --year 2015
python get_data.py --year 2016
python get_data.py --year 2017```

Once the data is downloaded, get all the data for each of the preceding stated stocks by combining data corresponding to all years:

 """ Usage: parse_data.py –company= """ import os import tarfile import pandas as pd from pandas import errors as pd_errors from functools import reduce from docopt import docopt args = docopt(doc=__doc__, argv=None, help=True, version=None, options_first=False) years = [2015, 2016, 2017] company = args['–company'] # Getting the data files list data_files_list = [] for year in years: year_directory = 'data/{year}'.format(year=year) for file in os.listdir(year_directory): data_files_list.append('{year_directory}/{file}'.format(year_directory=year_directory, file=file)) def parse_data(file_name, company_symbol): """ Returns data for the corresponding company :param file_name: name of the tar file :param company_symbol: company symbol :type file_name: str :type company_symbol: str :return: dataframe for the corresponding company data :rtype: pd.DataFrame """ tar = tarfile.open(file_name) try: price_report = pd.read_csv(tar.extractfile('prices.csv')) company_price_data = price_report[price_report['symbol'] == company_symbol] return company_price_data except (KeyError, pd_errors.EmptyDataError): return pd.DataFrame() # Getting the complete data for a given company company_data = reduce(lambda df, file_name: df.append(parse_data(file_name, company)), data_files_list, pd.DataFrame()) company_data = company_data.sort_values(by=['date']) # Create folder for company data if does not exists if not os.path.exists('data/company_data'): os.makedirs('data/company_data') # Write data to a CSV file company_data.to_csv('data/company_data/{company}.csv'.format(company=company), columns=['date', 'open', 'high', 'low', 'close', 'volume', 'adj_close'], index=False)

view raw
parse_data_hmm.py
hosted with ❤ by GitHub

Run the following scripts to create a .csv file containing all the historical data for the GOOGLFB, and AAPL stocks:

```python parse_data.py --company GOOGL
python parse_data.py --company FB
python parse_data.py --company AAPL```

## Features for Stock Price Prediction You have very limited features for each day, namely the opening price of the stock for that day, closing price, the highest price of the stock, and the lowest price of the stock. So, use them to compute the stock prices. You can compute the closing stock price for a day, given the opening stock price for that day, and previous some d days’ data. Your predictor would have a latency of d days.

Now, create a predictor called StockPredictor, which will contain all the logic to predict the stock price for a given company during a given day.

Instead of directly using the opening, closing, low, and high prices of a stock, extract the fractional changes in each of them that would be used to train your HMM. Define these parameters as follows: For the stock price predictor HMM, you can represent a single observation as a vector for these parameters, namely Xt= < fracchange, frachigh, fraclow >:

 import pandas as pd class StockPredictor(object): def __init__(self, company, n_latency_days=10): self._init_logger() self.company = company self.n_latency_days = n_latency_days self.data = pd.read_csv( 'data/company_data/{company}.csv'.format(company=self.company)) def _init_logger(self): self._logger = logging.getLogger(__name__) handler = logging.StreamHandler() formatter = logging.Formatter( '%(asctime)s %(name)-12s %(levelname)-8s %(message)s') handler.setFormatter(formatter) self._logger.addHandler(handler) self._logger.setLevel(logging.DEBUG) @staticmethod def _extract_features(data): open_price = np.array(data['open']) close_price = np.array(data['close']) high_price = np.array(data['high']) low_price = np.array(data['low']) # Compute the fraction change in close, high and low prices # which would be used a feature frac_change = (close_price – open_price) / open_price frac_high = (high_price – open_price) / open_price frac_low = (open_price – low_price) / open_price return np.column_stack((frac_change, frac_high, frac_low)) # Predictor for GOOGL stocks stock_predictor = StockPredictor(company='GOOGL')

## Predicting Price Using HMM The first step in predicting the price is to train an HMM to compute the parameters from a given sequence of observations. As the observations are a vector of continuous random variables, assume that the emission probability distribution is continuous. For simplicity, assume that it is a multinomial Gaussian distribution with parameters (μ and Σ). So, you have to determine the following parameters for the transition matrix, A, prior probabilities, π, along with μ and Σ, which represent the multinomial Gaussian distribution.

For now, assume that you have four hidden states. In the coming sections, you’ll look into the ways of finding the optimal number of hidden states. Use the GaussianHMM class provided by the hmmlearn package as your HMM and perform parameter estimation using the fit() method provided by it:

 from hmmlearn.hmm import GaussianHMM class StockPredictor(object): def __init__(self, company, n_latency_days=10, n_hidden_states=4): self._init_logger() self.company = company self.n_latency_days = n_latency_days self.hmm = GaussianHMM(n_components=n_hidden_states) self.data = pd.read_csv( 'data/company_data/{company}.csv'.format(company=self.company)) def fit(self): self._logger.info('>>> Extracting Features') feature_vector = StockPredictor._extract_features(self.data) self._logger.info('Features extraction Completed <<<') self.hmm.fit(feature_vector)

In machine learning, you divide the entire dataset into two categories. The first set, the training dataset, is used to train the model. The second set, the test dataset, is used to provide an unbiased evaluation of a final model fit on the training dataset. Separating the training dataset from the test dataset prevents from overfitting the data into the model. So, in this case, split the dataset into two categories, train_data for training the model and test_data for evaluating the model. To do so, use the train_test_split method provided by the sklearn.model_selection module:

 from sklearn.model_selection import train_test_split class StockPredictor(object): def __init__(self, company, test_size=0.33, n_latency_days=10, n_hidden_states=4): self._init_logger() self.company = company self.n_latency_days = n_latency_days self.hmm = GaussianHMM(n_components=n_hidden_states) self._split_train_test_data(test_size) def _split_train_test_data(self, test_size): data = pd.read_csv( 'data/company_data/{company}.csv'.format(company=self.company)) _train_data, test_data = train_test_split( data, test_size=test_size, shuffle=False) self._train_data = _train_data self._test_data = test_data def fit(self): self._logger.info('>>> Extracting Features') feature_vector = StockPredictor._extract_features(self._train_data) self._logger.info('Features extraction Completed <<<') self.hmm.fit(feature_vector)

The train_test_split can split arrays or matrices into the random train and test subsets. As you train your HMM with sequential data, you do not want to randomly split the data. To prevent random splitting of the test and train data, pass shuffle=False as the argument.

Once your model is trained, you need to predict the stock closing price. As mentioned earlier, you want to predict the stock closing price for a day given that you know the opening price. This means that if you are able to predict fracchange for a given day, you can compute the closing price as follows: Thus, your problem boils down to computing the Xt+1 = < fracchange, frachigh, fraclow > observation vector for a day given the observation data for t days, x1,…,xt, and the parameters of the HMM , which is finding the value of Xt+1 that maximizes the posterior probability of P(Xt+1|X1,…,Xt,θ): Once you remove all the parameters that are independent of Xt+1 from the maximization equation, you are left with the problem of finding the value of Xt+1, which optimizes the probability of P(X1,…,Xt+1|θ). If you assume that fracchange is a continuous variable, the optimization of the problem would be computationally difficult. So, divide these fractional changes into some discrete values ranging between two finite variables (as stated in the following table) and find a set of fractional changes, < fracchange, frachigh, fraclow > that would maximize the probability, P(X1,…,Xt+1|θ):

 Observation Minimum value Maximum value Number of points fracchange -0.1 0.1 20 frachigh 0 0.1 10 fraclow 0 0.1 10

So, with the preceding discrete set of values, run (20 x 10 x 10 =) 2,000 operations:

 def _compute_all_possible_outcomes(self, n_steps_frac_change, n_steps_frac_high, n_steps_frac_low): frac_change_range = np.linspace(–0.1, 0.1, n_steps_frac_change) frac_high_range = np.linspace(0, 0.1, n_steps_frac_high) frac_low_range = np.linspace(0, 0.1, n_steps_frac_low) self._possible_outcomes = np.array(list(itertools.product( frac_change_range, frac_high_range, frac_low_range)))

Now, implement the method to predict the closing price, as follows:

 def _get_most_probable_outcome(self, day_index): previous_data_start_index = max(0, day_index – self.n_latency_days) previous_data_end_index = max(0, day_index – 1) previous_data = self._test_data.iloc[previous_data_end_index: previous_data_end_index] previous_data_features = StockPredictor._extract_features( previous_data) outcome_score = [] for possible_outcome in self._possible_outcomes: total_data = np.row_stack( (previous_data_features, possible_outcome)) outcome_score.append(self.hmm.score(total_data)) most_probable_outcome = self._possible_outcomes[np.argmax( outcome_score)] return most_probable_outcome def predict_close_price(self, day_index): open_price = self._test_data.iloc[day_index]['open'] predicted_frac_change, _, _ = self._get_most_probable_outcome( day_index) return open_price * (1 + predicted_frac_change)

Predict the closing price for some days and plot both the curves:

 """ Usage: analyse_data.py –company= """ import warnings import logging import itertools import pandas as pd import numpy as np import matplotlib.pyplot as plt from hmmlearn.hmm import GaussianHMM from sklearn.model_selection import train_test_split from tqdm import tqdm from docopt import docopt args = docopt(doc=__doc__, argv=None, help=True, version=None, options_first=False) # Supress warning in hmmlearn warnings.filterwarnings("ignore") # Change plot style to ggplot (for better and more aesthetic visualisation) plt.style.use('ggplot') class StockPredictor(object): def __init__(self, company, test_size=0.33, n_hidden_states=4, n_latency_days=10, n_steps_frac_change=50, n_steps_frac_high=10, n_steps_frac_low=10): self._init_logger() self.company = company self.n_latency_days = n_latency_days self.hmm = GaussianHMM(n_components=n_hidden_states) self._split_train_test_data(test_size) self._compute_all_possible_outcomes( n_steps_frac_change, n_steps_frac_high, n_steps_frac_low) def _init_logger(self): self._logger = logging.getLogger(__name__) handler = logging.StreamHandler() formatter = logging.Formatter( '%(asctime)s %(name)-12s %(levelname)-8s %(message)s') handler.setFormatter(formatter) self._logger.addHandler(handler) self._logger.setLevel(logging.DEBUG) def _split_train_test_data(self, test_size): data = pd.read_csv( 'data/company_data/{company}.csv'.format(company=self.company)) _train_data, test_data = train_test_split( data, test_size=test_size, shuffle=False) self._train_data = _train_data self._test_data = test_data @staticmethod def _extract_features(data): open_price = np.array(data['open']) close_price = np.array(data['close']) high_price = np.array(data['high']) low_price = np.array(data['low']) # Compute the fraction change in close, high and low prices # which would be used a feature frac_change = (close_price – open_price) / open_price frac_high = (high_price – open_price) / open_price frac_low = (open_price – low_price) / open_price return np.column_stack((frac_change, frac_high, frac_low)) def fit(self): self._logger.info('>>> Extracting Features') feature_vector = StockPredictor._extract_features(self._train_data) self._logger.info('Features extraction Completed <<<') self.hmm.fit(feature_vector) def _compute_all_possible_outcomes(self, n_steps_frac_change, n_steps_frac_high, n_steps_frac_low): frac_change_range = np.linspace(–0.1, 0.1, n_steps_frac_change) frac_high_range = np.linspace(0, 0.1, n_steps_frac_high) frac_low_range = np.linspace(0, 0.1, n_steps_frac_low) self._possible_outcomes = np.array(list(itertools.product( frac_change_range, frac_high_range, frac_low_range))) def _get_most_probable_outcome(self, day_index): previous_data_start_index = max(0, day_index – self.n_latency_days) previous_data_end_index = max(0, day_index – 1) previous_data = self._test_data.iloc[previous_data_end_index: previous_data_start_index] previous_data_features = StockPredictor._extract_features( previous_data) outcome_score = [] for possible_outcome in self._possible_outcomes: total_data = np.row_stack( (previous_data_features, possible_outcome)) outcome_score.append(self.hmm.score(total_data)) most_probable_outcome = self._possible_outcomes[np.argmax( outcome_score)] return most_probable_outcome def predict_close_price(self, day_index): open_price = self._test_data.iloc[day_index]['open'] predicted_frac_change, _, _ = self._get_most_probable_outcome( day_index) return open_price * (1 + predicted_frac_change) def predict_close_prices_for_days(self, days, with_plot=False): predicted_close_prices = [] for day_index in tqdm(range(days)): predicted_close_prices.append(self.predict_close_price(day_index)) if with_plot: test_data = self._test_data[0: days] days = np.array(test_data['date'], dtype="datetime64[ms]") actual_close_prices = test_data['close'] fig = plt.figure() axes = fig.add_subplot(111) axes.plot(days, actual_close_prices, 'bo-', label="actual") axes.plot(days, predicted_close_prices, 'r+-', label="predicted") axes.set_title('{company}'.format(company=self.company)) fig.autofmt_xdate() plt.legend() plt.show() return predicted_close_prices stock_predictor = StockPredictor(company=args['–company']) stock_predictor.fit() stock_predictor.predict_close_prices_for_days(500, with_plot=True)

view raw
analyse_data.py
hosted with ❤ by GitHub

The output is as follows: ## Conclusion You’ve successfully predicted the price of stocks using HMM. You applied the parameter-estimation and evaluation-of-model methods to determine the closing price of a stock. Using HMM in the stock market analysis is just another example of the application of HMM in analyzing time series data.