【Flink实战】Flink自定义的Source 数据源案例-并行度调整结合WebUI

2023-09-13 16:16:26

🚀 作者 :“大数据小禅”

🚀 文章简介 :【Flink实战】玩转Flink里面核心的Source Operator实战

🚀 欢迎小伙伴们 点赞👍、收藏⭐、留言💬


什么是Flink的并行度

  • Flink的并行度是指在Flink应用程序中并行执行任务的级别或程度。它决定了任务在Flink集群中的并发执行程度,即任务被划分成多少个并行的子任务。

  • 在Flink中,可以通过设置并行度来控制任务的并行执行。并行度是根据数据或计算的特性来确定的,可以根据任务的特点和所需的处理能力进行调优。

  • 将一个任务的并行度设置为N意味着将该任务分成N个并行的子任务,这些子任务可以在Flink集群的不同节点上同时执行。Flink会根据配置的并行度自动对任务进行数据切分和任务调度,以实现高效的并行处理。

  • 选择合适的并行度需要在平衡性、吞吐量和可伸缩性之间权衡。较高的并行度可以提高任务的处理能力和吞吐量,但也会增加系统的资源需求和管理成本。较低的并行度可能导致资源浪费和性能瓶颈。

  • 在设计Flink应用程序时,可以根据任务之间的依赖关系、数据流量、数据分布以及可用的资源来选择合适的并行度。可以通过调整并行度来优化任务的性能,平衡任务的负载,提高整体的处理能力。-

Flink自定义的Source 数据源案例-并行度调整结合WebUI

  • 开启webui
    取消掉默认并行度为1,因为默认的并行度是8,也就是8个线程 默认的并行度就是系统的核数在这里插入图片描述
    在这里插入图片描述
StreamExecutionEnvironment env = StreamExecutionEnvironment.createLocalEnvironmentWithWebUI(new Configuration());
  • 设置不同的并行度
    Solt的数量就是设置的最大并行度的数量
    在这里插入图片描述
    在这里插入图片描述
public static void main(String[] args) throws Exception {

        //构建执行任务环境以及任务的启动的入口, 存储全局相关的参数
        //StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        StreamExecutionEnvironment env = StreamExecutionEnvironment.createLocalEnvironmentWithWebUI(new Configuration());
        env.setRuntimeMode(RuntimeExecutionMode.AUTOMATIC);
        env.setParallelism(2);

        DataStream<VideoOrder> videoOrderDS =  env.addSource(new VideoOrderSource());

        DataStream<VideoOrder> filterDS = videoOrderDS.filter(new FilterFunction<VideoOrder>() {
            @Override
            public boolean filter(VideoOrder videoOrder) throws Exception {
                return videoOrder.getMoney()>5;
            }
        }).setParallelism(3);

        filterDS.print().setParallelism(4);

        //DataStream需要调用execute,可以取个名称
        env.execute("source job");
    }


数据流中最大的并行度,就是算子链中最大算子的数量,比如source 2个并行度,filter 4个,sink 4个,最大就是4
在这里插入图片描述
在这里插入图片描述

更多推荐

解决高并发问题

在处理项目中的高并发问题时,可以采取以下几种方法:后端处理:大部分的高并发处理是在后端进行的。可以通过优化数据库查询、增加缓存机制(如集成Redis)、使用分布式技术(如分布式缓存、分布式锁)、使用消息队列等方式来提高系统的并发处理能力。此外,还可以通过水平扩展(增加服务器数量)或垂直扩展(增加服务器的硬件配置)来提高

Kafka【命令行操作】

Kafka命令行操作Kafka主要包括三大部分:生产者、主题分区节点、消费者。1、Topic命令行操作也就是我们kafka下的脚本kafka-topics.sh的相关操作。常用命令行操作参数描述--bootstrap-server<String:servertoconnectto>连接的KafkaBroker主机名称和

【Android Framework系列】第15章 Fragment+ViewPager与Viewpager2相关原理

1前言上一章节【AndroidFramework系列】第14章Fragment核心原理(AndroidX版本)我们学习了Fragment的核心原理,本章节学习常用的Fragment+ViewPager以及Fragment+ViewPager2的相关使用和一些基本的源码分析。2Fragment+ViewPager我们常用

腾讯mini项目-【指标监控服务重构】2023-07-19

今日已办OpenTelemetryLogs通过日志记录API支持日志收集集成现有的日志记录库和日志收集工具Overview日志记录API-LoggingAPI,允许您检测应用程序并生成结构化日志旨在与其他telemertydata(例如metric和trace)配合使用,以提供统一的可观测性解决方案结构化日志记录,允许

预处理代码

一、缺失值处理删除缺失值:data1=data.dropna()#丢弃缺失值#dropna()删除缺失值所在行(axis=0)或列(axis=1),默认为axis=0补全示例数据:importpandasaspdimportnumpyasnpdata=pd.DataFrame({'name':['Bob','Mary'

一篇搞定,Kettle详细教程

文章目录第一章Kettle概述1.1Kettle发展历程1.2Kettle简介1.3Kettle相关俗语1.4Kettle设计与组成1.5Kettle功能模块1.6Kettle的执行Transformation(转换)1.7Kettle商业和社区版区别1.8数据集成与ETL1.9ETL工具比较第二章Kettle安装部署

【python基础】编写/运行hello world项目

1.编写helloworld项目编程界每种语言的第一个程序往往都是输出helloworld。因此我们来看看,如何用Python输出helloworld。1.如果你是初学者,main.py中的代码暂时是无法看懂的,所以可以把main中的源代码直接删除。如下所示这里我们要学习python的第一个知识点,print输出函数。

机器学习(14)---逻辑回归(含手写公式、推导过程和手写例题)

逻辑回归一、逻辑回归概述二、模型、策略和优化(手写)三、w和b的梯度下降公式推导四、例题分析4.1题目4.2解答一、逻辑回归概述1.逻辑回归也称作logistic回归分析,是一种广义的线性回归分析模型,属于机器学习中的监督学习。其推导过程与计算方式类似于回归的过程,但实际上主要是用来解决二分类问题(当然也可以解决多分类

项目实战— pytorch搭建CNN处理MNIST数据集

项目文件夹介绍项目文件夹CNN_MNIST_practice文件夹是整个项目的文件夹,里面存放了六个子文件夹以及四个.py程序,接下来我们分别来介绍这些文件的内容。其中minist_all_CPU.py是CPU版本的模型训练+测试程序,而minist_all_GPU.py则是GPU版本的模型训练+测试程序。minist

Spring MyBatis【一篇搞定】

SpringMyBatis​SpringMyBatis在Spring中是一个非常重要的知识,将前端传递的数据存储起来,或者查询数据库⾥⾯的数据;简单来说MyBatis是更简单完成程序和数据库交互的⼯具,也就是更简单的操作和读取数据库⼯具。文章目录SpringMyBatis一、MyBatis简介二、学习MyBatis重要

用 Github Codespaces 免费搭建本地开发测试环境

如何丝滑地白嫖一个本地开发环境?怎么新建一个代码空间?1:通过Github网页新建2:通过VSCode插件新建为代码创建相应的开发测试环境如何丝滑地白嫖一个本地开发环境?使用Codespaces为开发者解决这样的痛点:为项目设置和维护一个或一组开发工作站。在“第一次提交”发生之前浪费的时间。开发工作站之间的配置/工具/

热文推荐