SpringBoot整合kafka消费者注解详解
admin
2024-01-21 05:28:32
0

目录

目标

实战

简单消费

监听多个主题

监听一个主题,指定分区消费消息

监听多个主题,指定分区,指定起始偏移量消费消息

指定多个分区,指定起始偏移量消费消息

监听多个主题,指定多个分区,指定起始偏移量消费消息

指定多个kafka监听器

手动提交偏移量(需要配置手动提交偏移量配置)


目标

本文不讲解SpringBoot整合kafka,只列举SpringBoot注解消费kafka消息的多种形式。


实战

简单消费

    /*** 指定一个消费者组,一个主题主题。* @param record*/@KafkaListener(topics = IPHONE_TOPIC,groupId = APPLE_GROUP)public void simpleConsumer(ConsumerRecord record) {System.out.println("进入simpleConsumer方法");System.out.printf("分区 = %d, 偏移量 = %d, key = %s, 内容 = %s,创建消息的时间戳 =%d%n",record.partition(),record.offset(),record.key(),record.value(),record.timestamp());}

监听多个主题

    /*** 指定多个主题。** @param record*/@KafkaListener(topics = {IPHONE_TOPIC,IPAD_TOPIC},groupId = APPLE_GROUP)public void topics(ConsumerRecord record) {System.out.println("进入topics方法");System.out.printf("主题 = %s,分区 = %d, 偏移量 = %d, key = %s, 内容 = %s,创建消息的时间戳 =%d%n",record.topic(),record.partition(),record.offset(),record.key(),record.value(),record.timestamp());}

监听一个主题,指定分区消费消息

    /*** 监听一个主题,且指定消费主题的哪些分区。* 参数详解:消费者组=apple_group;监听主题=iphoneTopic;只消费的分区=1,2;消费者数量=2* @param record*/@KafkaListener(groupId = APPLE_GROUP,topicPartitions = {@TopicPartition(topic = IPHONE_TOPIC, partitions = {"1", "2"})},concurrency = "2")public void consumeByPattern(ConsumerRecord record) {System.out.println("consumeByPattern");System.out.printf("主题 = %s,分区 = %d, 偏移量 = %d, key = %s, 内容 = %s,创建消息的时间戳 =%d%n",record.topic(),record.partition(),record.offset(),record.key(),record.value(),record.timestamp());}

监听多个主题,指定分区,指定起始偏移量消费消息

    /*** 监听多个主题,且指定消费主题的哪些分区,指定分区从某个偏移量开始消费。* 参数详解:消费者组=apple_group;监听主题=ipadTopic;消费的分区=0,1;* 每次重启项目后会自动从0分区,offset=5开始消费,消费者数量=3。** @param record*/@KafkaListener(groupId = APPLE_GROUP,topicPartitions = {@TopicPartition(topic = IPAD_TOPIC, partitions = "1",partitionOffsets = @PartitionOffset(partition = "0", initialOffset = "5"))},concurrency = "3")public void consumeByOffset(ConsumerRecord record) {System.out.println("consumeByOffset");System.out.printf("主题 = %s,分区 = %d, 偏移量 = %d, key = %s, 内容 = %s,创建消息的时间戳 =%d%n",record.topic(),record.partition(),record.offset(),record.key(),record.value(),record.timestamp());}

指定多个分区,指定起始偏移量消费消息

    /*** 指定多个分区从哪个偏移量开始消费。*/@KafkaListener(groupId = APPLE_GROUP,topicPartitions = {@TopicPartition(topic = IPAD_TOPIC,partitions = {"0","1"},partitionOffsets = {@PartitionOffset(partition = "2", initialOffset = "10"),@PartitionOffset(partition = "3", initialOffset = "0"),})},concurrency = "10")public void consumeByPartitionOffsets(ConsumerRecord record) {System.out.println("consumeByPartitionOffsets");System.out.printf("主题 = %s,分区 = %d, 偏移量 = %d, key = %s, 内容 = %s,创建消息的时间戳 =%d%n",record.topic(),record.partition(),record.offset(),record.key(),record.value(),record.timestamp());}

监听多个主题,指定多个分区,指定起始偏移量消费消息

    /*** 指定多个主题。参数详解如上面的方法。* @param record*/@KafkaListener(groupId = APPLE_GROUP,topicPartitions = {@TopicPartition(topic = IPHONE_TOPIC, partitions = {"1", "2"}),@TopicPartition(topic = IPAD_TOPIC, partitions = "1",partitionOffsets = @PartitionOffset(partition = "0", initialOffset = "5"))},concurrency = "4")public void topics2(ConsumerRecord record) {System.out.println("topics2");System.out.printf("主题 = %s,分区 = %d, 偏移量 = %d, key = %s, 内容 = %s,创建消息的时间戳 =%d%n",record.topic(),record.partition(),record.offset(),record.key(),record.value(),record.timestamp());}

指定多个kafka监听器

    /*** 指定多个消费者组。参数详解如上面的方法。** @param record*/@KafkaListeners({@KafkaListener(groupId = APPLE_GROUP,topicPartitions = {@TopicPartition(topic = IPHONE_TOPIC, partitions = {"1", "2"}),@TopicPartition(topic = IPAD_TOPIC, partitions = "1",partitionOffsets = @PartitionOffset(partition = "0", initialOffset = "5"))},concurrency = "3"),@KafkaListener(groupId = XM_GROUP,topicPartitions = {@TopicPartition(topic = XMPHONE_TOPIC, partitions = {"1", "2"}),@TopicPartition(topic = XMPAD_TOPIC, partitions = "1",partitionOffsets = @PartitionOffset(partition = "0", initialOffset = "5"))},concurrency = "3")})public void groupIds(ConsumerRecord record) {System.out.println("groupIds");System.out.println("内容:" + record.value());System.out.println("分区:" + record.partition());System.out.println("偏移量:" + record.offset());System.out.println("创建消息的时间戳:" + record.timestamp());}

手动提交偏移量(需要配置手动提交偏移量配置)

    /*** 设置手动提交偏移量** @param record*/@KafkaListener(topics = IPHONE_TOPIC,groupId = APPLE_GROUP,//3个消费者concurrency = "3")public void setCommitType(ConsumerRecord record, Acknowledgment ack) {System.out.println("setCommitType");System.out.println("内容:" + record.value());System.out.println("分区:" + record.partition());System.out.println("偏移量:" + record.offset());System.out.println("创建消息的时间戳:" + record.timestamp());ack.acknowledge();}

相关内容

热门资讯

【IPO追踪】零成交常态化,解... 港股市场最尴尬的处境,莫过于股价跌跌不休,连交易都无人问津。6月24日,美联股份(02671.HK)...
SK海力士冲刺美国上市,ADR... 若顺利完成,相关交易最快有望于下个月启动,并成为韩国企业历史上规模最大的海外股权融资项目之一。 人工...
伯恩斯坦:锂价上行周期远未结束... 6月23日消息,伯恩斯坦最新研报中上调锂价预测,并将 天齐锂业(002466.SZ)A股目标价从73...
原创 全... 中国与印度尼西亚之间,正在上演一场围绕镍矿资源的激烈博弈。令人颇感意外的是,这场博弈的主动挑起者竟然...
企业出海ESG合规与可持续发展... 中新网北京6月24日电 (记者 尹倩芸)2026年北京市“走出去”系列活动——企业出海ESG合规与可...
原创 中... 聊到现在的楼市,身边人想法差得挺远。有人还在等跌,觉得再观望一阵更稳妥。也有人盯上了官方最近放的几个...
企业微信Agent内测悄启 主... 继微信AI助手开启内测后,企业微信紧跟步伐开启了AI助手的内测,但跟微信嵌入的不是同一个Agent。...
马斯克官宣Starmind太空... IT之家 6 月 24 日消息,埃隆 · 马斯克(Elon Musk)今天(6 月 24 日)在 X...
5000亿市值巨头,盘中涨停 6月24日午后,立讯精密股价直线拉升,盘中触及涨停后开板。截至发稿,该股报75.39元/股,涨8.6...
中兴通讯爱理财?额度2年翻倍至... 图片来源:图虫创意 钱的流向,往往折射出一家企业的战略重心。 6月17日,中兴通讯在深圳召开股东大会...
中国掌控全球过半铜冶炼产能,美... 【文/观察者网 王恺雯】 面对中国在铜冶炼和精炼上的主导地位,美国试图通过关税及工业政策复兴国内铜产...
年内92宗IPO获受理 创业板... 来源:滚动播报 (来源:北京商报) 随着创业板深化改革持续推进,今年创业板IPO申报热度攀升,替代北...
SpaceX跌破IPO首日开盘... 来源:滚动播报 来源:中国基金报 【导读】SpaceX股价连续重挫 中国基金报记者 张舟 Space...
酒庄头条:在郎酒庄园,看见世界... 文/酒庄头条 中国酒业该向何处去?又如何走向国际市场?酿酒业是有根产业历史经典产业,郎酒以全球视野开...
100个选基指标|利润总额,真... 推荐阅读: 100个选基指标丨自然年度收益率,最简单,也最重要(第一期) 100个选基指标丨区间收益...
连云港开发区为AI“制药”按下... 从人才赋能、校地协同到产业落地,今年以来,连云港开发区通过一系列密集举措构建起全链条、立体化的产业赋...
1.08亿控股德维嘉:无锡振华... 一家传统汽车冲压件上市公司,正试图用一笔亿元级别的现金收购,为自己贴上“汽车智能化”的标签。 6月2...
ATFX:SpaceX下跌16... 来源:市场资讯 6月23日,ATFX汇评:美国航空航天代表性个股SpaceX,上市六个交易日,三个交...
嘉实基金李涛:长期视角决策未来... 嘉实成长共赢混合基金经理李涛表示, 投资理财本质上是一场长期主义的远行,权益投资本质是追求赚取产业与...
震裕科技可转债发行申请获深交所... 雷达财经 文|苏静 编|深海 6月23日, 震裕科技(300953)发布关于公司向不特定对象发行可转...