我想分享我的计划和想法,将异步编程与机器学习相结合,所以这是我的想法:让我们考虑以下代码:
import pandas as pd
from sklearn.ensemble import VotingClassifier
from sklearn.svm import SVC
from sklearn.linear_model import LogisticRegression
from sklearn.tree import DecisionTreeClassifier
import time
from sklearn.model_selection import train_test_split
from sklearn.preprocessing import LabelEncoder
mylabel =LabelEncoder()
data =pd.read_csv("https://raw.githubusercontent.com/datasciencedojo/datasets/master/titanic.csv",usecols=lambda x: x not in ["PassengerId","Ticket"])
# print(data.columns)
st = time.time()
#Process the data
data['Name'] =data['Name'].map(lambda x:x.split(',')[1].split('.')[0])
data['Age'] =data['Age'].fillna(data['Age'].mean())
data['Sex'] =mylabel.fit_transform(data['Sex'])
data['Cabin'] =data['Cabin'].fillna(data['Cabin'].value_counts().index[0])
data['Cabin'] =data['Cabin'].map(lambda x:x[0])
data['Embarked'] =data['Embarked'].fillna(data['Embarked'].value_counts().index[0])
# print(data.isnull().any())
# print(data.dtypes)
#convert categorical columns
cat_columns =data.select_dtypes(include=['object']).columns
print(cat_columns)
data[cat_columns] = data[cat_columns].apply(LabelEncoder().fit_transform)
#seperate and divide into train and test
y =data['Survived'].values
X =data.drop('Survived',axis=1).values
X_train,X_test,y_train,y_test =train_test_split(X,y,test_size=0.2,random_state=1)
eclf1 = VotingClassifier(estimators=[('svc', SVC(probability=True)), ('LR', LogisticRegression(max_iter=5000)),
('cart', DecisionTreeClassifier())], voting='soft')
eclf1.fit(X_train,y_train)
print(eclf1.score(X_test,y_test))
# get the end time
et = time.time()
# get the execution time
elapsed_time = et - st
print('Execution time:', elapsed_time, 'seconds')
# print(data.head())
这是标准机器学习代码,用于将 VotingClassifier 算法应用于泰坦尼克号数据(顺便说一下,它的大小非常小),我的算法大约需要:
Execution time: 0.10833930969238281 seconds
那相当小吧?所以代码运行速度非常快,但一个主要原因是数据集的大小非常小,如果我们的数据集有数百万甚至数十亿行甚至超出人类想象的大小怎么办?这周我开始学习 asyncio 库这是相应的官方 python 网络链接:asyncronous
现在如果我们看一下数据线的处理:
#Process the data
data['Name'] =data['Name'].map(lambda x:x.split(',')[1].split('.')[0])
data['Age'] =data['Age'].fillna(data['Age'].mean())
data['Sex'] =mylabel.fit_transform(data['Sex'])
data['Cabin'] =data['Cabin'].fillna(data['Cabin'].value_counts().index[0])
data['Cabin'] =data['Cabin'].map(lambda x:x[0])
data['Embarked'] =data['Embarked'].fillna(data['Embarked'].value_counts().index[0])
我们肯定可以将其分成部分并并行运行,对吧?我们不应该等待任何列并并行处理任何其他列,我的问题是我该怎么做?
还有一个问题在这里:
eclf1 = VotingClassifier(estimators=[('svc', SVC(probability=True)), ('LR', LogisticRegression(max_iter=5000)),
('cart', DecisionTreeClassifier())], voting='soft')
eclf1.fit(X_train,y_train)
我可以将 asyncio 应用于 voteclassifier 吗?或者我应该单独训练这些算法,然后将模式函数应用于预测?如果可以的话,请给出一个小例子,以便消化这些细节
事情是这样的,但我还没有测试代码(抱歉);如果你真的想要异步,你将需要:
import pandas as pd
from sklearn.model_selection import train_test_split
from sklearn.datasets import load_iris
from sklearn.ensemble import VotingClassifier
from sklearn.svm import SVC
from sklearn.linear_model import LogisticRegression
from sklearn.tree import DecisionTreeClassifier
from sklearn.metrics import accuracy_score
from concurrent.futures import ThreadPoolExecutor
def split_dataframe(df, chunk_size=10000):
chunks = []
num_chunks = len(df) // chunk_size + 1
for i in range(num_chunks):
chunks.append(df[i * chunk_size:(i + 1) * chunk_size])
return chunks
def fit_classifier_async(classifier, train_data, train_labels):
classifier.fit(train_data, train_labels)
return classifier
# Load the Iris dataset
iris = load_iris()
X = pd.DataFrame(iris.data, columns=iris.feature_names)
y = pd.Series(iris.target, name='target')
# Split the data into training and testing sets
X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.2, random_state=42)
# Generate an example DataFrame (you can replace this with your actual data)
# For simplicity, I'll use the first 100 samples of the Iris dataset
example_df = pd.concat([X_train.iloc[:50], X_train.iloc[50:100], X_train.iloc[100:150]], ignore_index=True)
example_labels = pd.concat([y_train.iloc[:50], y_train.iloc[50:100], y_train.iloc[100:150]], ignore_index=True)
# Create individual classifiers
svc_classifier = SVC(probability=True)
lr_classifier = LogisticRegression(max_iter=5000)
cart_classifier = DecisionTreeClassifier()
# Create the VotingClassifier
eclf1 = VotingClassifier(estimators=[
('svc', svc_classifier),
('LR', lr_classifier),
('cart', cart_classifier)
], voting='soft')
# Split the dataset into chunks
chunks = split_dataframe(example_df)
# Asynchronously fit the classifier on each chunk
with ThreadPoolExecutor() as executor: # set the number of threads here
fitted_classifiers = list(executor.map(
fit_classifier_async,
[eclf1] * len(chunks), # Use the same classifier for each chunk
chunks,
[example_labels] * len(chunks) # Use the same labels for each chunk
))
# Combine the fitted classifiers into a single ensemble model
for fitted_classifier in fitted_classifiers:
eclf1.estimators_ += fitted_classifier.estimators_
# Make predictions on the test set
predictions = eclf1.predict(X_test)
# Evaluate accuracy
accuracy = accuracy_score(y_test, predictions)
print(f'Accuracy: {accuracy:.2f}')