SpringBoot整合Kafka,通过简单配置实现生产消费功能
admin
2024-03-21 03:47:06
0

文章目录

  • 前提条件
  • 项目环境
  • 创建Topic
  • 配置信息
  • 生产消息
    • 生产自定义分区策略
    • 生产到指定分区
  • 消费消息
    • offset设置方式

*本文基于SpringBoot整合Kafka,通过简单配置实现生产及消费,包括生产消费的配置说明、消费者偏移设置方式等。更多功能细节可参考

spring kafka 文档:https://docs.spring.io/spring-kafka/docs/current/reference/html

前提条件

  • 搭建Kafka环境,参考Kafka集群环境搭建及使用
  • Java环境:JDK1.8
  • Maven版本:apache-maven-3.6.3
  • 开发工具:IntelliJ IDEA

项目环境

  1. 创建Springboot项目。
  2. pom.xml文件中引入kafka依赖。
org.springframework.kafkaspring-kafka

创建Topic

创建topic命名为testtopic并指定2个分区。

./kafka-topics.sh --bootstrap-server 127.0.0.1:9092 --create --topic testtopic --partitions 2

配置信息

application.yml配置文件信息

spring:application:name: kafka_springbootkafka:bootstrap-servers: 127.0.0.1:9092producer:#ACK机制,默认为1 (0,1,-1)acks: -1key-serializer: org.apache.kafka.common.serialization.StringSerializervalue-serializer: org.apache.kafka.common.serialization.StringSerializerproperties:# 自定义分区策略partitioner:class: org.bg.kafka.PartitionPolicyconsumer:#设置是否自动提交,默认为trueenable-auto-commit: falsekey-deserializer: org.apache.kafka.common.serialization.StringDeserializervalue-deserializer: org.apache.kafka.common.serialization.StringDeserializer#当一个新的消费组或者消费信息丢失后,在哪里开始进行消费。earliest:消费最早的消息。latest(默认):消费最近可用的消息。none:没有找到消费组消费数据时报异常。auto-offset-reset: latest#批量消费时每次poll的数量#max-poll-records: 5listener:#      当每一条记录被消费者监听器处理之后提交#      RECORD,#      当每一批数据被消费者监听器处理之后提交#      BATCH,#      当每一批数据被消费者监听器处理之后,距离上次提交时间大于TIME时提交#      TIME,#      当每一批数据被消费者监听器处理之后,被处理record数量大于等于COUNT时提交#      COUNT,#      #TIME | COUNT 有一个条件满足时提交#      COUNT_TIME,#      #当每一批数据被消费者监听器处理之后,手动调用Acknowledgment.acknowledge()后提交:#      MANUAL,#      # 手动调用Acknowledgment.acknowledge()后立即提交#      MANUAL_IMMEDIATE;ack-mode: manual#批量消费type: batch

更多配置信息查看KafkaProperties

生产消息

@Component
public class Producer {@Autowiredprivate KafkaTemplate kafkaTemplate;public void send(String msg) {kafkaTemplate.send(new ProducerRecord("testtopic", "key111", msg));}
}

生产自定义分区策略

package org.bg.kafka;import org.apache.kafka.clients.producer.Partitioner;
import org.apache.kafka.common.Cluster;
import org.apache.kafka.common.PartitionInfo;
import org.apache.kafka.common.utils.Utils;import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.ThreadLocalRandom;
import java.util.concurrent.atomic.AtomicInteger;public class PartitionPolicy implements Partitioner {private final ConcurrentMap topicCounterMap = new ConcurrentHashMap();@Overridepublic int partition(String topic, Object key, byte[] keyBytes, Object value, byte[] valueBytes, Cluster cluster) {List partitions = cluster.partitionsForTopic(topic);int numPartitions = partitions.size();if (keyBytes == null) {int nextValue = this.nextValue(topic);List availablePartitions = cluster.availablePartitionsForTopic(topic);if (availablePartitions.size() > 0) {int part = Utils.toPositive(nextValue) % availablePartitions.size();return ((PartitionInfo)availablePartitions.get(part)).partition();} else {return Utils.toPositive(nextValue) % numPartitions;}} else {return Utils.toPositive(Utils.murmur2(keyBytes)) % numPartitions;}}private int nextValue(String topic) {AtomicInteger counter = (AtomicInteger)this.topicCounterMap.get(topic);if (null == counter) {counter = new AtomicInteger(ThreadLocalRandom.current().nextInt());AtomicInteger currentCounter = (AtomicInteger)this.topicCounterMap.putIfAbsent(topic, counter);if (currentCounter != null) {counter = currentCounter;}}return counter.getAndIncrement();}@Overridepublic void close() {}@Overridepublic void configure(Map map) {}
}

生产到指定分区

ProducerRecord有指定分区的构造方法,设置分区号
public ProducerRecord(String topic, Integer partition, K key, V value)

kafkaTemplate.send(new ProducerRecord("testtopic",1, "key111", msg));

消费消息


/*** 自定义seek参考* https://docs.spring.io/spring-kafka/docs/current/reference/html/#seek*/
@Component
public class Consumer implements ConsumerSeekAware{@KafkaListener(topics = {"testtopic"},groupId = "test_group",clientIdPrefix = "bg",id = "testconsumer")public void onMessage(List> records, Acknowledgment ack){System.out.println(records.size());System.out.println(records.toString());ack.acknowledge();}@Overridepublic void onPartitionsAssigned(Map assignments, ConsumerSeekCallback callback) {//按照时间戳设置偏移callback.seekToTimestamp(assignments.keySet(),1670233826705L);//设置偏移到最近callback.seekToEnd(assignments.keySet());//设置偏移到最开始callback.seekToBeginning(assignments.keySet());//指定 offsetfor (TopicPartition topicPartition : assignments.keySet()) {callback.seek(topicPartition.topic(),topicPartition.partition(),0L);}}}

offset设置方式

如代码所示,实现ConsumerSeekAware接口,设置offset几种方式:

  • 指定 offset,需要自己维护 offset,方便重试。
  • 指定从头开始消费。
  • 指定 offset 为最近可用的 offset (默认)。
  • 根据时间戳获取 offset,设置 offset。

相关内容

热门资讯

近一年涨364%,近两年468... 来源:今晚吃基 今天前海开源的两则公告引起我的注意。 前海开源沪港深乐享生活、前海开源人工智能主题混...
美伊、霍尔木兹海峡,最新消息!... 特朗普称与伊朗的谈判进展顺利,霍尔木兹海峡通航量上升,油价维持弱势震荡。另外,特朗普要求中东多国与以...
原创 刚... 4月21日下午,当宁德时代超级科技日的大屏幕亮起时,台下不少行业人士都愣了一下。宁德时代宣布,备受瞩...
俄罗斯知名巧克力品牌优化增效 【环球时报综合报道】俄罗斯最大巧克力生产商之一“联合糖果”正优化生产。“联合糖果”公司(旗下品牌包括...
三星半导体员工协商达成年均奖金... 但这份协议对三星而言仍可能是一次胜利,因为其奖金总额低于本土竞争对手SK海力士。 三星与曾威胁发起罢...
Google亲手把搜索框做成了... Google I/O 2026开完了。如果你以为这家公司又在炫酷炫技术,那你猜对了一半——另一半是,...
女子把2万多克黄金存珠宝店,金... 浙江杭州的林女士反映,她是做黄金生意的,从2024年7月开始,分48次陆续将22917.462克黄金...
000638,终止上市!9股获... 今日(5月25日),A股三大指数集体收涨,上证指数报收4152.57点,上涨0.96%;深证成指上涨...
原创 人... 人民币这波行情,最戏剧性的一幕发生在5月13日。当天即期收盘价直接砸到6.7905,正式踏进6.7区...
燕文物流、闪回科技、金龙电机、... 每经记者:李旭馗 每经编辑:袁东 |2026年5月26日 星期二| NO.1燕文物流、闪回科技、金龙...
一代互联网招聘神话,破产了 消费赛道雷声滚滚,招聘赛道也未能幸免。 近日,招聘行业再传重磅消息,曾被无数互联网人视作“跳槽圣地”...
字节反击腾讯称“都是卖猪食的,... 澎湃新闻记者 范佳来 实习生 吴亦菲 抖音副总裁李亮辟谣“反击腾讯”。 近日,有传言称腾讯、字节跳动...
国有大型银行板块5月25日涨0... 证券之星消息,5月25日国有大型银行板块较上一交易日上涨0.02%,中国银行领涨。当日上证指数报收于...
金属包装行业的主流发展趋势 绿色环保、智能化生产、高端化与个性化、行业整合及国际化拓展是当前金属包装行业的主要发展趋势。 绿色...
投资也有流量密码?带你了解自由... 风险提示:基金有风险,投资需谨慎。
美债收益率破5%:全球资产定价... 导读 4月美国通胀数据超预期反弹、美联储新主席沃什近期就任、中东地缘冲突推升油价、美国财政赤字高企与...
烁威光电同步完成两轮Pre-A... 【大河财立方消息】近日,北京烁威光电科技有限公司(以下简称“烁威光电”)同步完成两轮合计金额超亿元融...
库克将迎CEO告别演讲,此后转... 5月25日,知名科技记者马克 · 古尔曼发文称,今年苹果全球开发者大会 (WWDC) 将是库克作为苹...
北京集中约谈17家重点平台企业... 据北京市市场监督管理局5月25日消息,为加强平台经济监管,规范6·18期间平台经营行为,近日,北京市...
原创 日... 你是否听过下面这些管理名言:”永远站在顾客的立场思考问题“、”盯住客户,而不是竞争对手“、”比业绩更...