全部版块 我的主页
论坛 数据科学与人工智能 大数据分析 数据仓库技术
1154 0
2024-02-26
pan---baidu----com/s/17TOzIVi-6yG24_QAmK60BQ 提取码: mxcp

离线计算作为大数据计算领域领军技能,在成本、稳定性、数据一致性等方面有着绝对优势。吃透Spark离线技术及相关生态,就掌握了大数据工程师的高薪密码。本文章将结合生产级项目,一栈式点亮:数据收集(DataX)、数据湖(Iceberg)、数据分析(Spark)、智能调度(DS)、数据服务(DBApi)、AI大模型(ChatGPT)、可视化(Davinci)等离线处理核心技能及生态体系,带你打通硬核技能,拓宽上升通道。

首先,我们先来认识spark:
1、什么是spark
  Spark是一种快速、通用、可扩展的大数据分析引擎,2009年诞生于加州大学伯克利分校AMPLab,2010年开源,2013年6月成为Apache孵化项目,2014年2月成为Apache顶级项目。目前,Spark生态系统已经发展成为一个包含多个子项目的集合,其中包含SparkSQL、Spark Streaming、GraphX、MLlib等子项目,Spark是基于内存计算的大数据并行计算框架。

2、spark有什么用?
 大数据处理和分析:Spark提供了高性能和可扩展的分布式计算能力,可以处理大规模的数据集。它支持批处理、实时流处理和交互式查询等多种数据处理模式,使得开发人员能够高效地处理和分析大数据。

  数据转换和清洗:Spark提供丰富的数据处理操作和函数,如映射、过滤、聚合、排序等,使开发人员能够方便地对数据进行转换、清洗和预处理,以满足特定的数据需求。

  机器学习和数据挖掘:Spark提供了机器学习库(如MLlib)和图计算库(如GraphX),支持在大规模数据集上进行机器学习和数据挖掘。开发人员可以使用Spark进行特征提取、模型训练和预测等任务。

  实时流处理:Spark提供了Spark Streaming模块,支持实时数据流的处理和分析。开发人员可以使用Spark Streaming来处理实时数据流,如日志流、传感器数据流等,并进行实时计算、聚合和窗口操作等。

3、spark的使用场景有哪些
批处理:Spark可以处理大规模的数据集,并提供了丰富的数据处理和转换功能,适用于各种批处理任务,如数据清洗、ETL、数据分析等。

实时流处理:Spark的流处理模块Spark Streaming可以实时处理数据流,并提供了低延迟的处理能力,适用于实时推荐、实时分析、日志处理等应用场景。

机器学习:Spark提供了机器学习库MLlib,包括各种常用的机器学习算法和工具,可以在大规模数据上进行机器学习任务,如分类、回归、聚类、推荐等。

图计算:Spark的图计算库GraphX可以处理大规模图结构数据,并提供了各种图算法和操作,适用于社交网络分析、网络图谱等应用。

SQL查询:Spark支持使用SQL进行数据查询和分析,可以直接在Spark上运行SQL查询,与传统的关系型数据库类似,适用于数据分析和报表生成等任务。

分布式文件系统:Spark可以与分布式文件系统(如HDFS)集成,可以直接读取和处理分布式文件系统中的数据,适用于大规模数据集的处理和分析。

总的来说,Spark适用于大规模数据的处理和分析,支持多种类型的数据处理和计算任务,包括批处理、实时流处理、机器学习、图计算等领域。

4、Spark常用代码
①创建RDD方法
有两个不同的方式可以创建新的RDD
from pyspark import SparkConf, SparkContext


conf = SparkConf().setAppName("createWholeTextFile").setMaster("local
  • ")
    sc = SparkContext(conf=conf)

    file_rdd = sc.textFile("/export/workspace/bigdata-pyspark_2.3.0/PySpark-SparkCore_2.3.0/data/ratings100")
    print("file_rdd numpartitions {}".format(file_rdd.getNumPartitions())) # 100 100个文件100个分区

    # 用于读取小文件并自动压缩分区
    wholefile_rdd = sc.wholeTextFiles("/export/workspace/bigdata-pyspark_2.3.0/PySpark-SparkCore_2.3.0/data/ratings100")
    print("wholefile_rdd numpartitions {}".format(wholefile_rdd.getNumPartitions())) # 2 把100个文件压缩到2个分区

    result = wholefile_rdd.take(1)
    # print(result) # (location, value)的形式
    # 获取前面的路径
    path_list = wholefile_rdd.map(lambda x: x[0]).collect()
    sc.stop()

    ②专门读取小文件wholeTextFiles
    from pyspark import SparkConf, SparkContext


    conf = SparkConf().setAppName("createWholeTextFile").setMaster("local
  • ")
    sc = SparkContext(conf=conf)

    file_rdd = sc.textFile("/export/workspace/bigdata-pyspark_2.3.0/PySpark-SparkCore_2.3.0/data/ratings100")
    print("file_rdd numpartitions {}".format(file_rdd.getNumPartitions())) # 100 100个文件100个分区

    # 用于读取小文件并自动压缩分区
    wholefile_rdd = sc.wholeTextFiles("/export/workspace/bigdata-pyspark_2.3.0/PySpark-SparkCore_2.3.0/data/ratings100")
    print("wholefile_rdd numpartitions {}".format(wholefile_rdd.getNumPartitions())) # 2 把100个文件压缩到2个分区

    result = wholefile_rdd.take(1)
    # print(result) # (location, value)的形式
    # 获取前面的路径
    path_list = wholefile_rdd.map(lambda x: x[0]).collect()
    sc.stop()

    ③rdd的分区数
    from pyspark import SparkConf, SparkContext


    if __name__ == '__main__':
        # spark入口申请资源
        conf = SparkConf().setAppName("createRDD").setMaster("local[5]")
        # 应该充分使用资源,线程数设置成CPU核心数的2-3倍
        # conf.set("spark.default.parallelism",  10)
        sc = SparkContext(conf=conf)

        # 创建rdd的第一种方法
        collection_rdd = sc.parallelize([1, 2, 3, 4, 5, 6])
        print(collection_rdd.collect())
        # 获取分区数
        print("rdd number of partitions ", collection_rdd.getNumPartitions())
        # 解释:
        # 设置了5个核心,默认是5个分区,如果是local
  • 默认是2个分区
        #  conf.set("spark.default.parallelism", 10)优先使用此值
        # 如果sc.parallelize也设置了分区,那么最优先使用api设置的分区数

        # 如果是读取文件夹下面的文件,sc.textFile, minPartitions失效,有多少个文件就有多少个分区,下面100个文件返回了100个分区
        file_rdd = sc.textFile("/export/workspace/bigdata-pyspark_2.3.0/PySpark-SparkCore_2.3.0/data/ratings100",
                               minPartitions=3)
        print("file_rdd numpartitions {}".format(file_rdd.getNumPartitions()))  # 100 100个文件100个分区

        # 用于读取小文件并自动压缩分区,minPartitions参数是生效的。
        wholefile_rdd = sc.wholeTextFiles("/export/workspace/bigdata-pyspark_2.3.0/PySpark-SparkCore_2.3.0/data/ratings100",
                                          minPartitions=3)
        print("wholefile_rdd numpartitions {}".format(wholefile_rdd.getNumPartitions()))  # 2 把100个文件压缩到3个分区

        # 打印不同分区数据
        collection_rdd = sc.parallelize([1, 2, 3, 4, 5, 6], numSlices=7)
        print("collection_rdd number of partitions ", collection_rdd.getNumPartitions())
        # 6个数据7个分区,有一个分区是空的 per partition content [[], [1], [2], [3], [4], [5], [6]]
        print("per partition content", collection_rdd.glom().collect())

        # 关闭spark context
        sc.stop()


  • 二维码

    扫码加我 拉你入群

    请注明:姓名-公司-职位

    以便审核进群资格,未注明则拒绝

    相关推荐
    栏目导航
    热门文章
    推荐文章

    说点什么

    分享

    扫码加好友,拉您进群
    各岗位、行业、专业交流群