这是代码:
import os
import random
import numpy as np
import pandas as pd
import tensorflow as tf
from tensorflow.keras import backend as K
from tensorflow.keras.layers import Dense, Dropout, Flatten, Conv2D, MaxPool2D, Input
from tensorflow.keras.models import Sequential, load_model
from tensorflow.keras.callbacks import ModelCheckpoint
from sklearn.preprocessing import StandardScaler
from sklearn.metrics import accuracy_score, f1_score, mean_absolute_error
DATADIR = "./Dataset"
TRAIN_TEST_CUTOFF = '2016-04-21'
TRAIN_VALID_RATIO = 0.75
# https://datascience.stackexchange.com/questions/45165/how-to-get-accuracy-f1-precision-and-
recall-for-a-keras-model
# to implement F1 score for validation in a batch
def recall_m(y_true, y_pred):
true_positives = K.sum(K.round(K.clip(y_true * y_pred, 0, 1)))
possible_positives = K.sum(K.round(K.clip(y_true, 0, 1)))
recall = true_positives / (possible_positives + K.epsilon())
return recall
def precision_m(y_true, y_pred):
true_positives = K.sum(K.round(K.clip(y_true * y_pred, 0, 1)))
predicted_positives = K.sum(K.round(K.clip(y_pred, 0, 1)))
precision = true_positives / (predicted_positives + K.epsilon())
return precision
def f1_m(y_true, y_pred):
precision = precision_m(y_true, y_pred)
recall = recall_m(y_true, y_pred)
return 2*((precision*recall)/(precision+recall+K.epsilon()))
def f1macro(y_true, y_pred):
f_pos = f1_m(y_true, y_pred)
# negative version of the data and prediction
f_neg = f1_m(1-y_true, 1-K.clip(y_pred,0,1))
return (f_pos + f_neg)/2
def cnnpred_2d(seq_len=60, n_features=82, n_filters=(8,8,8), droprate=0.1):
"2D-CNNpred model according to the paper"
model = Sequential([
Input(shape=(seq_len, n_features, 1)),
Conv2D(n_filters[0], kernel_size=(1, n_features), activation="relu"),
Conv2D(n_filters[1], kernel_size=(3,1), activation="relu"),
MaxPool2D(pool_size=(2,1)),
Conv2D(n_filters[2], kernel_size=(3,1), activation="relu"),
MaxPool2D(pool_size=(2,1)),
Flatten(),
Dropout(droprate),
Dense(1, activation="sigmoid")
])
return model
def datagen(data, seq_len, batch_size, targetcol, kind):
"As a generator to produce samples for Keras model"
batch = []
while True:
# Pick one dataframe from the pool
key = random.choice(list(data.keys()))
df = data[key]
input_cols = [c for c in df.columns if c != targetcol]
index = df.index[df.index < TRAIN_TEST_CUTOFF]
split = int(len(index) * TRAIN_VALID_RATIO)
assert split > seq_len, "Training data too small for sequence length
{}".format(seq_len)
if kind == 'train':
index = index[:split] # range for the training set
elif kind == 'valid':
index = index[split:] # range for the validation set
else:
raise NotImplementedError
# Pick one position, then clip a sequence length
while True:
t = random.choice(index) # pick one time step
n = (df.index == t).argmax() # find its position in the dataframe
if n-seq_len+1 < 0:
continue # this sample is not enough for one sequence length
frame = df.iloc[n-seq_len+1:n+1]
batch.append([frame[input_cols].values, df.loc[t, targetcol]])
break
# if we get enough for a batch, dispatch
if len(batch) == batch_size:
X, y = zip(*batch)
X, y = np.expand_dims(np.array(X), 3), np.array(y)
yield X, y
batch = []
def testgen(data, seq_len, targetcol):
"Return array of all test samples"
batch = []
for key, df in data.items():
input_cols = [c for c in df.columns if c != targetcol]
# find the start of test sample
t = df.index[df.index >= TRAIN_TEST_CUTOFF][0]
n = (df.index == t).argmax()
# extract sample using a sliding window
for i in range(n+1, len(df)+1):
frame = df.iloc[i-seq_len:i]
batch.append([frame[input_cols].values, frame[targetcol][-1]])
X, y = zip(*batch)
return np.expand_dims(np.array(X),3), np.array(y)
# Read data into pandas DataFrames
data = {}
for filename in os.listdir(DATADIR):
if not filename.lower().endswith(".csv"):
continue # read only the CSV files
filepath = os.path.join(DATADIR, filename)
X = pd.read_csv(filepath, index_col="Date", parse_dates=True)
# basic preprocessing: get the name, the classification
# Save the target variable as a column in dataframe for easier dropna()
name = X["Name"][0]
del X["Name"]
cols = X.columns
X["Target"] = (X["Close"].pct_change().shift(-1) > 0).astype(int)
X.dropna(inplace=True)
# Fit the standard scaler using the training dataset
index = X.index[X.index < TRAIN_TEST_CUTOFF]
index = index[:int(len(index) * TRAIN_VALID_RATIO)]
scaler = StandardScaler().fit(X.loc[index, cols])
# Save scale transformed dataframe
X[cols] = scaler.transform(X[cols])
data[name] = X
seq_len = 60
batch_size = 128
n_epochs = 20
n_features = 82
# Produce CNNpred as a binary classification problem
model = cnnpred_2d(seq_len, n_features)
model.compile(optimizer="adam", loss="mae", metrics=["acc", f1macro])
model.summary() # print model structure to console
# Set up callbacks and fit the model
# We use custom validation score f1macro() and hence monitor for "val_f1macro"
checkpoint_path = "./cp2d-{epoch}-{val_f1macro:.2f}.h5"
callbacks = [
ModelCheckpoint(checkpoint_path,
monitor='val_f1macro', mode="max",
verbose=0, save_best_only=True, save_weights_only=False,
save_freq="epoch")
]
model.fit(datagen(data, seq_len, batch_size, "Target", "train"),
validation_data=datagen(data, seq_len, batch_size, "Target", "valid"),
epochs=n_epochs, steps_per_epoch=400, validation_steps=10, verbose=1,
callbacks=callbacks)
# Prepare test data
test_data, test_target = testgen(data, seq_len, "Target")
# Test the model
test_out = model.predict(test_data)
test_pred = (test_out > 0.5).astype(int)
print("accuracy:", accuracy_score(test_pred, test_target))
print("MAE:", mean_absolute_error(test_pred, test_target))
print("F1:", f1_score(test_pred, test_target))
这是错误:
File ~\OneDrive\Desktop\SAMIR\Nile University Undergraduate\Final Year Project\CNNpred-
Keras-main\2DCNNPredOG.py:6 in <module>
import tensorflow as tf
File ~\anaconda3\lib\site-packages\tensorflow\__init__.py:37 in <module>
from tensorflow.python.tools import module_util as _module_util
File ~\anaconda3\lib\site-packages\tensorflow\python\__init__.py:42 in <module>
from tensorflow.python import data
File ~\anaconda3\lib\site-packages\tensorflow\python\data\__init__.py:21 in <module>
from tensorflow.python.data import experimental
File ~\anaconda3\lib\site-packages\tensorflow\python\data\experimental\__init__.py:95 in
<module>
from tensorflow.python.data.experimental import service
File ~\anaconda3\lib\site-
packages\tensorflow\python\data\experimental\service\__init__.py:387 in <module>
from tensorflow.python.data.experimental.ops.data_service_ops import distribute
File ~\anaconda3\lib\site-
packages\tensorflow\python\data\experimental\ops\data_service_ops.py:23 in <module>
from tensorflow.python.data.experimental.ops import compression_ops
File ~\anaconda3\lib\site-
packages\tensorflow\python\data\experimental\ops\compression_ops.py:16 in <module>
from tensorflow.python.data.util import structure
File ~\anaconda3\lib\site-packages\tensorflow\python\data\util\structure.py:22 in <module>
from tensorflow.python.data.util import nest
File ~\anaconda3\lib\site-packages\tensorflow\python\data\util\nest.py:36 in <module>
from tensorflow.python.framework import sparse_tensor as _sparse_tensor
File ~\anaconda3\lib\site-packages\tensorflow\python\framework\sparse_tensor.py:24 in
<module>
from tensorflow.python.framework import constant_op
File ~\anaconda3\lib\site-packages\tensorflow\python\framework\constant_op.py:25 in <module>
from tensorflow.python.eager import execute
File ~\anaconda3\lib\site-packages\tensorflow\python\eager\execute.py:23 in <module>
from tensorflow.python.framework import dtypes
File ~\anaconda3\lib\site-packages\tensorflow\python\framework\dtypes.py:29 in <module>
_np_bfloat16 = _pywrap_bfloat16.TF_bfloat16_type()
TypeError: Unable to convert function return value to a Python type! The signature was
() -> handle
如何解决这个问题?
检查你的numpy是否是最新版本。
import numpy
print(numpy.__version__)
如果你的numpy不是最新版本,请尝试升级到最新版本。
pip install numpy --upgrade
我最近设法解决了这个错误。它可能不一定需要最新的 numpy 版本。我正在安装 tensorflow=2.10 和 numpy=2.0.1 抛出错误。将 numpy 降级到 numpy=1.26.4 对我有用。