大数据-玩转数据-Flink CEP编程

2023-09-18 15:22:33

一、Flink CEP

FlinkCEP(Complex event processing for Flink) 是在Flink实现的复杂事件处理库。它可以让你在无界流中检测出特定的数据,有机会掌握数据中重要的那部分。
是一种基于动态环境中事件流的分析技术,事件在这里通常是有意义的状态变化,通过分析事件间的关系,利用过滤、关联、聚合等技术,根据事件间的时序关系和聚合关系制定检测规则,持续地从事件流中查询出符合要求的事件序列,最终分析得到更复杂的复合事件。

  1. 目标:从有序的简单事件流中发现一些高阶特征
  2. 输入:一个或多个由简单事件构成的事件流
  3. 处理:识别简单事件之间的内在联系,多个符合一定规则的简单事件构成复杂事件
  4. 输出:满足规则的复杂事件

二、Flink CEP应用场景

风险控制:对用户异常行为模式进行实时检测,当一个用户发生了不该发生的行为,判定这个用户是不是有违规操作的嫌疑。
策略营销:用预先定义好的规则对用户的行为轨迹进行实时跟踪,对行为轨迹匹配预定义规则的用户实时发送相应策略的推广。
运维监控:灵活配置多指标、多依赖来实现更复杂的监控模式。

三、CEP开发基本步骤

导入CEP相关依赖

<dependency>
    <groupId>org.apache.flink</groupId>
    <artifactId>flink-cep_${scala.binary.version}</artifactId>
    <version>${flink.version}</version>
</dependency>

代码案例

package com.lyh.flink11;

import com.lyh.bean.WaterSensor;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.cep.CEP;
import org.apache.flink.cep.PatternSelectFunction;
import org.apache.flink.cep.PatternStream;
import org.apache.flink.cep.pattern.Pattern;
import org.apache.flink.cep.pattern.conditions.SimpleCondition;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;

import java.time.Duration;
import java.util.List;
import java.util.Map;

public class Flink_CEP_S {
    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(2);
        SingleOutputStreamOperator<WaterSensor> stream = env.readTextFile("input/sensor.txt")
                .map(line -> {
                    String[] datas = line.split(",");
                    return new WaterSensor(
                            datas[0],
                            Long.valueOf(datas[1]),
                            Integer.valueOf(datas[2])
                    );
                }).assignTimestampsAndWatermarks(WatermarkStrategy.<WaterSensor>forBoundedOutOfOrderness(Duration.ofSeconds(5))
                        .withTimestampAssigner((element, timeStamp) -> element.getTs()));

        Pattern<WaterSensor, WaterSensor> sensor_1 = Pattern.<WaterSensor>begin("sensor_1")
                .where(new SimpleCondition<WaterSensor>() {
                    @Override
                    public boolean filter(WaterSensor value) throws Exception {
                        return "sensor_1".equals(value.getId());
                    }
                });
        PatternStream<WaterSensor> pattern = CEP.pattern(stream, sensor_1);
        pattern.select(new PatternSelectFunction<WaterSensor, String>() {
            @Override
            public String select(Map<String, List<WaterSensor>> map) throws Exception {
                return map.toString();
            }
        }).print();
env.execute();
    }
}

四、运行结果

在这里插入图片描述

更多推荐

润和软件HopeStage与华宇信息TAS应用中间件完成产品兼容性互认证

近日,江苏润和软件股份有限公司(以下简称“润和软件”)HopeStage操作系统与北京华宇信息技术有限公司(以下简称“华宇信息”)TAS应用中间件软件完成产品兼容性测试。测试结果表明,企业级通用操作系统HopeStageV1.0产品与TAS应用中间件软件产品可以顺利适配、相互良好兼容、稳定运行。这标志着润和软件Hope

小程序中如何查看会员卡的注册时间

会员系统是小程序中非常重要的一部分,可以帮助企业更好地管理客户,并提供更好的服务。在实际应用中,我们经常需要查看会员的注册时间,以便更好地了解客户的行为和需求。本文将介绍小程序如何查看会员的注册时间。1.找到指定的会员卡。在管理员后台->会员管理处,找到需要查看注册时间的会员卡。也支持对会员卡按卡号、手机号和等级进行搜

第十一章:Java集合

目录11.1:java集合框架概述11.2:Collection接口方法11.3:Iterator迭代器接口11.4:Collection子接口一:List11.5:Collection子接口二:Set11.6:Map接口11.7:Collections工具类11.8:增强for循环11.1:java集合框架概述Jav

网络安全(黑客)自学

前言我是去年8月22日才正式学习网络安全的,因为在国营单位工作了4年,在广东一个月工资只有5000块,而且看不到任何晋升的希望,如果想要往上走,那背后就一定要有关系才行。而且国营单位的气氛是你干的多了,领导觉得你有野心,你干的不多,领导却觉得你这个人不错。我才24周岁,实在的受不了这种工作氛围,情绪已经压制了很多久,一

Linux小程序-进度条

进度条我将实现三个版本:1简单原理版本2实际工程实践版本3c语言扩展-设计颜色首先我们需要有一些前置知识:关于行缓冲区和回车换行行缓冲区:c/c++语言会针对标准输出给我们提供默认的缓冲区,这次的角色是输出缓冲区输出的内容不会立马显示,而是放置在输出缓冲区内,只有当缓冲区刷新时我们才会看到输出的内容,而我们平时打印内容

腾讯云16核服务器性能测评_轻量和CVM配置大全

腾讯云16核服务器配置大全,CVM云服务器可选择标准型S6、标准型SA3、计算型C6或标准型S5等,目前标准型S5云服务器有优惠活动,性价比高,计算型C6云服务器16核性能更高,轻量16核32G28M带宽优惠价3468元15个月,腾讯云服务器网分享腾讯云16核CPU服务器可以选择的云服务器CVM规格列表:目录腾讯云16

七天学会C语言-第五天(函数)

1.调用有参函数有参函数是一种接受输入参数(参数值)并执行特定操作的函数。通过向函数传递参数,你可以将数据传递给函数,让函数处理这些数据并返回结果。例1:编写一程序,要求用户输入4个数字,输出前两个数中的最大数、后两个数中的最大数以及四个数中的最大数。#include<stdio.h>doublemax(doublex

jmeter学习文档

JMeter学习(一)工具简单介绍一、JMeter介绍ApacheJMeter是100%纯JAVA桌面应用程序,被设计为用于测试客户端/服务端结构的软件(例如web应用程序)。它可以用来测试静态和动态资源的性能,例如:静态文件,JavaServlet,CGIScripts,JavaObject,数据库和FTP服务器等等

【深度学习实验】线性模型(五):使用Pytorch实现线性模型:基于鸢尾花数据集,对模型进行评估(使用随机梯度下降优化器)

目录一、实验介绍二、实验环境1.配置虚拟环境2.库版本介绍三、实验内容0.导入库1.线性模型linear_model2.损失函数loss_function3.鸢尾花数据预处理4.初始化权重和偏置5.优化器6.迭代7.测试集预测8.实验结果评估9.完整代码一、实验介绍线性模型是机器学习中最基本的模型之一,通过对输入特征进

计算机组成原理——基础入门总结(二)

上一期的路径:基础入门总结(一)目录一.输入输出系统和IO控制方式二.存储系统的基本概念三.cache的基本概念和原理四.CPU的功能和基本结构五.总线概述一.输入输出系统和IO控制方式IO设备又可以被统一称为外部设备~IO接口:由被称为IO控制器、设备控制器:负责协调主机与外部设备之间的数据传输。IO控制器具有统一的

web:[HCTF 2018]WarmUp

题目点进页面,页面只有一张滑稽脸,没有其他的提示信息查看网页源代码,发现source.php,尝试访问一下跳转至该页面,页面显示为一段php代码,需要进行代码审计<?phphighlight_file(__FILE__);classemmm{publicstaticfunctioncheckFile(&$page){/

热文推荐