【Python】PySpark 数据计算 ③ ( RDD#reduceByKey 函数概念 | RDD#reduceByKey 方法工作流程 | RDD#reduceByKey 语法 | 代码示例 )

2023-08-02 07:15:00





一、RDD#reduceByKey 方法




1、RDD#reduceByKey 方法概念


RDD#reduceByKey 方法 是 PySpark 中 提供的计算方法 ,

  • 首先 , 对 键值对 KV 类型 RDD 对象 数据相同 键 key 对应的 值 value 进行分组 ,
  • 然后 , 按照 开发者 提供的 算子 ( 逻辑 / 函数 ) 进行 聚合操作 ;

上面提到的 键值对 KV 型 的数据 , 指的是 二元元组 , 也就是 RDD 对象中存储的数据是 二元元组 ;

元组 可以看做为 只读列表 ;

二元元组 指的是 元组 中的数据 , 只有两个 , 如 :

("Tom", 18)
("Jerry", 12)

PySpark 中 , 将 二元元组 中

  • 第一个元素 称为 键 Key ,
  • 第二个元素 称为 值 Value ;

按照 键 Key 分组 , 就是按照 二元元组 中的 第一个元素 的值进行分组 ;

[("Tom", 18), ("Jerry", 12), ("Tom", 17), ("Jerry", 13)]

将上述列表中的 二元元组 进行分组 , 按照 二元元组 第一个元素进行分组 ,

  • ("Tom", 18)("Tom", 17) 元组分为一组 , 在这一组中 , 将 18 和 17 两个数据进行聚合 , 如 : 相加操作 , 最终聚合结果是 35 ;
  • ("Jerry", 12)("Jerry", 13) 分为一组 ;

如果 键 Key 有 A, B, C 三个 值 Value 要进行聚合 , 首先将 A 和 B 进行聚合 得到 X , 然后将 X 与 C 进行聚合得到新的值 Y ;


具体操作方法是 : 先将相同 键 key 对应的 值 value 列表中的元素进行 reduce 操作 , 返回一个减少后的值,并将该键值对存储在RDD中 ;


2、RDD#reduceByKey 方法工作流程


RDD#reduceByKey 方法 工作流程 : reduceByKey(func) ;

  • 首先 , 对 RDD 对象中的数据 分区 , 每个分区中的相同 键 key 对应的 值 value 被组成一个列表 ;
  • 然后 , 对于 每个 键 key 对应的 值 value 列表 , 使用 reduceByKey 方法提供的 函数参数 func 进行 reduce 操作 , 将列表中的元素减少为一个 ;
  • 最后 , 将减少后的 键值对 存储在新的 RDD 对象中 ;

3、RDD#reduceByKey 函数语法


RDD#reduceByKey 语法 :

reduceByKey(func, numPartitions=None)
  • func 参数 : 用于聚合的函数 ;
  • numPartitions 是可选参数 , 指定 RDD 对象的分区数 ;

传入的 func 函数的类型为 :

(V, V) -> V

V 是泛型 , 指的是任意类型 , 上面的 三个 V 可以是任意类型 , 但是必须是 相同的类型 ;

该函数 接收 两个 V 类型的参数 , 参数类型要相同 , 返回一个 V 类型的返回值 , 传入的两个参数和返回值都是 V 类型的 ;


使用 reduceByKey 方法 , 需要保证函数的

  • 可结合性 ( associativity ) : 将两个具有 相同 参数类型 和 返回类型 的方法结合在一起 , 不会改变它们的行为的性质 ; 两个方法结合使用的结果与执行顺序无关 ;
  • 可重入性 ( commutativity ) : 在多任务环境下 , 一个方法可以被多个任务调用 , 而不会出现数据竞争或状态错误的问题 ;

以便在并行计算时能够正确地聚合值列表 ;





二、代码示例 - RDD#reduceByKey 方法




1、代码示例


在下面的代码中 , 要处理的数据是 列表 , 列表元素是 二元元组 ;

[("Tom", 18), ("Tom", 3), ("Jerry", 12), ("Jerry", 21)]

对 值 Value 进行的聚合操作就是相加 , 也就是把同一个 键 Key 下的多个 Value 值 进行相加操作 ,

# 应用 reduceByKey 操作,将同一个 Key 下的 Value 相加
rdd2 = rdd.reduceByKey(lambda a, b: a + b)

代码示例 :

"""
PySpark 数据处理
"""

# 导入 PySpark 相关包
from pyspark import SparkConf, SparkContext
# 为 PySpark 配置 Python 解释器
import os
os.environ['PYSPARK_PYTHON'] = "D:/001_Develop/022_Python/Python39/python.exe"

# 创建 SparkConf 实例对象 , 该对象用于配置 Spark 任务
# setMaster("local[*]") 表示在单机模式下 本机运行
# setAppName("hello_spark") 是给 Spark 程序起一个名字
sparkConf = SparkConf() \
    .setMaster("local[*]") \
    .setAppName("hello_spark")

# 创建 PySpark 执行环境 入口对象
sparkContext = SparkContext(conf=sparkConf)

# 打印 PySpark 版本号
print("PySpark 版本号 : ", sparkContext.version)

# 将 字符串列表 转为 RDD 对象
rdd = sparkContext.parallelize([("Tom", 18), ("Tom", 3), ("Jerry", 12), ("Jerry", 21)])

# 应用 reduceByKey 操作,将同一个 Key 下的 Value 相加
rdd2 = rdd.reduceByKey(lambda a, b: a + b)

# 打印新的 RDD 中的内容
print(rdd2.collect())

# 停止 PySpark 程序
sparkContext.stop()



2、执行结果


D:\001_Develop\022_Python\Python39\python.exe D:/002_Project/011_Python/HelloPython/Client.py
23/08/01 10:16:04 WARN Shell: Did not find winutils.exe: java.io.FileNotFoundException: java.io.FileNotFoundException: HADOOP_HOME and hadoop.home.dir are unset. -see https://wiki.apache.org/hadoop/WindowsProblems
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
23/08/01 10:16:04 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
PySpark 版本号 :  3.4.1
D:\001_Develop\022_Python\Python39\Lib\site-packages\pyspark\python\lib\pyspark.zip\pyspark\shuffle.py:65: UserWarning: Please install psutil to have better support with spilling
D:\001_Develop\022_Python\Python39\Lib\site-packages\pyspark\python\lib\pyspark.zip\pyspark\shuffle.py:65: UserWarning: Please install psutil to have better support with spilling
D:\001_Develop\022_Python\Python39\Lib\site-packages\pyspark\python\lib\pyspark.zip\pyspark\shuffle.py:65: UserWarning: Please install psutil to have better support with spilling
D:\001_Develop\022_Python\Python39\Lib\site-packages\pyspark\python\lib\pyspark.zip\pyspark\shuffle.py:65: UserWarning: Please install psutil to have better support with spilling
D:\001_Develop\022_Python\Python39\Lib\site-packages\pyspark\python\lib\pyspark.zip\pyspark\shuffle.py:65: UserWarning: Please install psutil to have better support with spilling
[('Jerry', 33), ('Tom', 21)]

Process finished with exit code 0

在这里插入图片描述





三、代码示例 - 使用 RDD#reduceByKey 统计文件内容




1、需求分析


给定一个 文本文件 word.txt , 文件内容为 :

Tom Jerry
Tom Jerry Tom
Jack Jerry

在这里插入图片描述

读取文件中的内容 , 统计文件中单词的个数 ;

思路 :

  • 读取数据到 RDD 中 ,
  • 然后 按照空格分割开 再展平 , 获取到每个单词 ,
  • 根据上述单词列表 , 生成一个 二元元组 列表 , 列表中每个元素的 键 Key 为单词 , 值 Value 为 数字 1 ,
  • 对上述 二元元组 列表 进行 聚合操作 , 相同的 键 Key 对应的 值 Value 进行相加 ;

2、代码示例


首先 , 读取文件 , 将 文件转为 RDD 对象 , 该 RDD 对象中 , 列表中的元素是 字符串 类型 , 每个字符串的内容是 整行的数据 ;

# 将 文件 转为 RDD 对象
rdd = sparkContext.textFile("word.txt")
# 内容为 ['Tom Jerry', 'Tom Jerry Tom', 'Jack Jerry']

然后 , 通过 flatMap 展平文件, 先按照 空格 切割每行数据为 字符串 列表 , 然后展平数据解除嵌套 ;

# 通过 flatMap 展平文件, 先按照 空格 切割每行数据为 字符串 列表
#   然后展平数据解除嵌套
rdd2 = rdd.flatMap(lambda element: element.split(" "))
# 内容为 : ['Tom', 'Jerry', 'Tom', 'Jerry', 'Tom', 'Jack', 'Jerry']

再后 , 将 rdd 数据 的 列表中的元素 转为二元元组 , 第一个元素设置为 单词 字符串 , 第二个元素设置为 1

# 将 rdd 数据 的 列表中的元素 转为二元元组, 第二个元素设置为 1
rdd3 = rdd2.map(lambda element: (element, 1))
# 内容为 [('Tom', 1), ('Jerry', 1), ('Tom', 1), ('Jerry', 1), ('Tom', 1), ('Jack', 1), ('Jerry', 1)]

最后 , 应用 reduceByKey 操作 , 对相同 键 Key 对应的 值 Value 进行聚合操作 , 将同一个 Key 下的 Value 相加, 也就是统计 键 Key 的个数 ;

# 应用 reduceByKey 操作,
#   将同一个 Key 下的 Value 相加, 也就是统计 键 Key 的个数
rdd4 = rdd3.reduceByKey(lambda a, b: a + b)
# [('Tom', 3), ('Jack', 1), ('Jerry', 3)]

代码示例 :

"""
PySpark 数据处理
"""

# 导入 PySpark 相关包
from pyspark import SparkConf, SparkContext
# 为 PySpark 配置 Python 解释器
import os
os.environ['PYSPARK_PYTHON'] = "D:/001_Develop/022_Python/Python39/python.exe"

# 创建 SparkConf 实例对象 , 该对象用于配置 Spark 任务
# setMaster("local[*]") 表示在单机模式下 本机运行
# setAppName("hello_spark") 是给 Spark 程序起一个名字
sparkConf = SparkConf() \
    .setMaster("local[*]") \
    .setAppName("hello_spark")

# 创建 PySpark 执行环境 入口对象
sparkContext = SparkContext(conf=sparkConf)

# 打印 PySpark 版本号
print("PySpark 版本号 : ", sparkContext.version)

# 将 文件 转为 RDD 对象
rdd = sparkContext.textFile("word.txt")
print("查看文件内容 : ", rdd.collect())

# 通过 flatMap 展平文件, 先按照 空格 切割每行数据为 字符串 列表
#   然后展平数据解除嵌套
rdd2 = rdd.flatMap(lambda element: element.split(" "))
print("查看文件内容展平效果 : ", rdd2.collect())

# 将 rdd 数据 的 列表中的元素 转为二元元组, 第二个元素设置为 1
rdd3 = rdd2.map(lambda element: (element, 1))
print("转为二元元组效果 : ", rdd3.collect())

# 应用 reduceByKey 操作,
#   将同一个 Key 下的 Value 相加, 也就是统计 键 Key 的个数
rdd4 = rdd3.reduceByKey(lambda a, b: a + b)
print("最终统计单词 : ", rdd4.collect())

# 停止 PySpark 程序
sparkContext.stop()


执行结果 :

D:\001_Develop\022_Python\Python39\python.exe D:/002_Project/011_Python/HelloPython/Client.py
23/08/01 11:25:24 WARN Shell: Did not find winutils.exe: java.io.FileNotFoundException: java.io.FileNotFoundException: HADOOP_HOME and hadoop.home.dir are unset. -see https://wiki.apache.org/hadoop/WindowsProblems
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
23/08/01 11:25:24 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
PySpark 版本号 :  3.4.1
查看文件内容 :  ['Tom Jerry', 'Tom Jerry Tom', 'Jack Jerry']
查看文件内容展平效果 :  ['Tom', 'Jerry', 'Tom', 'Jerry', 'Tom', 'Jack', 'Jerry']
转为二元元组效果 :  [('Tom', 1), ('Jerry', 1), ('Tom', 1), ('Jerry', 1), ('Tom', 1), ('Jack', 1), ('Jerry', 1)]
D:\001_Develop\022_Python\Python39\Lib\site-packages\pyspark\python\lib\pyspark.zip\pyspark\shuffle.py:65: UserWarning: Please install psutil to have better support with spilling
D:\001_Develop\022_Python\Python39\Lib\site-packages\pyspark\python\lib\pyspark.zip\pyspark\shuffle.py:65: UserWarning: Please install psutil to have better support with spilling
D:\001_Develop\022_Python\Python39\Lib\site-packages\pyspark\python\lib\pyspark.zip\pyspark\shuffle.py:65: UserWarning: Please install psutil to have better support with spilling
D:\001_Develop\022_Python\Python39\Lib\site-packages\pyspark\python\lib\pyspark.zip\pyspark\shuffle.py:65: UserWarning: Please install psutil to have better support with spilling
最终统计单词 :  [('Tom', 3), ('Jack', 1), ('Jerry', 3)]

Process finished with exit code 0

在这里插入图片描述

更多推荐

【三维重建】3D Gaussian Splatting:实时的神经场渲染

文章目录摘要一、前言二、相关工作1.传统的场景重建与渲染2.神经渲染和辐射场3.基于点的渲染和辐射场4.*什么是Tile-basedrasterizer(快速光栅化)三、OVERVIEW四、可微的三维高斯Splatting五、三维高斯自适应密度控制的优化1.优化2.高斯的自适应控制六、高斯分布的快速可微光栅化器(拓展)

DevSecOps内置安全保护

前言随着DevOps的发展,DevOps大幅提升了企业应用迭代的速度。但同时,安全如果不能跟上步伐,不仅会抵消DevOps变革带来的提升,拖慢企业数字化转型进程,还会导致漏洞与风险不约而至。所以安全能力在全球范围内受到的重视越来越高,软件开发内生的安全性成为评价企业DevOps成熟度水平的重要指标。一直以来,业界长期重

自己实现 SpringMVC 底层机制 系列之-实现任务阶段 8- 完成返回 JSON 格式数据-@ResponseBody

😀前言自己实现SpringMVC底层机制系列之-实现任务阶段8-完成返回JSON格式数据-@ResponseBody🏠个人主页:尘觉主页🧑个人简介:大家好,我是尘觉,希望我的文章可以帮助到大家,您的满意是我的动力😉😉在csdn获奖荣誉:🏆csdn城市之星2名⁣⁣⁣⁣⁣⁣⁣⁣⁣⁣⁣⁣⁣⁣⁣⁣⁣⁣⁣⁣⁣⁣⁣⁣⁣

有机化学英文单词组成以及命名规则

当涉及到有机化合物的命名时,有各种不同的组成部分和功能团可以组合在一起。以下是一些常见的有机功能团和它们的英文名称:|中文|英文||--------------|--------------------||甲基|Methyl||乙基|Ethyl||酸|Acid||醇|Alcohol||醚|Ether||醛|Aldehy

ElasticSearch(三)

1.数据聚合聚合(aggregations)可以让我们极其方便的实现对数据的统计、分析、运算。例如:什么品牌的手机最受欢迎?这些手机的平均价格、最高价格、最低价格?这些手机每月的销售情况如何?实现这些统计功能的比数据库的sql要方便的多,而且查询速度非常快,可以实现近实时搜索效果。1.1.聚合的种类聚合常见的有三类:桶

大语言模型之十-Byte Pair Encoding

Tokenizer诸如GPT-3/4以及LlaMA/LlaMA2大语言模型都采用了token的作为模型的输入输出,其输入是文本,然后将文本转为token(正整数),然后从一串token(对应于文本)预测下一个token。进入OpenAI官网提供的tokenizer可以看到GPT-3tokenizer采用的方法。这里以H

面试中的数据可视化:如何用数据支持你的观点

🌷🍁博主猫头虎(🐅🐾)带您GotoNewWorld✨🍁🦄博客首页——🐅🐾猫头虎的博客🎐🐳《面试题大全专栏》🦕文章图文并茂🦖生动形象🐅简单易学!欢迎大家来踩踩~🌺🌊《IDEA开发秘籍专栏》🐾学会IDEA常用操作,工作效率翻倍~💐🌊《100天精通Golang(基础入门篇)》🐅学会Gol

中小型互联网公司如何搭建自己的技术架构

适用范围本文主要针对中小型互联网公司,特别适用于手机APP或者pc的后台架构,基本可以支撑5万日活本文会对可能用到的相关技术进行技术选型的说明,以及技术的架构介绍,技术架构的介绍课程后面有地址,可以点进去查看。技术指标说一下一些技术指标的计算过程可以作为其他同学的参考QPS,如果是5万日活,使用集中在每天的4小时,每个

记录一个爬虫过程,从基础爬虫到逆向,再到jsrpc,再到selenium,啥都包括了

这篇文章记录一下我跟一个网站的恩怨纠葛,为了爬这个网站,不断学习新知识,不断尝试,水平提高了不少。总算有点成就了,这里做一个记录,当然还是不完美,期待未来可能技术更精进,能有更好的方法吧。这个网站是:aHR0cDovL3NkLmNoaW5hdm9sdW50ZWVyLm1jYS5nb3YuY24vc3Vic2l0ZS9z

掌握Linux服务器管理技巧与容器化应用 - 从软件服务到虚拟化技术全面解析

文章目录一.Linux软件和服务1.软件包管理和软件更新1.1APT(AdvancedPackageTool)的使用1.2Yum包管理器的使用2.操作系统升级2.1Debian/Ubuntu系统的升级2.2RedHat/CentOS系统的升级二.Linux服务的安装和配置1.Apache服务器的安装和配置步骤1:安装A

使用亚马逊云科技Amazon SageMaker,为营销活动制作广告素材

广告公司可以使用生成式人工智能和文字转图像根基模型,制作创新的广告素材和内容。在本篇文案中,将演示如何使用亚马逊云科技AmazonSageMaker从现有的基本图像生成新图像,这是一项完全托管式服务,用于大规模构建、训练和部署机器学习模型。采用此解决方案,企业无论规模如何,都可以比以往更快地制作新的广告素材,而且大幅降

热文推荐