【Python】PySpark 数据计算 ① ( RDD#map 方法 | RDD#map 语法 | 传入普通函数 | 传入 lambda 匿名函数 | 链式调用 )

2023-07-30 21:53:18





一、RDD#map 方法




1、RDD#map 方法引入


在 PySpark 中 RDD 对象 提供了一种 数据计算方法 RDD#map 方法 ;

该 RDD#map 函数 可以对 RDD 数据中的每个元素应用一个函数 , 该 被应用的函数 ,

  • 可以将每个元素转换为另一种类型 ,
  • 也可以针对 RDD 数据的 原始元素进行 指定操作 ;

计算完毕后 , 会返回一个新的 RDD 对象 ;


2、RDD#map 语法


map 方法 , 又称为 map 算子 , 可以将 RDD 中的数据元素 逐个进行处理 , 处理的逻辑 需要用外部 通过 参数传入 map 函数 ;


RDD#map 语法 :

rdd.map(fun)

传入的 fun 是一个函数 , 其函数类型为 :

(T) -> U

上述 函数 类型 前面的 小括号 及其中的内容 , 表示 函数 的参数类型 ,

  • () 表示不传入参数 ;
  • (T) 表示传入 1 个参数 ;

同时 T 类型是 泛型 , 表示任意类型 , 也就是说 该函数的 参数 可以是任意类型的 ;


上述 函数 类型 右箭头 后面的 U , -> U 表示的是 函数 返回值类型 ,

  • (T) -> U 表示 参数 类型为 T , 返回值类型为 U , T 和 U 类型都是任意类型 , 可以是一个类型 , 也可以是不同的类型 ;
  • (T) -> T 函数类型中 , T 可以是任意类型 , 但是如果确定了参数 , 那么返回值必须也是相同的类型 ;

U 类型也是 泛型 , 表示任意类型 , 也就是说 该函数的 参数 可以是任意类型的 ;


3、RDD#map 用法


RDD#map 方法 , 接收一个 函数 作为参数 , 计算时 , 该 函数参数 会被应用于 RDD 数据中的每个元素 ;

下面的 代码 , 传入一个 lambda 匿名函数 , 将 RDD 对象中的元素都乘以 10 ;

# 将 RDD 对象中的元素都乘以 10
rdd.map(lambda x: x * 10)  

4、代码示例 - RDD#map 数值计算 ( 传入普通函数 )


在下面的代码中 ,

首先 , 创建了一个包含整数的 RDD ,

# 创建一个包含整数的 RDD
rdd = sparkContext.parallelize([1, 2, 3, 4, 5])

然后 , 使用 map() 方法将每个元素乘以 10 ;

# 为每个元素执行的函数
def func(element):
    return element * 10


# 应用 map 操作,将每个元素乘以 10
rdd2 = rdd.map(func)

最后 , 打印新的 RDD 中的内容 ;

# 打印新的 RDD 中的内容
print(rdd2.collect())

代码示例 :

"""
PySpark 数据处理
"""

# 导入 PySpark 相关包
from pyspark import SparkConf, SparkContext
# 为 PySpark 配置 Python 解释器
import os
os.environ['PYSPARK_PYTHON'] = "Y:/002_WorkSpace/PycharmProjects/pythonProject/venv/Scripts/python.exe"

# 创建 SparkConf 实例对象 , 该对象用于配置 Spark 任务
# setMaster("local[*]") 表示在单机模式下 本机运行
# setAppName("hello_spark") 是给 Spark 程序起一个名字
sparkConf = SparkConf() \
    .setMaster("local[*]") \
    .setAppName("hello_spark")

# 创建 PySpark 执行环境 入口对象
sparkContext = SparkContext(conf=sparkConf)

# 打印 PySpark 版本号
print("PySpark 版本号 : ", sparkContext.version)

# 创建一个包含整数的 RDD
rdd = sparkContext.parallelize([1, 2, 3, 4, 5])


# 为每个元素执行的函数
def func(element):
    return element * 10


# 应用 map 操作,将每个元素乘以 10
rdd2 = rdd.map(func)

# 打印新的 RDD 中的内容
print(rdd2.collect())

# 停止 PySpark 程序
sparkContext.stop()

执行结果 :

Y:\002_WorkSpace\PycharmProjects\pythonProject\venv\Scripts\python.exe Y:/002_WorkSpace/PycharmProjects/HelloPython/hello.py
23/07/30 21:39:59 WARN Shell: Did not find winutils.exe: java.io.FileNotFoundException: java.io.FileNotFoundException: HADOOP_HOME and hadoop.home.dir are unset. -see https://wiki.apache.org/hadoop/WindowsProblems
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
23/07/30 21:39:59 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
PySpark 版本号 :  3.4.1
[10, 20, 30, 40, 50]

Process finished with exit code 0

在这里插入图片描述


5、代码示例 - RDD#map 数值计算 ( 传入 lambda 匿名函数 )


在下面的代码中 ,

首先 , 创建了一个包含整数的 RDD ,

# 创建一个包含整数的 RDD
rdd = sparkContext.parallelize([1, 2, 3, 4, 5])

然后 , 使用 map() 方法将每个元素乘以 10 , 这里传入了 lambda 函数作为参数 , 该函数接受一个整数参数 element , 并返回 element * 10 ;

# 应用 map 操作,将每个元素乘以 10
rdd2 = rdd.map(lambda element: element * 10)

最后 , 打印新的 RDD 中的内容 ;

# 打印新的 RDD 中的内容
print(rdd2.collect())

代码示例 :

"""
PySpark 数据处理
"""

# 导入 PySpark 相关包
from pyspark import SparkConf, SparkContext
# 为 PySpark 配置 Python 解释器
import os
os.environ['PYSPARK_PYTHON'] = "Y:/002_WorkSpace/PycharmProjects/pythonProject/venv/Scripts/python.exe"

# 创建 SparkConf 实例对象 , 该对象用于配置 Spark 任务
# setMaster("local[*]") 表示在单机模式下 本机运行
# setAppName("hello_spark") 是给 Spark 程序起一个名字
sparkConf = SparkConf() \
    .setMaster("local[*]") \
    .setAppName("hello_spark")

# 创建 PySpark 执行环境 入口对象
sparkContext = SparkContext(conf=sparkConf)

# 打印 PySpark 版本号
print("PySpark 版本号 : ", sparkContext.version)

# 创建一个包含整数的 RDD
rdd = sparkContext.parallelize([1, 2, 3, 4, 5])

# 应用 map 操作,将每个元素乘以 10
rdd2 = rdd.map(lambda element: element * 10)

# 打印新的 RDD 中的内容
print(rdd2.collect())

# 停止 PySpark 程序
sparkContext.stop()

执行结果 :

Y:\002_WorkSpace\PycharmProjects\pythonProject\venv\Scripts\python.exe Y:/002_WorkSpace/PycharmProjects/HelloPython/hello.py
23/07/30 21:46:53 WARN Shell: Did not find winutils.exe: java.io.FileNotFoundException: java.io.FileNotFoundException: HADOOP_HOME and hadoop.home.dir are unset. -see https://wiki.apache.org/hadoop/WindowsProblems
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
23/07/30 21:46:54 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
PySpark 版本号 :  3.4.1
[10, 20, 30, 40, 50]

Process finished with exit code 0

在这里插入图片描述


6、代码示例 - RDD#map 数值计算 ( 链式调用 )


在下面的代码中 , 先对 RDD 对象中的每个元素数据都乘以 10 , 然后再对计算后的数据每个元素加上 5 , 最后对最新的计算数据每个元素除以 2 , 整个过程通过函数式编程 , 链式调用完成 ;

核心代码如下 :

# 创建一个包含整数的 RDD
rdd = sparkContext.parallelize([1, 2, 3, 4, 5])

# 应用 map 操作,将每个元素乘以 10
rdd2 = rdd.map(lambda element: element * 10)\
    .map(lambda element: element + 5)\
    .map(lambda element: element / 2)

# 打印新的 RDD 中的内容
print(rdd2.collect())

代码示例 :

"""
PySpark 数据处理
"""

# 导入 PySpark 相关包
from pyspark import SparkConf, SparkContext
# 为 PySpark 配置 Python 解释器
import os
os.environ['PYSPARK_PYTHON'] = "Y:/002_WorkSpace/PycharmProjects/pythonProject/venv/Scripts/python.exe"

# 创建 SparkConf 实例对象 , 该对象用于配置 Spark 任务
# setMaster("local[*]") 表示在单机模式下 本机运行
# setAppName("hello_spark") 是给 Spark 程序起一个名字
sparkConf = SparkConf() \
    .setMaster("local[*]") \
    .setAppName("hello_spark")

# 创建 PySpark 执行环境 入口对象
sparkContext = SparkContext(conf=sparkConf)

# 打印 PySpark 版本号
print("PySpark 版本号 : ", sparkContext.version)

# 创建一个包含整数的 RDD
rdd = sparkContext.parallelize([1, 2, 3, 4, 5])

# 应用 map 操作,将每个元素乘以 10
rdd2 = rdd.map(lambda element: element * 10)\
    .map(lambda element: element + 5)\
    .map(lambda element: element / 2)

# 打印新的 RDD 中的内容
print(rdd2.collect())

# 停止 PySpark 程序
sparkContext.stop()

执行结果 :

Y:\002_WorkSpace\PycharmProjects\pythonProject\venv\Scripts\python.exe Y:/002_WorkSpace/PycharmProjects/HelloPython/hello.py
23/07/30 21:50:29 WARN Shell: Did not find winutils.exe: java.io.FileNotFoundException: java.io.FileNotFoundException: HADOOP_HOME and hadoop.home.dir are unset. -see https://wiki.apache.org/hadoop/WindowsProblems
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
23/07/30 21:50:29 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
PySpark 版本号 :  3.4.1
[7.5, 12.5, 17.5, 22.5, 27.5]

Process finished with exit code 0

在这里插入图片描述

更多推荐

基于TensorFlow+CNN+协同过滤算法的智能电影推荐系统——深度学习算法应用(含微信小程序、ipynb工程源码)+MovieLens数据集(一)

目录前言总体设计系统整体结构图系统流程图运行环境Python环境TensorFlow环境方法一方法二后端服务器Django环境配置微信小程序环境相关其它博客工程源代码下载其它资料下载前言本项目专注于MovieLens数据集,并采用TensorFlow中的2D文本卷积网络模型。它结合了协同过滤算法来计算电影之间的余弦相似

人工智能安全-5-网络入侵检测

0提纲概述数据集数据预处理特征工程天池AI上的实践棘手问题1概述入侵检测是网络安全中的经典问题,入侵是指攻击者违反系统安全策略,试图破坏计算资源的完整性、机密性或可用性的任何行为。不管是哪种类型的入侵检测系统(IDS),其工作过程大体是相同的,可以分为三个主要的环节,即信息收集、分类检测和决策,其中,分类检测和决策环节

Git操作

前期准备:1、安装2、身份认证gitconfig--globaluser.name"your_username"gitconfig--globaluser.emailyour_email@domain.comgitconfig--list查看所有配置常见的Git命令gitstatus.命令用于显示工作目录和暂存区的状态

React useRequest解读

源码结构:可以看到虽然是一个hooks(具有一定功能且具备状态的单一函数)但是各种文件功能分得也是很细的,方便抽离和复用useRequest.ts抽离的原则还是单一功能原则可以看出真正的hooks实现是在Implement里对于类型type的引入我们需要显示声明这是importtypeuseRequestImpleme

华清 Qt day5 9月21

QT+=coreguisqlnetwork/*****************************************************************/#ifndefWIDGET_H#defineWIDGET_H#include<QWidget>#include<QWidget>#include

为何学linux及用处

目前企业使用的操作系统无非就是国产类的,windows和linux类。我们要提升自己的技能,需要学习这两款。我记得在大学时期,学习过windows以及linux,但当时觉得又不常用,就学的模棱两可。毕业之后,你会发现,其实这两种操作系统是很主流的。为什么学?下面就是一些工作中遇到的例子分享一下。我记得在企业中有次遇到数

【python第7课 实例,类】

文章目录一、实例1.1实例的变量1.2实例方法1.3构造方法1.4析构函数1.4预置实例属性:二,类1.1类变量1.2类方法1.3静态方法1.4类属性的增删改查一、实例1.1实例的变量使用示例classdog:def__init__(self,k,c,a):self.kinds=kself.color=cself.ag

【Hash表】两数之和-力扣 1 题

💝💝💝欢迎来到我的博客,很高兴能够在这里和您见面!希望您在这里可以感受到一份轻松愉快的氛围,不仅可以获得有趣的内容和知识,也可以畅所欲言、分享您的想法和见解。推荐:kuan的首页,持续学习,不断总结,共同进步,活到老学到老导航檀越剑指大厂系列:全面总结java核心技术点,如集合,jvm,并发编程redis,kaf

Docker笔记

安装卸载旧版本以及相关的依赖项sudoyumremovedocker\docker-client\docker-client-latest\docker-common\docker-latest\docker-latest-logrotate\docker-logrotate\docker-engine安装所需的软件包

leetcode分类刷题:二叉树(一、简单的层序遍历)

二叉树的深度优先遍历题目是让我有点晕,先把简单的层序遍历总结下吧:配合队列进行的层序遍历在逻辑思维上自然直观,不容易出错102.二叉树的层序遍历本题是二叉树的层序遍历模板:每次循环将一层节点出队,再将一层节点入队,也是所有可用层序遍历解二叉树题目的模板,只需要在模板里稍加改动即可解题fromtypingimportLi

Docker学习大纲

Docker是一个用于自动部署应用程序在轻量级容器中的平台。下面列出一些Docker的基础和必知概念。1.容器(Containers)容器是独立的应用程序运行环境。命令:dockerrunhello-world解析:该命令会从DockerHub下载一个叫做“hello-world”的镜像,并运行一个容器。2.镜像(Im

热文推荐