这里通过两种方式
springboot初始化bena源码 :
@Configuration(proxyBeanMethods = false)
@ConditionalOnClass(KafkaTemplate.class)
@EnableConfigurationProperties(KafkaProperties.class)
@Import({ KafkaAnnotationDrivenConfiguration.class, KafkaStreamsAnnotationDrivenConfiguration.class })
/**
*初始化consumer工厂类 创建Consumer
*/
public class KafkaAutoConfiguration {
。。。。
@Bean
@ConditionalOnMissingBean(ConsumerFactory.class)
public ConsumerFactory<?, ?> kafkaConsumerFactory() {
return new DefaultKafkaConsumerFactory<>(this.properties.buildConsumerProperties());
}
。。。。
}
/**
*注入并行监听容器
*/
@Configuration(proxyBeanMethods = false)
@ConditionalOnClass(EnableKafka.class)
class KafkaAnnotationDrivenConfiguration {
。。。。
/**
*创建MessageListenerContainer 工厂类
*/
@Bean
@ConditionalOnMissingBean(name = "kafkaListenerContainerFactory")
ConcurrentKafkaListenerContainerFactory<?, ?> kafkaListenerContainerFactory(
ConcurrentKafkaListenerContainerFactoryConfigurer configurer,
ConsumerFactory<Object, Object> kafkaConsumerFactory) {
ConcurrentKafkaListenerContainerFactory<Object, Object> factory = new ConcurrentKafkaListenerContainerFactory<>();
configurer.configure(factory, kafkaConsumerFactory);
return factory;
}
@Configuration(proxyBeanMethods = false)
@EnableKafka
@ConditionalOnMissingBean(name = KafkaListenerConfigUtils.KAFKA_LISTENER_ANNOTATION_PROCESSOR_BEAN_NAME)
static class EnableKafkaConfiguration {
}
}
对于kafkaListenerContainerFactory 的用于创建MessageListenerContainer 消息监听容器类
MessageListenerContainer有两个实现类KafkaMessageListenerContainer,ConcurrentMessageListenerContainer
KafkaMessageListenerContainer:以单线程的方式消费topic中所有partition数据
ConcurrentMessageListenerContainer:以并行的方式消费topic中所有partition数据(开启多线程),每一个线程会对应一个partition所有建议对于ConcurrentMessageListenerContainer的并发数与topic的partition数保持一致
MessageListenerContainer的作用在于对于MessageListener的管理
MessageListener这个接口的作用又是什么呢?用于消费topic中的数据,并对offset进行管理(2.3以后默认是手动提交)
springboot 对于MessageListener又默认8中实现:
1:public interface MessageListener<K, V> {
void onMessage(ConsumerRecord<K, V> data);
}
2:public interface AcknowledgingMessageListener<K, V> {
void onMessage(ConsumerRecord<K, V> data, Acknowledgment acknowledgment);
}
3:public interface ConsumerAwareMessageListener<K, V> extends MessageListener<K, V> {
void onMessage(ConsumerRecord<K, V> data, Consumer<?, ?> consumer);
}
4:public interface AcknowledgingConsumerAwareMessageListener<K, V> extends MessageListener<K, V> {
void onMessage(ConsumerRecord<K, V> data, Acknowledgment acknowledgment, Consumer<?, ?> consumer);
}
5:public interface BatchMessageListener<K, V> {
void onMessage(List<ConsumerRecord<K, V>> data);
}
6:public interface BatchAcknowledgingMessageListener<K, V> {
void onMessage(List<ConsumerRecord<K, V>> data, Acknowledgment acknowledgment);
}
7:public interface BatchConsumerAwareMessageListener<K, V> extends BatchMessageListener<K, V> {
void onMessage(List<ConsumerRecord<K, V>> data, Consumer<?, ?> consumer);
}
8:public interface BatchAcknowledgingConsumerAwareMessageListener<K, V> extends BatchMessageListener<K, V> {
void onMessage(List<ConsumerRecord<K, V>> data, Acknowledgment acknowledgment, Consumer<?, ?> consumer);
}
public KafkaMessageListenerContainer messageListenerContainer1(@Qualifier("consumerFactory") ConsumerFactory<Integer,String> consumerFactory){
//创建topic分区
TopicPartition topicPartition1 = new TopicPartition("springboot_test_topic",0);
//设置需要消费的分区偏移量
TopicPartitionOffset tpo1 = new TopicPartitionOffset(topicPartition1,0L,TopicPartitionOffset.SeekPosition.BEGINNING);
//创建topic分区
TopicPartition topicPartition2 = new TopicPartition("springboot_test_topic",1);
//设置需要消费的分区偏移量
TopicPartitionOffset tpo2 = new TopicPartitionOffset(topicPartition2,0L,TopicPartitionOffset.SeekPosition.BEGINNING);
//创建topic分区
TopicPartition topicPartition3 = new TopicPartition("springboot_test_topic",2);
//设置需要消费的分区偏移量
TopicPartitionOffset tpo3 = new TopicPartitionOffset(topicPartition3,0L,TopicPartitionOffset.SeekPosition.BEGINNING);
//设置topic的主题和分区 可以指定从哪个分区哪个offset开始消费
//容器配置
ContainerProperties containerProperties = new ContainerProperties(tpo1,tpo2,tpo3);
/**
* 指定
*/
containerProperties.setAckMode(ContainerProperties.AckMode.MANUAL_IMMEDIATE);
containerProperties.setClientId("springboot_test_topic_id");
containerProperties.setGroupId("springboot_test_topic_group");
containerProperties.setAckTime(6000);
containerProperties.getKafkaConsumerProperties().setProperty("enable.auto.commit","false");
containerProperties.setPollTimeout(3000);
containerProperties.setAckOnError(false);
//绑定message 消息监听 手动提交 单条消费
containerProperties.setMessageListener((AcknowledgingMessageListener<Integer, String>) (consumerRecord, acknowledgment) -> {
System.out.println("partition============>"+consumerRecord.partition());
System.out.println("offset============>"+consumerRecord.offset());
System.out.println("key============>"+consumerRecord.key());
System.out.println("value============>"+consumerRecord.value());
//确定消费完成 commit offset
acknowledgment.acknowledge();
});
//构建kafka消费者监听容器
KafkaMessageListenerContainer<Integer,String> kafkaMessageListenerContainer =
new KafkaMessageListenerContainer<Integer,String>(consumerFactory,containerProperties);
//启动消费监听
kafkaMessageListenerContainer.start();
return kafkaMessageListenerContainer;
}
public ConcurrentMessageListenerContainer messageListenerContainer(@Qualifier("consumerFactory") ConsumerFactory<Integer,String> consumerFactory){
String topicName = "springboot_test_topic";
//创建topic分区
TopicPartition topicPartition1 = new TopicPartition(topicName,0);
//设置需要消费的分区偏移量
TopicPartitionOffset tpo1 = new TopicPartitionOffset(topicPartition1,0L,TopicPartitionOffset.SeekPosition.BEGINNING);
//创建topic分区
TopicPartition topicPartition2 = new TopicPartition(topicName,1);
//设置需要消费的分区偏移量
TopicPartitionOffset tpo2 = new TopicPartitionOffset(topicPartition2,0L,TopicPartitionOffset.SeekPosition.BEGINNING);
//创建topic分区
TopicPartition topicPartition3 = new TopicPartition(topicName,2);
//设置需要消费的分区偏移量
TopicPartitionOffset tpo3 = new TopicPartitionOffset(topicPartition3,0L,TopicPartitionOffset.SeekPosition.BEGINNING);
//ConsumerFactory设置topic的主题和分区
//容器配置
ContainerProperties containerProperties = new ContainerProperties(tpo1,tpo2,tpo3);
containerProperties.setAckMode(ContainerProperties.AckMode.MANUAL_IMMEDIATE);
containerProperties.setClientId("springboot_test_topic_concurrent_id");
containerProperties.setGroupId("springboot_test_topic_concurrent_group");
containerProperties.setAckTime(6000);
containerProperties.getKafkaConsumerProperties().setProperty("enable.auto.commit","false");
containerProperties.setPollTimeout(3000);
containerProperties.setAckOnError(false);
//绑定message 消息监听 自定提交 单条消费 存在丢失数据 以及重复消费的问题
containerProperties.setMessageListener((AcknowledgingMessageListener<Integer, String>) (consumerRecord, acknowledgment) -> {
System.out.println("partition============>"+consumerRecord.partition());
System.out.println("offset============>"+consumerRecord.offset());
System.out.println("key============>"+consumerRecord.key());
System.out.println("value============>"+consumerRecord.value());
//确定消费完成 commit offset
acknowledgment.acknowledge();
});
containerProperties.getTopicPartitionsToAssign();
ConcurrentMessageListenerContainer<Integer,String> cmlc = new ConcurrentMessageListenerContainer(consumerFactory,containerProperties);
//是否设置虽容器自动启动
cmlc.setAutoStartup(true);
cmlc.setBeanName("concurrentMessageListenerContainer");
//设置并发数
cmlc.setConcurrency(3);
//启动消费监听
cmlc.start();
return cmlc;
}
上述两个demo都是通过手动commit offset(建议使用这种方式,保证数据的一致性),通过containerProperties.setAckMode(ContainerProperties.AckMode.MANUAL_IMMEDIATE);AckMode设置commit offset方式,有以下几种:
RECORD
: 当listen每处理一条就commit offset 自动提交
BATCH
: 在listen处理完poll()返回的所有记录后提交偏移量,针对于批处理。自动提交
TIME
: 每次间隔ackTime的时间去commit
COUNT
: 累积达到ackCount次的ack去commit
COUNT_TIME
: TIME和COUNT的都满足则执行提交.
MANUAL
: 需要手动确认消息(Acknowledgement.acknowledge()),其它机制与BATCH机制相同
MANUAL_IMMEDIATE
: 当listenr中调用Acknowledgement.acknowledge()方法时立即提交偏移量
测试结果
@Target({ElementType.TYPE, ElementType.METHOD, ElementType.ANNOTATION_TYPE})
@Retention(RetentionPolicy.RUNTIME)
@MessageMapping
@Documented
@Repeatable(KafkaListeners.class)
public @interface KafkaListener {
//配置消费者id
String id() default "";
//设置容器工厂类 默认为ConcurrentKafkaListenerContainerFactory 并行
String containerFactory() default "";
//设置需要消费的topic名 支持多topic消费
String[] topics() default {};
//支持topic名正则设置
String topicPattern() default "";
//topic分区类,设置主题和分区(以及可选的初始offset)
TopicPartition[] topicPartitions() default {};
//容器组如果该值被设置,将该容器添加到一个集合中并保存到该bean中,在这个容器集合中可以开启或关闭容器
String containerGroup() default "";
//提供在消费message时报错时Handler类
String errorHandler() default "";
//消费者组id
String groupId() default "";
//2.x ,该id属性(如果存在)将用作Kafka消费者group.id属性覆盖消费者工厂中的已配置属性
boolean idIsGroup() default true;
//客户端id前缀
String clientIdPrefix() default "";
//在spring容器对象中的bean名
String beanRef() default "__listener";
//消费并行度 此值最好等于topic的partition数使用 ${listen.concurrency:3}
String concurrency() default "";
//是否设置为自动启动 "${listen.auto.start:true}" 容器需要实现SmartLifecycle
String autoStartup() default "";
//设置kafka消费者配置
//支持k=v,k:v,k v 模式 如max.poll.interval.ms=3000
String[] properties() default {};
}
@KafkaListener(id = "springboot_test_topic_id"
//,topics = {"springboot_test_topic"}
,topicPartitions = {
//设置 消费topic springboot_test_topic partition为0数据,1分区起始offset为0 注意:partitions或partitionOffsets属性都可以指定分区,但不能两个都指定。
@TopicPartition(topic = "springboot_test_topic",partitionOffsets = {
@PartitionOffset(partition = "0",initialOffset = "0"),
@PartitionOffset(partition = "1",initialOffset = "0"),
@PartitionOffset(partition = "2",initialOffset = "0"),
})
}
,containerFactory = "kafkaListenerContainerFactory"
,clientIdPrefix = "_listener"
,concurrency = "${listen.concurrency:3}"
,idIsGroup = false
,groupId ="springboot_test_topic-group"
,beanRef = "springboot_test_topic"
,autoStartup = "${listen.auto.start:true}"
// ,errorHandler = "validationErrorHandler"
,properties = {"max.poll.interval.ms=3000",
"max.poll.records=30",
"enable.auto.commit=false",
"bootstrap.servers=localhost:9092"
}
)
public void listen(ConsumerRecord<Integer, String> record
//如果需要使用Acknowledgment 需要在factory中指定ACKmodel
// factory.getContainerProperties().setAckMode(ContainerProperties.AckMode.MANUAL_IMMEDIATE);
, Acknowledgment ack,
@Header(KafkaHeaders.GROUP_ID) String groupId,
@Header(KafkaHeaders.RECEIVED_PARTITION_ID) int partition,
@Header(KafkaHeaders.OFFSET) int offset,
@Header(KafkaHeaders.RECEIVED_TOPIC) String topic,
@Header(KafkaHeaders.RECEIVED_TIMESTAMP) long timestamp) {
//消息消息
System.out.println("=======key=========>"+record.key());
System.out.println("=======value=========>"+record.value());
//
System.out.println("=======topic=========>"+topic);
System.out.println("=======groupId=========>"+groupId);
System.out.println("=======partition=========>"+partition);
System.out.println("=======offset=========>"+offset);
System.out.println("=======timestamp=========>"+timestamp);
//手动确定 提交offset
ack.acknowledge();
}
KafkaHeaders.OFFSET // 获取当前信息的offset
KafkaHeaders.RECEIVED_MESSAGE_KEY // 获取message key信息
KafkaHeaders.RECEIVED_TOPIC //获取当前信息所在topic信息
KafkaHeaders.RECEIVED_PARTITION_ID //获取当前信息所在partitionId信息
KafkaHeaders.RECEIVED_TIMESTAMP //获取当前信息的时间戳
KafkaHeaders.TIMESTAMP_TYPE //获取当前信息的时间戳类型类型
KafkaHeaders.GROUP_ID //获取当前消费组id
第一步:构建consumer批次监听容器
/**
* 构建并行消费监听容器批次处理
* @param consumerFactory
* @return
*/
@Bean
public KafkaListenerContainerFactory<ConcurrentMessageListenerContainer<Integer,String>> bitchFactory(ConsumerFactory<Integer,String> consumerFactory){
//构建kafka并行消费监听类工厂类 此类通过topic名称创建该topic消费监听
ConcurrentKafkaListenerContainerFactory<Integer,String> concurrentKafkaListenerContainerFactory =
new ConcurrentKafkaListenerContainerFactory<>();
//可通过注解的方式进行设置
concurrentKafkaListenerContainerFactory.setConsumerFactory(consumerFactory);
//设置拉取时间超时数
concurrentKafkaListenerContainerFactory.getContainerProperties().setPollTimeout(3000);
//是否开启 批次处理
concurrentKafkaListenerContainerFactory.setBatchListener(true);
//设置ack模型机制 当发生error时 不同处理机制针对与offset有不同处理机制
concurrentKafkaListenerContainerFactory.getContainerProperties().setAckMode(ContainerProperties.AckMode.MANUAL_IMMEDIATE);
return concurrentKafkaListenerContainerFactory;
}
/**
* 注册监听bean
* @return
*/
@Bean("bitchListeners")
public BitchListeners bitchListeners(){
return new BitchListeners("springboot_test_topic");
}
第二步:使用
/**
* 使用SpEL表达式支持特殊的标记:__bean名称
* @author fangyuan
*/
public class BitchListeners {
/**
* topic名称
*
*/
private final String topic;
public BitchListeners(String topic) {
this.topic = topic;
}
public String getTopic() {
return this.topic;
}
/**
* 批处理
*
*/
@KafkaListener(id = "bitchConsumer",
topicPartitions = {
//设置 消费topic springboot_test_topic partition为0数据,1分区起始offset为0 注意:partitions或partitionOffsets属性都可以指定分区,但不能两个都指定。
@TopicPartition(topic = "#{bitchListeners.topic}",partitionOffsets = {
@PartitionOffset(partition = "0",initialOffset = "0"),
@PartitionOffset(partition = "1",initialOffset = "0"),
@PartitionOffset(partition = "2",initialOffset = "0"),
})
}
, containerFactory = "bitchFactory"
// , clientIdPrefix = ""
, idIsGroup = false
, concurrency = "${listen.concurrency:3}"
, groupId = "bitchConsumer-group"
, beanRef = "bitchConsumer_"
, autoStartup = "${listen.auto.start:true}"
, properties = {
"max.poll.interval.ms=3000",
//设置一个批次拉取最大消息数
"max.poll.records=5"})
public void listen(List<ConsumerRecord<Integer, String>> records, Acknowledgment ack) {
System.out.println("该批次拉取消息数====================>"+records.size());
//遍历消息
records.forEach(record->{
//消息消息
System.out.print("=======key=========>"+record.key());
System.out.print("=======value=========>"+record.value());
System.out.print("=======topic=========>"+record.partition());
System.out.print("=======topic=========>"+record.offset());
System.out.println();
});
//手动确定 提交offset
ack.acknowledge();
}
//
// /**
// * @param messages
// */
// @
// KafkaListener(id = "bitchConsumer2"
// , topics = {"__listener.topic"}
// , containerFactory = "bitchFactory"
// , idIsGroup = false
// , concurrency = "${listen.concurrency:3}"
// , groupId = "bitchConsumer-1")
// public void listMsg(List<Message<String>> messages) {
// //
// }
/**
* 添加批次ack确认机制
*
* @param messages
* @param ack
*/
public void listMsgAck(List<Message<String>> messages, Acknowledgment ack) {
}
/**
* @param messages
* @param ack
* @param consumer
*/
public void listMsgAckConsumer(List<Message<String>> messages, Acknowledgment ack,
Consumer<Integer, String> consumer) {
//
}
/**
* 如果使用使用ConsumerRecord 接收消息方法入参数只能为ConsumerRecord (如果启用ack机制可以包含Acknowledgment)
* @param list
*/
public void listCRs(List<ConsumerRecord<Integer, String>> list) {
}
}
Demo项目github地址:https://github.com/fangyuan94/kafkaDemo
手机扫一扫
移动阅读更方便
你可能感兴趣的文章