Google Colab上有关数据科学家的必读指南,介绍如何使用PySpark!
Google Colab是处理大型数据集和运行复杂模型的数据科学家的救星。
对于数据工程师来说,PySpark简直就是半神半兽!
那么,当我们把这两个各自所属类别中最优秀的球员,并将它们组合在一起时,会发生什么呢?
我们(几乎)为您所有的数据科学和
机器学习问题提供了完美的解决方案!
python colab
在本文中,我们将介绍如何在Google Colaboratory笔记本中运行PySpark。我们还将执行一些大多数数据科学问题共有的基本数据探索任务。所以,让我们开始吧!
注–我假设您已经熟悉Spark和Google Colab的基础知识。如果没有,我建议在阅读这篇文章之前先阅读以下文章:
PySpark初学者
Google Colab入门
目录
将Google云端硬盘连接到Colab
从Google云端硬盘读取数据
在Google Colab中设置PySpark
将数据加载到PySpark
了解数据
使用PySpark数据框进行数据探索
显示列详细信息
显示行
数据框中的行数
显示特定列
描述列
分类列的不同值
与Groupby汇总
计算和删除空值
保存到文件
将驱动器连接到Colab
使用Colab时,您要做的第一件事就是安装Google云端硬盘。这将使您能够访问Colab笔记本中Drive上的任何目录。
从google.colab导入驱动器
drive.mount('/ content / drive')
完成此操作后,下一个显而易见的步骤就是加载数据。
奖励-您可以在本文中找到一些针对Google Colab的惊人技巧!
从驱动器读取数据
现在,我假设您将使用足够大的数据集。因此,将数据上传到云端硬盘的最佳方法是使用zip格式。只需将zip文件夹拖放到您要在云端硬盘上的任何目录内即可。
解压缩这些数据根本不是麻烦。您只需要提供zip文件夹的路径以及!unzip命令即可。
!unzip“ /内容/驱动器/我的驱动器/ AV文章/ Colab上的PySpark / black_friday_train.zip”
如果您不确定该文件夹的确切位置,可以从Colab的侧面板上将其检出。
PySpark Colab-colab文件路径
好的,让我们设置Spark
在Colab中设置PySpark
Spark用Scala编程语言编写,需要Java虚拟机(JVM)才能运行。因此,我们的首要任务是下载Java。
!apt-get install openjdk-8-jdk-headless -qq> / dev / null
接下来,我们将在此处从Hadoop 2.7安装Apache Spark 3.0.1 。
!wget -q
https://www-us.apache.org/dist/s ... 1-bin-hadoop2.7.tgz
现在,我们只需要解压缩该文件夹。
!tar xf spark-3.0.1-bin-hadoop2.7.tgz
注–在撰写本文时,3.0.1是Apache Spark的最新版本。但是Spark正在飞速发展。因此,如果在执行此代码时存在Spark的较新版本,则只需在任何地方看到3.0.1都将其替换为最新版本。
我们需要安装的最后一件事是findspark库。它将在系统上找到Spark并将其作为常规库导入。
!pip install -q findspark
现在我们已经在Colab中安装了所有必需的依赖项,现在是时候设置环境路径了。这将使我们能够在Colab环境中运行Pyspark。
导入操作系统
os.environ [“ JAVA_HOME”] =“ / usr / lib / jvm / java-8-openjdk-amd64”
os.environ [“ SPARK_HOME”] =“ /content/spark-3.0.1-bin-hadoop2.7”
是时候进行真正的测试了!
我们需要在系统中找到Spark。为此,我们导入findspark并使用findspark.init()方法。
导入findspark
findspark.init()
奖励–如果您想知道Spark的安装位置,请使用findspark.find()
findspark.find()
现在,我们可以从pyspark.sql导入SparkSession并创建一个SparkSession,这是Spark的入口点。
您可以使用appName()为会话命名,并根据需要使用config()添加一些配置。
从pyspark.sql导入SparkSession
spark = SparkSession.builder \
.master(“本地”)\
.appName(“ Colab”)\
.config('spark.ui.port','4050')\
.getOrCreate()
最后,打印SparkSession变量。
火花
PySpark Colab-火花变量
如果一切顺利,您应该可以查看上述输出。
如果要查看Spark UI,则必须包含几行代码才能为UI页面创建公共URL。
!wget
https://bin.equinox.io/c/4VmDzA7iaHb/ngrok-stable-linux-amd64.zip
!unzip ngrok-stable-linux-amd64.zip
get_ipython()。system_raw('./ ngrok http 4050&')
!curl -s http:// localhost:4040 / api / tunnels
Spark ui公共网址
现在,您应该能够在创建的链接中查看作业及其阶段。
PySpark Colab-Spark UI
大!现在让我们开始使用PySpark!
将数据加载到PySpark
首先,我们需要加载数据集。我们将使用read.csv模块。提供的inferSchema参数将使Spark能够自动确定每一列的数据类型,但是它必须对数据进行一次遍历。如果您不希望发生这种情况,那么可以改为在schema参数中显式提供架构。
df = spark.read.csv(“ train.csv”,标头=真,inferSchema =真)
这将创建一个Spark数据框。
奖励– Spark中有多个数据源,您可以在本文中了解所有这些数据源!
了解数据
我们从DataHack平台获得了黑色星期五数据集。过去一个月有零售公司的各种客户的购买摘要。我们向客户提供了人口统计信息,购买明细和总购买金额。目的是预测每个客户针对各种产品的购买金额。
PySpark Colab-数据集
使用PySpark DF进行数据探索
现在是时候使用PySpark数据框功能浏览我们的数据了。在此过程中,我们将继续将其与Pandas数据框进行比较。
显示列详细信息
探索性
数据分析的第一步是检查数据框的架构。这将使您对数据框中的列及其数据类型有一个鸟瞰图。
df.printSchema()
PySpark Colab-Spark DF模式
显示行
现在,您显然显然也希望查看实际数据。
就像在Pandas Dataframe中一样,您具有df.head()函数,在这里您具有show()函数。您可以在括号内提供要打印的行数。
df.show(5)
PySpark Colab-Spark df Show
DF中的行数
如果您想知道数据框中的总行数,只需使用count()函数。
df.count()
550068
显示特定列
有时您可能想查看数据框中的某些特定列。为此,您可以利用Spark的SQL功能。
使用select()函数,您可以提及要查看的任何列。
df.select(“ User_ID”,“性别”,“年龄”,“职业”).show(5)
df选择
描述列
通常,当我们使用数字功能时,我们希望查看有关数据框的统计信息。的描述()函数是最适合用于这样的目的。
它与Panda的describe函数非常相似,但统计值要少得多,并且字符串列也得到了描述。
df.describe()。show()
与Google Colab上的PySpark合作,为数据科学家服务!pyspark描述
分类列的不同值
当您要确定数据框类别列中的唯一值时,distinct()将派上用场。
df.select(“ City_Category”)。distinct()。show()
火花df与众不同
与Groupby汇总
我们可以使用groupBy函数对数据框列值进行分组,然后对它们应用聚合函数以得出一些有用的见解。
在这里,我们可以在数据框中对各个城市类别进行分组,并确定每个城市类别的总购买量。为此,我们必须使用Spark SQL函数模块中的sum聚合函数。
从pyspark.sql导入功能为F
df.groupBy(“ City_Category”)。agg(F.sum(“ Purchase”))。show()
spark df groupby汇总
计算和删除空值
现在我们都知道,现实世界的数据并不能忽略缺失的值。因此,始终检查丢失的值并删除它们(如果存在)是明智的。
df.select([F.count(F.when(F.isnull(c),c))。alias(c)for df.columns中的c])。show()
我们有一些带有空值的列。因此,最好用一些值替换它们。根据我们的数据集,“产品类别”列中的空值可能表示用户未购买产品。因此,最好将空值替换为0。
我们将使用fillna()函数替换空值。由于Spark数据帧是不可变的,因此我们需要将结果存储在新的数据帧中。
df = df.fillna({'Product_Category_2':0,'Product_Category_3':0})
我们可以再次检查空值以验证更改。
df.select([F.count(F.when(F.isnull(c),c))。alias(c)for df.columns中的c])。show()
计算pyspark中的空值
完善!数据框中没有更多的空值。
保存到文件
最后,在完成所有分析后,如果要将结果保存到新的CSV文件中,则可以使用write.csv()函数进行操作。
df.write.csv(“ /内容/驱动器/我的驱动器/ AV文章/ Colab上的PySpark / preprocessed_data”)
但是这里有一个陷阱。不会保存单个CSV,而是保存多个CSV,具体取决于数据帧的分区数量。因此,如果有2个分区,则每个分区将保存两个CSV文件。
df.rdd.getNumPartitions()
2
将文件保存在pyspark中
奖励–我在这里将Spark数据帧转换为RDD。两者有什么区别?看看 这篇文章!
但是,当我们不得不再次加载这些文件时,这不是很方便。因此,我们可以将Spark df转换为旧的Pandas df,然后使用常规的to_csv()方法存储结果。
#火花df于熊猫df
df_pd = df.toPandas()
#存储结果
df_pd.to_csv(“ /内容/驱动器/我的驱动器/ AV文章/ Colab上的PySpark / pandas_preprocessed_data.csv”)
题库