Spring-Boot + Kafka实现生产+批量消费

本文是Springboot + Kafka实现消息写入和批量消费,属于一个学习demo,下面直接上代码。

POM依赖

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
<parent>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-parent</artifactId>
<version>2.1.2.RELEASE</version>
</parent>

<dependencies>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-test</artifactId>
</dependency>


<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-log4j2</artifactId>
</dependency>

<dependency>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
<version>1.16.18</version>
<scope>provided</scope>
</dependency>

<dependency>
<groupId>org.springframework.kafka</groupId>
<artifactId>spring-kafka</artifactId>
<version>2.2.4.RELEASE</version>
</dependency>
</dependencies>

配置文件

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
#============== kafka ===================
# 指定kafka 代理地址,可以多个
spring.kafka.bootstrap-servers=10.101.38.213:8092

#指定template默认topic id
spring.kafka.template.default-topic=topic-test


#=============== provider =======================
## 重试次数
spring.kafka.producer.retries=3
# 批量发送消息数量Bytes
spring.kafka.producer.batch-size=16384
# 32M批处理缓冲区
spring.kafka.producer.buffer-memory=33554432
spring.kafka.producer.properties.linger-ms=1
# 指定消息key和消息体的编解码方式
spring.kafka.producer.key-serializer=org.apache.kafka.common.serialization.StringSerializer
spring.kafka.producer.value-serializer=org.apache.kafka.common.serialization.StringSerializer



#=============== consumer =======================
# 指定默认消费者group id
spring.kafka.consumer.group-id=etl
# 最早未被消费的offset, 若设置为earliest,那么会从头开始读partition
spring.kafka.consumer.auto-offset-reset=earliest
# 批量一次最大拉取数据量
spring.kafka.consumer.max-poll-records=5
# 如果没有足够的数据立即满足“fetch.min.bytes”给出的要求,服务器在回答获取请求之前将阻塞的最长时间(以毫秒为单位)
spring.kafka.consumer.fetch-max-wait=10000
# 自动提交
spring.kafka.consumer.enable-auto-commit=false
spring.kafka.consumer.auto-commit-interval=10000
# 连接超时时间, 自定义
spring.kafka.consumer.session-timeout=15000

# 指定消息key和消息体的编解码方式
spring.kafka.consumer.key-deserializer=org.apache.kafka.common.serialization.StringDeserializer
spring.kafka.consumer.value-deserializer=org.apache.kafka.common.serialization.StringDeserializer

#=============== listener =======================
# 指定listener 容器中的线程数,用于提高并发量
spring.kafka.listener.concurrency=1
# 轮询消费者时使用的超时(以毫秒为单位)
spring.kafka.listener.poll-timeout=50000
# 是否开启批量消费,true表示批量消费
spring.kafka.listener.batch-listener=true
topic.name=springDemo

Kafka配置

如果不需要批量消费,只需KafkaTemplate进行produce, 则不需要该显式配置类,spring-boot的自动配置会根据配置文件帮我们创建好KafkaTemplate对象。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
package com.austin.brant.kafka.demo.config;

import java.util.HashMap;
import java.util.Map;

import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.apache.kafka.common.serialization.StringSerializer;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.kafka.annotation.EnableKafka;
import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory;
import org.springframework.kafka.config.KafkaListenerContainerFactory;
import org.springframework.kafka.core.DefaultKafkaConsumerFactory;
import org.springframework.kafka.core.DefaultKafkaProducerFactory;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.kafka.core.ProducerFactory;
import org.springframework.kafka.listener.ContainerProperties;

/**
* @author austin-brant
* @since 2019/7/15 21:45
*/
@Configuration
@EnableKafka
public class KafkaConfig {

@Value("${spring.kafka.bootstrap-servers}")
private String bootstrapServers;

@Value("${spring.kafka.producer.retries}")
private String producerRetries; // 生产者重试次数

@Value("${spring.kafka.producer.batch-size}")
private String producerBatchSize;

@Value("${spring.kafka.producer.properties.linger-ms}")
private String producerLingerMs;

@Value("${spring.kafka.producer.buffer-memory}")
private String producerBufferMemory;

@Value("${spring.kafka.consumer.enable-auto-commit}")
private Boolean autoCommit;

@Value("${spring.kafka.consumer.auto-commit-interval}")
private Integer autoCommitInterval;

@Value("${spring.kafka.consumer.group-id}")
private String groupId;

@Value("${spring.kafka.consumer.max-poll-records}")
private Integer maxPollRecords;

@Value("${spring.kafka.consumer.fetch-max-wait}")
private Integer maxPollIntervals;

@Value("${spring.kafka.consumer.auto-offset-reset}")
private String autoOffsetReset;

@Value("${spring.kafka.listener.concurrency}")
private Integer concurrency;

@Value("${spring.kafka.listener.poll-timeout}")
private Long pollTimeout;

@Value("${spring.kafka.consumer.session-timeout}")
private String sessionTimeout;

@Value("${spring.kafka.listener.batch-listener}")
private Boolean batchListener;

/**
* ProducerFactory
*/
@Bean
public ProducerFactory<String, String> producerFactory() {
Map<String, Object> configs = new HashMap<String, Object>(); //参数
configs.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
configs.put(ProducerConfig.RETRIES_CONFIG, producerRetries);
configs.put(ProducerConfig.BATCH_SIZE_CONFIG, producerBatchSize);
configs.put(ProducerConfig.LINGER_MS_CONFIG, producerLingerMs);
configs.put(ProducerConfig.BUFFER_MEMORY_CONFIG, producerBufferMemory);
configs.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
configs.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
return new DefaultKafkaProducerFactory<String, String>(configs);
}

/**
* KafkaTemplate
*/
@Bean
public KafkaTemplate<String, String> kafkaTemplate() {
return new KafkaTemplate<String, String>(producerFactory(), true);
}

/**
* 添加KafkaListenerContainerFactory,用于批量消费消息
*/
@Bean
public KafkaListenerContainerFactory<?> batchFactory() {
ConcurrentKafkaListenerContainerFactory<Object, Object> factory = new
ConcurrentKafkaListenerContainerFactory<>();
factory.setConsumerFactory(new DefaultKafkaConsumerFactory<Object, Object>(consumerConfigs()));
factory.setBatchListener(batchListener); // 开启批量监听
factory.setConcurrency(concurrency); // 并发消费线程
factory.getContainerProperties().setAckMode(ContainerProperties.AckMode.MANUAL_IMMEDIATE);
factory.getContainerProperties().setPollTimeout(pollTimeout);
return factory;
}

@Bean
public Map<String, Object> consumerConfigs() {
Map<String, Object> props = new HashMap<>();
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
props.put(ConsumerConfig.GROUP_ID_CONFIG, groupId);

props.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, maxPollRecords); // 批量消费的数量
props.put(ConsumerConfig.MAX_POLL_INTERVAL_MS_CONFIG, maxPollIntervals); //每一批读取间隔时间

props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, autoOffsetReset); // 最早未被消费的offset

props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, autoCommit); // 是否自动提交
props.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, autoCommitInterval); // 自动提交间隔

props.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, sessionTimeout);

// props.put(ConsumerConfig.REQUEST_TIMEOUT_MS_CONFIG, 180000);
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
return props;
}

}

生产者

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
package com.austin.brant.kafka.demo.provider;

import java.util.Date;
import java.util.List;

import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.kafka.support.SendResult;
import org.springframework.stereotype.Component;
import org.springframework.util.concurrent.ListenableFuture;
import org.springframework.util.concurrent.ListenableFutureCallback;

import com.austin.brant.kafka.demo.model.Message;

import lombok.extern.slf4j.Slf4j;

/**
* 生产者
*
* @author austin-brant
* @since 2019/7/15 19:39
*/
@Component
@Slf4j
public class KafkaProducer {

@Autowired
private KafkaTemplate<String, String> kafkaTemplate;

public void send(String topic, String message) {
ListenableFuture<SendResult<String, String>> future = kafkaTemplate.send(topic,
Message.builder()
.id(System.currentTimeMillis())
.msg(message)
.sendTime(new Date()).build().toString());
future.addCallback(new ListenableFutureCallback<SendResult<String, String>>() {
@Override
public void onFailure(Throwable throwable) {
log.error("send message [{}] to topic [{}] failed, ", message, topic);
}

@Override
public void onSuccess(SendResult<String, String> stringStringSendResult) {
log.info("send message [{}] to topic [{}] success, ", message, topic);
}
});
log.info("send message end");
}

public void batchSend(String topic, List<String> message) {
message.forEach(it -> kafkaTemplate.send(topic, it));
}
}

消费者

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
package com.austin.brant.kafka.demo.consumer;

import java.util.List;

import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.kafka.support.Acknowledgment;
import org.springframework.stereotype.Component;

import lombok.extern.slf4j.Slf4j;

/**
* 消费者
*
* @author austin-brant
* @since 2019/7/15 19:58
*/
@Slf4j
@Component
public class KafkaConsumer {

// @KafkaListener(topics = "${topic.name}")
// public void listen(ConsumerRecord<String, String> record) {
// consumer(record);
// }

@KafkaListener(topics = {"${topic.name}"}, containerFactory = "batchFactory", id = "consumer")
public void listen(List<ConsumerRecord<String, String>> records, Acknowledgment ack) {
log.info("batch listen size {}.", records.size());
try {
records.forEach(it -> consumer(it));
} finally {
ack.acknowledge(); //手动提交偏移量
}
}

/**
* 单条消费
*/
public void consumer(ConsumerRecord<String, String> record) {
log.info("主题:{}, 内容: {}", record.topic(), record.value());
}
}

完整代码:https://github.com/austin-brant/kafka-spring-boot-demo