spring kafka 文档:https://docs.spring.io/spring-kafka/docs/current/reference/html
org.springframework.kafka spring-kafka
创建topic命名为testtopic并指定2个分区。
./kafka-topics.sh --bootstrap-server 127.0.0.1:9092 --create --topic testtopic --partitions 2
application.yml配置文件信息
spring:application:name: kafka_springbootkafka:bootstrap-servers: 127.0.0.1:9092producer:#ACK机制,默认为1 (0,1,-1)acks: -1key-serializer: org.apache.kafka.common.serialization.StringSerializervalue-serializer: org.apache.kafka.common.serialization.StringSerializerproperties:# 自定义分区策略partitioner:class: org.bg.kafka.PartitionPolicyconsumer:#设置是否自动提交,默认为trueenable-auto-commit: falsekey-deserializer: org.apache.kafka.common.serialization.StringDeserializervalue-deserializer: org.apache.kafka.common.serialization.StringDeserializer#当一个新的消费组或者消费信息丢失后,在哪里开始进行消费。earliest:消费最早的消息。latest(默认):消费最近可用的消息。none:没有找到消费组消费数据时报异常。auto-offset-reset: latest#批量消费时每次poll的数量#max-poll-records: 5listener:# 当每一条记录被消费者监听器处理之后提交# RECORD,# 当每一批数据被消费者监听器处理之后提交# BATCH,# 当每一批数据被消费者监听器处理之后,距离上次提交时间大于TIME时提交# TIME,# 当每一批数据被消费者监听器处理之后,被处理record数量大于等于COUNT时提交# COUNT,# #TIME | COUNT 有一个条件满足时提交# COUNT_TIME,# #当每一批数据被消费者监听器处理之后,手动调用Acknowledgment.acknowledge()后提交:# MANUAL,# # 手动调用Acknowledgment.acknowledge()后立即提交# MANUAL_IMMEDIATE;ack-mode: manual#批量消费type: batch
更多配置信息查看KafkaProperties
@Component
public class Producer {@Autowiredprivate KafkaTemplate kafkaTemplate;public void send(String msg) {kafkaTemplate.send(new ProducerRecord("testtopic", "key111", msg));}
}
package org.bg.kafka;import org.apache.kafka.clients.producer.Partitioner;
import org.apache.kafka.common.Cluster;
import org.apache.kafka.common.PartitionInfo;
import org.apache.kafka.common.utils.Utils;import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.ThreadLocalRandom;
import java.util.concurrent.atomic.AtomicInteger;public class PartitionPolicy implements Partitioner {private final ConcurrentMap topicCounterMap = new ConcurrentHashMap();@Overridepublic int partition(String topic, Object key, byte[] keyBytes, Object value, byte[] valueBytes, Cluster cluster) {List partitions = cluster.partitionsForTopic(topic);int numPartitions = partitions.size();if (keyBytes == null) {int nextValue = this.nextValue(topic);List availablePartitions = cluster.availablePartitionsForTopic(topic);if (availablePartitions.size() > 0) {int part = Utils.toPositive(nextValue) % availablePartitions.size();return ((PartitionInfo)availablePartitions.get(part)).partition();} else {return Utils.toPositive(nextValue) % numPartitions;}} else {return Utils.toPositive(Utils.murmur2(keyBytes)) % numPartitions;}}private int nextValue(String topic) {AtomicInteger counter = (AtomicInteger)this.topicCounterMap.get(topic);if (null == counter) {counter = new AtomicInteger(ThreadLocalRandom.current().nextInt());AtomicInteger currentCounter = (AtomicInteger)this.topicCounterMap.putIfAbsent(topic, counter);if (currentCounter != null) {counter = currentCounter;}}return counter.getAndIncrement();}@Overridepublic void close() {}@Overridepublic void configure(Map map) {}
}
ProducerRecord有指定分区的构造方法,设置分区号
public ProducerRecord(String topic, Integer partition, K key, V value) 。
kafkaTemplate.send(new ProducerRecord("testtopic",1, "key111", msg));
/*** 自定义seek参考* https://docs.spring.io/spring-kafka/docs/current/reference/html/#seek*/
@Component
public class Consumer implements ConsumerSeekAware{@KafkaListener(topics = {"testtopic"},groupId = "test_group",clientIdPrefix = "bg",id = "testconsumer")public void onMessage(List> records, Acknowledgment ack){System.out.println(records.size());System.out.println(records.toString());ack.acknowledge();}@Overridepublic void onPartitionsAssigned(Map assignments, ConsumerSeekCallback callback) {//按照时间戳设置偏移callback.seekToTimestamp(assignments.keySet(),1670233826705L);//设置偏移到最近callback.seekToEnd(assignments.keySet());//设置偏移到最开始callback.seekToBeginning(assignments.keySet());//指定 offsetfor (TopicPartition topicPartition : assignments.keySet()) {callback.seek(topicPartition.topic(),topicPartition.partition(),0L);}}}
如代码所示,实现ConsumerSeekAware接口,设置offset几种方式: