生产者
说明
KafkaTemplate封装了一个生成器,并提供了方便的方法来发送数据到kafka主题。 提供了异步和同步方法,异步方法返回一个Future。
其构造方法有:
1 | ListenableFuture<SendResult<K, V>> sendDefault(V data); |
前3个方法需要向Temple提供默认主题
配置
使用Producer配置类
1 |
|
示例
1 |
|
消费者
说明
可以通过配置MessageListenerContainer并提供MessageListener或通过使用@KafkaListener注释来接收消息。
MessageListenerContainer有两个实现:
配置
使用Consumer配置类
1 |
|
消息接收
Java实现
直接使用kafka0.10 client去收发消息
1 | @Test |
使用MessageListener接口
继承MessageListener接口
1 | public class CustomMessageListener implements MessageListener<Integer, String> { |
使用@KafkaListener注解
1 | @KafkaListener(id = "foo", topics = "myTopic") |
总结
- 对于生产者来说,封装KafkaProducer到KafkaTemplate相对简单
- 对于消费者来说,由于spring是采用注解的形式去标注消息处理方法
- 先在KafkaListenerAnnotationBeanPostProcessor中扫描bean,然后注册到KafkaListenerEndpointRegistrar
- 而KafkaListenerEndpointRegistrar在afterPropertiesSet的时候去创建MessageListenerContainer
- messageListener包含了原始endpoint携带的bean以及method转换成的InvocableHandlerMethod
- ConcurrentMessageListenerContainer这个衔接上,根据配置的spring.kafka.listener.concurrency来生成多个并发的KafkaMessageListenerContainer实例
- 每个KafkaMessageListenerContainer都自己创建一个ListenerConsumer,然后自己创建一个独立的kafka consumer,每个ListenerConsumer在线程池里头运行,这样来实现并发
- 每个ListenerConsumer里头都有一个recordsToProcess队列,从原始的kafka consumer poll出来的记录会放到这个队列里头,
- 然后有一个ListenerInvoker线程循环超时等待从recordsToProcess取出记录,然后调用messageListener的onMessage方法(即KafkaListener注解标准的方法)
