Kafka集群部署以及与SpringBoot整合发布订阅
============================
1. 业务背景
=======
我们的项目中设备数据接收系统从MQTT消息服务订阅到设备数据之后,需要推到大数据平台,以便于大数据平台进行实时计算生产数据报告,给用户展示。所以我们数据接收系统跟大数据平台通信我们需要使用消息队列来异步解耦。
现在主流的MQ有2种:Kafka和RocketMQ。毫无疑问我们项目接收的设备数据量大、频率快,同时后面要基于设备上报的数据做流式数据分析处理,很显然使用Kafka更合适。因为kafka更适合大数据量日志数据收集、流式处理分析,对接Flink等大数据处理工具分析处理生成报告,同时吞吐量高,适用场景比较符合我们的业务场景。RocketMQ适用于可靠性要求高,消息顺序和事务消息、延迟消息等业务处理场景。同时业界主流物联网设备数据收集分析处理在消息队列选型上面也是使用Kafka,所以最终我们选择Kafka作为我们业务场景中的消息队列。
2. 部署Kafka集群
我们部署Kraft模式集群。使用3个节点来部署集群,节点规划如下:
| 节点 | ip | 角色 |
| — | — | — |
| 节点1 | 192.168.56.200 | controller,broker |
| 节点2 | 192.168.56.202 | controller,broker |
| 节点3 | 192.168.56.203 | controller,broker |
先下载kafka,我们下载kafka_2.13-3.1.0.tgz这个版本。
- 修改配置文件
vi config/kraft/server.properties
主要配置内容如下:
- 配置角色
- 节点id
- 配置controller列表
- 配置log目录(数据存储目录)
- 本节点监听器地址
- 对客户端公布的地址
节点1的配置
# 节点角色
process.roles=broker,controller
# 节点ID,和节点所承担的角色想关联
node.id=1
# controller列表
controller.quorum.voters=1@192.168.56.200:9093,2@192.168.56.202:9093,3@192.168.56.203:9093
# 本节点监听器地址
listeners=PLAINTEXT://192.168.56.200:9092,CONTROLLER://192.168.56.200:9093
# 对客户端公布的地址
advertised.listeners=PLAINTEXT://192.168.56.200:9092
# 日志文件的路径
log.dirs=/home/xiangguo/kafka_2.13-3.1.0/datas
节点2配置
process.roles=broker,controller
node.id=2
controller.quorum.voters=1@192.168.56.200:9093,2@192.168.56.202:9093,3@192.168.56.203:9093
listeners=PLAINTEXT://192.168.56.202:9092,CONTROLLER://192.168.56.202:9093
advertised.listeners=PLAINTEXT://192.168.56.202:9092
log.dirs=/home/xiangguo/kafka_2.13-3.1.0/datas
节点3配置
process.roles=broker,controller
node.id=3
controller.quorum.voters=1@192.168.56.200:9093,2@192.168.56.202:9093,3@192.168.56.203:9093
listeners=PLAINTEXT://192.168.56.203:9092,CONTROLLER://192.168.56.203:9093
advertised.listeners=PLAINTEXT://192.168.56.203:9092
log.dirs=/home/xiangguo/kafka_2.13-3.1.0/datas
最后在每个节点创建数据目录:/home/xiangguo/kafka_2.13-3.1.0/datas
- 格式化存储目录
生成唯一id
bin/kafka-storage.sh random-uuid
当然,我们需要安装jdk,不然执行上面这个脚本会报错。
# 查看Java相关套件
rpm -qa | grep java
# 显示如下
javapackages-tools-3.4.1-11.el7.noarch
python-javapackages-3.4.1-11.el7.noarch
tzdata-java-2024a-1.el7.noarch
java-1.8.0-openjdk-headless-1.8.0.412.b08-1.el7_9.x86_64
# 卸载
rpm -e --nodeps java-1.8.0-openjdk-headless-1.8.0.412.b08-1.el7_9.x86_64
# 系统自带版本已经被卸载
[root@xg-200 kafka_2.13-3.1.0]# java -version
-bash: /usr/bin/java: 没有那个文件或目录
# 下载jdk8然后安装
jdk-8u361-linux-x64.tar.gz
# 解压到目录/usr/local/java/jdk1.8.0_361
配置环境变量
vi /etc/profile
# java环境变量
export JAVA_HOME=/usr/local/java/jdk1.8.0_361
export CLASSPATH=.:${JAVA_HOME}/jre/lib/rt.jar:${JAVA_HOME}/lib/dt.jar:${JAVA_HOME}/lib/tools.jar
export PATH=$PATH:${JAVA_HOME}/bin
使修改生效
source /etc/profile
查看版本
[root@xg-200 xiangguo]# java -version
java version "1.8.0_361"
Java(TM) SE Runtime Environment (build 1.8.0_361-b09)
Java HotSpot(TM) 64-Bit Server VM (build 25.361-b09, mixed mode)
再次执行生产唯一id,看到生成了唯一id
[root@xg-200 kafka_2.13-3.1.0]# bin/kafka-storage.sh random-uuid
sQNj3XoUQv6BHx_KCpnx4w
使用此id格式化存储目录(每个节点都要执行)
bin/kafka-storage.sh format -t sQNj3XoUQv6BHx_KCpnx4w -c /home/xiangguo/kafka_2.13-3.1.0/config/kraft/server.properties
# 看到格式化完成
Formatting /home/xiangguo/kafka_2.13-3.1.0/datas
同理,在另外两个节点执行格式化。
- 配置kafka环境变量
vi /etc/profile
# KAFKA环境变量
export KAFKA_HOME=/home/xiangguo/kafka_2.13-3.1.0
export PATH=$PATH:$KAFKA_HOME/bin
使环境变量生效
source /etc/profile
另外两个节点,同理设置环境变量。
- 创建启动/停止脚本
vi /usr/bin/kafka
输入内容如下
#! /bin/bash
if [ $# -lt 1 ]; then
echo "No Args Input..."
exit
fi
case $1 in
"start") {
for i in 192.168.56.200 192.168.56.202 192.168.56.203; do
echo " --------启动 $i Kafka-------"
ssh $i "source /etc/profile;kafka-server-start.sh -daemon /home/xiangguo/kafka_2.13-3.1.0/config/kraft/server.properties"
done
} ;;
"stop") {
for i in 192.168.56.200 192.168.56.202 192.168.56.203; do
echo " --------停止 $i Kafka-------"
ssh $i "source /etc/profile;kafka-server-stop.sh"
done
} ;;
*)
echo "Input Args Error..."
;;
esac
然后添加执行权限
chmod +x /usr/bin/kafka
启动/停止集群
kafka start/stop
启动kafka节点之后,过段时间服务会挂掉,如下9092端口都没有了。
[root@xg-200 ~]# netstat -anptu | grep 9092
tcp6 0 0 192.168.56.200:9092 :::* LISTEN 8211/java
[root@xg-200 ~]# netstat -anptu | grep 9092
查看下kafka日志,排查下问题。我们不以demo方式启动以此来查看日志。
kafka-server-start.sh /home/xiangguo/kafka_2.13-3.1.0/config/kraft/server.properties
果然,我们发现了关键的报错信息。意思是我们的集群id不一致。
Unexpected error INCONSISTENT_CLUSTER_ID in VOTE response: InboundResponse
我们把各个节点的datas目录下面的meta.properties文件中的cluster.id都统一设置为节点1中的cluster.id。
# 节点3的配置
#Wed May 01 14:25:08 CST 2024
cluster.id=sQNj3XoUQv6BHx_KCpnx4w
version=1
node.id=3
然后再次启动3个节点,此时我们发现3个节点启动运行正常,也没有中途挂掉的问题。
[2024-05-02 09:35:40,712] INFO [SocketServer listenerType=CONTROLLER, nodeId=1] Stopping socket server request processors (kafka.network.SocketServer)
[2024-05-02 09:35:40,728] INFO [SocketServer listenerType=CONTROLLER, nodeId=1] Stopped socket server request processors (kafka.network.SocketServer)
[2024-05-02 09:35:40,728] INFO [Controller 1] QuorumController#beginShutdown: shutting down event queue. (org.apache.kafka.queue.KafkaEventQueue)
[2024-05-02 09:35:40,729] INFO [SocketServer listenerType=CONTROLLER, nodeId=1] Shutting down socket server (kafka.network.SocketServer)
[2024-05-02 09:35:40,766] INFO [SocketServer listenerType=CONTROLLER, nodeId=1] Shutdown completed (kafka.network.SocketServer)
[2024-05-02 09:35:40,767] INFO [data-plane Kafka Request Handler on Broker 1], shutting down (kafka.server.KafkaRequestHandlerPool)
[2024-05-02 09:35:40,770] INFO [data-plane Kafka Request Handler on Broker 1], shut down completely (kafka.server.KafkaRequestHandlerPool)
[2024-05-02 09:35:40,770] INFO [ExpirationReaper-1-AlterAcls]: Shutting down (kafka.server.DelayedOperationPurgatory$ExpiredOperationReaper)
[2024-05-02 09:35:40,935] INFO [ExpirationReaper-1-AlterAcls]: Stopped (kafka.server.DelayedOperationPurgatory$ExpiredOperationReaper)
[2024-05-02 09:35:40,936] INFO [ExpirationReaper-1-AlterAcls]: Shutdown completed (kafka.server.DelayedOperationPurgatory$ExpiredOperationReaper)
[2024-05-02 09:35:40,937] INFO [ThrottledChannelReaper-Fetch]: Shutting down (kafka.server.ClientQuotaManager$ThrottledChannelReaper)
[2024-05-02 09:35:41,433] INFO [ThrottledChannelReaper-Fetch]: Stopped (kafka.server.ClientQuotaManager$ThrottledChannelReaper)
[2024-05-02 09:35:41,434] INFO [ThrottledChannelReaper-Fetch]: Shutdown completed (kafka.server.ClientQuotaManager$ThrottledChannelReaper)
[2024-05-02 09:35:41,434] INFO [ThrottledChannelReaper-Produce]: Shutting down (kafka.server.ClientQuotaManager$ThrottledChannelReaper)
[2024-05-02 09:35:42,434] INFO [ThrottledChannelReaper-Produce]: Stopped (kafka.server.ClientQuotaManager$ThrottledChannelReaper)
[2024-05-02 09:35:42,434] INFO [ThrottledChannelReaper-Produce]: Shutdown completed (kafka.server.ClientQuotaManager$ThrottledChannelReaper)
[2024-05-02 09:35:42,434] INFO [ThrottledChannelReaper-Request]: Shutting down (kafka.server.ClientQuotaManager$ThrottledChannelReaper)
[2024-05-02 09:35:43,435] INFO [ThrottledChannelReaper-Request]: Stopped (kafka.server.ClientQuotaManager$ThrottledChannelReaper)
[2024-05-02 09:35:43,436] INFO [ThrottledChannelReaper-Request]: Shutdown completed (kafka.server.ClientQuotaManager$ThrottledChannelReaper)
[2024-05-02 09:35:43,436] INFO [ThrottledChannelReaper-ControllerMutation]: Shutting down (kafka.server.ClientQuotaManager$ThrottledChannelReaper)
[2024-05-02 09:35:44,436] INFO [ThrottledChannelReaper-ControllerMutation]: Stopped (kafka.server.ClientQuotaManager$ThrottledChannelReaper)
[2024-05-02 09:35:44,437] INFO [ThrottledChannelReaper-ControllerMutation]: Shutdown completed (kafka.server.ClientQuotaManager$ThrottledChannelReaper)
[2024-05-02 09:35:44,437] INFO [Controller 1] closed event queue. (org.apache.kafka.queue.KafkaEventQueue)
[2024-05-02 09:35:44,445] INFO App info kafka.server for 1 unregistered (org.apache.kafka.common.utils.AppInfoParser)
然后,我们停止掉服务,使用上面的启动脚本一键启动/停止kafka集群服务。
kafka start/stop
启动成功,端口9092已经起来。
[root@xg-200 datas]# kafka start
--------启动 192.168.56.200 Kafka-------
root@192.168.56.200's password:
--------启动 192.168.56.202 Kafka-------
root@192.168.56.202's password:
--------启动 192.168.56.203 Kafka-------
root@192.168.56.203's password:
[root@xg-200 datas]# netstat -anptu | grep 9092
tcp6 0 0 192.168.56.200:9092 :::* LISTEN 4804/java
另外2个节点服务也已经启动成功。
[root@xg-202 datas]# netstat -anptu | grep 9092
tcp6 0 0 192.168.56.202:9092 :::* LISTEN 3421/java
3. 简单测试验证
首先,创建topic
kafka-topics.sh --create --topic test_topic --bootstrap-server 192.168.56.200:9092
查看topic
kafka-topics.sh --describe --topic test_topic --bootstrap-server 192.168.56.200:9092
查看到的信息如下
[root@xg-200 datas]# kafka-topics.sh --describe --topic test_topic --bootstrap-server 192.168.56.200:9092
Topic: test_topic TopicId: dH-NJGShTPKLmrsU7Gw4jQ PartitionCount: 1 ReplicationFactor: 1 Configs: segment.bytes=1073741824
Topic: test_topic Partition: 0 Leader: 3 Replicas: 3 Isr: 3
生产消息
kafka-console-producer.sh --topic test_topic --bootstrap-server 192.168.56.200:9092
>hello
>aaa
>bbb
>ccc
消费消息
kafka-console-consumer.sh --topic test_topic --from-beginning --bootstrap-server 192.168.56.200:9092
hello
aaa
bbb
ccc
4. 与SpringBoot整合以及发布订阅
- 引入spring-kafka依赖
<!--kafka依赖-->
<dependency>
<groupId>org.springframework.kafka</groupId>
<artifactId>spring-kafka</artifactId>
</dependency>
- 配置
spring:
kafka:
# 指定kafka server的地址,集群中间,逗号隔开
bootstrap-servers: 192.168.56.200:9092,192.168.56.202:9092,192.168.56.203:9092
producer:
# 消息重发的次数
retries: 3
# 当有多个消息需要被发送到同一个分区时,生产者会把它们放在同一个批次里。该参数指定了一个批次可以使用的内存大小,按照字节数计算。
batch-size: 16384
# 设置生产者内存缓冲区的大小。
buffer-memory: 33554432
# 键的序列化方式
key-serializer: org.apache.kafka.common.serialization.StringSerializer
# 值的序列化方式
value-serializer: org.apache.kafka.common.serialization.StringSerializer
# acks=0 : 生产者在成功写入消息之前不会等待任何来自服务器的响应。
# acks=1 : 只要集群的首领节点收到消息,生产者就会收到一个来自服务器成功响应。
# acks=all :只有当所有参与复制的节点全部收到消息时,生产者才会收到一个来自服务器的成功响应。
acks: 1
consumer:
# 自动提交的时间间隔 在spring boot 2.X 版本中这里采用的是值的类型为Duration 需要符合特定的格式,如1S,1M,2H,5D
auto-commit-interval: 1S
# 该属性指定了消费者在读取一个没有偏移量的分区或者偏移量无效的情况下该作何处理:
# latest(默认值)在偏移量无效的情况下,消费者将从最新的记录开始读取数据(在消费者启动之后生成的记录)
# earliest :在偏移量无效的情况下,消费者将从起始位置读取分区的记录
auto-offset-reset: earliest
# 是否自动提交偏移量,默认值是true,为了避免出现重复数据和数据丢失,可以把它设置为false,然后手动提交偏移量
enable-auto-commit: false
# 键的反序列化方式
key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
# 值的反序列化方式
value-deserializer: org.apache.kafka.common.serialization.StringDeserializer
listener:
# 在侦听器容器中运行的线程数。
concurrency: 5
#listner负责ack,每调用一次,就立即commit
ack-mode: manual_immediate
missing-topics-fatal: false
- 消息生产
使用异步回调,避免消息发送阻塞等待。
@Component
@Slf4j
public class KafkaProducer {
@Autowired
private KafkaTemplate<String, Object> kafkaTemplate;
/**
* 发送消息到topic
*/
public static final String TOPIC_TEST = "topic.test";
/**
* 消费者组
*/
public static final String CONSUMER_GROUP = "consumer.group";
/**
* 发送消息
* @param msg
*/
public void send(Object msg) {
String msgString = JSONUtil.toJsonStr(msg);
log.info("发送消息:{}", msgString);
ListenableFuture<SendResult<String, Object>> future = kafkaTemplate.send(TOPIC_TEST, msg);
future.addCallback(new ListenableFutureCallback<SendResult<String, Object>>() {
@Override
public void onFailure(Throwable throwable) {
// 发送失败的处理
log.error("生产者发送消息失败,topic:{}, throwable: {}", TOPIC_TEST, throwable.getMessage());
}
@Override
public void onSuccess(SendResult<String, Object> stringObjectSendResult) {
// 成功的处理
log.info("生产者发送消息成功, topic:{}, SendResult:{}",TOPIC_TEST, stringObjectSendResult.toString());
}
});
}
}
- 消息消费
从topic订阅接收、处理消息。
@Component
@Slf4j
public class KafkaConsumer {
/**
* 从topic订阅接收、处理数据
* @param record
* @param ack
* @param topic
*/
@KafkaListener(topics = TOPIC_TEST, groupId = CONSUMER_GROUP)
public void handle(ConsumerRecord<?, ?> record, Acknowledgment ack, @Header(KafkaHeaders.RECEIVED_TOPIC) String topic) {
Optional message = Optional.ofNullable(record.value());
if (message.isPresent()) {
Object msg = message.get();
log.info("消费者组:{} 从topic:{} 订阅接收、处理消息:{}", CONSUMER_GROUP, topic, msg);
ack.acknowledge();
}
}
}
- 发布订阅验证
生产者发送测试数据
@SpringBootTest
@RunWith(SpringRunner.class)
@Slf4j
public class AppTest {
@Resource
private KafkaProducer kafkaProducer;
/**
* kafka生产者发送消息
*/
@Test
public void sendMsg() throws InterruptedException {
for(int i = 0; i<10; i++) {
kafkaProducer.send("测试数据: " + i);
Thread.sleep(1000);
}
}
}
消费到数据。
2024-05-03 10:34:03.034 INFO 35416 --- [ntainer#0-0-C-1] o.e.r.d.c.service.KafkaConsumer : 消费者组:consumer.group 从topic:topic.test 订阅接收、处理消息:测试数据: 0
2024-05-03 10:34:04.024 INFO 35416 --- [ntainer#0-0-C-1] o.e.r.d.c.service.KafkaConsumer : 消费者组:consumer.group 从topic:topic.test 订阅接收、处理消息:测试数据: 1
2024-05-03 10:34:05.030 INFO 35416 --- [ntainer#0-0-C-1] o.e.r.d.c.service.KafkaConsumer : 消费者组:consumer.group 从topic:topic.test 订阅接收、处理消息:测试数据: 2
2024-05-03 10:34:06.024 INFO 35416 --- [ntainer#0-0-C-1] o.e.r.d.c.service.KafkaConsumer : 消费者组:consumer.group 从topic:topic.test 订阅接收、处理消息:测试数据: 3
2024-05-03 10:34:07.029 INFO 35416 --- [ntainer#0-0-C-1] o.e.r.d.c.service.KafkaConsumer : 消费者组:consumer.group 从topic:topic.test 订阅接收、处理消息:测试数据: 4
2024-05-03 10:34:08.025 INFO 35416 --- [ntainer#0-0-C-1] o.e.r.d.c.service.KafkaConsumer : 消费者组:consumer.group 从topic:topic.test 订阅接收、处理消息:测试数据: 5
2024-05-03 10:34:09.029 INFO 35416 --- [ntainer#0-0-C-1] o.e.r.d.c.service.KafkaConsumer : 消费者组:consumer.group 从topic:topic.test 订阅接收、处理消息:测试数据: 6
2024-05-03 10:34:10.033 INFO 35416 --- [ntainer#0-0-C-1] o.e.r.d.c.service.KafkaConsumer : 消费者组:consumer.group 从topic:topic.test 订阅接收、处理消息:测试数据: 7
2024-05-03 10:34:11.030 INFO 35416 --- [ntainer#0-0-C-1] o.e.r.d.c.service.KafkaConsumer : 消费者组:consumer.group 从topic:topic.test 订阅接收、处理消息:测试数据: 8
2024-05-03 10:34:12.033 INFO 35416 --- [ntainer#0-0-C-1] o.e.r.d.c.service.KafkaConsumer : 消费者组:consumer.group 从topic:topic.test 订阅接收、处理消息:测试数据: 9
原文链接: https://juejin.cn/post/7363828811363221555
文章收集整理于网络,请勿商用,仅供个人学习使用,如有侵权,请联系作者删除,如若转载,请注明出处:http://www.cxyroad.com/17976.html