SpringBoot整合kafka消费者注解详解
admin
2024-01-21 05:28:32
0

目录

目标

实战

简单消费

监听多个主题

监听一个主题,指定分区消费消息

监听多个主题,指定分区,指定起始偏移量消费消息

指定多个分区,指定起始偏移量消费消息

监听多个主题,指定多个分区,指定起始偏移量消费消息

指定多个kafka监听器

手动提交偏移量(需要配置手动提交偏移量配置)


目标

本文不讲解SpringBoot整合kafka,只列举SpringBoot注解消费kafka消息的多种形式。


实战

简单消费

    /*** 指定一个消费者组,一个主题主题。* @param record*/@KafkaListener(topics = IPHONE_TOPIC,groupId = APPLE_GROUP)public void simpleConsumer(ConsumerRecord record) {System.out.println("进入simpleConsumer方法");System.out.printf("分区 = %d, 偏移量 = %d, key = %s, 内容 = %s,创建消息的时间戳 =%d%n",record.partition(),record.offset(),record.key(),record.value(),record.timestamp());}

监听多个主题

    /*** 指定多个主题。** @param record*/@KafkaListener(topics = {IPHONE_TOPIC,IPAD_TOPIC},groupId = APPLE_GROUP)public void topics(ConsumerRecord record) {System.out.println("进入topics方法");System.out.printf("主题 = %s,分区 = %d, 偏移量 = %d, key = %s, 内容 = %s,创建消息的时间戳 =%d%n",record.topic(),record.partition(),record.offset(),record.key(),record.value(),record.timestamp());}

监听一个主题,指定分区消费消息

    /*** 监听一个主题,且指定消费主题的哪些分区。* 参数详解:消费者组=apple_group;监听主题=iphoneTopic;只消费的分区=1,2;消费者数量=2* @param record*/@KafkaListener(groupId = APPLE_GROUP,topicPartitions = {@TopicPartition(topic = IPHONE_TOPIC, partitions = {"1", "2"})},concurrency = "2")public void consumeByPattern(ConsumerRecord record) {System.out.println("consumeByPattern");System.out.printf("主题 = %s,分区 = %d, 偏移量 = %d, key = %s, 内容 = %s,创建消息的时间戳 =%d%n",record.topic(),record.partition(),record.offset(),record.key(),record.value(),record.timestamp());}

监听多个主题,指定分区,指定起始偏移量消费消息

    /*** 监听多个主题,且指定消费主题的哪些分区,指定分区从某个偏移量开始消费。* 参数详解:消费者组=apple_group;监听主题=ipadTopic;消费的分区=0,1;* 每次重启项目后会自动从0分区,offset=5开始消费,消费者数量=3。** @param record*/@KafkaListener(groupId = APPLE_GROUP,topicPartitions = {@TopicPartition(topic = IPAD_TOPIC, partitions = "1",partitionOffsets = @PartitionOffset(partition = "0", initialOffset = "5"))},concurrency = "3")public void consumeByOffset(ConsumerRecord record) {System.out.println("consumeByOffset");System.out.printf("主题 = %s,分区 = %d, 偏移量 = %d, key = %s, 内容 = %s,创建消息的时间戳 =%d%n",record.topic(),record.partition(),record.offset(),record.key(),record.value(),record.timestamp());}

指定多个分区,指定起始偏移量消费消息

    /*** 指定多个分区从哪个偏移量开始消费。*/@KafkaListener(groupId = APPLE_GROUP,topicPartitions = {@TopicPartition(topic = IPAD_TOPIC,partitions = {"0","1"},partitionOffsets = {@PartitionOffset(partition = "2", initialOffset = "10"),@PartitionOffset(partition = "3", initialOffset = "0"),})},concurrency = "10")public void consumeByPartitionOffsets(ConsumerRecord record) {System.out.println("consumeByPartitionOffsets");System.out.printf("主题 = %s,分区 = %d, 偏移量 = %d, key = %s, 内容 = %s,创建消息的时间戳 =%d%n",record.topic(),record.partition(),record.offset(),record.key(),record.value(),record.timestamp());}

监听多个主题,指定多个分区,指定起始偏移量消费消息

    /*** 指定多个主题。参数详解如上面的方法。* @param record*/@KafkaListener(groupId = APPLE_GROUP,topicPartitions = {@TopicPartition(topic = IPHONE_TOPIC, partitions = {"1", "2"}),@TopicPartition(topic = IPAD_TOPIC, partitions = "1",partitionOffsets = @PartitionOffset(partition = "0", initialOffset = "5"))},concurrency = "4")public void topics2(ConsumerRecord record) {System.out.println("topics2");System.out.printf("主题 = %s,分区 = %d, 偏移量 = %d, key = %s, 内容 = %s,创建消息的时间戳 =%d%n",record.topic(),record.partition(),record.offset(),record.key(),record.value(),record.timestamp());}

指定多个kafka监听器

    /*** 指定多个消费者组。参数详解如上面的方法。** @param record*/@KafkaListeners({@KafkaListener(groupId = APPLE_GROUP,topicPartitions = {@TopicPartition(topic = IPHONE_TOPIC, partitions = {"1", "2"}),@TopicPartition(topic = IPAD_TOPIC, partitions = "1",partitionOffsets = @PartitionOffset(partition = "0", initialOffset = "5"))},concurrency = "3"),@KafkaListener(groupId = XM_GROUP,topicPartitions = {@TopicPartition(topic = XMPHONE_TOPIC, partitions = {"1", "2"}),@TopicPartition(topic = XMPAD_TOPIC, partitions = "1",partitionOffsets = @PartitionOffset(partition = "0", initialOffset = "5"))},concurrency = "3")})public void groupIds(ConsumerRecord record) {System.out.println("groupIds");System.out.println("内容:" + record.value());System.out.println("分区:" + record.partition());System.out.println("偏移量:" + record.offset());System.out.println("创建消息的时间戳:" + record.timestamp());}

手动提交偏移量(需要配置手动提交偏移量配置)

    /*** 设置手动提交偏移量** @param record*/@KafkaListener(topics = IPHONE_TOPIC,groupId = APPLE_GROUP,//3个消费者concurrency = "3")public void setCommitType(ConsumerRecord record, Acknowledgment ack) {System.out.println("setCommitType");System.out.println("内容:" + record.value());System.out.println("分区:" + record.partition());System.out.println("偏移量:" + record.offset());System.out.println("创建消息的时间戳:" + record.timestamp());ack.acknowledge();}

相关内容

热门资讯

盈盈订购APP(四川禄宏商品)...   盈盈订购APP打着“现货订购”的旗号,实际操作却完全照搬期货交易模式——高杠杆、T+0交易、收取...
AI“开启办事时代”!千万资金... 1月15日,两市延续震荡。数据显示,截至11时7分,软件龙头ETF(159899)跌1.46%,石基...
马斯克又整活了 首先,郑重推介我的干货文章,这应该是半年以来写的最深度的——2026年投资策略,没看的小伙伴一定要看...
一针打进肿瘤,全面唤醒免疫系统 在肿瘤免疫治疗的发展过程中,医学界一直在寻找一种既能真正激活免疫系统,又不至于引发严重全身副作用的方...
国内首只千亿黄金ETF诞生 1月15日消息,截至1月14日,华安黄金ETF最新份额达101.62亿份,规模迈上千亿元大关,达10...
5000克银条五天赚1万?揭秘... 近日,一则“有人买入5000克银条,短短五天净赚1万元”的消息在社交平台引发热议。按当前白银价格计算...
麒盛科技及时任董事长唐国海被监... 乐居财经 李兰1月14日,麒盛科技(603610.SH)发布关于公司及相关人员收到浙江证监局警示函的...
西贝将大规模关停102家门店,... 界面新闻记者 | 卢奕贝 马越 李烨 界面新闻编辑 | 牙韩翔 1月15日,社交平台流传一份西贝...
A股“吹哨人”发声!三大信号,... A股迎来了一些变化! 今日早盘,A股市场走势偏弱,但不乏亮点。市场释放出三大信号:一是,在高位题材股...
美国已正式开始出售委内瑞拉石油 △委内瑞拉一处炼油厂 当地时间1月14日,央视记者获悉,一位美国政府官员透露,美国已完成首批委内瑞拉...
追觅员工怒怼CEO俞浩 1月15日消息,员工截图爆料,追觅智能汽车工作群内员工开麦怒怼CEO俞浩。 该员工@俞浩并表示,“...
创业板指数半日跌逾1%,关注创... 截至午间收盘,创业板成长指数下跌0.9%,创业板指数下跌1.0%,创业板中盘200指数下跌2.1%。...
银河通用21亿领跑,十强融资超... 当前,具身智能机器人赛道浪潮奔涌,资本以前所未有的热忱与巨量资金押注未来。据悉,2025年全球仅人形...
KKV被卓悦嫌弃,卖小样的話梅... 作者 | 源Sight 周艺 卓悦和KKV的争执还没分出个对错,另一家美妆集合店HARMAY話梅却...
圣农发展2025年业绩大增 C... 证券时报记者 余胜良 1月14日晚间,圣农发展(002299)发布2025年业绩预告,公司全年业绩实...
重磅观察|2026汽车市场:内... 【文/财圈社&道哥说车 云龙】岁聿云暮,新元肇启。每逢此时,复盘行业脉络、洞见发展肌理,便成为汽车观...
原创 A... 2026年1月14日午间,A股正上演“疯牛狂奔”——沪指冲高超1%,创业板大涨2.5%,两市成交逼近...
阿里千问App全球首发点外卖、... 1月15日,阿里举行千问App发布会, 阿里巴巴(BABA.US)集团副总裁吴嘉宣布,千问App全面...
携程港股、美股放量跌近20% 【大河财立方消息】1月15日,携程港股、美股双双大跌。 截至发稿,港股携程集团跌幅18.53%,美股...
淳厚基金获国资入主!管理规模缩... 淳厚基金获得长宁国投入主,但是市场声誉遭遇重挫,机构投资者纷纷撤出。 文/每日财报 楚风 随着国有...