大数据——Spark SQL

2023-09-22 10:28:57

1、Spark SQL是什么

Spark SQL是Spark中用于处理结构化数据的一个模块,前身是Shark,但本身继承了前身Hive兼容和内存列存储的一些优点。Spark SQL具有以下四个特点:

  1. 综合性(Integrated):Spark中可以加入SQL查询,也可以使用DataFrame API,其中API提供了多种语言选择,Python、R、Java和Scala都支持。
  2. 连接统一性(Uniform Data Access):使用相同的方式连接不同的数据源(Hive、Json和JDBC等等)。
  3. Hive兼容性:能够在已有数据仓库中执行SQL或者Hive查询
  4. 标准化连接(Standard Connectivity):提供了JDBC或者ODBC的数据接口,可以给其他BI工具使用。

Spark SQL的优点

  1. 代码量少:可以直接写SQL语句或者DataFrame 。
  2. 性能更高:在使用DataFrame API时,DataFrame转成RDD时,会进行代码优化,执行效率更高;Spark SQL代码的RDD还行效率比Python、Java等编写的RDD效率高。

2、DataFrame简介

Spark中DataFrame是⼀个分布式的⾏集合,可以想象为⼀个关系型数据库的表,或者⼀个带有列名的Excel表格。不过它跟RDD有以下共同之处:

  • 不可变(Immuatable):跟RDD一样,一旦创建就不能更改你,只能通过transformation生成新的DataFrame;
  • 懒加载(Lazy Evaluations):只有action才会让transformation执行;
  • 分布式(Distributed):也是分布式的。

DataFrame跟RDD的比较

DataFrameRDD
逻辑框架提供详细结构信息,例如列的名称和类型不知道类的内部结构
数据操作API更丰富、效率更高代码少时,速度更快

DataFrame API常用代码

DataFrame的API也分为transformation和action两类

  • transformation 延迟操作
  • action 立即操作

在这里插入图片描述

  1. 创建SparkSession对象
SparkSession.builder.master("local") \
... appName("Word Count") \
... getOrCreate()
# Builder 是 SparkSession 的构造器。 通过 Builder, 可以添加各种配置
# master (master)设置要链接到的spark master节点地址, 传⼊ “local” 代表本地模式, “local[4]”代表本地模式4内核运⾏
# appName (name)为Spark应⽤设置名字
# getOrCreate ()获取⼀个已经存在的 SparkSession 或者如果没有已经存在的, 创建⼀个新的SparkSession
  1. 通过SparkSession创建DataFrame
 sparkSession.createDataFrame
  1. 读取文件生成DataFrame
# json格式
spark.read.json("xxx.json")
spark.read.format('json').load('xxx.json')
# parquet格式
spark.read.parquet("xxx.parquet")
# jdbc格式
spark.read.format("jdbc").option("url","jdbc:mysql://localhost:3306/db_name")\
.option("dbtable","table_name").option("user","xxx").option("password","xxx").load()
  1. 基于RDD创建DataFrame
# rdd中读取数据
    spark = SparkSession.builder.master('local').appName('Test').getOrCreate()
    sc = spark.sparkContext
    list1 =  [('Ankit',25),('Jalfaizy',22),('saurabh',20),('Bala',26)]
    rdd = sc.parallelize(list1)
    # 添加数据列名
    people = rdd.map(lambda x:Row(name=x[0], age=int(x[1])))
    # 创建DataFrame
    df_pp = spark.createDataFrame(people)
    print(df_pp.show(2))
  1. 从CSV文件中读取数据
# # rdd中读取数据
spark = SparkSession.builder.master('local').appName('Test').getOrCreate()
df = spark.read.format('csv').option('header','true').load('iris.csv')
df.printSchema()
df.show(5)
print(df.count())
print(df.columns)
  1. 增加列、删除列和提取部分列
# 增加列
df.withColumn('newWidth', df.SepalWidth*2).show()

# 删除列
df.drop('cls').show()

# 提取部分列
df.select('SepalLength','SepalWidth').show()
  1. 统计信息、基本统计功能和分组统计
#统计信息 describe
df.describe().show()
#计算某⼀列的描述信息
df.describe('cls').show() 
# 基本统计信息
df.select('cls').distinct().count()
# 分组统计 
df.groupby('cls').agg({'SepalWidth':'mean','SepalLength':'max'}).show()
# avg(), count(), countDistinct(), first(), kurtosis(),
# max(), mean(), min(), skewness(), stddev(), stddev_pop(),
# stddev_samp(), sum(), sumDistinct(), var_pop(), var_samp() and variance()
  1. 采集数据、拆分数据集和查看两个数据集的差异
# ================采样数据 sample===========
#withReplacement:是否有放回的采样
#fraction:采样⽐例
#seed:随机种⼦
sdf = df.sample(False,0.2,100)

#设置数据⽐例将数据划分为两部分
trainDF, testDF = df.randomSplit([0.6, 0.4])

#查看两个数据集在类别上的差异 subtract,确保训练数据集覆盖了所有分类
diff_in_train_test = testDF.select('cls').subtract(trainDF.select('cls'))
diff_in_train_test.distinct().count()
  1. 自定义函数和交叉表
# 交叉表 crosstab
df.crosstab('cls','SepalLength').show()

# 自定义函数UDF
# 找到数据,做后续处理
traindf, testdf = df.randomSplit([0.7,0.3])
diff_in_train_test = testdf.select('cls').subtract(traindf.select('cls')).distinct().show()
# 找到类,整理到一个列表中
not_exist_cls = traindf.select('cls').subtract(testdf.select('cls')).distinct().rdd.map(lambda x:x[0]).collect()

# 定义一个方法
def shou_remove(x):
    if x in not_exist_cls:
        return -1
    else:
        return x
# 在RDD中可以直接定义函数,交给rdd的transformatioins⽅法进⾏执⾏
# 在DataFrame中需要通过udf将⾃定义函数封装成udf函数再交给DataFrame进⾏调⽤执⾏
from pyspark.sql.types import StringType
from pyspark.sql.functions import udf

check = udf(shou_remove, StringType())
resultdf = traindf.withColumn('new_cls',check(traindf['cls'])).filter('new_cls<>-1')

resultdf.show()
  1. 加载json的API

1)以通过反射⾃动推断DataFrame的Schema

# 1) json→RDD→DataFrame
spark = SparkSession.builder.appName('json_demo').getOrCreate()
sc = spark.sparkContext

jsonString =  [
"""{ "id" : "01001", "city" : "AGAWAM", "pop" : 15338, "state" : "MA" }""",
"""{ "id" : "01002", "city" : "CUSHMAN", "pop" : 36963, "state" : "MA" }"""
]
jsonrdd = sc.parallelize(jsonString) # json 2 rdd
jsondf = spark.read.json(jsonrdd) #rdd 2 dataframe
jsondf.printSchema()
jsondf.show()

# 2) 直接从文件中加载
spark = SparkSession.builder.appName('json_demo').getOrCreate()
sc = spark.sparkContext

jsondf = spark.read.json('zips.json')
jsondf.printSchema()
jsondf.filter(jsondf.pop>40000).show(10)
jsondf.createOrReplaceTempView('temp_table')
resulfdf = spark.sql('select * from temp_table where pop>40000')
resulfdf.show(10)

2)通过StructType对象指定Schema

spark = SparkSession.builder.appName('json_demo').getOrCreate()
    sc = spark.sparkContext
    jsonSchema = StructType([
     StructField("id", StringType(), True),
     StructField("city", StringType(), True),
     StructField("loc" , ArrayType(DoubleType())),
     StructField("pop", LongType(), True),
     StructField("state", StringType(), True)
    ])
    reader = spark.read.schema(jsonSchema)
    jsondf = reader.json('zips.json')
    jsondf.printSchema()
    jsondf.show(2)
    jsondf.filter(jsondf.pop>40000).show(10)
更多推荐

API(十)时间相关的SDK

一时间相关的SDK①时间记录的必要性1、'案发'现场的时间点2、通过时间判断'性能'3、时间的'不准确'性,日志'落盘'时间-->'缓冲区'导致延迟②使用哪些日期和时间的函数1、lua标准'时间'函数,函数'os.time'、'os.date'和'os.difftime'提供了所有日期和时间2、在openresty的世

vue3硅谷甄选01 | 使用vite创建vue3项目及项目的配置 环境准备 ESLint配置 prettier配置 husky配置 项目集成

文章目录使用vite创建vue3项目及项目的配置1.环境准备2.项目配置ESLint校验代码工具配置-js代码检测工具1.安装ESLint到开发环境devDependencies2.生成配置文件:`.eslint.cjs`**3.安装vue3环境代码校验插件**4.修改.eslintrc.cjs配置文件5.生成ESLi

接口自动化测试框架搭建全部过程

思想:1、基本目录的搭建report:静态输出目录(报告或者日志)data:静态输入目录(可以存放Excel数据,被读取的一些数据)utils:实用方法层(这里存放的是项目的公共方法,一般拿到别的项目可以直接使用,列如:读取Excel中的数据,连接数据库,)apis:接口请求层(这里封装的方法一般都是和项目有关系,列如

MySQL 权限分配

有时候,您需要查看某个用户被授予的权限以便复核。MySQL允许您使用SHOWGRANTS语句来显示分配给用户帐户或角色的权限。MySQLSHOWGRANTS语句介绍以下是SHOWGRANTS语句的基本语法:SHOWGRANTS[FOR{user|role}[USINGrole[,role]...]]在这个语法中:首先,

记录一次DLL分析实战

记录一次DLL分析实战1.VT查看分析报告2.判断文件是否加壳3.查看导入函数4.查看是否有任何其他文件或基于主机的迹象5.使用工具IDAPro进行字符串分析1.VT查看分析报告virustotal全绿,没有报毒:可以看到这个dll是32位的:下面可以看它调用的其他dll:以及它对外提供的函数接口:其中RunCmd很可

Redis简介

1.Nosql作用:应对基于海量用户和海量数据前提下的数据处理问题。​常见Nosql数据库:​RedismemcacheHBaseMongoDB​特征:可扩容,可伸缩,大数据量下高性能,灵活的数据模型,高可用2.Redis特征:1.数据间没有必然的关联关系2.内部采用单线程机制进行工作3.高性能。官方提供测试数据,50

OceanBase杨传辉传递亚运火炬:国产数据库为“智能亚运”提供稳稳支持

9月14日,亚运火炬传递到了浙江台州,OceanBase的CTO杨传辉作为火炬手交接了第89棒火炬。2010年,杨传辉作为创始成员之一参与自研原生分布式数据库OceanBase。十年磨一剑,国产数据库OceanBase交出了一张优秀的成绩单:连续10年稳定支撑双11,承受住了世界级的流量洪峰和稳定性考验;刷新过TPC-

go学习-GMP模型

GMP好理解还是GPM好理解?按照上述图,从上往下,GPM更适合理解GMP模型:Go语言运行时系统中的Goroutine、用于管理Goroutine调度的GoScheduler(P)、机器可用的逻辑处理器数量(M)。理解GPMG每个Goroutine是一个轻量级“线程”,称之为“协程”,可由Go运行时系统并发执行G与P

2023 Google 开发者大会:将大型语言模型部署到你的手机

在2022年末,不到半年时间,各家大语言模型的发展如雨后春笋,截至2023年9月,全球总共有接近100个大语言模型,可谓是百花齐放显而易见,大语言模型凭借出色的AI对话能力,已经逐渐深入各个行业2023Google开发者大会带来了AI专题,Google技术推广工程师魏巍提出“将大语言模型部署到个人终端”,关于这点,在外

[NLP] LLM---<训练中文LLama2(三)>对LLama2进行中文预料预训练

预训练预训练部分可以为两个阶段:第一阶段:冻结transformer参数,仅训练embedding,在尽量不干扰原模型的情况下适配新增的中文词向量。第二阶段:使用LoRA技术,为模型添加LoRA权重(adapter),训练embedding的同时也更新LoRA参数。第一阶段预训练由于第一阶段预训练会冻结transfor

Java基于SpringBoot的校园交友网站的设计与实现

博主介绍:✌程序员徐师兄、7年大厂程序员经历。全网粉丝30W+,Csdn博客专家、掘金/华为云/阿里云/InfoQ等平台优质作者、专注于Java技术领域和毕业项目实战✌文章目录一、效果演示二、前言介绍三、主要技术四、系统设计(部分)4.1、主要功能模块设计4.2、系统登录流程设计五、运行截图5.1、系统功能模块5.1.

热文推荐