计算机毕设 flink大数据淘宝用户行为数据实时分析与可视化

2023-09-18 12:42:22


0 前言

🔥 这两年开始毕业设计和毕业答辩的要求和难度不断提升,传统的毕设题目缺少创新和亮点,往往达不到毕业答辩的要求,这两年不断有学弟学妹告诉学长自己做的项目系统达不到老师的要求。

为了大家能够顺利以及最少的精力通过毕设,学长分享优质毕业设计项目,今天要分享的是

🚩 flink大数据淘宝用户行为数据实时分析与可视化

🥇学长这里给一个题目综合评分(每项满分5分)

  • 难度系数:3分
  • 工作量:3分
  • 创新点:4分

1、环境准备

1.1 flink 下载相关 jar 包

flink-sql 连接外部系统时,需要依赖特定的 jar 包,所以需要事先把这些 jar 包准备好。说明与下载入口

本项目使用到了以下的 jar 包 ,下载后直接放在了 flink/lib 里面。

需要注意的是 flink-sql 执行时,是转化为 flink-job 提交到集群执行的,所以 flink 集群的每一台机器都要添加以下的 jar 包。

外部版本jar
kafka4.1flink-sql-connector-kafka_2.11-1.10.2.jar
flink-json-1.10.2-sql-jar.jar
elasticsearch7.6flink-sql-connector-elasticsearch7_2.11-1.10.2.jar
mysql5.7flink-jdbc_2.11-1.10.2.jar
mysql-connector-java-8.0.11.jar

1.2 生成 kafka 数据

用户行为数据来源: 阿里云天池公开数据集

网盘:https://pan.baidu.com/s/1wDVQpRV7giIlLJJgRZAInQ 提取码:gja5

商品类目纬度数据来源: category.sql

数据生成器:datagen.py

有了数据文件之后,使用 python 读取文件数据,然后并发写入到 kafka。

修改生成器中的 kafka 地址配置,然后运行 以下命令,开始不断往 kafka 写数据

# 5000 并发
nohup python3 datagen.py 5000 &                  

1.3 开发前的三个小 tip

  • 生成器往 kafka 写数据,会自动创建主题,无需事先创建

  • flink 往 elasticsearch 写数据,会自动创建索引,无需事先创建

  • Kibana 使用索引模式从 Elasticsearch 索引中检索数据,以实现诸如可视化等功能。

使用的逻辑为:创建索引模式 》Discover (发现) 查看索引数据 》visualize(可视化)创建可视化图表》dashboards(仪表板)创建大屏,即汇总多个可视化的图表

2、flink-sql 客户端编写运行 sql

# 进入 flink-sql 客户端, 需要指定刚刚下载的 jar 包目录
./bin/sql-client.sh embedded -l lib

2.1 创建 kafka 数据源表

-- 创建 kafka 表, 读取 kafka 数据
CREATE TABLE user_behavior (
    user_id BIGINT,
    item_id BIGINT,
    category_id BIGINT,
    behavior STRING,
    ts TIMESTAMP(3),
    proctime as PROCTIME(),
    WATERMARK FOR ts as ts - INTERVAL '5' SECOND  
) WITH (
    'connector.type' = 'kafka', 
    'connector.version' = 'universal',  
    'connector.topic' = 'user_behavior',  
    'connector.startup-mode' = 'earliest-offset', 
    'connector.properties.zookeeper.connect' = '172.16.122.24:2181', 
    'connector.properties.bootstrap.servers' = '172.16.122.17:9092', 
    'format.type' = 'json'  
);
SELECT * FROM user_behavior;

2.2 指标统计:每小时成交量

2.2.1 创建 es 结果表, 存放每小时的成交量

CREATE TABLE buy_cnt_per_hour (
    hour_of_day BIGINT,
    buy_cnt BIGINT
) WITH (
    'connector.type' = 'elasticsearch', 
    'connector.version' = '7',  
    'connector.hosts' = 'http://172.16.122.13:9200',  
    'connector.index' = 'buy_cnt_per_hour',
    'connector.document-type' = 'user_behavior',
    'connector.bulk-flush.max-actions' = '1',
    'update-mode' = 'append',
    'format.type' = 'json'
);

2.2.2 执行 sql ,统计每小时的成交量

INSERT INTO buy_cnt_per_hour
SELECT HOUR(TUMBLE_START(ts, INTERVAL '1' HOUR)), COUNT(*)
FROM user_behavior
WHERE behavior = 'buy'
GROUP BY TUMBLE(ts, INTERVAL '1' HOUR);

2.3 指标统计:每10分钟累计独立用户数

2.3.1 创建 es 结果表,存放每10分钟累计独立用户数

CREATE TABLE cumulative_uv (
    time_str STRING,
    uv BIGINT
) WITH (
    'connector.type' = 'elasticsearch', 
    'connector.version' = '7',  
    'connector.hosts' = 'http://172.16.122.13:9200',  
    'connector.index' = 'cumulative_uv',
    'connector.document-type' = 'user_behavior',    
    'update-mode' = 'upsert',
    'format.type' = 'json'
);

2.3.2 创建视图

CREATE VIEW uv_per_10min AS
SELECT
  MAX(SUBSTR(DATE_FORMAT(ts, 'HH:mm'),1,4) || '0') OVER w AS time_str,
  COUNT(DISTINCT user_id) OVER w AS uv
FROM user_behavior
WINDOW w AS (ORDER BY proctime ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW);

2.3.3 执行 sql ,统计每10分钟的累计独立用户数

INSERT INTO cumulative_uv
SELECT time_str, MAX(uv)
FROM uv_per_10min
GROUP BY time_str;

2.4 指标统计:商品类目销量排行

2.4.1 创建商品类目维表

先在 mysql 创建一张商品类目的维表,然后配置 flink 读取 mysql。

CREATE TABLE category_dim (
    sub_category_id BIGINT,
    parent_category_name STRING
) WITH (
    'connector.type' = 'jdbc',
    'connector.url' = 'jdbc:mysql://172.16.122.25:3306/flink',
    'connector.table' = 'category',
    'connector.driver' = 'com.mysql.jdbc.Driver',
    'connector.username' = 'root',
    'connector.password' = 'root',
    'connector.lookup.cache.max-rows' = '5000',
    'connector.lookup.cache.ttl' = '10min'
);

2.4.1 创建 es 结果表,存放商品类目排行表

CREATE TABLE top_category  (
    category_name  STRING,
    buy_cnt  BIGINT
) WITH (
    'connector.type' = 'elasticsearch', 
    'connector.version' = '7',  
    'connector.hosts' = 'http://172.16.122.13:9200',  
    'connector.index' = 'top_category',
    'connector.document-type' = 'user_behavior',
    'update-mode' = 'upsert',
    'format.type' = 'json'
);

2.4.2 创建视图

CREATE VIEW rich_user_behavior AS
SELECT U.user_id, U.item_id, U.behavior, C.parent_category_name as category_name
FROM user_behavior AS U LEFT JOIN category_dim FOR SYSTEM_TIME AS OF U.proctime AS C
ON U.category_id = C.sub_category_id;

2.4.3 执行 sql , 统计商品类目销量排行

INSERT INTO top_category
SELECT category_name, COUNT(*) buy_cnt
FROM rich_user_behavior
WHERE behavior = 'buy'
GROUP BY category_name;

3、最终效果与体验心得

3.1 最终效果

整个开发过程,只用到了 flink-sql ,无需写 java 或者其它代码,就完成了这样一个实时报表。

image-20201201175438743

3.2 体验心得

3.2.1 执行

  • flink-sql 的 ddl 语句不会触发 flink-job , 同时创建的表、视图仅在会话级别有效。

  • 对于连接表的 insert、select 等操作,则会触发相应的流 job, 并自动提交到 flink 集群,无限地运行下去,直到主动取消或者 job 报错。

  • flink-sql 客户端关闭后,对于已经提交到 flink 集群的 job 不会有任何影响。

本次开发,执行了 3 个 insert , 因此打开 flink 集群面板,可以看到有 3 个无限的流 job 。即使 kafka 数据全部写入完毕,关闭 flink-sql 客户端,这个 3 个 job 都不会停止。
image-20201201175523916

3.2.2 存储

  • flnik 本身不存储业务数据,只作为流批一体的引擎存在,所以主要的用法为读取外部系统的数据,处理后,再写到外部系统。

  • flink 本身的元数据,包括表、函数等,默认情况下只是存放在内存里面,所以仅会话级别有效。但是,似乎可以存储到 Hive Metastore 中,关于这一点就留到以后再实践。

4 最后

更多推荐

软信天成:如何提高云数据仓库的数据质量?

随着云计算的深入普及、5G和边缘计算等技术带来了数据量的爆发增长,数据驱动商业运作向实时化和自动化迈进,越来越多的企业开始考虑嵌入基于云计算的企业数据仓库,以Snowflake、MicrosoftSynapseAnalytics、GoogleBigQuery、AmazonRedshift等为代表的云数据仓库或成为数据时

神经网络案例分析

神经网络(neuralnetwork)是一种模拟人脑神经思维方式的数据模型,神经网络有多种,包括BP神经网络、卷积神经网络,多层感知器MLP等,最为经典为神经网络为多层感知器MLP(Multi-LayerPerception),SPSSAU默认使用该模型。类似其它的机器学习模型(比如决策树、随机森林、支持向量机SVM等

STM32 CAN使用记录:FDCAN基础通讯

文章目录目的基础说明关键配置与代码轮询方式中断方式收发测试示例链接总结目的CAN是非常常用的一种数据总线,被广泛用在各种车辆系统中。这篇文章将对STM32中FDCAN的使用做个示例。CAN的一些基础介绍与使用可以参考下面文章:《CAN基础概念》https://blog.csdn.net/Naisu_kun/articl

贸易战的影响:跨境电商的“黑洞”风险与机遇

当今全球贸易局势充满了不确定性和动荡。贸易战的阴云笼罩下,跨境电商企业面临着前所未有的挑战,但与此同时,也蕴藏着巨大的机遇。本文将深入探讨贸易战对跨境电商的影响,以及企业在这个新现实中如何应对风险并寻找机遇。贸易战的背景贸易战是国家之间为争夺贸易利益而采取的一系列贸易政策和关税措施的冲突。近年来,中美之间的贸易战引起了

大模型股票交易-挖掘新闻和情绪价值

埃隆·马斯克(ElonMusk)的星际飞船于2023年4月20日升空后爆炸。想象一下,当时您正在观察股市,突然出现新闻,您会如何交易TSLA股票?我希望您不要与我争论,您作为交易者(而不是投资者)要做的第一件事就是摆脱现有的多头头寸并可能做空股票。让我们看看这样的交易是否有利可图。根据此链接SpaceXrocketla

飞驰的高铁-第15届蓝桥杯第一次STEMA测评Scratch真题精选

[导读]:超平老师的《Scratch蓝桥杯真题解析100讲》已经全部完成,后续会不定期解读蓝桥杯真题,这是Scratch蓝桥杯真题解析第150讲。飞驰的高铁,本题是2023年8月20日举行的第15届蓝桥杯STEMA测评Scratch编程中级组编程第2题,题目要求编程实现模拟高铁飞驰前进的效果。当按下数字1时,画面中的景

游戏创业小知识:游戏运营的步骤和流程

游戏运营是确保游戏在持续运行中保持活跃和成功的过程。以下是游戏运营的一般步骤流程:1.游戏发布前准备游戏选择:了解并熟悉游戏的核心概念、目标受众和游戏玩法。开发团队:组建开发团队,包括程序员、设计师、艺术家和声音设计师等。技术基础设施:建立游戏服务器、数据库和其他必要的技术基础设施。资金筹集:获取足够的资金来支持游戏的

原生微信小程序中进行 API 请求

原生微信小程序中进行API请求当在原生微信小程序中进行API请求时,封装请求可以提高代码的可维护性和可扩展性。在本篇博客中,我们将一步步介绍如何进一步封装请求,并添加请求超时、拦截器和请求取消功能。第一步:基本请求封装首先,我们创建一个用于发送HTTP请求的基本封装。在微信小程序中,我们使用wx.request发送请求

day41 jdk8新特性Stream流 数据库安装

流(Stream)中保存了对集合或者数组数据的操作,和集合类似,但是集合中保存的是数据。Stream不能保存数据一、创建流通过Collection对象的stream()或者parallelStream()通过Arrays类的stream(Array[]<T>)方法通过Stream接口of()iterate()gener

Java版本spring cloud + spring boot企业电子招投标系统源代码

项目说明随着公司的快速发展,企业人员和经营规模不断壮大,公司对内部招采管理的提升提出了更高的要求。在企业里建立一个公平、公开、公正的采购环境,最大限度控制采购成本至关重要。符合国家电子招投标法律法规及相关规范,以及审计监督要求;通过电子化平台提高招投标工作的公开性和透明性;通过电子化招投标,使得招标采购的质量更高、速度

计算机视觉与深度学习-经典网络解析-ZFNet-[北邮鲁鹏]

这里写目录标题ZFNet主要改进减小第一层卷积核将第二、第三个卷积层的卷积步长都设置为2增加了第三、第四个卷积层的卷积核个数ZFNetZFNet是一种基于AlexNet的模型,由MatthewD.Zeiler和RobFergus在2013年提出。相对于AlexNet,ZFNet结构与AlexNet网络结构基本一致,进行

热文推荐