【Flink实战】Flink 商品销量统计-实战Bahir Connetor实战存储 数据到Redis6.X

2023-09-13 16:29:10

🚀 作者 :“大数据小禅”

🚀 文章简介 :Flink 商品销量统计-实战Bahir Connetor实战存储 数据到Redis6.X

🚀 欢迎小伙伴们 点赞👍、收藏⭐、留言💬


Flink怎么操作Redis

  • Flink怎么操作redis?

    • 方式一:自定义sink
    • 方式二:使用connector
  • Redis Sink 核心是RedisMapper 是一个接口,使用时要编写自己的redis操作类实现这个接口中的三个方法

    • getCommandDescription 选择对应的数据结构和key名称配置
    • getKeyFromData 获取key
    • getValueFromData 获取value
  • 使用

    • 添加依赖
    <dependency>
        <groupId>org.apache.bahir</groupId>
        <artifactId>flink-connector-redis_2.11</artifactId>
        <version>1.0</version>
    </dependency>
    
  • 编码

    public class MyRedisSink implements RedisMapper<Tuple2<String, Integer>> {
        @Override
        public RedisCommandDescription getCommandDescription() {
            return new RedisCommandDescription(RedisCommand.HSET, "VIDEO_ORDER_COUNTER");
        }
    
        @Override
        public String getKeyFromData(Tuple2<String, Integer> value) {
            return value.f0;
        }
    
        @Override
        public String getValueFromData(Tuple2<String, Integer> value) {
            return value.f1.toString();
        }
    }
    

Flink 商品销量统计-转换-分组-聚合-存储自定义的Redis Sink实战

  • Redis环境说明 redis6

    • 使用docker部署redis6.x 看个人主页docker相关文章

      docker run -d  -p 6379:6379 redis
      
  • 编码实战

数据源

public class VideoOrderSource extends RichParallelSourceFunction<VideoOrder> {


    private volatile Boolean flag = true;

    private Random random = new Random();

    private static List<String> list = new ArrayList<>();
    static {
        list.add("spring boot2.x课程");
        list.add("微服务SpringCloud课程");
        list.add("RabbitMQ消息队列");
        list.add("Kafka课程");
        list.add("小滴课堂面试专题第一季");
        list.add("Flink流式技术课程");
        list.add("工业级微服务项目大课训练营");
        list.add("Linux课程");
    }


    /**
     * run 方法调用前 用于初始化连接
     * @param parameters
     * @throws Exception
     */
    @Override
    public void open(Configuration parameters) throws Exception {
        System.out.println("-----open-----");
    }

    /**
     * 用于清理之前
     * @throws Exception
     */
    @Override
    public void close() throws Exception {
        System.out.println("-----close-----");
    }


    /**
     * 产生数据的逻辑
     * @param ctx
     * @throws Exception
     */
    @Override
    public void run(SourceContext<VideoOrder> ctx) throws Exception {

        while (flag){
            Thread.sleep(1000);
            String id = UUID.randomUUID().toString();
            int userId = random.nextInt(10);
            int money = random.nextInt(100);
            int videoNum = random.nextInt(list.size());
            String title = list.get(videoNum);
            VideoOrder videoOrder = new VideoOrder(id,title,money,userId,new Date());

            ctx.collect(videoOrder);
        }


    }

    /**
     * 控制任务取消
     */
    @Override
    public void cancel() {

        flag = false;
    }
}

保存的格式与存取的方法

public class VideoOrderCounterSink implements RedisMapper<Tuple2<String, Integer>> {


    /***
     * 选择需要用到的命令,和key名称
     * @return
     */
    @Override
    public RedisCommandDescription getCommandDescription() {
        return new RedisCommandDescription(RedisCommand.HSET, "VIDEO_ORDER_COUNTER");
    }

    /**
     * 获取对应的key或者filed
     *
     * @param data
     * @return
     */
    @Override
    public String getKeyFromData(Tuple2<String, Integer> data) {

        System.out.println("getKeyFromData=" + data.f0);
        return data.f0;
    }

    /**
     * 获取对应的值
     *
     * @param data
     * @return
     */
    @Override
    public String getValueFromData(Tuple2<String, Integer> data) {
        System.out.println("getValueFromData=" + data.f1.toString());
        return data.f1.toString();
    }
}

落地

public class Flink07RedisSinkApp {

    /**
     * source
     * transformation
     * sink
     *
     * @param args
     */
    public static void main(String[] args) throws Exception {

        //构建执行任务环境以及任务的启动的入口, 存储全局相关的参数
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

        env.setRuntimeMode(RuntimeExecutionMode.AUTOMATIC);
        env.setParallelism(1);

        //数据源 source
//        DataStream<VideoOrder> ds = env.fromElements(
//                new VideoOrder("21312","java",32,5,new Date()),
//                new VideoOrder("314","java",32,5,new Date()),
//                new VideoOrder("542","springboot",32,5,new Date()),
//                new VideoOrder("42","redis",32,5,new Date()),
//                new VideoOrder("4252","java",32,5,new Date()),
//                new VideoOrder("42","springboot",32,5,new Date()),
//                new VideoOrder("554232","flink",32,5,new Date()),
//                new VideoOrder("23323","java",32,5,new Date())
//        );
        DataStream<VideoOrder> ds = env.addSource(new VideoOrderSource());



        //transformation
       DataStream<Tuple2<String,Integer>> mapDS =  ds.map(new MapFunction<VideoOrder, Tuple2<String,Integer>>() {
            @Override
            public Tuple2<String, Integer> map(VideoOrder value) throws Exception {
                return new Tuple2<>(value.getTitle(),1);
            }
        });



//        DataStream<Tuple2<String,Integer>> mapDS = ds.flatMap(new FlatMapFunction<VideoOrder, Tuple2<String,Integer>>() {
//            @Override
//            public void flatMap(VideoOrder value, Collector<Tuple2<String, Integer>> out) throws Exception {
//                out.collect(new Tuple2<>(value.getTitle(),1));
//            }
//        });


       //分组
        KeyedStream<Tuple2<String,Integer>,String> keyByDS = mapDS.keyBy(new KeySelector<Tuple2<String,Integer>, String>() {
            @Override
            public String getKey(Tuple2<String, Integer> value) throws Exception {
                return value.f0;
            }
        });

        //统计每组有多少个
        DataStream<Tuple2<String,Integer>> sumDS =  keyByDS.sum(1);

        //控制台打印
        sumDS.print();

        //单机redis
        FlinkJedisPoolConfig conf = new FlinkJedisPoolConfig.Builder().setHost("127.0.0.1").setPort(6379).build();

        sumDS.addSink(new RedisSink<>(conf,new VideoOrderCounterSink()));


        //DataStream需要调用execute,可以取个名称
        env.execute("custom redis sink job");
    }

}

在这里插入图片描述

更多推荐

matlab读写json文件

Background通常,在matlab中使用mat文件进行数据存储。MAT文件是MATLAB中用来存储数据的二进制文件格式。MAT文件可以包含各种数据类型,包括数字、矩阵、向量、结构体、字符和函数等。但是,当和其他语言有交互时,mat文件会不太方便。而json格式在许多编程语言中,包括MATLAB,都有提供解析和创建

【LQR】离散代数黎卡提方程的求解,附Matlab/python代码(笔记)

LQR的核心是设计QRN,并求解对应的黎卡提方程对于连续状态空间方程系统,先求连续LQR后离散和先离散后求离散LQR方程的结果是不一样的1.离散代数黎卡提方程注:LQR算法中含N项离散系统:在matlab里有现成的函数dlqr(),但为了搞清楚其内核,编写matlab代码展示其求解过程matlab帮助文件里的dlqr(

淘宝拍立淘插件转链和商业化图片生成接口介绍,图片搜索商品接口,按图搜索接口,图片识别商品接口介绍

淘宝拍立淘是淘宝网推出的一种搜索方式,通过拍立淘,用户可以输入文字描述或上传图片来搜索商品。拍立淘通过与淘宝网进行数据接入和授权,使用淘宝提供的API获取商品信息和操作权限,拍立淘使用图像识别技术,通过深度学习算法和计算机视觉技术,对用户拍摄的商品照片进行识别,拍立淘插件转链API用于为淘宝客提供开启拍立淘插件(根据图

基于改进莱维飞行和混沌映射的粒子群优化BP神经网络预测股票价格研究(Matlab代码实现)

💥💥💞💞欢迎来到本博客❤️❤️💥💥🏆博主优势:🌞🌞🌞博客内容尽量做到思维缜密,逻辑清晰,为了方便读者。⛳️座右铭:行百里者,半于九十。📋📋📋本文目录如下:🎁🎁🎁目录💥1概述📚2运行结果🎉3参考文献🌈4Matlab代码实现💥1概述基于改进莱维飞行和混沌映射的粒子群优化BP神经网络

SunTorque亮相GAF2023数字化智能装配工程与装备技术大会

智能扭矩系统-智能拧紧系统-智能扭矩控制-SunTorqueGAF2023数字化智能装配工程与装备技术大会在中国上海汽车会展中心盛大开幕,青创智通与装配领域、智能制造、数字化应用等相关先进智造技术的知名企业一齐亮相。本次展会,我们带来了扭矩相关解决方案,包含智能扭矩系统软件、工具存储设备、扭矩校验设备、智能手持终端、扭

neo4j下载安装配置步骤

目录一、介绍简介Neo4j和JDK版本对应二、下载官网下载直接获取三、解压缩安装四、配置环境变量五、启动测试一、介绍简介Neo4j是一款高性能的图数据库,专门用于存储和处理图形数据。它采用节点、关系和属性的图形结构,非常适用于表示和查询复杂的实体关系。Neo4j具有高性能、事务支持、可扩展性和直观的Cypher查询语言

喜报 | 亮相2023数博会,摘得首届数智金融创新大赛优秀奖

河北正定,千年古城,这里不仅有一幕幕刀光剑影,鼓角争鸣的故事,还有驰名中外的人“一寺四塔”,有宜人的气候,也有汇聚高科技的天下英雄会。图源于网络2023年9月6日,河北正定,中国国际数字经济博览会(以下简称数博会)正式开幕,坚定“工业互联网赋能千行百业”的科技信仰,奔向“数字经济引领高质量发展”的未来世界。图源于网络据

探索小程序的世界(专栏导读、基础理论)

文章导读一、为什么要学习小程序开发1.1低门槛1.2市场需求1.3创业机会1.4技术发展趋势二、专栏导读2.1实战系列2.2工具系列2.3游戏系列2.4插件系列三、基础理论3.1微信小程序简易教程框架组件API工具开发者工具项目结构3.2app.json配置pageswindowtabbar3.3App.jsonLau

k8备份与恢复-Velero

简介Velero是一款可以安全的备份、恢复和迁移Kubernetes集群资源和持久卷等资源的备份恢复软件。Velero实现的kubernetes资源备份能力,可以轻松实现Kubernetes集群的数据备份和恢复、复制kubernetes集群资源到其他kubernetes集群或者快速复制生产环境到测试环境等功能,这种备份

4. algorithm

algorithm书写1.algorithm2.algorithm2e1.algorithm在LaTeX中,要显示算法,您可以使用algorithm宏包来排版算法,并使用algorithmic宏包来编写算法的伪代码。以下是显示算法的基本步骤:导入宏包:在LaTeX文档的导言区(preamble)中,导入algorith

idea集成tomcat(Smart Tomcate插件安装)

当我们在tomcat上部署好一个webapp后,如果我们要修改代码,就需要重新进行打包和部署,但往往在工作中是需要频繁修改代码,然后再查看成果的,就需要反复的进行打包和部署的过程,这是很麻烦的通过SmartTomcate插件我们就能解决这个问题,可以直接使用idea图形化界面把代码部署到tomcat上达成“一键打包&部

热文推荐