SpringBoot整合Kafka,通过简单配置实现生产消费功能
admin
2024-03-21 03:47:06
0

文章目录

  • 前提条件
  • 项目环境
  • 创建Topic
  • 配置信息
  • 生产消息
    • 生产自定义分区策略
    • 生产到指定分区
  • 消费消息
    • offset设置方式

*本文基于SpringBoot整合Kafka,通过简单配置实现生产及消费,包括生产消费的配置说明、消费者偏移设置方式等。更多功能细节可参考

spring kafka 文档:https://docs.spring.io/spring-kafka/docs/current/reference/html

前提条件

  • 搭建Kafka环境,参考Kafka集群环境搭建及使用
  • Java环境:JDK1.8
  • Maven版本:apache-maven-3.6.3
  • 开发工具:IntelliJ IDEA

项目环境

  1. 创建Springboot项目。
  2. pom.xml文件中引入kafka依赖。
org.springframework.kafkaspring-kafka

创建Topic

创建topic命名为testtopic并指定2个分区。

./kafka-topics.sh --bootstrap-server 127.0.0.1:9092 --create --topic testtopic --partitions 2

配置信息

application.yml配置文件信息

spring:application:name: kafka_springbootkafka:bootstrap-servers: 127.0.0.1:9092producer:#ACK机制,默认为1 (0,1,-1)acks: -1key-serializer: org.apache.kafka.common.serialization.StringSerializervalue-serializer: org.apache.kafka.common.serialization.StringSerializerproperties:# 自定义分区策略partitioner:class: org.bg.kafka.PartitionPolicyconsumer:#设置是否自动提交,默认为trueenable-auto-commit: falsekey-deserializer: org.apache.kafka.common.serialization.StringDeserializervalue-deserializer: org.apache.kafka.common.serialization.StringDeserializer#当一个新的消费组或者消费信息丢失后,在哪里开始进行消费。earliest:消费最早的消息。latest(默认):消费最近可用的消息。none:没有找到消费组消费数据时报异常。auto-offset-reset: latest#批量消费时每次poll的数量#max-poll-records: 5listener:#      当每一条记录被消费者监听器处理之后提交#      RECORD,#      当每一批数据被消费者监听器处理之后提交#      BATCH,#      当每一批数据被消费者监听器处理之后,距离上次提交时间大于TIME时提交#      TIME,#      当每一批数据被消费者监听器处理之后,被处理record数量大于等于COUNT时提交#      COUNT,#      #TIME | COUNT 有一个条件满足时提交#      COUNT_TIME,#      #当每一批数据被消费者监听器处理之后,手动调用Acknowledgment.acknowledge()后提交:#      MANUAL,#      # 手动调用Acknowledgment.acknowledge()后立即提交#      MANUAL_IMMEDIATE;ack-mode: manual#批量消费type: batch

更多配置信息查看KafkaProperties

生产消息

@Component
public class Producer {@Autowiredprivate KafkaTemplate kafkaTemplate;public void send(String msg) {kafkaTemplate.send(new ProducerRecord("testtopic", "key111", msg));}
}

生产自定义分区策略

package org.bg.kafka;import org.apache.kafka.clients.producer.Partitioner;
import org.apache.kafka.common.Cluster;
import org.apache.kafka.common.PartitionInfo;
import org.apache.kafka.common.utils.Utils;import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.ThreadLocalRandom;
import java.util.concurrent.atomic.AtomicInteger;public class PartitionPolicy implements Partitioner {private final ConcurrentMap topicCounterMap = new ConcurrentHashMap();@Overridepublic int partition(String topic, Object key, byte[] keyBytes, Object value, byte[] valueBytes, Cluster cluster) {List partitions = cluster.partitionsForTopic(topic);int numPartitions = partitions.size();if (keyBytes == null) {int nextValue = this.nextValue(topic);List availablePartitions = cluster.availablePartitionsForTopic(topic);if (availablePartitions.size() > 0) {int part = Utils.toPositive(nextValue) % availablePartitions.size();return ((PartitionInfo)availablePartitions.get(part)).partition();} else {return Utils.toPositive(nextValue) % numPartitions;}} else {return Utils.toPositive(Utils.murmur2(keyBytes)) % numPartitions;}}private int nextValue(String topic) {AtomicInteger counter = (AtomicInteger)this.topicCounterMap.get(topic);if (null == counter) {counter = new AtomicInteger(ThreadLocalRandom.current().nextInt());AtomicInteger currentCounter = (AtomicInteger)this.topicCounterMap.putIfAbsent(topic, counter);if (currentCounter != null) {counter = currentCounter;}}return counter.getAndIncrement();}@Overridepublic void close() {}@Overridepublic void configure(Map map) {}
}

生产到指定分区

ProducerRecord有指定分区的构造方法,设置分区号
public ProducerRecord(String topic, Integer partition, K key, V value)

kafkaTemplate.send(new ProducerRecord("testtopic",1, "key111", msg));

消费消息


/*** 自定义seek参考* https://docs.spring.io/spring-kafka/docs/current/reference/html/#seek*/
@Component
public class Consumer implements ConsumerSeekAware{@KafkaListener(topics = {"testtopic"},groupId = "test_group",clientIdPrefix = "bg",id = "testconsumer")public void onMessage(List> records, Acknowledgment ack){System.out.println(records.size());System.out.println(records.toString());ack.acknowledge();}@Overridepublic void onPartitionsAssigned(Map assignments, ConsumerSeekCallback callback) {//按照时间戳设置偏移callback.seekToTimestamp(assignments.keySet(),1670233826705L);//设置偏移到最近callback.seekToEnd(assignments.keySet());//设置偏移到最开始callback.seekToBeginning(assignments.keySet());//指定 offsetfor (TopicPartition topicPartition : assignments.keySet()) {callback.seek(topicPartition.topic(),topicPartition.partition(),0L);}}}

offset设置方式

如代码所示,实现ConsumerSeekAware接口,设置offset几种方式:

  • 指定 offset,需要自己维护 offset,方便重试。
  • 指定从头开始消费。
  • 指定 offset 为最近可用的 offset (默认)。
  • 根据时间戳获取 offset,设置 offset。

相关内容

热门资讯

华强科技业绩快报:2025年度... 上证报中国证券网讯(记者 骆民)华强科技披露业绩快报。公司2025年实现营业总收入6.28亿元,同比...
字节跳动60亿美元出售沐瞳科技... Habby用户总消费突破20亿美元 根据Appmagic的估算,Habby的用户总消费已经突破20...
公司互动丨这些公司披露在电子、... 来源:第一财经 2月25日,多家上市公司通过互动平台、披露投资者关系活动记录表等渠道披露公司在电子、...
国际机构上调金价预期 摩根大通... 据路透社等外媒报道,近日多家国际大型机构发布最新研究报告,上调黄金价格预期。摩根大通于周三更新贵金属...
三只羊否认借壳美股上市,国内业... 2月26日,疯狂小杨哥所属MCN三只羊网络在其官网微博发布声明,回应了近日的“三只羊借壳上市”传闻。...
“离大谱!”广东中山,男子向银... 自己明明按时还款,征信却显示逾期;想贷款扩大生产,却被银行拒之门外;打开手机App一看,好家伙,自己...
英伟达财报“炸裂“,黄仁勋:A... 英伟达以一份打破纪录的财报,试图回击外界对AI泡沫的质疑。2月25日美股盘后,英伟达公布最新财报,营...
三只羊否认“借壳上市”,称系海... 红星资本局2月26日消息,红星资本局从三只羊集团获悉,其发布声明否认“借壳美股上市”传闻。 此前三只...
德邦股份向上交所提出终止上市申... 来源:市场投研资讯 (来源:财闻) 本次终止上市事项最终能否通过相关审批及实施尚存在不确定性,敬请广...
和讯投顾华飞凡:没有涨价潮就没... 今日涨价线几乎全天维持强势,化工、能源金属、半导体、国产算力、商业航天轮番上阵,赚钱效应显著。理解其...
受益金属概念,锂电上下游个股普... 华夏时报记者 胡雅文 北京报道 “有色金属股票和期货一直呈现出很强的联动性,春节前碳酸锂和白银的下跌...
韩国股市又暴涨,2026年涨幅... 2月26日韩国股市继续大涨。 韩国综合股价指数收涨3.67%,报6307.32点,盘中更是创下631...
四点半观市 | 机构:2026... 日韩股市2月26日收盘续创历史新高;深成指窄幅震荡微涨0.19% 算力芯片股午后爆发;中韩半导体ET...
红杉资本合伙人:软件行业具备长... 来源:智通财经网 在人工智能引发新一轮市场震荡之际,红杉资本合伙人兼联席负责人Alfred Lin表...
Shopee跨境走过第一个十年... 跨境电商的发展,从来不是一条既定路线。 不同阶段,行业面对的问题并不相同:有时是“能不能做”,有时是...
雷军马年的第一场直播定档2月2... 2月26日,@小米汽车发文: 2月27日(本周五)晚7点半,@雷军 准备了一场「关于安全」的直播,专...
马年茅台可以按农历生产日期选购... 2月26日,“小茅i茅台”发布,自2026年2月26日(丙午年正月初十)起,在i茅台APP“i购”板...
2026年港美上市政策双向优化... 2026年跨境资本市场政策环境持续优化,中国证监会境外上市备案机制不断完善,港交所与纳斯达克上市规则...
携程总裁、董事,双双辞职 2月26日,携程集团公布2025年第四季度及全年未经审计的财务业绩,同时公布一系列董事变动,范敏辞任...