kafka消费者模式

2023-09-20 22:37:01

一、单线程消费者模式

package nj.zb.kb23.kafka;

import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.common.serialization.StringDeserializer;

import java.time.Duration;
import java.util.Collections;
import java.util.Properties;

/*
* 单线程
*/
public class MyConsumer {
    public static void main(String[] args) {
        Properties properties = new Properties();
        properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,"192.168.91.11:9092");
        properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
        properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,StringDeserializer.class);
        /*
        earliest: 当各分区下有已提交的offset时,从提交的offset开始消费,无提交的offset时,从头开始消费
         latest: 当各分区下有已提交的offset时,从提交的offset开始消费,无提交的offset时,消费新产生的数据
         none:   当各分区下有已提交的offset时,从提交的offset开始消费,无提交的offset时,抛出异常
         */
        properties.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG,"earliest");
        /*
         * ENABLE_AUTO_COMMIT_CONFIG 设置是否自动提交,获取数据的状态,false手动提交,true自动提交
         * AUTO_COMMIT_INTERVAL_MS_CONFIG   设置提交时间,1000ms
         */
        properties.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG,"false");
        properties.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG,"1000");
        /**
         * 设置消费组
         */
        properties.put(ConsumerConfig.GROUP_ID_CONFIG,"group1");
        //单线程
        KafkaConsumer<String, String> kafkaConsumer = new KafkaConsumer<>(properties);
        kafkaConsumer.subscribe(Collections.singleton("kb23"));
        while (true){
            ConsumerRecords<String, String> records = kafkaConsumer.poll(Duration.ofMillis(100));
            for (ConsumerRecord<String, String> record:
            records){
                System.out.println("topic:"+record.topic()
                        +" partition:"+record.partition()
                        +" 偏移量:"+record.offset()
                        +" value:"+record.value()
                        +" 时间戳:"+record.timestamp());
            }
            //设置手动提交  earliest/latest都接着上次的内容继续输出,除非有新消息输入
            kafkaConsumer.commitAsync();
        }
    }
}

二、多线程消费者模式

package nj.zb.kb23.kafka;

import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.common.serialization.StringDeserializer;

import java.time.Duration;
import java.util.Collections;
import java.util.Properties;

/*
* 多线程
*/
public class MyConsumer2 {
    public static void main(String[] args) {
        Properties properties = new Properties();
        properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,"192.168.91.11:9092");
        properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
        properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,StringDeserializer.class);
        properties.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG,"earliest");
        properties.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG,"false");
        properties.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG,"1000");
        /**
         * 设置消费组
         */
        properties.put(ConsumerConfig.GROUP_ID_CONFIG,"心心威武");
        //多线程(3个线程)
        for(int i=0;i<=3;i++){
            new Thread(new Runnable() {
                @Override
                public void run() {
                    KafkaConsumer<String, String> kafkaConsumer = new KafkaConsumer<>(properties);
                    kafkaConsumer.subscribe(Collections.singleton("kb23"));
                    while(true){
                        ConsumerRecords<String, String> records = kafkaConsumer.poll(Duration.ofMillis(100));
                        for (ConsumerRecord<String, String> record : records) {
                            System.out.println(Thread.currentThread().getName()+
                                    " topic: "+record.topic()+
                                    " partition: "+record.partition()+
                                    " 偏移量: "+record.offset()+
                                    " value: "+record.value()+
                                    " 时间戳: "+record.timestamp());
                        }
                    }
                }
            }).start();
        }
    }
}
"C:\Program Files\Java\jdk1.8.0_144\bin\java.exe" "-javaagent:D:\Program Files\JetBrains\IntelliJ IDEA 。。。。。。。。。。。。。。。。。。。。。。。。
SLF4J: Failed to load class "org.slf4j.impl.StaticLoggerBinder".
SLF4J: Defaulting to no-operation (NOP) logger implementation
SLF4J: See http://www.slf4j.org/codes.html#StaticLoggerBinder for further details.
Thread-3 topic: kb23 partition: 0 偏移量: 0 value: hello java 时间戳: 1695173593009
Thread-3 topic: kb23 partition: 0 偏移量: 1 value: hello c== 时间戳: 1695173606546
Thread-2 topic: kb23 partition: 1 偏移量: 0 value: dufiudhifch 时间戳: 1695174679229
Thread-1 topic: kb23 partition: 2 偏移量: 0 value: hel 时间戳: 1695173599314
Thread-3 topic: kb23 partition: 0 偏移量: 2 value: djfhjsjkhfk 时间戳: 1695174683054
Thread-1 topic: kb23 partition: 2 偏移量: 1 value: hello world 时间戳: 1695173611446
Thread-2 topic: kb23 partition: 1 偏移量: 1 value: hsdakhskfhak 时间戳: 1695174686318
Thread-1 topic: kb23 partition: 2 偏移量: 2 value: hshcdshcdskc 时间戳: 1695174681057
Thread-3 topic: kb23 partition: 0 偏移量: 3 value: jkfdsajklfjalds 时间戳: 1695174689058
Thread-1 topic: kb23 partition: 2 偏移量: 3 value: dhjfhkshkf 时间戳: 1695174684802

三、消费者模式seek方法

package nj.zb.kb23.kafka;

import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.serialization.StringDeserializer;

import java.time.Duration;
import java.util.Collections;
import java.util.HashSet;
import java.util.Properties;
import java.util.Set;

/*
* seek指定开始消费的位置
*/
public class MyConsumerSeek {
    public static void main(String[] args) {
        Properties properties = new Properties();
        properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,"192.168.91.11:9092");
        properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
        properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,StringDeserializer.class);
        /*
        earliest: 当各分区下有已提交的offset时,从提交的offset开始消费,无提交的offset时,从头开始消费
         latest: 当各分区下有已提交的offset时,从提交的offset开始消费,无提交的offset时,消费新产生的数据
         none:   当各分区下有已提交的offset时,从提交的offset开始消费,无提交的offset时,抛出异常
         */
        properties.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG,"earliest");
        /*
         * ENABLE_AUTO_COMMIT_CONFIG 设置是否自动提交,获取数据的状态,false手动提交,true自动提交
         * AUTO_COMMIT_INTERVAL_MS_CONFIG   设置提交时间,1000ms
         */
        properties.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG,"false");
        properties.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG,"1000");
        /**
         * 设置消费组
         */
        properties.put(ConsumerConfig.GROUP_ID_CONFIG,"group3");
        //单线程
        KafkaConsumer<String, String> kafkaConsumer = new KafkaConsumer<>(properties);
        kafkaConsumer.subscribe(Collections.singleton("kb23"));

        Set<TopicPartition> assignment = new HashSet<>();
        while (assignment.size()==0){
            kafkaConsumer.poll(Duration.ofMillis(1000));
            assignment = kafkaConsumer.assignment();
        }
        for (TopicPartition topicPartition :
                assignment) {
            //topic:kb23 tp-0:4 tp-1:5  tp-2:4
            System.out.println(topicPartition.topic()+"\t"+topicPartition.partition());
            if (topicPartition.partition()==0){
                kafkaConsumer.seek(topicPartition,4);
            }else if (topicPartition.partition()==1){
                kafkaConsumer.seek(topicPartition,5);
            }else if (topicPartition.partition()==2){
                kafkaConsumer.seek(topicPartition,4);
            }
        }
        while (true){
            ConsumerRecords<String, String> records = kafkaConsumer.poll(Duration.ofMillis(100));
            for (ConsumerRecord<String, String> record:
            records){
                System.out.println("topic:"+record.topic()
                        +" partition:"+record.partition()
                        +" 偏移量:"+record.offset()
                        +" value:"+record.value()
                        +" 时间戳:"+record.timestamp());
            }
        }
    }
}

"C:\Program Files\Java\jdk1.8.0_144\bin\java.exe" "-javaagent:D:\Program Files\JetBrains\IntelliJ IDEA.。。。。。。。。。。。。。
SLF4J: See http://www.slf4j.org/codes.html#StaticLoggerBinder for further details.
kb23    2
kb23    1
kb23    0
topic:kb23 partition:2 偏移量:4 value:sjhkdksahkdah 时间戳:1695174687827
topic:kb23 partition:2 偏移量:5 value:hhh1 时间戳:1695175898301
topic:kb23 partition:2 偏移量:6 value:2222 时间戳:1695176003767
topic:kb23 partition:2 偏移量:7 value:444 时间戳:1695176010084
topic:kb23 partition:2 偏移量:8 value:ppp 时间戳:1695177956251
topic:kb23 partition:2 偏移量:9 value:ppp1 时间戳:1695178017439
topic:kb23 partition:2 偏移量:10 value:ppp3 时间戳:1695178021374
topic:kb23 partition:2 偏移量:11 value:ananaq 时间戳:1695179560702
topic:kb23 partition:1 偏移量:5 value:qqq 时间戳:1695175970133

更多推荐

Linux各种命令-查询篇

目录查看文件内容查看存储空间查看python安装目录查Ubuntu版本查看所有文件(含隐藏文件)查IP查看内存使用情况查看GPU使用情况查看CPU使用情况​​​​​​​查看文件内容cat[选项][文件...]-n:显示行号。-b:显示非空行号。-s:合并空白行。-E:在每行结尾添加$符号。-T:将制表符显示为^I。-v

交换机端口镜像详解

交换机端口镜像是一种网络监控技术,它允许将一个或多个交换机端口的网络流量复制并重定向到另一个端口上,以便进行流量监测、分析和记录。通过端口镜像,管理员可以实时查看特定端口上的流量,以进行网络故障排查、安全审计和性能优化。以下是关于交换机端口镜像的详细介绍:工作原理:交换机端口镜像通过在交换机的配置中指定源端口和目标端口

pycharm 中package, directory, sources root, resources root的区别

【遇到的问题】导入yolov5中有utils文件,自己的代码中也有utils文件,使得yolov5中的这部分引用出错了。【解决方案】单独建立detection文件夹,把检测相关的都放在这里,yolov5是github上拉取的源码,发现yolov5中fromutilsimport...有下划线,且会认为是edgeserv

多输入多输出 | MATLAB实现PSO-LSSVM粒子群优化最小二乘支持向量机多输入多输出

多输入多输出|MATLAB实现PSO-LSSVM粒子群优化最小二乘支持向量机多输入多输出目录多输入多输出|MATLAB实现PSO-LSSVM粒子群优化最小二乘支持向量机多输入多输出预测效果基本介绍程序设计往期精彩参考资料预测效果基本介绍MATLAB实现PSO-LSSVM粒子群优化最小二乘支持向量机多输入多输出1.dat

【码银送书第七期】七本考研书籍

八九月的朋友圈刮起了一股晒通知书潮,频频有大佬晒出“研究生入学通知书”,看着让人既羡慕又焦虑。果然应了那句老话——比你优秀的人,还比你努力。心里痒痒,想考研的技术人儿~别再犹豫了。小编咨询了一大波上岸的大佬,这份备考书单给大家参考。专业课(此处特指408,全称计算机专业基础综合)知识点超级多,题目灵活,数据结构、计算机

考研408 | 【计算机组成原理】 数据的表示和运算

进位计数制十进制计数法:推广:r进制计数法任意进制-->十进制:二进制<-->八进制、十六进制:各种进制的常见书写方式:十进制-->任意进制:十进制-->二进制(拼凑法):真值和机器数:总结:BCD码总结:无符号整数的表示和运算无符号整数在计算机中的应用:无符号整数的表示:无符号整数的加法运算:无符号整数的减法运算:总

uniapp运行到IOS真机提示 错误:请查看是否设备未加入到证书列表或者确认证书类型是否匹配

参考文章:请查看是否设备未加入到证书列表或者确认证书类型是否匹配ios开发描述文件必须绑定调试设备,只有授权的设备才可以直接安装基座,所以在申请开发描述文件之前,先添加调试的IOS设备。前往网站https://developer.apple.com,在Devices中,添加手机设备UUID第一步:登录第二步:检查设备列

VHOST-SCSI代码分析(1)VHOST SCSI设备模拟

VHOSTSCSI设备的模拟是由QEMU和HOST共同实现的,QEMU模拟VHOSTSCSI设备配置空间等,而对于虚拟机通知HOST和HOST通知虚拟机机制由HOST内核实现。在QEMU中VHOSTSCSI设备继承关系如下:其它设备以及对应class_init函数和realize具现化实现与VIRTIO-SCSI一致,

Learn Prompt-为什么用 ChatGPT API?

引用人工智能先驱吴恩达先生说过的话:“一个系统需要的远不止一个提示(prompt)或者一个对LLM(大性语言模型)的调用。”API的优点:集成更深:通过API,您可以将ChatGPT集成到自己的系统和工作流中,实现更深层次的定制和控制。个性化的响应:您可以根据特定需求和场景调整模型的响应,例如,通过改变温度(tempe

华策影视AIGC工程师招聘; 百度大模型创业松;主流大语言模型的技术原理细节;AIGC Prompt的七个缺陷 | ShowMeAI日报

👀日报&周刊合集|🎡生产力工具与行业应用大全|🧡点赞关注评论拜托啦!🎯华策影视AIGC工程师招聘,AIGC在「文娱领域」的真正落地逛即刻时发现关注的AI博主@杨昌发布了自己公司的招聘信息,而且附上了团队氛围和工作感受等分享。华策影视是影视行业龙头企业,成立了AIGC应用研究院,重视AI且不算卷。岗位base上海

Learn Prompt-ChatGPT 精选案例:学习各国语言

过去,我们学语言需要花费很多时间来学习各种材料,再联系老师修改口语、作文等,十分费时费力。有了ChatGPT之后,我们就相当于有一个免费的,实时反馈的语言学习助手,大大节省了我们的时间。下面我将以英文的雅思学习为例子,结合口语和写作,介绍如何利用ChatGPT来学习语言。口语​插件安装​正式开始之前,我们要先完成插件的

热文推荐