Section 10: 实现 Kafka 和异步通信 (04:04:11 - 04:32:06)
将 Kafka 和 Zookeeper 添加到 Docker Compose (04:04:11 - 04:05:25)¶
首先,我们需要将 Kafka 和 Zookeeper 添加到我们的 Docker Compose 文件中。 ⏎
打开 docker-compose.yml 文件,并添加以下服务定义:
services:
zookeeper:
image: confluentinc/cp-zookeeper:latest
container_name: zookeeper
environment:
ZOOKEEPER_SERVER_ID: 1
ZOOKEEPER_CLIENT_PORT: 2181
ZOOKEEPER_TICK_TIME: 2000
ports:
- "22181:2181"
networks:
- microservices-net
kafka:
image: confluentinc/cp-kafka:latest
container_name: ms_kafka
depends_on:
- zookeeper
ports:
- "9092:9092"
environment:
KAFKA_BROKER_ID: 1
KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
KAFKA_TRANSACTION_STATE_LOG_REPLICATION_FACTOR: 1
KAFKA_TRANSACTION_STATE_LOG_MIN_ISR: 1
KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: PLAINTEXT:PLAINTEXT,PLAINTEXT_HOST:PLAINTEXT
KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://localhost:9092,PLAINTEXT_HOST://localhost:29092
networks:
- microservices-net
docker-compose.yml 文件中已经定义了 microservices-net 网络。如果没有,请在 docker-compose.yml 文件中添加以下内容:
完成以上配置后,运行 docker-compose up -d 命令来启动 Zookeeper 和 Kafka 容器。 ⏎
Kafka 配置 (04:05:25 - 04:12:05)¶
接下来,我们需要配置 Kafka。首先,在 order 服务和 payment 服务的 pom.xml 文件中添加 Kafka 依赖:
<dependency>
<groupId>org.springframework.kafka</groupId>
<artifactId>spring-kafka</artifactId>
</dependency>
然后,在 config 包下创建一个名为 KafkaOrderConfig 的配置类,用于配置 Kafka Topic:
package com.alibou.ecommerce.config;
import org.apache.kafka.clients.admin.NewTopic;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.kafka.config.TopicBuilder;
@Configuration
public class KafkaOrderConfig {
@Bean
public NewTopic orderTopic() {
return TopicBuilder.name("order-topic")
.build();
}
}
在订单服务中实现 Kafka 生产者 (04:12:05 - 04:32:06)¶
现在,我们需要在订单服务中实现 Kafka 生产者。 ⏎
创建一个名为 OrderProducer 的类,并添加以下代码:
package com.alibou.ecommerce.kafka;
import com.alibou.ecommerce.dto.OrderConfirmation;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.messaging.Message;
import org.springframework.messaging.support.MessageBuilder;
import org.springframework.kafka.support.KafkaHeaders;
import org.springframework.stereotype.Service;
@Service
@RequiredArgsConstructor
@Slf4j
public class OrderProducer {
private final KafkaTemplate<String, OrderConfirmation> kafkaTemplate;
public void sendOrderConfirmation(OrderConfirmation orderConfirmation) {
log.info("Sending order confirmation with body {}", orderConfirmation);
Message<OrderConfirmation> message = MessageBuilder
.withPayload(orderConfirmation)
.setHeader(KafkaHeaders.TOPIC, "order-topic")
.build();
kafkaTemplate.send(message);
}
}
在这个类中,我们注入了 KafkaTemplate,并使用它来发送消息到 Kafka 的 order-topic 主题。 ⏎