Spark【Spark SQL(四)UDF函数和UDAF函数】

2023-09-14 11:22:53

UDF 函数

        UDF 是我们用户可以自定义的函数,我们通过SparkSession对象来调用 udf 的 register(name:String,func(A1,A2,A3...)) 方法来注册一个我们自定义的函数。其中,name 是我们自定义的函数名称,func 是我们自定义的函数,它可以有很多个参数。

        通过 UDF 函数,我们可以针对某一列数据或者某单元格数据进行针对的处理。

案例 1

定义一个函数,给 Andy 的 name 字段的值前 + "Name: "。

def main(args: Array[String]): Unit = {

    val conf = new SparkConf()
    conf.setAppName("spark sql udf")
      .setMaster("local[*]")
    val spark = SparkSession.builder().config(conf).getOrCreate()
    import spark.implicits._

    val df = spark.read.json("data/sql/people.json")
    df.createOrReplaceTempView("people")

    spark.udf.register("prefixName",(name:String)=>{
      if (name.equals("Andy"))
        "Name: " + name
      else
        name
    })
    spark.sql("select prefixName(name) as name,age,sex from people").show()

    spark.stop()
  }

        这里我们定义了一个自定义的 UDF 函数:prefixName,它会判断name字段的值是否为 "Andy",如果是,就会在她的值前+"Name: "。

运行结果:

+----------+---+---+
|      name|age|sex|
+----------+---+---+
|   Michael| 30| 男|
|Name: Andy| 19| 女|
|    Justin| 19| 男|
|Bernadette| 20| 女|
|  Gretchen| 23| 女|
|     David| 27| 男|
|    Joseph| 33| 女|
|     Trish| 27| 女|
|      Alex| 33| 女|
|       Ben| 25| 男|
+----------+---+---+

UDAF 函数

        强类型的DataSet和弱类型的DataFrame都提供了相关聚合函数,如count、countDistinct、avg、max、min。

        UDAF 也就是我们用户的自定义聚合函数。聚合函数就比如 avg、sum这种函数,需要先把所有数据放到一起(缓冲区),再进行统一处理的一个函数。

        实现 UDAF 函数需要有我们自定义的聚合函数的类(主要任务就是计算),我们可以继承 UserDefinedAggregateFunction,并实现里面的八种方法,来实现弱类型的聚合函数。(Spark3.0之后就不推荐使用了,更加推荐强类型的聚合函数)

        我们可以继承Aggregator来实现强类型的聚合函数。

案例1 - 平均年龄

case 类可以直接构建对象,不需要new,因为样例类可以自动生成它的伴生对象和apply方法。

弱类型实现

import org.apache.spark.SparkConf
import org.apache.spark.sql.{Row, SparkSession}
import org.apache.spark.sql.expressions.{MutableAggregationBuffer, UserDefinedAggregateFunction}
import org.apache.spark.sql.types.{DataType, IntegerType, LongType, StructField, StructType}

/**
 * 弱类型
 */
object UDAFTest01 {
  def main(args: Array[String]): Unit = {
    val conf = new SparkConf()
    conf.setAppName("spark sql udaf")
      .setMaster("local[*]")
    val spark = SparkSession.builder().config(conf).getOrCreate()
    import spark.implicits._

    val df = spark.read.json("data/sql/people.json")
    df.createOrReplaceTempView("people")

    spark.udf.register("avgAge",new MyAvgUDAF())

    spark.sql("select avgAge(age) from people").show()

    spark.stop()
  }
}
class MyAvgUDAF extends UserDefinedAggregateFunction{

  // 输入数据的结构 IN
  override def inputSchema: StructType = {
   StructType(
     Array(StructField("age",LongType))
   )}

  // 缓冲区数据的结构 BUFFER
  override def bufferSchema: StructType = {
    StructType(
      Array(
        StructField("total",LongType),
        StructField("count",LongType)
      )
    )}

  // 函数计算结果的数据类型 OUT
  override def dataType: DataType = LongType

  // 函数的稳定性 (传入相同的参数结果是否相同)
  override def deterministic: Boolean = true

  // 缓冲区初始化
  override def initialize(buffer: MutableAggregationBuffer): Unit = {
    //这两种写法都一样
//    buffer(0) = 0L
//    buffer(1) = 0L
    //第二种方法
    buffer.update(0,0L) //total 给缓冲区的第0个数据结构-total-初始化赋值0L
    buffer.update(1,0L) //count 给缓冲区的第1个数据结构-count-初始化赋值0L
  }

  // 数据过来之后 如何更新缓冲区
  override def update(buffer: MutableAggregationBuffer, input: Row): Unit = {
    // 第一个参数代表缓冲区的第i个数据结构 0代表total 1代表count
    // 第二个参数是对第一个参数的数据结构进行重新赋值
    // buffer.getLong(0)是取出缓冲区第0个值-也就是total的值,给它+上输入的值中的第0个值(因为我们输入结构只有一个就是age:Long)
    buffer.update(0,buffer.getLong(0)+input.getLong(0))
    buffer.update(1,buffer.getLong(1)+1)  //count 每次数据过来+1
  }

  // 多个缓冲区数据合并
  override def merge(buffer1: MutableAggregationBuffer, buffer2: Row): Unit = {
    buffer1.update(0,buffer1.getLong(0)+buffer2.getLong(0))
    buffer1.update(1,buffer1.getLong(1)+buffer2.getLong(1))
  }

  // 计算结果操作
  override def evaluate(buffer: Row): Any = {
    buffer.getLong(0)/buffer.getLong(1)
  }
}

运行结果:

+-----------+
|avgage(age)|
+-----------+
|         25|
+-----------+

 

强类型实现

import org.apache.spark.SparkConf
import org.apache.spark.sql.{Encoder, Encoders, Row, SparkSession, functions}
import org.apache.spark.sql.expressions.Aggregator

/**
 * 强类型
 */
object UDAFTest02 {
  def main(args: Array[String]): Unit = {
    val conf = new SparkConf()
    conf.setAppName("spark sql udaf")
      .setMaster("local[*]")
    val spark = SparkSession.builder().config(conf).getOrCreate()
    import spark.implicits._

    val df = spark.read.json("data/sql/people.json")
    df.createOrReplaceTempView("people")

    spark.udf.register("avgAge",functions.udaf(new MyAvg_UDAF()))

    spark.sql("select avgAge(age) from people").show()

    spark.stop()
  }
}

/**
 * 自定义聚合函数类:
 *  1.继承org.apache.spark.sql.expressions.Aggregator,定义泛型:
 *    IN  : 输入数据类型 Long
 *    BUF : 缓冲区数据类型
 *    OUT : 输出数据类型 Long
 *  2.重写方法
 */
//样例类中的参数默认是 val 所以这里必须指定为var
case class Buff(var total: Long,var count: Long)
class MyAvg_UDAF extends Aggregator[Long,Buff,Long]{

  // zero: Buff zero代表这个方法是用来初始值(0值)
  // Buff是我们的case类 也就是说明这里是用来给 缓冲区进行初始化
  override def zero: Buff = {
    Buff(0L,0L)
  }

  // 根据输入数据更新缓冲区 要求返回-Buff
  override def reduce(buff: Buff, in: Long): Buff = {
    buff.total += in
    buff.count += 1
    buff
  }

  // 合并缓冲区 同样返回buff1
  override def merge(buff1: Buff, buff2: Buff): Buff = {
    buff1.total += buff2.total
    buff1.count += buff2.count
    buff1
  }

  // 计算结果
  override def finish(buff: Buff): Long = {
    buff.total/buff.count
  }

  // 网络传输需要序列化 缓冲区的编码操作 -编码
  override def bufferEncoder: Encoder[Buff] = Encoders.product

  // 输出的编码操作 -解码
  override def outputEncoder: Encoder[Long] = Encoders.scalaLong
}

运行结果:

+-----------+
|avgage(age)|
+-----------+
|         25|
+-----------+

 

早期UDAF强类型聚合函数

SQL:结构化数据查询 & DSL:面向对象查询(有对象有方法,与类型相关,所以通过DSL语句结合起来使用)

早期的UDAF强类型聚合函数使用DSL操作。

定义一个case类对应数据类型,然后通过as[对象]方法将DataFrame转为DataSet类型,然后将我们的UDAF聚合类转为列对象。

import org.apache.spark.SparkConf
import org.apache.spark.sql.{Dataset, Encoder, Encoders, Row, SparkSession, TypedColumn, functions}
import org.apache.spark.sql.expressions.{Aggregator, MutableAggregationBuffer, UserDefinedAggregateFunction}
import org.apache.spark.sql.types.{DataType, IntegerType, LongType, StructField, StructType}

/**
 * 早期的UDAF强类型聚合函数使用DSL操作
 */
object UDAFTest03 {

  def main(args: Array[String]): Unit = {
    val conf = new SparkConf()
    conf.setAppName("spark sql udaf")
      .setMaster("local[*]")
    val spark = SparkSession.builder().config(conf).getOrCreate()
    import spark.implicits._

    val df = spark.read.json("data/sql/people.json")

    val ds: Dataset[User] = df.as[User]

    // 将UDAF强类型聚合函数转为查询的类对象
    val udafCol: TypedColumn[User, Long] = new OldAvg_UDAF().toColumn
    ds.select(udafCol).show()

    spark.stop()
  }
}

/**
 * 自定义聚合函数类:
 *  1.继承org.apache.spark.sql.expressions.Aggregator,定义泛型:
 *    IN  : 输入数据类型 User
 *    BUF : 缓冲区数据类型
 *    OUT : 输出数据类型 Long
 *  2.重写方法
 */
//样例类中的参数默认是 val 所以这里必须指定为var
case class User(name: String,age: Long,sex: String)
case class Buff(var total: Long,var count: Long)
class OldAvg_UDAF extends Aggregator[User,Buff,Long]{

  // zero: Buff zero代表这个方法是用来初始值(0值)
  // Buff是我们的case类 也就是说明这里是用来给 缓冲区进行初始化
  override def zero: Buff = {
    Buff(0L,0L)
  }

  // 根据输入数据更新缓冲区 要求返回-Buff
  override def reduce(buff: Buff, in: User): Buff = {
    buff.total += in.age
    buff.count += 1
    buff
  }

  // 合并缓冲区 同样返回buff1
  override def merge(buff1: Buff, buff2: Buff): Buff = {
    buff1.total += buff2.total
    buff1.count += buff2.count
    buff1
  }

  // 计算结果
  override def finish(buff: Buff): Long = {
    buff.total/buff.count
  }

  // 网络传输需要序列化 缓冲区的编码操作 -编码
  override def bufferEncoder: Encoder[Buff] = Encoders.product

  // 输出的编码操作 -解码
  override def outputEncoder: Encoder[Long] = Encoders.scalaLong
}

运行结果:

+------------------------------------------+
|OldAvg_UDAF(com.study.spark.core.sql.User)|
+------------------------------------------+
|                                        25|
+------------------------------------------+

更多推荐

【ELFK】之zookeeper

一、Zookeeper是什么?zooleeper是一个分布式服务管理框架。存储业务服务节点元数据及信息,并复制;通知客户端在zookeeper上注册的服务节点状态,通过文件系统+通知机制1、Zookeeper工作机制Zookeeper从设计模式角度来理解是一个基于观察者模式设计的分布式服务管理框架,它负责存储和管理大家

8位单片机的优势:永不过时的选择

被广泛应用于各种智能化设备中,8位单片机以其独特的优势,成为了许多应用场景的首选。为什么它们在今天仍然具有重要的应用价值?一、8位单片机的优势:(1)成本效益8位单片机由于其制造成本相对较低,因此价格相对较为亲民。这意味着使用8位单片机可以降低整个产品的成本,提高市场竞争力。(2)可靠性高8位单片机经过多年的发展和优化

c++:三种实例化对象方式

1.隐式创建首先我们定义一个测试类classPerson{private:intage;stringname;public:Person(){cout<<"thisisconstruct~";}Person(intage,stringname){this->age=age;this->name=name;cout<<"

Django之初入门

一)Django简介1.简介Django是一个开源的PythonWeb框架,它以简洁高效的方式帮助开发者构建复杂的Web应用程序。Django采用了MVC(Model-View-Controller)的架构模式,通过强大的工具和功能,提供了一套完整的解决方案,使开发过程更加简单、快速和可扩展。Django拥有丰富的内置

安科瑞电流隔离传感器 BA穿孔交流电流变送器-安科瑞黄安南

一.产品原理和功能介绍BA系列产品应用电磁感应原理,对电网中的交流电流进行实时测量,采用精密恒流技术和线性温度补偿技术,将其隔离变换为标准的直流信号输出采用24伏或12伏安全电压供电,具有过载能力强、高精度、高隔离、高安全性、低功耗等特点,可广泛用于工业自动化领域。可以选配真有效值,也可以对漏电流进行实时测量.BA系列

Spring IOC 容器:掌握 Spring 的核心技术

Spring是一个非常流行和强大的Java开发框架,它可以帮助我们简化和优化Java项目的开发过程。Spring的核心技术之一就是IOC(InversionofControl,控制反转),它可以实现对象之间的解耦,让对象的创建和管理由Spring容器来完成,而不是由对象自己或使用对象的类来完成。这样可以提高代码的可维护

mysql限制用户登录失败次数,限制时间

mysql用户登录限制设置mysql需要进行用户登录次数限制,当使用密码登录超过3次认证链接失败之后,登录锁住一段时间,禁止登录这里使用的mysql:8.1.0这种方式不用重启数据库.配置:首先进入到mysql命令行:然后需要安装两个插件:在mysql命令行中执行:mysql>INSTALLPLUGINCONNECTI

SpringMvc根据返回值类型不同处理响应

目录一、介绍二、返回值为void(1)控制层方法三、返回值为String(1)控制层四、返回值为ModelAndView(1)控制层方法(2)jsp页面一、介绍我们可以通过控制器方法的返回值设置跳转的视图。控制器支持如void,String,ModelAndView类型。二、返回值为void返回值是void会跳转到名字

“深入理解SpringMVC的JSON数据返回和异常处理机制“

目录引言1.SpringMVC之JSON数据返回1.1导入依赖1.2配置弹簧-MVC.xml1.3@ResponseBody注解使用1.4.Jackson2.异常处理机制2.1为什么要全局异常处理2.2异常处理思路2.3SpringMVC异常分类2.4综合案例总结引言在现代Web开发中,SpringMVC是一个广泛使用

【SpringMVC】自定义注解

🎉🎉欢迎来到我的CSDN主页!🎉🎉🏅我是Java方文山,一个在CSDN分享笔记的博主。📚📚🌟在这里,我要推荐给大家我的专栏《SpringMVC》。🎯🎯🚀无论你是编程小白,还是有一定基础的程序员,这个专栏都能满足你的需求。我会用最简单易懂的语言,带你走进SpringMVC的世界,让你从零开始,一步步

设计模式:组合模式

目录组件代码实现优缺点源码中应用总结组合模式是一种结构型设计模式,用于将对象组织成树形结构,以表示“部分-整体”的层次结构。组合模式使得客户端可以统一地处理单个对象和组合对象,而不需要区分它们之间的差异。在组合模式中,有两种主要类型的对象:叶节点和组合节点。叶节点表示树结构中的最小单位,它们没有子节点。而组合节点则可以

热文推荐