(二十八)大数据实战——Flume数据采集之kafka数据生产与消费集成案例

2023-09-14 00:03:56

前言

本节内容我们主要介绍一下flume数据采集和kafka消息中间键的整合。通过flume监听nc端口的数据,将数据发送到kafka消息的first主题中,然后在通过flume消费kafka中的主题消息,将消费到的消息打印到控制台上。集成使用flume作为kafka的生产者和消费者。关于nc工具、flume以及kafka的安装部署,这里不在赘述,请读者查看作者往期博客内容。整体架构如下:

正文

  • 启动Kafka集群,创建first主题

- 启动Kafka集群

- 创建first主题

kafka-topics.sh --bootstrap-server hadoop101:9092 --create --topic first --partitions 3 --replication-factor 3

- 查看first主题详情

kafka-topics.sh --bootstrap-server hadoop101:9092 --describe --topic first

  • 在hadoop101服务器flume安装目录/opt/module/apache-flume-1.9.0/job下创建nc监听服务

 - 创建nc监听的flume任务:job-netcat-flume-kafka.conf

# 1 组件定义
a1.sources = r1
a1.sinks = k1
a1.channels = c1
# 2 配置 source
a1.sources.r1.type = netcat
a1.sources.r1.bind = hadoop101
a1.sources.r1.port = 1111
# 3 配置 channel
a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100
# 4 配置 sink
a1.sinks.k1.type = org.apache.flume.sink.kafka.KafkaSink
a1.sinks.k1.kafka.bootstrap.servers = hadoop101:9092,hadoop102:9092,hadoop103:9092
a1.sinks.k1.kafka.topic = first
a1.sinks.k1.kafka.flumeBatchSize = 20
a1.sinks.k1.kafka.producer.acks = 1
a1.sinks.k1.kafka.producer.linger.ms = 1
# 5 拼接组件
a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1

  • 在hadoop102服务器flume安装目录/opt/module/apache-flume-1.9.0/job下创建kafka监听r任务

-  创建kafka监听的flume任务:job-kafka-flume-console.conf

# 1 组件定义
a1.sources = r1
a1.sinks = k1
a1.channels = c1
# 2 配置 source
a1.sources.r1.type = org.apache.flume.source.kafka.KafkaSource
a1.sources.r1.batchSize = 50
a1.sources.r1.batchDurationMillis = 200
a1.sources.r1.kafka.bootstrap.servers = hadoop101:9092,hadoop102:9092,hadoop103:9092
a1.sources.r1.kafka.topics = first
a1.sources.r1.kafka.consumer.group.id = custom.g.id
# 3 配置 channel
a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100
# 4 配置 sink
a1.sinks.k1.type = logger
# 5 拼接组件
a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1

  • 在hadoop102服务器启动kafka监听任务job-kafka-flume-console.conf

- 启动job-kafka-flume-console.conf任务

bin/flume-ng agent -c conf/ -n a1 -f job/job-kafka-flume-console.conf -Dflume.root.logger=INFO,console

  •  在hadoop101服务器启动nc监听任务job-netcat-flume-kafka.conf

 - 启动job-netcat-flume-kafka.conf任务

bin/flume-ng agent -c conf/ -n a1 -f job/job-netcat-flume-kafka.conf -Dflume.root.logger=INFO,console

  •  使用netcat工具发送数据到nc服务1111端口

- 发送nc消息

  • 查看结果 

- 控制台结果

结语

该案例证明了flume1成功采集到了nc监听端口的数据,并将数据发送到了kafka主题first中,flume2成功从kafka主题中消费到了数据并打印到了控制台。关于Flume数据采集之kafka数据生产与消费的集成案例到这里就结束了,我们下期见。。。。。。

更多推荐

【 Ubuntu】systemd服务自启

要在Ubuntu启动后执行一个守护脚本,您可以使用Shell脚本编写一个systemd服务单元。systemd是Ubuntu中常用的服务管理工具,可以在系统启动时自动启动和管理服务。下面是一个示例的守护脚本和systemd服务单元的步骤:创建守护脚本:创建一个Shell脚本文件,例如mydaemon.sh,并在其中编写

markdown学习笔记

markdown学习笔记1.文字(依靠HTML)1.1文字缩进-空格转义符单字符空:&emsp;半字符空:&ensp;1.2文字对齐「居中:」<center>居中</center>or<palign="center">居中</p>「左对齐:」<palign="left">左对齐</p>「右对齐:」<palign="ri

VUE build:gulp打包:测试、正式环境

目录项目结构GulpVUE使用GulpVue安装GulpVue定义Gulp.jspackage.jsonbuild文件夹config文件夹static-config文件夹项目结构GulpGulp是一个自动化构建工具,可以帮助前端开发者通过自动化任务来管理工作流程。Gulp使用Node.js的代码编写,可以更加灵活地管理

STM32 基础学习——GPIO位结构(江科大老师教程)

一、GPIO内部结构1、GPIO外设名称是由GPIOA、GPIOB、GPIOC等命名,共有16个引脚2、每个GPIO模块内,主要包含了寄存器和驱动器这些东西3、寄存器写1,对应的端口就是高电平。写0,对应的端口就是低电平4、寄存器只负责存储数据这是GPIO结构图,总体来说上半部分是输入部分,下半部分是输出部分这是部分是

Bartender for Mac菜单栏图标自定义

Bartender是一款可以帮助用户更好地管理和组织菜单栏图标的macOS软件。它允许用户隐藏和重新排列菜单栏图标,从而减少混乱和杂乱。以下是Bartender的主要特点:菜单栏图标隐藏:Bartender允许用户隐藏菜单栏图标,只在需要时显示。这样可以减少菜单栏的拥挤和视觉干扰,使界面更加整洁和专注。可自定义的菜单栏

Layui快速入门之第九节 表格事件的使用

目录一:事件二:头部工具栏事件三:排序切换事件四:列拖拽宽度后的事件五:列筛选(显示或隐藏)后的事件六:行单击和双击事件七:行右键菜单事件八:单元格编辑事件九:单元格工具事件十:复选框事件十一:单选框事件十二:尾部分页栏事件一:事件table.on('event(filter)',callback);参数event(f

Spring后处理器-BeanPostProcessor

Spring后处理器-BeanPostProcessorBean被实例化后,到最终缓存到名为singletonObjects单例池之前,中间会经过bean的初始化过程((该后处理器的执行时机)),例如:属性的填充、初始化方法init的执行等,其中有一个对外拓展的点BeanPostProcessor,我们称之为bean后

2D游戏开发和3D游戏开发有什么不同?

2D游戏开发和3D游戏开发是两种不同类型的游戏制作方法,它们之间有一些显著的区别:1.图形和视觉效果:2D游戏开发:2D游戏通常使用二维图形,游戏世界和角色通常在一个平面上显示。这种类型的游戏具有平面的外观,就像经典的平台游戏,如《超级马里奥》或《糖果传奇》。3D游戏开发:3D游戏使用三维图形,玩家可以在三维环境中自由

MySQL学习系列(3)-每天学习10个知识

目录1.全文搜索(Full-TextSearch)vs.LIKE操作符2.MySQL中的大数据量处理3.分区(Partitioning)在MySQL中的作用和用法4.MySQL中的数据复制(Replication)5.索引的覆盖和索引下推6.预处理语句(PreparedStatements)7.视图和存储过程8.MyS

C语言知识阶段性总结项目:电子词典

项目需求使用TCP实现客户端和服务端通信使用sqlite存放用户信息客户端需要有登录、注册、查询单词、账号查询记录功能服务器需要实时显示在线用户解决方案使用sqlite创建三个数据库,分别存放用户账号密码,单词表,用户查询记录使用链表存放在线用户的信息,在子线程中循环遍历,达到实时显示在线用户的效果主要的功能代码头文件

大数据(九):数据可视化(一)

专栏介绍结合自身经验和内部资料总结的Python教程,每天3-5章,最短1个月就能全方位的完成Python的学习并进行实战开发,学完了定能成为大佬!加油吧!卷起来!全部文章请访问专栏:《Python全栈教程(0基础)》再推荐一下最近热更的:《大厂测试高频面试题详解》该专栏对近年高频测试相关面试题做详细解答,结合自己多年

热文推荐