【Python】PySpark 数据计算 ④ ( RDD#filter 方法 - 过滤 RDD 中的元素 | RDD#distinct 方法 - 对 RDD 中的元素去重 )

2023-08-02 21:18:24





一、RDD#filter 方法




1、RDD#filter 方法简介


RDD#filter 方法 可以 根据 指定的条件 过滤 RDD 对象中的元素 , 并返回一个新的 RDD 对象 ;

RDD#filter 方法 不会修改原 RDD 数据 ;

使用方法 :

new_rdd = old_rdd.filter(func)

上述代码中 ,

  • old_rdd 是 原始的 RDD 对象 ,
  • 调用 filter 方法 , 传入的 func 参数是一个 函数 或者 lambda 匿名函数 , 用于定义过滤条件 ,
    • func 函数返回 True , 则保留元素 ;
    • func 函数返回 False , 则删除元素 ;
  • new_rdd 是过滤后的 RDD 对象 ;

2、RDD#filter 函数语法


RDD#filter 方法 语法 :

rdd.filter(func)

上述 方法 接受一个 函数 作为参数 , 该 函数参数 定义了要过滤的条件 ; 符合条件的 元素 保留 , 不符合条件的删除 ;

下面介绍 filter 函数中的 func 函数类型参数的类型 要求 ;


func 函数 类型说明 :

(T) -> bool

传入 filter 方法中的 func 函数参数 , 其函数类型 是 接受一个 任意类型 元素作为参数 , 并返回一个布尔值 , 该布尔值的作用是表示该元素是否应该保留在新的 RDD 中 ;

  • 返回 True 保留元素 ;
  • 返回 False 删除元素 ;


3、代码示例 - RDD#filter 方法示例


下面代码中的核心代码是 :

# 创建一个包含整数的 RDD
rdd = sc.parallelize([1, 2, 3, 4, 5, 6, 7, 8, 9])

# 使用 filter 方法过滤出偶数, 删除奇数
even_numbers = rdd.filter(lambda x: x % 2 == 0)

# 输出过滤后的结果
print(even_numbers.collect())

上述代码中 , 原始代码是 1 到 9 之间的整数 ;

传入 lambda 匿名函数 , lambda x: x % 2 == 0 , 传入数字 ,

  • 如果是偶数返回 True , 保留元素 ;
  • 如果是 奇数 返回 False , 删除元素 ;

代码示例 :

"""
PySpark 数据处理
"""

# 导入 PySpark 相关包
from pyspark import SparkConf, SparkContext
# 为 PySpark 配置 Python 解释器
import os
os.environ['PYSPARK_PYTHON'] = "Y:/002_WorkSpace/PycharmProjects/pythonProject/venv/Scripts/python.exe"

# 创建 SparkConf 实例对象 , 该对象用于配置 Spark 任务
# setMaster("local[*]") 表示在单机模式下 本机运行
# setAppName("hello_spark") 是给 Spark 程序起一个名字
sparkConf = SparkConf() \
    .setMaster("local[*]") \
    .setAppName("hello_spark")

# 创建 PySpark 执行环境 入口对象
sc = SparkContext(conf=sparkConf)

# 打印 PySpark 版本号
print("PySpark 版本号 : ", sc.version)

# 创建一个包含整数的 RDD
rdd = sc.parallelize([1, 2, 3, 4, 5, 6, 7, 8, 9])

# 使用 filter 方法过滤出偶数, 删除奇数
even_numbers = rdd.filter(lambda x: x % 2 == 0)

# 输出过滤后的结果
print(even_numbers.collect())

# 停止 PySpark 程序
sc.stop()

执行结果 :

Y:\002_WorkSpace\PycharmProjects\pythonProject\venv\Scripts\python.exe Y:/002_WorkSpace/PycharmProjects/HelloPython/hello.py
23/08/02 21:07:55 WARN Shell: Did not find winutils.exe: java.io.FileNotFoundException: java.io.FileNotFoundException: HADOOP_HOME and hadoop.home.dir are unset. -see https://wiki.apache.org/hadoop/WindowsProblems
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
23/08/02 21:07:55 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
PySpark 版本号 :  3.4.1
[2, 4, 6, 8]

Process finished with exit code 0

在这里插入图片描述





二、RDD#distinct 方法




1、RDD#distinct 方法简介


RDD#distinct 方法 用于 对 RDD 中的数据进行去重操作 , 并返回一个新的 RDD 对象 ;

RDD#distinct 方法 不会修改原来的 RDD 对象 ;


使用时 , 直接调用 RDD 对象的 distinct 方法 , 不需要传入任何参数 ;

new_rdd = old_rdd.distinct()

上述代码中 , old_rdd 是原始 RDD 对象 , new_rdd 是元素去重后的新的 RDD 对象 ;


2、代码示例 - RDD#distinct 方法示例


代码示例 :

"""
PySpark 数据处理
"""

# 导入 PySpark 相关包
from pyspark import SparkConf, SparkContext
# 为 PySpark 配置 Python 解释器
import os
os.environ['PYSPARK_PYTHON'] = "Y:/002_WorkSpace/PycharmProjects/pythonProject/venv/Scripts/python.exe"

# 创建 SparkConf 实例对象 , 该对象用于配置 Spark 任务
# setMaster("local[*]") 表示在单机模式下 本机运行
# setAppName("hello_spark") 是给 Spark 程序起一个名字
sparkConf = SparkConf() \
    .setMaster("local[*]") \
    .setAppName("hello_spark")

# 创建 PySpark 执行环境 入口对象
sc = SparkContext(conf=sparkConf)

# 打印 PySpark 版本号
print("PySpark 版本号 : ", sc.version)

# 创建一个包含整数的 RDD 对象
rdd = sc.parallelize([1, 1, 2, 2, 3, 3, 3, 4, 4, 5])

# 使用 distinct 方法去除 RDD 对象中的重复元素
distinct_numbers = rdd.distinct()

# 输出去重后的结果
print(distinct_numbers.collect())

# 停止 PySpark 程序
sc.stop()

执行结果 :

Y:\002_WorkSpace\PycharmProjects\pythonProject\venv\Scripts\python.exe Y:/002_WorkSpace/PycharmProjects/HelloPython/hello.py
23/08/02 21:16:35 WARN Shell: Did not find winutils.exe: java.io.FileNotFoundException: java.io.FileNotFoundException: HADOOP_HOME and hadoop.home.dir are unset. -see https://wiki.apache.org/hadoop/WindowsProblems
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
23/08/02 21:16:35 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
PySpark 版本号 :  3.4.1
Y:\002_WorkSpace\PycharmProjects\pythonProject\venv\Lib\site-packages\pyspark\python\lib\pyspark.zip\pyspark\shuffle.py:65: UserWarning: Please install psutil to have better support with spilling
Y:\002_WorkSpace\PycharmProjects\pythonProject\venv\Lib\site-packages\pyspark\python\lib\pyspark.zip\pyspark\shuffle.py:65: UserWarning: Please install psutil to have better support with spilling
Y:\002_WorkSpace\PycharmProjects\pythonProject\venv\Lib\site-packages\pyspark\python\lib\pyspark.zip\pyspark\shuffle.py:65: UserWarning: Please install psutil to have better support with spilling
Y:\002_WorkSpace\PycharmProjects\pythonProject\venv\Lib\site-packages\pyspark\python\lib\pyspark.zip\pyspark\shuffle.py:65: UserWarning: Please install psutil to have better support with spilling
Y:\002_WorkSpace\PycharmProjects\pythonProject\venv\Lib\site-packages\pyspark\python\lib\pyspark.zip\pyspark\shuffle.py:65: UserWarning: Please install psutil to have better support with spilling
[1, 2, 3, 4, 5]

Process finished with exit code 0

在这里插入图片描述

更多推荐

Shell脚本中文英文多语言国际化和命令行批处理(bash sh cmd bat)中定义函数的简单写法

文章目录命令行脚本参考-bat命令行脚本参考-bash值得学习的知识点1.识别终端使用的语言2.函数的编写3.获取用户的输入4.bat文件老是乱码怎么办有时候为了方便别人使用,我们会选择去编写各种各样的命令行脚本:给Windows用户编写.batcmd批处理脚本,给macOS、Linux用户编写.shbashshell

鼠标、键盘、窗口监听事件

一、画笔paint//画笔publicclassTestPaint{publicstaticvoidmain(String[]args){newMyPaint().loadFrame();​}}​classMyPaintextendsFrame{​publicvoidloadFrame(){setBounds(200,

flex:1详解,以及flex:1和flex:auto的区别

什么是flex:1?在css中,我们经常可以看到这样的写法:.box{display:flex;}.item{flex:1;}这里的flex:1相当于flex:110%,它是一个简写属性,表示项目(flexitem)在弹性容器(flexcontainer)中如何伸缩。它相当于flex:110%,包含了三个子属性:fle

JDK动态代理

Java中的两种常用动态代理方式JDK动态代理和Cglib动态代理是Java中常用的实现动态代理的方式。它们都可以在运行时生成代理类,实现对目标对象的代理操作。JDK动态代理适用于接口代理,Cglib动态代理适用于类代理。Cglib动态代理Cglib动态代理是基于继承的动态代理方式。它通过生成目标类的子类来实现代理,子

3.SpringEL方法调用实例

SpringEL方法调用实例文章目录SpringEL方法调用实例介绍SpringEL在注解的形式SpringEL调用List,Map中的方法**从List中过滤元素****从Map中获取值**SpringEL在XML的形式介绍Spring表达式语言(使用SpEL)允许开发人员使用表达式来执行方法和将返回值以注入的方式到

线路板的性能和技术要求有哪些

PCB加工厂家电路板的性能和技术要求与线路板的结构类型、选用的基材有关。不同类型(刚性和挠性)、不同结构(单面、双面、多层、有或无盲孔、埋孔等)、不同基材的PCB板,性能指标是不同的。它的性能等级,与产品设计一样按使用范围通常分为三个等级,PCB厂家描述产品在复杂性、功能性要求的程度和试验、检验的频度的不同。不同性能等

java 单元测试Junit

所谓单元测试,就是针对最小的功能单元,编写测试代码对其进行正确性测试。为了测试更加方便,有一些第三方的公司或者组织提供了很好用的测试框架,给开发者使用。这里介绍一种Junit测试框架。Junit是第三方公司开源出来的,用于对代码进行单元测试的工具(IDEA已经集成了junit框架)。相比于在main方法中测试有如下几个

[设计模式] 浅谈SOLID设计原则

目录单一职责原则开闭原则里氏替换原则接口隔离原则依赖倒转原则SOLID是一个缩写词,代表以下五种设计原则单一职责原则SingleResponsibilityPrinciple,SRP开闭原则Open-ClosedPrinciple,OCP里氏替换原则LiskovSubstitutionPrinciple,LSP接口隔离

华为OD机试真题 Java 实现【路灯照明问题】【2022Q4 100分】,感谢fly晨发现这个问题,并提供更优质的算法

目录专栏导读一、题目描述二、输入描述三、输出描述四、解题思路特别鸣谢:感谢fly晨发现这个问题,并提供更优质的算法。解题思路如下:五、Java算法源码六、效果展示1、输入2、输出3、思路专栏导读本专栏收录于《华为OD机试(JAVA)真题(A卷+B卷)》。刷的越多,抽中的概率越大,每一题都有详细的答题思路、详细的代码注释

模式识别与人工智能(程序与算法)系列讲解 - 总目录

模式识别与人工智能(程序与算法)系列讲解-总目录作者:安静到无声作者简介:人工智能和硬件设计博士生、CSDN与阿里云开发者博客专家,多项比赛获奖者,发表SCI论文多篇。Thanks♪(・ω・)ノ如果觉得文章不错或能帮助到你学习,可以点赞👍收藏📁评论📒+关注哦!o( ̄▽ ̄)dლ(°◕‵ƹ′◕ლ)希望在传播知识、分享

【算法系列 | 8】深入解析查找算法之—二分查找

序言心若有阳光,你便会看见这个世界有那么多美好值得期待和向往。决定开一个算法专栏,希望能帮助大家很好的了解算法。主要深入解析每个算法,从概念到示例。我们一起努力,成为更好的自己!今天第8讲,讲一下查找算法的二分查找1基础介绍查找算法是很常见的一类问题,主要是将一组数据按照某种规则进行排序。以下是一些常见的查找算法及其应

热文推荐