搭建Flink集群、集群HA高可用以及配置历史服务器

2023-09-21 15:22:05

Flink集群搭建

集群规划

节点node01node02node03
角色JobManager
TaskManager
TaskManagerTaskManager

下载并解压安装包

wget https://repo.huaweicloud.com/apache/flink/flink-1.17.0/flink-1.17.0-bin-scala_2.12.tgz

在node01节点下载flink安装包,同时解压、重命名。

tar  -zxvf flink-1.17.0-bin-scala_2.12.tgz 
mv flink-1.17.0 flink

修改集群配置

进入flink的conf目录,修改集群配置

vim /usr/local/program/flink/conf/flink-conf.yaml

1.修改flink-conf.yaml文件

JobManager节点配置

# jobmanager.rpc.address: localhost
# jobmanager.bind-host: localhost
jobmanager.rpc.address: node01
jobmanager.bind-host: 0.0.0.0

# rest.address: localhost
# rest.bind-address: localhost
rest.address: node01
rest.bind-address: 0.0.0.0

TaskManager节点配置

# taskmanager.host: localhost
# taskmanager.bind-host: localhost

taskmanager.host: node01
taskmanager.bind-host: 0.0.0.0

注意:需要在/etc/hosts文件中配置各个节点信息

172.29.234.1	node01	node01
172.29.234.2	node02	node02
172.29.234.3	node03	node03

2.修改workers文件

指定node01、node02、node03等节点为TaskManager

# localhost
node01
node02
node03

3.修改masters文件

# localhost:8081
node01:8081

分发安装目录

node01节点安装、配置好后,将Flink安装目录分发给另外两个节点服务器。

[root@node01 program]# pwd
/usr/local/program
[root@node01 program]# ls
flink                            jdk8

[root@node01 program]# scp -r flink node02:/usr/local/program/flink

[root@node01 program]# scp -r flink node03:/usr/local/program/flink

在node02、node03节点,修改flink-conf.yaml 配置

1.node02节点

# taskmanager.host: localhost

taskmanager.host: node02

2.node03节点

# taskmanager.host: localhost

taskmanager.host: node03

启动集群

Flink附带了相关的bash脚本,可以用于启动、停止集群。

# 启动集群
./bin/start-cluster.sh

# 停止集群
./bin/stop-cluster.sh

node01节点服务器上执行start-cluster.sh脚本以启动Flink集群

[root@node01 bin]# cd /usr/local/program/flink/bin

[root@node01 bin]# ./start-cluster.sh 
Starting cluster.
Starting standalonesession daemon on host node01.
Starting taskexecutor daemon on host node01.
Starting taskexecutor daemon on host node02.
Starting taskexecutor daemon on host node03.

查看进程情况

[root@node01 bin]# jps
6788 StandaloneSessionClusterEntrypoint
7256 Jps
7116 TaskManagerRunner
[root@node02 conf]# jps
16884 TaskManagerRunner
16959 Jps
[root@node03 conf]# jps
17139 TaskManagerRunner
17214 Jps

访问Web UI

当如上所示一样后,代表启动成功,此时可以访问http://node01:8081对flink集群和任务进行监控管理。

在这里插入图片描述
注意:关闭防火墙,否则可能无法访问,或者集群的TaskManager数量、Slot数量显示异常

systemctl stop firewalld

提交任务

[root@node01 bin]# flink run ../examples/streaming/WordCount.jar

查看运行结果

[root@node01 bin]# tail flink-*-taskexecutor-*.out

也可以通过Flink的 Web UI来监视集群的状态和正在运行的作业
在这里插入图片描述

Flink集群HA高可用

概述

集群实际上只有一个JobManager,是存在单点故障的,官方提供了Standalone Cluster HA模式来实现集群高可用。

集群可以有多个JobManager,但只有一个处于active状态,其余的则处于备用状态,Flink使用 ZooKeeper来选举出Active JobManager,并依赖其来提供一致性协调服务,所以需要预先安装 ZooKeeper 。

Flink本身提供了内置ZooKeeper插件,可以直接修改conf/zoo.cfg,并且使用 /bin/start-zookeeper-quorum.sh直接启动。

集群规划

节点node01node02node03
角色JobManager
TaskManager
JobManager
TaskManager
TaskManager

配置flink

基于Flink集群的node01节点配置的情况下,修改conf/flink-conf.yaml文件,增加如下配置:

# 配置使用zookeeper来开启高可用模式
high-availability.type: zookeeper

# 配置zookeeper的地址,采用zookeeper集群时,可以使用逗号来分隔多个节点地址
high-availability.zookeeper.quorum: node01:2181,node02:2181,node03:2181

# 在zookeeper上存储flink集群元信息的路径
high-availability.zookeeper.path.root: /flink

# 集群id 放置集群的所有必需协调数据
high-availability.cluster-id: /cluster_one

# 持久化存储JobManager元数据的地址,zookeeper上存储的只是指向该元数据的指针信息
high-availability.storageDir: hdfs://node01:9000/flink/recovery

配置master、workers

修改conf/masters文件,配置master节点

node01:8081
node02:8081

修改conf/workers文件,配置worker节点

node01
node02
node03

配置ZK

编辑vim zoo.cfg文件

server.1=node01:2888:3888
server.2=node02:2888:3888
server.3=node03:2888:3888

分发安装目录

node01节点安装、配置好后,将Flink安装目录分发给另外两个节点服务器。

[root@node01 program]# pwd
/usr/local/program
[root@node01 program]# ls
flink                            jdk8

[root@node01 program]# scp -r flink node02:/usr/local/program/flink

[root@node01 program]# scp -r flink node03:/usr/local/program/flink

在node02、node03节点,修改flink-conf.yaml 配置

1.node02节点

jobmanager.rpc.address: node02

taskmanager.host: node02

2.node03节点

taskmanager.host: node03

启动HA集群

分发Flink相关配置到其他节点,然后确保Hadoop和ZooKeeper已经启动后,使用以下命令来启动集群:

[root@node01 flink]# bin/start-cluster.sh
Starting HA cluster with 2 masters.
Starting standalonesession daemon on host node01.
Starting standalonesession daemon on host node02.
Starting taskexecutor daemon on host node01.
Starting taskexecutor daemon on host node02.
Starting taskexecutor daemon on host node03.

访问http://node01:8081
在这里插入图片描述
访问http://node02:8081
在这里插入图片描述

测试

查看ZK:JobManager节点信息
在这里插入图片描述
kill node01节点上的JobManager进程

[root@node01 flink]# jps
2564 DataNode
3508 NodeManager
18741 Jps
7784 QuorumPeerMain
16666 TaskManagerRunner
2363 NameNode
16300 StandaloneSessionClusterEntrypoint
3117 ResourceManager
[root@node01 flink]# kill -9 16300

查看Active JobManager是否变化
在这里插入图片描述

Flink参数配置

flink-conf.yaml文件中有大量的配置参数,基本常见参数如下:

# jobmanager地址	
jobmanager.rpc.address: node01

# JobManagerJVM 堆内存大小,默认为 1024m 
jobmanager.heap.size: 1024m

# rpc通信端口
jobmanager.rpc.port: 6123

# 进程使用的全部内存大小,可以根据集群规模进行适当调整
jobmanager.memory.process.size:1600m

# TaskmanagerJVM 堆内存大小,默认为 1024m 
taskmanager.heap.size: 1024m

# 进程使用的全部内存大小,可以根据集群规模进行适当调整
taskmanager.memory.process.size: 1728m

# 每个TaskManager能够分配的Slot数量进行配置,默认为1 
# 通常设置为 CPU 核心的数量,或其一半
# Slot就是TaskManager中具体运行一个任务所分配的计算资源
taskmanager.numberOfTaskSlots: 1

# flink任务执行的并行度,默认为1
# 优先级低于代码中进行的并行度配置和任务提交时使用参数指定的并行度数量
parallelism.default: 1

# 重启策略
jobmanager.execution.failover-strategy: region

# 存储临时文件的路径,如果没有配置,则默认采用服务器的临时目录,如 LInux/tmp 目录
io.tmp.dirs: /tmp

参考Flink的官方手册:更多配置

配置历史服务器

概述

运行Flink job的集群一旦停止,只能去yarn或本地磁盘上查看日志,对于Job任务信息的查看、异常问题的排查非常不友好。

Flink提供了历史服务器,用来在相应的Flink集群关闭后查询已完成作业的统计信息。通过History Server可以查询这些已完成作业的统计信息,无论是正常退出还是异常退出。

Flink任务停止后,JobManager会将已经完成任务的统计信息进行存档,History Server进程则在任务停止后可以对任务统计信息进行查询。

配置

创建存储目录

[root@node01 flink]# hadoop fs -mkdir -p /logs/flink-job

在flink-config.yaml中添加如下配置

#==============================================================================
# HistoryServer
#==============================================================================

# The HistoryServer is started and stopped via bin/historyserver.sh (start|stop)

# Directory to upload completed jobs to. Add this directory to the list of
# monitored directories of the HistoryServer as well (see below).
#jobmanager.archive.fs.dir: hdfs:///completed-jobs/
jobmanager.archive.fs.dir: hdfs://node01:9000/logs/flink-job

# The address under which the web-based HistoryServer listens.
#historyserver.web.address: 0.0.0.0
historyserver.web.address: node01

# The port under which the web-based HistoryServer listens.
#historyserver.web.port: 8082
historyserver.web.port: 8082

# Comma separated list of directories to monitor for completed jobs.
#historyserver.archive.fs.dir: hdfs:///completed-jobs/
historyserver.archive.fs.dir: hdfs://node01:9000/logs/flink-job

# Interval in milliseconds for refreshing the monitored directories.
#historyserver.archive.fs.refresh-interval: 10000
historyserver.archive.fs.refresh-interval: 5000

启动、停止历史服务器

启动历史服务器

[root@node01 flink]# bin/historyserver.sh start
Starting historyserver daemon on host node01.

停止历史服务器

[root@node01 flink]# bin/historyserver.sh stop
Stopping historyserver daemon (pid: 30749) on host node01.

提交一个Job任务

[root@node01 flink]# bin/flink run -t yarn-per-job -c com.atguigu.wc.WordCountStreamUnboundedDemo  /root/FlinkTutorial-1.17-1.0-SNAPSHOT.jar

2023-06-12 23:41:00,719 INFO  org.apache.hadoop.hdfs.protocol.datatransfer.sasl.SaslDataTransferClient [] - SASL encryption trust check: localHostTrusted = false, remoteHostTrusted = false
2023-06-12 23:41:00,742 INFO  org.apache.hadoop.hdfs.protocol.datatransfer.sasl.SaslDataTransferClient [] - SASL encryption trust check: localHostTrusted = false, remoteHostTrusted = false
2023-06-12 23:41:00,761 INFO  org.apache.flink.yarn.YarnClusterDescriptor                  [] - Cannot use kerberos delegation token manager, no valid kerberos credentials provided.
2023-06-12 23:41:00,766 INFO  org.apache.flink.yarn.YarnClusterDescriptor                  [] - Submitting application master application_1686577483648_0012
2023-06-12 23:41:00,792 INFO  org.apache.hadoop.yarn.client.api.impl.YarnClientImpl        [] - Submitted application application_1686577483648_0012
2023-06-12 23:41:00,792 INFO  org.apache.flink.yarn.YarnClusterDescriptor                  [] - Waiting for the cluster to be allocated
2023-06-12 23:41:00,793 INFO  org.apache.flink.yarn.YarnClusterDescriptor                  [] - Deploying cluster, current state ACCEPTED
2023-06-12 23:41:04,565 INFO  org.apache.flink.yarn.YarnClusterDescriptor                  [] - YARN application has been deployed successfully.
2023-06-12 23:41:04,565 INFO  org.apache.flink.yarn.YarnClusterDescriptor                  [] - Found Web Interface node02:38887 of application 'application_1686577483648_0012'.
Job has been submitted with JobID cd41d983c93d8eb906c9aa899dcdefd0

访问http://node01:8088/cluster查看Hadoop
在这里插入图片描述
访问Web UI查看提交任务信息
在这里插入图片描述

查看历史Job信息

在浏览器地址栏输入:http://node01:8082 查看已经停止的 job 的统计信息
在这里插入图片描述
停止提交任务

[root@node01 flink]# bin/flink cancel -t yarn-per-job -Dyarn.application.id=application_1686577483648_0012 cd41d983c93d8eb906c9aa899dcdefd0

访问http://node01:9870/explorer.html#/logs/flink-job查看HDFS中的归档文件
在这里插入图片描述
等一段时间,几分钟后查看历史服务器
在这里插入图片描述
查看Job具体信息
在这里插入图片描述

更多推荐

【智能家居-大模型】行业专题报告:ChatGPT等大模型催化智能家居行业发展

(报告出品方/作者:华安证券,马远方)1智能家居:ChatGPT等大模型为行业发展带来新机遇1.1现状:智能家居产品的用户体验(交互能力、智能化水平)及安全性待提升智能家居:智能化的家庭系统,增加生活便利、舒适、安全。智能家居是指通过互联网技术和智能设备,将家庭设备、家庭电器、家庭安全等各种家居设施连接,实现智能控制、

计算机竞赛 深度学习+python+opencv实现动物识别 - 图像识别

文章目录0前言1课题背景2实现效果3卷积神经网络3.1卷积层3.2池化层3.3激活函数:3.4全连接层3.5使用tensorflow中keras模块实现卷积神经网络4inception_v3网络5最后0前言🔥优质竞赛项目系列,今天要分享的是🚩**基于深度学习的动物识别算法**该项目较为新颖,适合作为竞赛课题方向,学

Servlet

1Servlet1.1概念Servlet是JavaEE规范之一。规范就是接口Servlet是JavaWeb三大组件之一。三大组件分别是:Servlet程序、Filter过滤器、Listener监听器。Servlet服务于HTTP协议的服务端的一个小程序,“接收请求,解析请求,根据请求执行业务逻辑,做出响应”1.2实现功

【Spatial-Temporal Action Localization(七)】论文阅读2022年

文章目录1.TubeR:TubeletTransformerforVideoActionDetection摘要和结论引言:针对痛点和贡献模型框架TubeREncoder:TubeRDecoder:Task-SpecificHeads:2.HolisticInteractionTransformerNetworkforA

使用vue-cli搭建spa项目

目录一.什么是vue-cli二.安装vue-cli三.使用脚手架vue-cli(2.X版)来构建项目四.vue项目结构说明五.基于spa项目完成路由六.基于spa项目完成嵌套路由好啦!今天的分享就到这啦!!一.什么是vue-cliVueCLI是一个基于Vue.js的官方脚手架工具,用于快速启动、构建和管理Vue.js项

【论文阅读 05】图像异常检测研究现状综述

1图像异常检测任务图像异常检测任务根据异常的形态可以分为定性异常的分类和定量异常的定位两个类别.定性异常的分类:整体地给出是否异常的判断,无需准确定位异常的位置。如图2左上图所示,左侧代表正常图像,右侧代表异常图像,在第1行中,模型仅使用服饰数据集中衣服类型的样本进行训练,则其他类别的样本图像(鞋子等)对模型来说都是需

论文阅读 - Natural Language is All a Graph Needs

目录摘要IntroductionRelatedWork3InstructGLM3.1Preliminary3.2InstructionPromptDesign3.3节点分类的生成指令调整3.4辅助自监督链路预测4Experiments4.1ExperimentalSetup4.2MainResults4.2.1ogbn

机器学习—非零中心化、非零中心化会带来的问题

众所周知,激活函数最好具有关于零点对称的特性,不关于零点对称会导致收敛变慢。这种说法看到几次了,但对于背后的原因却一直比较模糊,今天就来捋一捋。神经元模型如图1所示是神经网络中一个典型的神经元设计,它完全仿照人类大脑中神经元之间传递数据的模式设计。大脑中,神经元通过若干树突(dendrite)的突触(synapse),

【数据结构练习】链表面试题集锦二

目录前言:1.链表分割2.相交链表3.环形链表4.环形链表II前言:数据结构想要学的好,刷题少不了,我们不仅要多刷题,还要刷好题!为此我开启了一个必做好题锦集的系列,每篇大约5题左右。此为第二篇选择题篇,该系列会不定期更新敬请期待!1.链表分割代码:publicclassPartition{publicListNode

Hadoop-sqoop

sqoop1.Sqoop简介及原理简介:Sqoop是一款开源的工具,主要用于在Hadoop(Hive)与传统的数据库(mysq1.postgresql..)间进行数据的传递,可以将一个关系型数据库(例如:MySQL,Oracle,Postgres等)中的数据导进到Hadoop的HDFS中,也可以将HDFS的数据导进到关

JVM基础知识(内存区域划分,类加载,GC垃圾回收)

目录内存区域划分JVM中的栈JVM中的堆程序计数器方法区(元数据区)给一段代码,某个变量在哪个区域上?类加载类加载时机双亲委派模型GC垃圾回收机制GC实际工作过程1.找到垃圾/判定垃圾1.可达性分析(Java中的做法)2.引用计数2.清理垃圾1.标记清除2.复制算法3.标记整理分代回收(复制算法+标记整理)内存区域划分

热文推荐