import yfinance as yf
import datetime
import pandas as pd
import numpy as np
from finta import TA
import matplotlib.pyplot as plt
from sklearn import svm
from sklearn.ensemble import RandomForestClassifier
from sklearn.neighbors import KNeighborsClassifier
from sklearn.ensemble import AdaBoostClassifier
from sklearn.ensemble import GradientBoostingClassifier
from sklearn.ensemble import VotingClassifier
from sklearn.model_selection import train_test_split, GridSearchCV
from sklearn.metrics import confusion_matrix, classification_report, accuracy_score, mean_squared_error
from sklearn import metrics
from sklearn.pipeline import Pipeline
from sklearn.preprocessing import StandardScaler
from sklearn.svm import SVC, SVR
from sklearn.tree import DecisionTreeRegressor
# Define the ticker symbol and period of interest
ticker = "AAPL"
period = "1y"
INDICATORS = ['RSI', 'MACD', 'STOCH', 'ADL', 'ATR', 'MOM', 'MFI', 'ROC', 'OBV', 'CCI', 'EMV', 'VORTEX']
# Fetch the data using yfinance
data = yf.download(ticker, period=period)
# Remove the current month's data
data = data[data.index.month != pd.Timestamp.now().month]
# Save the data as a CSV file
data.to_csv(f"{ticker}_{period}.csv", index=True)
data.rename(columns={"Close": 'close', "High": 'high', "Low": 'low', 'Volume': 'volume', 'Open': 'open'}, inplace=True)
tmp = data.iloc[-60:]
tmp['close'].plot()
plt.show()
def _get_indicator_data(data):
"""
Function that uses the finta API to calculate technical indicators used as the features
:return:
"""
for indicator in INDICATORS:
ind_data = eval('TA.' + indicator + '(data)')
if not isinstance(ind_data, pd.DataFrame):
ind_data = ind_data.to_frame()
data = data.merge(ind_data, left_index=True, right_index=True)
data.rename(columns={"14 period EMV.": '14 period EMV'}, inplace=True)
# Also calculate moving averages for features
data['ema50'] = data['close'] / data['close'].ewm(50).mean()
data['ema21'] = data['close'] / data['close'].ewm(21).mean()
data['ema15'] = data['close'] / data['close'].ewm(14).mean()
data['ema5'] = data['close'] / data['close'].ewm(5).mean()
# Instead of using the actual volume value (which changes over time), we normalize it with a moving volume average
data['normVol'] = data['volume'] / data['volume'].ewm(5).mean()
# Remove columns that won't be used as features
del (data['open'])
del (data['high'])
del (data['low'])
del (data['volume'])
del (data['Adj Close'])
return data
data = _get_indicator_data(data)
# captures 5 rows corresponding to the 5 days to predict future values with
live_pred_data = data.iloc[-30:-25]
def _produce_prediction(data, window):
"""
Function that produces the 'truth' values
At a given row, it looks 'window' rows ahead to see if the price increased (1) or decreased (0)
:param window: number of days, or rows to look ahead to see what the price did
"""
prediction = (data.shift(-window)['close'] >= data['close'])
prediction = prediction.iloc[:-window]
data['pred'] = prediction.astype(int)
return data
data = _produce_prediction(data, window=10)
del (data['close'])
data = data.dropna() # Some indicators produce NaN values for the first few rows, we just remove them here
# create random forest model
def _train_random_forest(X_train, y_train, X_test, y_test):
"""
Function that uses random forest classifier to train the model
:return:
"""
# Create a new random forest classifier
rf = RandomForestClassifier()
# Dictionary of all values we want to test for n_estimators
params_rf = {'n_estimators': [110, 130, 140, 150, 160, 180, 200]}
# Use gridsearch to test all values for n_estimators
rf_gs = GridSearchCV(rf, params_rf, cv=5)
# Fit model to training data
rf_gs.fit(X_train, y_train)
# Save best model
rf_best = rf_gs.best_estimator_
# Check best n_estimators value
print(rf_gs.best_params_)
prediction = rf_best.predict(X_test)
print(classification_report(y_test, prediction))
print(confusion_matrix(y_test, prediction))
return rf_best
# knn model
def _train_KNN(X_train, y_train, X_test, y_test):
knn = KNeighborsClassifier()
# Create a dictionary of all values we want to test for n_neighbors
params_knn = {'n_neighbors': np.arange(1, 25)}
# Use gridsearch to test all values for n_neighbors
knn_gs = GridSearchCV(knn, params_knn, cv=5)
# Fit model to training data
knn_gs.fit(X_train, y_train)
# Save best model
knn_best = knn_gs.best_estimator_
# Check best n_neigbors value
print(knn_gs.best_params_)
prediction = knn_best.predict(X_test)
print(classification_report(y_test, prediction))
print(confusion_matrix(y_test, prediction))
return knn_best
# ensemble model
def _ensemble_model(rf_model, knn_model, X_train, y_train, X_test, y_test):
# Create a dictionary of our models
estimators = [('knn', knn_model), ('rf', rf_model), ]
# Create our voting classifier, inputting our models
ensemble = VotingClassifier(estimators, voting='hard')
# fit model to training data
ensemble.fit(X_train, y_train)
# test our model on the test data
print(ensemble.score(X_test, y_test))
prediction = ensemble.predict(X_test)
print(classification_report(y_test, prediction))
print(confusion_matrix(y_test, prediction))
return ensemble
# put together with cross validiation method
def cross_Validation(data):
# Split data into equal partitions of size len_train
num_train = 10 # Increment of how many starting points (len(data) / num_train = number of train-test sets)
len_train = 40 # Length of each train-test set
# Lists to store the results from each model
rf_RESULTS = []
knn_RESULTS = []
ensemble_RESULTS = []
i = 0
while True:
# Partition the data into chunks of size len_train every num_train days
df = data.iloc[i * num_train: (i * num_train) + len_train]
i += 1
print(i * num_train, (i * num_train) + len_train)
if len(df) < 40:
break
y = df['pred']
features = [x for x in df.columns if x not in ['pred']]
X = df[features]
X_train, X_test, y_train, y_test = train_test_split(X, y, train_size=7 * len(X) // 10, shuffle=False)
rf_model = _train_random_forest(X_train, y_train, X_test, y_test)
knn_model = _train_KNN(X_train, y_train, X_test, y_test)
ensemble_model = _ensemble_model(rf_model, knn_model, X_train, y_train, X_test, y_test)
rf_prediction = rf_model.predict(X_test)
knn_prediction = knn_model.predict(X_test)
ensemble_prediction = ensemble_model.predict(X_test)
print('rf prediction is ', rf_prediction)
print('knn prediction is ', knn_prediction)
print('ensemble prediction is ', ensemble_prediction)
print('truth values are ', y_test.values)
rf_accuracy = accuracy_score(y_test.values, rf_prediction)
knn_accuracy = accuracy_score(y_test.values, knn_prediction)
ensemble_accuracy = accuracy_score(y_test.values, ensemble_prediction)
print(rf_accuracy, knn_accuracy, ensemble_accuracy)
rf_RESULTS.append(rf_accuracy)
knn_RESULTS.append(knn_accuracy)
ensemble_RESULTS.append(ensemble_accuracy)
print('RF Accuracy = ' + str(sum(rf_RESULTS) / len(rf_RESULTS)))
print('KNN Accuracy = ' + str(sum(knn_RESULTS) / len(knn_RESULTS)))
print('Ensemble Accuracy = ' + str(sum(ensemble_RESULTS) / len(ensemble_RESULTS)))
print(live_pred_data.head())
del (live_pred_data['close'])
prediction = ensemble_model.predict(live_pred_data)
print(prediction)
cross_Validation(data)
我想打印预测值,例如 152.34、134.40,..... 但是当我打印预测时,它会像那样打印 [0,0,1,0,1]。我只想知道预测值,这样我就可以绘制关于预测股票价格的图表,如果你运行这段代码,它会运行,但会引发许多我无法解决的错误。如果有帮助,那就太好了