数据工程101 – Apache Spark中的数据源每个数据工程师都必须知道!
从不同类型的数据源读取和写入数据以及社区创建自己的贡献的能力可以说是Spark的最大优势之一。
作为通用计算引擎,Spark可以处理来自各种数据管理/存储系统的数据,包括HDFS,Hive,Cassandra和Kafka。为了灵活性和高吞吐量,Spark定义了数据源API,它是存储层的抽象。
此数据源API有两个要求:
通用性:支持读/写大多数数据管理/存储系统。
灵活性:根据不同系统的功能自定义和优化其读写路径。
本文将使您更好地了解所有可用的核心数据源。我们将向您介绍各种可与Spark一起使用的数据源,以及更大的社区构建的无数其他数据源。Spark有六个“核心”数据源和社区编写的数百个外部数据源。
目录
Spark数据源API的结构
阅读API结构
编写API结构
您应该了解的Apache Spark数据源
CSV
JSON格式
木地板
兽人
文本
JDBC / ODBC连接
Apache Spark中的核心数据源
以下是您应该了解的Apache Spark核心数据源:
1.CSV
2.JSON
3.Parquet
4.ORC
5.JDBC / ODBC连接
6.纯文本文件
还有一些社区创建的数据源:
1. Cassandra
2. HBase
3. MongoDB
4. AWS Redshift
5. XML
和许多其他
Apache Spark的DataSources API的结构
DataFrameReader.format(...)。option(“ key”,“ value”)。schema(...)。load()
在哪里。格式用于读取所有数据源。
。格式是可选的,因为默认情况下,Spark将使用拼花格式。该选件允许我们设置键值配置以参数化必须如何读取数据。
最后,如果数据源提供架构或您打算提供架构推断,则架构是可选的。
读取API结构
在Spark中读取数据的基本方法是通过DataFrameReader。可以通过SparkSession通过以下所示的read属性来访问它:
火花阅读
由于有了DataFrameReader,因此可以指定多个值。对于不同的数据源,有多个选项集,这些选项决定了如何读取数据。除了一个以外,所有选项都可以省略。至少应为DataFrameReader提供从中读取文件的路径。
spark.read.format(“ csv”)
.option(“ mode”,“ FAILFAST”)
.option(“ inferSchema”,“ true”)
.option(“路径”,“路径/到/文件”)
.schema(someSchema)
。加载()
阅读模式
当使用半结构化数据源时,我们经常会遇到格式错误的数据。读取模式指定遇到此类数据时需要执行的操作。
宽容的遇到损坏的记录并将所有损坏的记录放在字符串列中时,将所有字段设置为null
叫做_corrupt_record
格式错误删除包含格式错误的记录的行
failFast遇到格式错误的记录后立即失败
默认为permissive。
编写API结构
DataFrameWriter.format(...).
option(...).
partitionBy(...).
bucketBy(...).
sortBy(...)
. save ()
.format指定了如何将文件写入数据源。
.option是可选的,因为Spark默认使用镶木地板。
.PartitionBy,.bucketBy,.sortBy仅与基于文件的数据源一起使用,并控制目标处文件结构的布局。
写数据
写入数据与读取数据相同。仅将DataFrameReader替换为DataFrameWriter。
dataFrame.write
使用DataFrameWriter,我们需要提供format,一系列选项和保存路径。我们可以指定许多选项,但至少需要提供目标路径。
dataframe.write.format(“ csv”)
.option(“ mode”,“ OVERWRITE”)
.option(“ dateFormat”,“ yyyy-MM-dd”)
.option(“路径”,“路径/到/文件”)
。保存()
保存模式
附加将输出文件追加到该位置已存在的文件列表中
覆写将完全覆盖那里已经存在的任何数据
errorIfExists如果指定位置已经存在数据或文件,则会引发错误并导致写入失败
忽视如果该位置存在数据或文件,请对当前DataFrame不执行任何操作
如果Spark找到目标路径中存在的数据,则errorIfExists无法写入数据。
您应该了解的不同Apache Spark数据源
CSV
CSV代表以逗号分隔的值。这是一种常见的文本文件格式,其中每一行代表一个记录,并且每个字段由记录内的逗号分隔。CSV格式结构良好,但可能是在生产方案中可使用的最棘手的文件格式之一,因为关于它们的内容和结构的假设不多。
因此,CSV阅读器具有大量选项。这些选项使您能够解决需要转义某些字符的问题,例如,当文件也是逗号分隔或以非常规方式标记的空值时,则在列内使用逗号。
读取CSV文件
spark.read.format(“ csv”)
.option(“ header”,“ true”)
.option(“ mode”,“ FAILFAST”)
.option(“ inferSchema”,“ true”)
.load(“ some / path / to / file.csv”)
如果您有一个标头上带有列名的标头,则需要显式指定true标头选项,API会将标头视为数据记录。
您也可以与他们的全名来指定数据源(即org.apache.spark.sql.csv),但内置信号源,也可以用自己的短名称(csv,json, parquet, jdbc, text等)。
在读取具有指定架构的CSV文件时,文件中的数据可能与架构不匹配。例如,包含城市名称的字段不会解析为整数。结果取决于解析器运行的模式:
PERMISSIVE (默认):为无法正确解析的字段插入null
DROPMALFORMED:删除包含无法解析的字段的行
FAILFAST:如果发现任何格式错误的数据,则中止读取。
下表列出了CSV阅读器上可用的选项:
读/写键潜在价值默认描述
都九月任何单个字符串
字符
,用作每个字段和值的分隔符的单个字符。
都标头真假假布尔值标志,用于声明文件的第一行是否为列名。
读逃逸任何字符串字符\字符Spark应该用于转义文件中的其他字符。
读推理模式真假假指定在读取文件时Spark是否应推断列类型。
读ignoreLeadingWhiteSpace真假假声明是否应跳过读取值的前导空格。
读ignoreTrailingWhiteSpace真假假声明是否应跳过读取值的尾随空格。
都空值JSON数据源选项:任何字符串字符“”声明什么字符代表文件中的空值。
都nanValue任何字符串字符N声明什么字符表示CSV文件中的NaN或缺少字符。
都阳性信息任何字符串或
字符
信息声明哪些字符表示正无穷大。
都负信息任何字符串或
字符
-信息声明哪些字符表示负无穷大。
都压缩或编解码器无,未压缩,
bzip2,放气,
gzip,lz4或snappy
没有声明Spark应当使用哪种压缩编解码器读取或写入文件。
都日期格式任何字符串或
那个角色
符合java的
SimpleDataFormat。
YYYY-MM-dd声明任何日期类型列的日期格式。
都时间戳格式任何字符串或
那个角色
符合java的
SimpleDataFormat。
YYYY-MM-
dd'T'HH:mm
:ss.SSSZZ
声明任何时间戳类型的时间戳格式。
读maxColumns任何整数20480声明文件中的最大列数。
读maxCharsPerColumn任何整数1000000声明一列中的最大字符数。
读转义语真假真正声明Spark是否应转义在行中找到的引号。
读maxMalformedLogPerPartition任何整数10设置Spark将为每个分区记录的格式错误的最大行数。超出此数字的格式错误的记录将被忽略。
写quoteAll真假假指定是否应将所有值都用引号引起来,而不是仅转义具有引号字符的值。
读多线真假假通过此选项,您可以读取多行CSV文件,其中CSV文件中的每个逻辑行都可能跨越文件本身中的多个行。
编写CSV文件
csvFile.write.format(“ csv”)
.mode(“覆盖”)
.option(“ sep”,“ \ t”)\
.save(“ / tmp / my-tsv-file.tsv”)
JSON格式
来自不同编程语言(尤其是Java和JavaScript)的人们必须意识到JavaScript Object Notation或JSON(众所周知的JSON)。在Spark中,当我们引用JSON文件时,我们指的是行分隔的JSON文件。这与每个文件具有较大JSON对象或数组的文件形成对比。
当在Spark中工作时,当我们引用JSON文件时,我们将引用行分隔的JSON文件。行定界与多行权衡是由一个选项控制的:multiLine。当
将此选项设置为true时,您可以将整个文件作为一个JSON对象读取,Spark将完成将其解析为DataFrame的工作。
Spark SQL可以自动推断JSON数据集的架构并将其作为DataFrame加载。可以使用SparkSession.read.jsonJSON文件完成此转换。请注意,以JSON文件形式提供的文件不是典型的JSON文件。每行必须包含一个单独的,自包含的有效JSON对象。
有关更多信息,请参见JSON Lines文本格式,也称为newline分隔的JSON。
JSON数据源选项:
任何单个字符串
读/写键潜在价值默认描述
都没有无,未压缩,bzip2,deflate,gzip,lz4或snappy没有Decl声明Spark应该使用哪种压缩编解码器来读取或写入文件。
都日期格式符合Java的yyyy-MM-dd SimpleDataFormat的任何字符串或字符。声明任何日期类型列的日期格式。
都日期格式符合Java的yyyy-MM-dd SimpleDataFormat的任何字符串或字符。声明任何日期类型列的日期格式。
都originalAsString真假假将所有原始值推断为字符串类型。
都时间戳格式符合Java的yyyy-MM-dd'T'HH:mm:ss.SSSZZ SimpleDataFormat的任何字符串或字符。声明所有属于时间戳类型的列的时间戳格式。
读允许评论真假假忽略JSON记录中的Java / C ++样式注释。
读allowUnquoted-
栏位名称
真假假允许使用不带引号的JSON字段名称。
读allowSingleQuotes真假真正除双引号外,还允许单引号。
读多线真假假允许读取非行分隔的JSON文件。
读allowNumeric-
领先零
真假假允许数字前导零(例如00012)。
读allowBackslash-
转义任何字符
真假假允许使用反斜杠引用机制接受所有字符的引用。
读columnNameOf-
损坏记录
任何字符串spark.sql.column&NameOfCorruptRecord的值允许重命名新字段,该新字段创建了格式错误的字符串,其值是“宽松模式”。这将覆盖配置值。
读取JSON文件
spark.read.format(“ json”)
.option(“ mode”,“ FAILFAST”)\
.option(“ inferSchema”,“ true”)\
.load(“ / data / movie-data / json / 2010-summary.json”)
编写JSON文件
csvFile.write.format(“ json”)
.mode(“覆盖”)
.save(“ / tmp / my-json-file.json”)
实木复合地板文件
Parquet是可用于Hadoop生态系统中任何项目的开源文件格式。与CSV或TSV文件等基于行的文件相比,Apache Parquet旨在提高效率以及数据的高性能扁平列存储格式。
Parquet使用记录粉碎和组装算法,该算法优于嵌套名称空间的简单扁平化。Parquet经过优化,可以批量处理复杂的数据,并采用不同的方式进行有效的数据压缩和编码类型。这种方法最适合需要从大表中读取某些列的查询。Parquet只能读取所需的列,因此大大减少了IO。
读取实木复合地板文件
spark.read.format(“ parquet”)\
.load(“ / data / movie-data / parquet / 2020-summary.parquet”)。show(5)
编写实木复合地板文件
csvFile.write.format(“ parquet”)
.mode(“ overwrite”)\
.save(“ / tmp / my-parquet-file.parquet”
兽人
所述优化行柱状(ORC)文件格式提供了存储数据蜂房一种高度有效的方法。它旨在克服其他Hive文件格式的限制。当Hive读取,写入和处理数据时,使用ORC文件可以提高性能。
与RCFile格式相比,ORC文件格式具有许多优点,例如:
一个文件作为每个任务的输出,从而减轻了NameNode的负担
Hive类型支持,包括日期时间,十进制和复杂类型(结构,列表,映射和联合)
存储在文件中的轻量级索引
跳过不通过谓词过滤的行组
寻求给定的行
基于数据类型的块模式压缩
整数列的游程编码
字符串列的字典编码
使用单独的RecordReaders并发读取同一文件
无需扫描标记即可分割文件的功能
限制读取或写入所需的内存量
使用协议缓冲区存储的元数据,允许添加和删除字段
读取兽人文件
spark.read.format(“ orc”)
.load(“ / data / movie-data / orc / 2020-summary.orc”)。show(5)
编写兽人文件
csvFile.write.format(“ orc”)
.mode(“覆盖”)
.save(“ / tmp / my-json-file.orc”
文字档案
Spark还允许您读取纯文本文件。文件中的每一行都成为DataFrame中的一条记录。然后由您自己进行相应的转换。作为如何执行此操作的示例,假设您需要将某些Apache日志文件解析为某种更结构化的格式,或者您可能想解析一些纯文本以进行自然语言处理。
文本文件因其能够利用本机类型的灵活性而成为Dataset API的重要参数。
读取文字档
spark.read.textFile(“ / data / movie-data / csv / 2020-summary.csv”)
。
编写文本文件
csvFile.select(“ DEST_COUNTRY_NAME”)
.write.text(“ / tmp / simple-text-file.txt”
JDBC / ODBC连接
SQL数据源是功能更强大的连接器之一,因为可以连接多种系统(只要该系统使用SQL即可)。例如,您可以连接到MySQL数据库,PostgreSQL数据库或Oracle数据库。您还可以连接到SQLite,这是我们在此示例中所做的。
当然,数据库不仅是一组原始文件,因此,关于如何连接到数据库,还有更多选项可供考虑。即,您将需要开始考虑诸如身份验证和连接之类的事情(您将需要确定Spark集群的网络是否已连接到数据库系统的网络)。
首先,您需要在spark类路径上包含特定数据库的JDBC驱动程序。例如,要从Spark Shell连接到Postgres,您可以运行以下命令:
./bin/spark-shell \
--driver-class-path postgresql-9.4.1207.jar \
--jars postgresql-9.4.1207.jar
可以使用数据源API将外部或远程数据库中的表作为数据框或临时视图加载。
从JDBC源读取数据
jdbcDF = spark.read \
.format(“ jdbc”)\
.option(“ url”,“ jdbc:postgresql:dbserver”)\
.option(“ dbtable”,“ schema.tablename”)\
.option(“用户”,“用户名”)\
.option(“密码”,“密码”)\
。加载()
将数据保存到JDBC源
jdbcDF.write \
.format(“ jdbc”)\
.option(“ url”,“ jdbc:postgresql:dbserver”)\
.option(“ dbtable”,“ schema.tablename”)\
.option(“用户”,“用户名”)\
.option(“密码”,“密码”)\
。保存()
尾注
我们讨论了可用于在Spark中读写数据的各种资源。这几乎涵盖了您作为Spark的日常用户在数据源方面需要了解的所有内容。出于好奇,有几种方法可以实现您自己的数据源。
题库