全部版块 我的主页
论坛 数据科学与人工智能 数据分析与数据科学 数据分析与数据挖掘
1566 0
2020-10-30
使用scikit-learn和PySpark Pandas UDF进行大规模预测
scikit-learn 是使用Python进行机器学习的绝佳工具,具有实现管道和运行实验的极大灵活性(例如,参见Civis博客系列文章),但它并不是真正为“大数据”上的分布式计算而设计的(例如,数亿条记录或更多)。
至少在Civis处,一种常见的预测建模方案是具有少量或中等数量的标记数据以从中估计模型(例如10
PySpark具有腌制python对象(包括函数)的功能,并将它们应用于在进程,机器等之间分布的数据。此外,它具有类似于pandas的语法,但是将计算的定义与执行分开,类似于TensorFlow。
一个问题是,在a)基于Java的Spark执行过程之间传递数据,该过程在计算机之间发送数据并可以高效地执行转换; b)Python过程(例如,用于scikit-learn进行预测)由于以下原因而产生了一些开销:序列化和进程间通信。一种解决方案是PySpark的DataFrame API中的用户定义函数(UDF)。您可以使用DataFrame API在Java中高效地执行大多数操作(而无需编写Java或Scala!),然后仅在必要时调用会导致Java-Python通信开销的Python UDF 。        
普通的PySpark UDF一次只能运行一个值,这会产生大量的Java-Python通信开销。最近,PySpark添加了Pandas UDF,可通过Apache Arrow将DataFrame列的块有效地转换为Pandas Series对象,从而避免了常规UDF的大量开销。让UDF期望Pandas Series还可以节省scikit-learn在Python和NumPy浮点表示之间的转换,就像常规UDF要做的那样。      
该PySpark文档总体上是好的大概还有熊猫的UDF(一些帖子1,2,3),但也许下面的示例代码将帮助一些人谁拥有部署在PySpark预测scikit学习模型的具体使用情况。最近,在Civis中,我们发现这种工作流程对于多种用例非常有前途(请参阅本SciPy演讲),希望您也能如此。   
最后一点,我要提到值得一提的是,PySpark可以替代Pandas来实现数据框和/或Python的并发功能。PySpark只需要一点点工作就可以处理小型数据集,但是如果需要,可以更轻松地进行扩展。  
pyspark_pandas_udf_sklearn.ipynb 托管与?通过 GitHub上
设置并生成一些数据
在[1]中:
进口 numpy的 是 NP 从 sklearn.datasets 导入 make_classification 从 sklearn.model_selection 进口 train_test_split 从 sklearn.ensemble 进口 RandomForestClassifier 从 sklearn.model_selection 进口 GridSearchCV 进口 大熊猫 作为 PD 进口 pyspark 进口 pyspark.sql.functions 作为 ?F 从 pyspark.sql.types 导入 DoubleType , StringType , ArrayType
在[2]中:
#制作一些虚假数据并训练模型。 n_samples_test  =  100000  n_samples_train  =  1000  n_samples_all  =  n_samples_train  +  n_samples_test  n_features  =  50   X , ?  =  make_classification (N_SAMPLES次= n_samples_all , n_features = n_features , random_state = 123 ) X_train , X_test , y_train , y_test  = \      train_test_split (X
使用scikit-learn训练模型
在[3]中:
param_grid  =  { 'n_estimators' : [ 100 ], 'MAX_DEPTH' : [ 2 , 4 , 无]}  gs_rf  =  GridSearchCV (     RandomForestClassifier (random_state = 42 ),     param_grid = param_grid ,     得分= 'roc_auc' ) 。 适合(X_train ,y_train )打印('ROC AUC:%. 3f ' %gs_rf 。    best_score_ )
ROC AUC:0.959
设置火花环境
在[4]中:
sc  =  pyspark 。SparkContext (appName = “ foo” ) sqlContext  =  pyspark 。SQLContext (sc )
现在加载数据并进行预测。
在实际使用中,读取原始数据后,我们可能会执行大量ETL,但是在这里,我们将其加载。
在[5]中:
df_unlabeled  =  sqlContext 。阅读。实木复合地板(' unlabeled_data ' ) df_unlabeled
出[5]:
DataFrame [id:bigint,feature0:double,feature1:double,feature2:double,feature3:double,feature4:double,feature5:double,feature6:double,feature7:double,feature8:double,feature9:double,feature10:double, Feature11:双重,Feature12:双重,Feature13:双重,Feature14:双重,Feature15:双重,Feature16:双重,Feature17:双重,Feature18:双重,Feature19:双重,Feature20:双重,Feature21:双重,Feature22:双重,Feature23: double,feature24:double,feature25:double,feature26:double,feature27:double,feature28:double,feature29:double,feature30:double,feature31:double,feature32:double,feature33:double,feature34:double,feature35:double, Feature36:双重,Feature37:双重,Feature38:双重,Feature39:双重,Feature40:双重,Feature41:双重,Feature42:双重,Feature43:双重,Feature44:double,feature45:double,feature46:double,feature47:double,feature48:double,feature49:double,__ index_level_0 __:bigint)
使用常规UDF进行预测
首先,我们将尝试常规的UDF。这将一次反序列化一行(即,实例,样本,记录),使用进行预测,然后返回预测,该预测将被序列化并发送回Spark以与所有其他预测结合。
在[6]中:
@F 。UDF (返回类型= DoubleType ()) 高清 predict_udf (*的cols ):      #的cols将在这里彩车的元组。     返回 浮子(gs_rf 。predict_proba ((COLS ,))[ 0 , 1 ])  df_pred_a  =  df_unlabeled 。选择(     ?F 。栏('ID' ),     predict_udf (* COLUMN_NAMES )。别名('预测' ) ) df_pred_a 。接(5 )
出[6]:
[行(id = 0,预测值= 0.96),行(id = 1,预测值0.13),行(id = 2,预测值0.95),行(id = 3,预测值0.43),行(id = 4 ,预测= 0.95)]
使用Pandas UDF进行预测
现在,我们将使用Pandas UDF(即矢量化UDF)。在这种情况下,Spark将一次发送一个具有多个行的pandas Series对象元组。元组将按传递到UDF的顺序每个列/功能具有一个Series。请注意,这些系列对象之一不会一次包含所有行的功能,因为Spark在工作人员之间划分了数据集。分区大小可以调整,但是我们在这里只使用默认值。
在[7]中:
@F 。pandas_udf (返回类型= DoubleType ()) 高清 predict_pandas_udf (*的cols ):      #的cols将在这里pandas.Series的元组。     X  =  pd 。concat (cols , axis = 1 )     返回 pd 。系列(gs_rf 。predict_proba (X )[:, 1 ])  df_pred_b  =  df_unlabeled 。选择(     ?F 。COL('id' ),     predict_pandas_udf (* column_names )。别名('prediction' ) ) df_pred_b 。接(5 )
出[7]:
[行(id = 0,预测值= 0.96),行(id = 1,预测值0.13),行(id = 2,预测值0.95),行(id = 3,预测值0.43),行(id = 4 ,预测= 0.95)]
进行多类预测
上面,我们只为肯定类返回一系列预测,该预测对单个二进制或因变量有效。也可以将多类或多标签模型放入Pandas UDF中。一个只返回一系列数字列表,而不是一系列数字。
在[8]中:
@F 。pandas_udf (returnType = ArrayType (DoubleType ())) def 预测_pandas_udf (* cols ):     X  =  pd 。concat (cols , axis = 1 )     返回 pd 。系列(行。tolist () 为 行 中 gs_rf 。predict_proba (X ))  df_pred_multi  =  (     df_unlabeled 。select(         F.col('id')
出[8]:
[行(id = 0,预测_0 = 0.04,预测_1 = 0.96),行(id = 1,预测_0 = 0.87,预测_1 = 0.13),行(id = 2
1
二维码

扫码加我 拉你入群

请注明:姓名-公司-职位

以便审核进群资格,未注明则拒绝

相关推荐
栏目导航
热门文章
推荐文章

说点什么

分享

扫码加好友,拉您进群
各岗位、行业、专业交流群