序
本文主要解析一下spring for kafka对原生的kafka client consumer的封装与集成。
consumer工厂
spring-kafka-1.2.3.RELEASE-sources.jar!/org/springframework/kafka/core/DefaultKafkaConsumerFactory.java
protected KafkaConsumercreateKafkaConsumer(String clientIdSuffix) { if (!this.configs.containsKey(ConsumerConfig.CLIENT_ID_CONFIG) || clientIdSuffix == null) { return createKafkaConsumer(); } else { Map modifiedClientIdConfigs = new HashMap<>(this.configs); modifiedClientIdConfigs.put(ConsumerConfig.CLIENT_ID_CONFIG, modifiedClientIdConfigs.get(ConsumerConfig.CLIENT_ID_CONFIG) + clientIdSuffix); return createKafkaConsumer(modifiedClientIdConfigs); } } protected KafkaConsumer createKafkaConsumer(Map configs) { return new KafkaConsumer (configs, this.keyDeserializer, this.valueDeserializer); }
ConcurrentKafkaListenerContainerFactory
spring-kafka-1.2.3.RELEASE-sources.jar!/org/springframework/kafka/config/ConcurrentKafkaListenerContainerFactory.java
关于consumer的主要的封装在ConcurrentKafkaListenerContainerFactory这个里头,本身的KafkaConsumer是线程不安全的,无法并发操作,这里spring又在包装了一层,根据配置的spring.kafka.listener.concurrency来生成多个并发的KafkaMessageListenerContainer实例
spring-kafka-1.2.3.RELEASE-sources.jar!/org/springframework/kafka/listener/ConcurrentMessageListenerContainer.java
public class ConcurrentMessageListenerContainerextends AbstractMessageListenerContainer { private final ConsumerFactory consumerFactory; private final List > containers = new ArrayList<>(); @Override protected void doStart() { if (!isRunning()) { ContainerProperties containerProperties = getContainerProperties(); TopicPartitionInitialOffset[] topicPartitions = containerProperties.getTopicPartitions(); if (topicPartitions != null && this.concurrency > topicPartitions.length) { this.logger.warn("When specific partitions are provided, the concurrency must be less than or " + "equal to the number of partitions; reduced from " + this.concurrency + " to " + topicPartitions.length); this.concurrency = topicPartitions.length; } setRunning(true); for (int i = 0; i < this.concurrency; i++) { KafkaMessageListenerContainer container; if (topicPartitions == null) { container = new KafkaMessageListenerContainer<>(this.consumerFactory, containerProperties); } else { container = new KafkaMessageListenerContainer<>(this.consumerFactory, containerProperties, partitionSubset(containerProperties, i)); } if (getBeanName() != null) { container.setBeanName(getBeanName() + "-" + i); } if (getApplicationEventPublisher() != null) { container.setApplicationEventPublisher(getApplicationEventPublisher()); } container.setClientIdSuffix("-" + i); container.start(); this.containers.add(container); } } } //......}
KafkaMessageListenerContainer
spring-kafka-1.2.3.RELEASE-sources.jar!/org/springframework/kafka/listener/KafkaMessageListenerContainer.java
public class KafkaMessageListenerContainerextends AbstractMessageListenerContainer { private final ConsumerFactory consumerFactory; private final TopicPartitionInitialOffset[] topicPartitions; private ListenerConsumer listenerConsumer; private ListenableFuture listenerConsumerFuture; private GenericMessageListener listener; private GenericAcknowledgingMessageListener acknowledgingMessageListener; private String clientIdSuffix; @Override protected void doStart() { if (isRunning()) { return; } ContainerProperties containerProperties = getContainerProperties(); if (!this.consumerFactory.isAutoCommit()) { AckMode ackMode = containerProperties.getAckMode(); if (ackMode.equals(AckMode.COUNT) || ackMode.equals(AckMode.COUNT_TIME)) { Assert.state(containerProperties.getAckCount() > 0, "'ackCount' must be > 0"); } if ((ackMode.equals(AckMode.TIME) || ackMode.equals(AckMode.COUNT_TIME)) && containerProperties.getAckTime() == 0) { containerProperties.setAckTime(5000); } } Object messageListener = containerProperties.getMessageListener(); Assert.state(messageListener != null, "A MessageListener is required"); if (messageListener instanceof GenericAcknowledgingMessageListener) { this.acknowledgingMessageListener = (GenericAcknowledgingMessageListener ) messageListener; } else if (messageListener instanceof GenericMessageListener) { this.listener = (GenericMessageListener ) messageListener; } else { throw new IllegalStateException("messageListener must be 'MessageListener' " + "or 'AcknowledgingMessageListener', not " + messageListener.getClass().getName()); } if (containerProperties.getConsumerTaskExecutor() == null) { SimpleAsyncTaskExecutor consumerExecutor = new SimpleAsyncTaskExecutor( (getBeanName() == null ? "" : getBeanName()) + "-C-"); containerProperties.setConsumerTaskExecutor(consumerExecutor); } if (containerProperties.getListenerTaskExecutor() == null) { SimpleAsyncTaskExecutor listenerExecutor = new SimpleAsyncTaskExecutor( (getBeanName() == null ? "" : getBeanName()) + "-L-"); containerProperties.setListenerTaskExecutor(listenerExecutor); } this.listenerConsumer = new ListenerConsumer(this.listener, this.acknowledgingMessageListener); setRunning(true); this.listenerConsumerFuture = containerProperties .getConsumerTaskExecutor() .submitListenable(this.listenerConsumer); } @Override protected void doStop(final Runnable callback) { if (isRunning()) { this.listenerConsumerFuture.addCallback(new ListenableFutureCallback
每个KafkaMessageListenerContainer都自己创建一个ListenerConsumer,然后自己创建一个独立的kafka consumer,每个ListenerConsumer在线程池里头运行,这样来实现并发。
每个ListenerConsumer里头都有一个recordsToProcess队列,从原始的kafka consumer poll出来的记录会放到这个队列里头,然后有一个ListenerInvoker线程循环超时等待从recordsToProcess取出记录,然后交给应用程序的KafkaListener标注的方法去执行
private final class ListenerInvoker implements SchedulingAwareRunnable { private final CountDownLatch exitLatch = new CountDownLatch(1); private volatile boolean active = true; private volatile Thread executingThread; ListenerInvoker() { super(); } @Override public void run() { Assert.isTrue(this.active, "This instance is not active anymore"); if (ListenerConsumer.this.theListener instanceof ConsumerSeekAware) { ((ConsumerSeekAware) ListenerConsumer.this.theListener).registerSeekCallback(ListenerConsumer.this); } try { this.executingThread = Thread.currentThread(); while (this.active) { try { ConsumerRecordsrecords = ListenerConsumer.this.recordsToProcess.poll(1, TimeUnit.SECONDS); if (this.active) { if (records != null) { invokeListener(records); } else { if (ListenerConsumer.this.logger.isTraceEnabled()) { ListenerConsumer.this.logger.trace("No records to process"); } } } } catch (InterruptedException e) { if (!this.active) { Thread.currentThread().interrupt(); } else { ListenerConsumer.this.logger.debug("Interrupt ignored"); } } } } finally { this.active = false; this.exitLatch.countDown(); } } @Override public boolean isLongLived() { return true; } private void stop() { if (ListenerConsumer.this.logger.isDebugEnabled()) { ListenerConsumer.this.logger.debug("Stopping invoker"); } this.active = false; try { if (!this.exitLatch.await(getContainerProperties().getShutdownTimeout(), TimeUnit.MILLISECONDS) && this.executingThread != null) { if (ListenerConsumer.this.logger.isDebugEnabled()) { ListenerConsumer.this.logger.debug("Interrupting invoker"); } this.executingThread.interrupt(); } } catch (InterruptedException e) { if (this.executingThread != null) { this.executingThread.interrupt(); } Thread.currentThread().interrupt(); } if (ListenerConsumer.this.logger.isDebugEnabled()) { ListenerConsumer.this.logger.debug("Invoker stopped"); } } }
这里的invokeListener就是调用listener的onMessage方法
KafkaListener注解
这里我们来看看,标注KafkaListener的方法,最后是怎么包装成ListenerInvoker这个类里头调用的listener的
KafkaListenerAnnotationBeanPostProcessor
spring-kafka-1.2.3.RELEASE-sources.jar!/org/springframework/kafka/annotation/KafkaListenerAnnotationBeanPostProcessor.java
这个类会扫描bean的KafkaListener注解,然后将其信息注册到KafkaListenerEndpointRegistrar@Override public Object postProcessAfterInitialization(final Object bean, final String beanName) throws BeansException { if (!this.nonAnnotatedClasses.contains(bean.getClass())) { Class targetClass = AopUtils.getTargetClass(bean); CollectionclassLevelListeners = findListenerAnnotations(targetClass); final boolean hasClassLevelListeners = classLevelListeners.size() > 0; final List multiMethods = new ArrayList (); Map > annotatedMethods = MethodIntrospector.selectMethods(targetClass, new MethodIntrospector.MetadataLookup >() { @Override public Set inspect(Method method) { Set listenerMethods = findListenerAnnotations(method); return (!listenerMethods.isEmpty() ? listenerMethods : null); } }); if (hasClassLevelListeners) { Set methodsWithHandler = MethodIntrospector.selectMethods(targetClass, new ReflectionUtils.MethodFilter() { @Override public boolean matches(Method method) { return AnnotationUtils.findAnnotation(method, KafkaHandler.class) != null; } }); multiMethods.addAll(methodsWithHandler); } if (annotatedMethods.isEmpty()) { this.nonAnnotatedClasses.add(bean.getClass()); if (this.logger.isTraceEnabled()) { this.logger.trace("No @KafkaListener annotations found on bean type: " + bean.getClass()); } } else { // Non-empty set of methods for (Map.Entry > entry : annotatedMethods.entrySet()) { Method method = entry.getKey(); for (KafkaListener listener : entry.getValue()) { processKafkaListener(listener, method, bean, beanName); } } if (this.logger.isDebugEnabled()) { this.logger.debug(annotatedMethods.size() + " @KafkaListener methods processed on bean '" + beanName + "': " + annotatedMethods); } } if (hasClassLevelListeners) { processMultiMethodListeners(classLevelListeners, multiMethods, bean, beanName); } } return bean; } protected void processKafkaListener(KafkaListener kafkaListener, Method method, Object bean, String beanName) { Method methodToUse = checkProxy(method, bean); MethodKafkaListenerEndpoint endpoint = new MethodKafkaListenerEndpoint (); endpoint.setMethod(methodToUse); endpoint.setBeanFactory(this.beanFactory); processListener(endpoint, kafkaListener, bean, methodToUse, beanName); }protected void processListener(MethodKafkaListenerEndpoint endpoint, KafkaListener kafkaListener, Object bean, Object adminTarget, String beanName) { endpoint.setBean(bean); endpoint.setMessageHandlerMethodFactory(this.messageHandlerMethodFactory); endpoint.setId(getEndpointId(kafkaListener)); endpoint.setTopicPartitions(resolveTopicPartitions(kafkaListener)); endpoint.setTopics(resolveTopics(kafkaListener)); endpoint.setTopicPattern(resolvePattern(kafkaListener)); String group = kafkaListener.containerGroup(); if (StringUtils.hasText(group)) { Object resolvedGroup = resolveExpression(group); if (resolvedGroup instanceof String) { endpoint.setGroup((String) resolvedGroup); } } KafkaListenerContainerFactory factory = null; String containerFactoryBeanName = resolve(kafkaListener.containerFactory()); if (StringUtils.hasText(containerFactoryBeanName)) { Assert.state(this.beanFactory != null, "BeanFactory must be set to obtain container factory by bean name"); try { factory = this.beanFactory.getBean(containerFactoryBeanName, KafkaListenerContainerFactory.class); } catch (NoSuchBeanDefinitionException ex) { throw new BeanInitializationException("Could not register Kafka listener endpoint on [" + adminTarget + "] for bean " + beanName + ", no " + KafkaListenerContainerFactory.class.getSimpleName() + " with id '" + containerFactoryBeanName + "' was found in the application context", ex); } } this.registrar.registerEndpoint(endpoint, factory); }
KafkaListenerEndpointRegistrar
spring-kafka-1.2.3.RELEASE-sources.jar!/org/springframework/kafka/config/KafkaListenerEndpointRegistrar.java
/** * Register a new {@link KafkaListenerEndpoint} alongside the * {@link KafkaListenerContainerFactory} to use to create the underlying container. *The {@code factory} may be {@code null} if the default factory has to be * used for that endpoint. * @param endpoint the {@link KafkaListenerEndpoint} instance to register. * @param factory the {@link KafkaListenerContainerFactory} to use. */ public void registerEndpoint(KafkaListenerEndpoint endpoint, KafkaListenerContainerFactory
factory) { Assert.notNull(endpoint, "Endpoint must be set"); Assert.hasText(endpoint.getId(), "Endpoint id must be set"); // Factory may be null, we defer the resolution right before actually creating the container KafkaListenerEndpointDescriptor descriptor = new KafkaListenerEndpointDescriptor(endpoint, factory); synchronized (this.endpointDescriptors) { if (this.startImmediately) { // Register and start immediately this.endpointRegistry.registerListenerContainer(descriptor.endpoint, resolveContainerFactory(descriptor), true); } else { this.endpointDescriptors.add(descriptor); } } }
这里将KafkaListenerEndpoint包装为KafkaListenerEndpointDescriptor,注册到名为endpointDescriptors的KafkaListenerEndpointDescriptor集合中
KafkaListenerEndpointRegistrar
spring-kafka-1.2.3.RELEASE-sources.jar!/org/springframework/kafka/config/KafkaListenerEndpointRegistrar.java
public class KafkaListenerEndpointRegistrar implements BeanFactoryAware, InitializingBean { @Override public void afterPropertiesSet() { registerAllEndpoints(); } protected void registerAllEndpoints() { synchronized (this.endpointDescriptors) { for (KafkaListenerEndpointDescriptor descriptor : this.endpointDescriptors) { this.endpointRegistry.registerListenerContainer( descriptor.endpoint, resolveContainerFactory(descriptor)); } this.startImmediately = true; // trigger immediate startup } }}
这个类实现了InitializingBean接口的afterPropertiesSet方法(
初始化bean的时候执行
),在这个里头去根据endpointDescriptors去挨个调用registerListenerContainer注册
KafkaListenerEndpointRegistry
spring-kafka-1.2.3.RELEASE-sources.jar!/org/springframework/kafka/config/KafkaListenerEndpointRegistry.java
public class KafkaListenerEndpointRegistry implements DisposableBean, SmartLifecycle, ApplicationContextAware, ApplicationListener{ protected final Log logger = LogFactory.getLog(getClass()); //NOSONAR private final Map listenerContainers = new ConcurrentHashMap (); //...... public void registerListenerContainer(KafkaListenerEndpoint endpoint, KafkaListenerContainerFactory factory, boolean startImmediately) { Assert.notNull(endpoint, "Endpoint must not be null"); Assert.notNull(factory, "Factory must not be null"); String id = endpoint.getId(); Assert.hasText(id, "Endpoint id must not be empty"); synchronized (this.listenerContainers) { Assert.state(!this.listenerContainers.containsKey(id), "Another endpoint is already registered with id '" + id + "'"); MessageListenerContainer container = createListenerContainer(endpoint, factory); this.listenerContainers.put(id, container); if (StringUtils.hasText(endpoint.getGroup()) && this.applicationContext != null) { List containerGroup; if (this.applicationContext.containsBean(endpoint.getGroup())) { containerGroup = this.applicationContext.getBean(endpoint.getGroup(), List.class); } else { containerGroup = new ArrayList (); this.applicationContext.getBeanFactory().registerSingleton(endpoint.getGroup(), containerGroup); } containerGroup.add(container); } if (startImmediately) { startIfNecessary(container); } } } protected MessageListenerContainer createListenerContainer(KafkaListenerEndpoint endpoint, KafkaListenerContainerFactory factory) { MessageListenerContainer listenerContainer = factory.createListenerContainer(endpoint); if (listenerContainer instanceof InitializingBean) { try { ((InitializingBean) listenerContainer).afterPropertiesSet(); } catch (Exception ex) { throw new BeanInitializationException("Failed to initialize message listener container", ex); } } int containerPhase = listenerContainer.getPhase(); if (containerPhase < Integer.MAX_VALUE) { // a custom phase value if (this.phase < Integer.MAX_VALUE && this.phase != containerPhase) { throw new IllegalStateException("Encountered phase mismatch between container factory definitions: " + this.phase + " vs " + containerPhase); } this.phase = listenerContainer.getPhase(); } return listenerContainer; } @Override public void start() { for (MessageListenerContainer listenerContainer : getListenerContainers()) { startIfNecessary(listenerContainer); } } @Override public void stop() { for (MessageListenerContainer listenerContainer : getListenerContainers()) { listenerContainer.stop(); } }}
注册的时候将endpoint转换为MessageListenerContainer,然后放到listenerContainers的map当中
AbstractKafkaListenerContainerFactory
spring-kafka-1.2.3.RELEASE-sources.jar!/org/springframework/kafka/config/AbstractKafkaListenerContainerFactory.java
public C createListenerContainer(KafkaListenerEndpoint endpoint) { C instance = createContainerInstance(endpoint); if (this.autoStartup != null) { instance.setAutoStartup(this.autoStartup); } if (this.phase != null) { instance.setPhase(this.phase); } if (this.applicationEventPublisher != null) { instance.setApplicationEventPublisher(this.applicationEventPublisher); } if (endpoint.getId() != null) { instance.setBeanName(endpoint.getId()); } if (endpoint instanceof AbstractKafkaListenerEndpoint) { AbstractKafkaListenerEndpointaklEndpoint = (AbstractKafkaListenerEndpoint ) endpoint; if (this.recordFilterStrategy != null) { aklEndpoint.setRecordFilterStrategy(this.recordFilterStrategy); } if (this.ackDiscarded != null) { aklEndpoint.setAckDiscarded(this.ackDiscarded); } if (this.retryTemplate != null) { aklEndpoint.setRetryTemplate(this.retryTemplate); } if (this.recoveryCallback != null) { aklEndpoint.setRecoveryCallback(this.recoveryCallback); } if (this.batchListener != null) { aklEndpoint.setBatchListener(this.batchListener); } } endpoint.setupListenerContainer(instance, this.messageConverter); initializeContainer(instance); return instance; }
这里主要看这个setupMessageListener方法
spring-kafka-1.2.3.RELEASE-sources.jar!/org/springframework/kafka/config/AbstractKafkaListenerEndpoint.java
private void setupMessageListener(MessageListenerContainer container, MessageConverter messageConverter) { Object messageListener = createMessageListener(container, messageConverter); Assert.state(messageListener != null, "Endpoint [" + this + "] must provide a non null message listener"); if (this.retryTemplate != null) { if (messageListener instanceof AcknowledgingMessageListener) { messageListener = new RetryingAcknowledgingMessageListenerAdapter<>( (AcknowledgingMessageListener) messageListener, this.retryTemplate, this.recoveryCallback); } else { messageListener = new RetryingMessageListenerAdapter<>((MessageListener ) messageListener, this.retryTemplate, (RecoveryCallback
这个messageListener包含了原始endpoint携带的bean以及method转换成的InvocableHandlerMethod,然后注入到MessageListenerContainer(
ConcurrentMessageListenerContainer
),然后这里就跟上头的ConcurrentMessageListenerContainer这个衔接上,根据配置的spring.kafka.listener.concurrency来生成多个并发的KafkaMessageListenerContainer实例
MethodKafkaListenerEndpoint
createMessageListener这个方法将endpoint包含的原始标注KafkaListener注解的bean以及方法,包装为InvocableHandlerMethod,注入到MessagingMessageListenerAdapter当中
public class MethodKafkaListenerEndpointextends AbstractKafkaListenerEndpoint { private Object bean; private Method method; private MessageHandlerMethodFactory messageHandlerMethodFactory; //...... @Override protected MessagingMessageListenerAdapter createMessageListener(MessageListenerContainer container, MessageConverter messageConverter) { Assert.state(this.messageHandlerMethodFactory != null, "Could not create message listener - MessageHandlerMethodFactory not set"); MessagingMessageListenerAdapter messageListener = createMessageListenerInstance(messageConverter); messageListener.setHandlerMethod(configureListenerAdapter(messageListener)); return messageListener; } /** * Create a {@link HandlerAdapter} for this listener adapter. * @param messageListener the listener adapter. * @return the handler adapter. */ protected HandlerAdapter configureListenerAdapter(MessagingMessageListenerAdapter messageListener) { InvocableHandlerMethod invocableHandlerMethod = this.messageHandlerMethodFactory.createInvocableHandlerMethod(getBean(), getMethod()); return new HandlerAdapter(invocableHandlerMethod); }}
这个类会将原始的bean跟方法包装为InvocableHandlerMethod这个类,然后注入到MessagingMessageListenerAdapter当中
小结
- 对于生产者来说,封装KafkaProducer到KafkaTemplate相对简单
-
对于消费者来说,由于spring是采用注解的形式去标注消息处理方法的,所以这里稍微费劲一点:
- 先在KafkaListenerAnnotationBeanPostProcessor中扫描bean,然后注册到KafkaListenerEndpointRegistrar
- 而KafkaListenerEndpointRegistrar在afterPropertiesSet的时候去创建MessageListenerContainer
- messageListener包含了原始endpoint携带的bean以及method转换成的InvocableHandlerMethod
- ConcurrentMessageListenerContainer这个衔接上,根据配置的spring.kafka.listener.concurrency来生成多个并发的KafkaMessageListenerContainer实例
- 每个KafkaMessageListenerContainer都自己创建一个ListenerConsumer,然后自己创建一个独立的kafka consumer,每个ListenerConsumer在线程池里头运行,这样来实现并发
- 每个ListenerConsumer里头都有一个recordsToProcess队列,从原始的kafka consumer poll出来的记录会放到这个队列里头,
- 然后有一个ListenerInvoker线程循环超时等待从recordsToProcess取出记录,然后调用messageListener的onMessage方法(
即KafkaListener注解标准的方法
)
ListenerConsumer是重点,里头还有包括offset的提交,这里改天再详解一下。