【Flink实战】Flink对接Kafka Connetor使用docker部署kafka

2023-09-13 16:40:22

🚀 作者 :“大数据小禅”

🚀 文章简介 :Flink对接Kafka Connetor第一步 使用docker部署kafka

🚀 欢迎小伙伴们 点赞👍、收藏⭐、留言💬


什么是Docker

  • Docker 是一个开源的容器化平台,用于将应用程序和其依赖的环境打包成一个独立的容器,以实现应用程序的快速部署、可移植性和可伸缩性。

0 传统的应用部署方式通常需要在目标环境中手动设置各种依赖项和配置,可能面临不同操作系统或软件版本之间的兼容性问题。而 Docker 可以通过容器的方式隔离应用程序和其依赖的环境,使得应用程序能够在任意系统上以相同的方式运行,并且不受目标环境的影响。

Docker 的主要概念包括以下几个方面:

  1. 镜像(Image):镜像是一个只读的模板,它包含了运行一个应用程序所需的所有文件系统、库和依赖项。镜像可以从一个基础镜像上构建,也可以通过 Dockerfile 中的指令一步步构建。镜像是 Docker 容器的基础。

  2. 容器(Container):容器是镜像的运行实例,它包含了应用程序以及其所需的运行环境。容器是隔离运行的,其使用的文件系统和网络资源都是独立的。容器可以快速创建、启动、停止和销毁,它提供了一种轻量级、快速和可移植的应用部署方式。

  3. Dockerfile:Dockerfile 是一个文本文件,包含了构建镜像时的一系列指令。通过 Dockerfile,可以定义容器中的文件、环境变量、运行命令等。使用 Dockerfile 可以实现自动化构建、重现性和标准化的镜像构建过程。

  4. 仓库(Repository):仓库是用于存储和共享镜像的地方。Docker Hub 是 Docker 官方提供的公共仓库,可以从中获取各种常用的镜像。也可以配置私有仓库来存储和管理自己的镜像。

Docker常用命令

  • docker安装教程查看个人主页的docker教程
    查看已经下载的镜像:

  • docker images
    删除指定的镜像:

  • docker rmi [IMAGE ID]
    其中,[IMAGE ID]是要删除的镜像的ID或名称。可以通过docker images命令查看对应的IMAGE ID。

删除所有未被使用的镜像:

  • docker image prune
    这个命令会删除所有没有被任何容器使用的镜像。
    注意:删除镜像操作无法撤销,因此请谨慎操作。

要查看正在运行的容器,可以使用以下命令:

  • docker ps
    该命令将列出当前正在运行的所有容器的详细信息,包括容器ID、镜像名称、启动时间、状态等。
    如果要查看所有的容器,包括已停止的容器,可以在命令中添加-a选项:

  • docker ps -a
    这个命令将列出所有的容器,包括正在运行和已停止的容器。

要删除一个容器,可以使用以下命令:

  • docker rm [CONTAINER ID]
    其中,[CONTAINER ID]是要删除的容器的ID。可以通过docker ps -a命令查看对应的CONTAINER ID。

Docker安装过程

cd /etc/yum.repos.d
//使用阿里云yum源
wget http://mirrors.aliyun.com/docker-ce/linux/centos/docker-ce.repo
依次执行:
查看docker安装包:yum list | grep docker
安装Docker Ce 社区版本:yum install -y docker-ce.x86_64
设置开机启动:systemctl enable docker
更新xfsprogs:yum -y update xfsprogs
启动docker:systemctl start docker
查看版本:docker version
查看详细信息:docker info

Docker部署kafka

  • 配置阿里云加速

    • 控制台->容器镜像服务->镜像加速器

在这里插入图片描述

  • 每个账号一个加速地址 https://cr.console.aliyun.com/cn-shenzhen/instances/mirrors

  • mkdir -p /etc/docker/
    vi /etc/docker/daemon.json
    
    {
      "registry-mirrors": ["https://pw7d3bwu.mirror.aliyuncs.com"]
    }
    
    重启:systemctl daemon-reload && systemctl restart docker
    
  • 快速搭建

  • #zk
    docker run -d --name zookeeper -p 2181:2181 -t wurstmeister/zookeeper
    
    #kafka
    docker run -d --name xdclass_kafka \
    -p 9092:9092 \
    -e KAFKA_BROKER_ID=0 \
    -e KAFKA_ZOOKEEPER_CONNECT=192.168.192.100:2181 \
    -e KAFKA_ADVERTISED_LISTENERS=PLAINTEXT://192.168.192.100:9092 \
    -e KAFKA_LISTENERS=PLAINTEXT://0.0.0.0:9092 wurstmeister/kafka
    
    #查看容器
    docker ps
    
    #进入容器内部,创建topic
    docker exec -it 9b5070d79dfa /bin/bash
    
    cd /opt/kafka
    bin/kafka-topics.sh --create --zookeeper 192.168.192.100:2181 --replication-factor 1 --partitions 1 --topic xdclass-topic
    
    
    #创建生产者发送消息
    bin/kafka-console-producer.sh --broker-list localhost:9092 --topic xdclass-topic
    
    
    #运行一个消费者
    bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic xdclass-topic --from-beginning
    
    #开两个窗口可测试生产者消费者成功链接
    

在这里插入图片描述

  • 测试案例
import org.apache.flink.api.common.serialization.SimpleStringSchema;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer;
public class FlilnkKafkaApp01 {
    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        Properties props = new Properties();
        //kafka地址
        props.setProperty("bootstrap.servers", "192.168.192.100:9092");
        //组名 随意指定
        props.setProperty("group.id", "test");
        FlinkKafkaConsumer<String> kafka =new FlinkKafkaConsumer<>("xdclass-topic", new SimpleStringSchema(), props);
        DataStreamSource<String> stream = env.addSource(kafka);
        stream.print();
        env.execute("FlinkKafkaApp");
    }
}

更多推荐

Vue知识系列(7)每天10个小知识点

目录系列文章目录Vue知识系列(1)每天10个小知识点Vue知识系列(2)每天10个小知识点Vue知识系列(3)每天10个小知识点Vue知识系列(4)每天10个小知识点Vue知识系列(5)每天10个小知识点Vue知识系列(6)每天10个小知识点知识点61.**Vue**过滤器的概念、作用、特性、优点、缺点、区别、使用场

如何利用 MidJourney 进行 AI 艺术创作(详细教程)

文章目录什么是MidJourney?MidJourney入门第1步:设置Discord第2步:注册订阅MidJourney了解MidJourney的工具和功能在MidJourney中进行AI艺术创作Tips小技巧简明扼要使用样式和媒介组合概念关于MidJourney的思考什么是MidJourney?Midjourney

三维展示技术让未来项目更加裸眼3D可视化展示在观众面前

三维展示系统分为三维虚拟模型和三维实体模型,三维虚拟模型多为软件制图和后期成像进行展示供人们观赏,三维实体模型为采用各种可塑性材料根据三维虚拟模型制作出的实物,相比三维虚拟模型,三维实体模型更具有收藏与展示的价值,三维实体模型可用于楼盘展示,车型展示与人物艺术展示。现有技术中三维实体模型中的三维模型展示只有简单的保护罩

云原生Kubernetes:K8S存储卷

目录一、理论1.存储卷2.emptyDir存储卷3.hostPath卷4.NFS共享存储二、实验1.emptyDir存储卷2.hostPath卷3.NFS共享存储三、问题1.生成pod一直pending四、总结一、理论1.存储卷(1)概念容器磁盘上的文件的生命周期是短暂的,这就使得在容器中运行重要应用时会出现一些问题。

[2023.09.13]: Rust Lang,避不开的所有权问题

Rust的所有权问题,在我学Rust的时候就跳过了,因为我知道这玩意儿没有场景就不好理解。没想到场景很快就出现了。在开发Yew应用组件的时候,涉及到了事件,闭包,自然就引出了所有权问题。话不多说,下面让我们直接进入代码场景,去体验并了解Rust的所有权机制吧。下面这段代码是能够正常工作的。这段代码的逻辑意图也很简单,这

Linux MQTT智能家居项目(网络基础知识)

文章目录前言一、IP和端口的作用1.IP2.端口二、路由器的转发作用三、MQTT概念总结前言本篇文章带大家来做一个LinuxMQTT智能家居项目,这个项目将会讲解到网络的基础知识和MQTT协议一些相关的知识。一、IP和端口的作用1.IPIP(InternetProtocol):IP是一种网络层协议,它负责在互联网中标识

Redis 面试题

Redis面试题Q:Redis有哪些优势?速度快,因为数据存在内存中支持丰富数据类型,支持string,list,set,sortedset,hash支持事务,操作都是原子性,所谓的原子性就是对数据的更改要么全部执行,要么全部不执行丰富的特性:可用于缓存,消息,按key设置过期时间,过期后将会自动删除单线程,单进程,采

机器学习(17)---支持向量机(SVM)

支持向量机一、概述1.1介绍1.2工作原理1.3三层理解二、sklearn.svm.SVC2.1查看数据集2.2contour函数2.3画决策边界:制作网格2.4建模画图三、非线性情况推广3.1查看数据集3.2线性画图3.3为非线性数据增加维度并绘制3D图像四、核函数一、概述1.1介绍1.支持向量机(SVM,也称为支持

【新版】系统架构设计师 - 案例分析 - 数据库设计

个人总结,仅供参考,欢迎加好友一起讨论文章目录架构-案例分析-数据库设计数据库基础数据库设计概述E-R模型概念结构设计逻辑结构设计规范化(范式)反规范化技术数据库事务并发控制索引视图物化视图存储过程触发器数据库性能优化分布式数据库系统分布式数据库特点分布透明性两阶段提交协议2PC分区分表分库分区技术数据库主从复制步骤b

Jumia、Shein流量逐渐上升,测评自养号如何实现订单突破?

Jumia是全球领先的非洲跨境电商平台,也是非洲第一家在美国主板上市的非洲科技企业。作为100%面向非洲市场的互联网公司,业务范围覆盖尼日利亚、肯尼亚等11个非洲国家。Shein是一家全球领先的时尚和生活方式在线零售商,通过按需生产的模式赋能供应商共同打造敏捷柔性供应链,从而减少浪费,并向全球消费者提供丰富且具有性价比

分布式锁的三种实现方式!

分布式锁是一种用于保证分布式系统中多个进程或线程同步访问共享资源的技术。同时它又是面试中的常见问题,所以我们本文就重点来看分布式锁的具体实现(含实现代码)。在分布式系统中,由于各个节点之间的网络通信延迟、故障等原因,可能会导致数据不一致的问题。分布式锁通过协调多个节点的行为,保证在任何时刻只有一个节点可以访问共享资源,

热文推荐