Section 10: 实现 Kafka 和异步通信 (04:04:11 - 04:32:06)

将 Kafka 和 Zookeeper 添加到 Docker Compose (04:04:11 - 04:05:25)

首先,我们需要将 Kafka 和 Zookeeper 添加到我们的 Docker Compose 文件中。

打开 docker-compose.yml 文件,并添加以下服务定义:

services:
  zookeeper:
    image: confluentinc/cp-zookeeper:latest
    container_name: zookeeper
    environment:
      ZOOKEEPER_SERVER_ID: 1
      ZOOKEEPER_CLIENT_PORT: 2181
      ZOOKEEPER_TICK_TIME: 2000
    ports:
      - "22181:2181"
    networks:
      - microservices-net

  kafka:
    image: confluentinc/cp-kafka:latest
    container_name: ms_kafka
    depends_on:
      - zookeeper
    ports:
      - "9092:9092"
    environment:
      KAFKA_BROKER_ID: 1
      KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
      KAFKA_TRANSACTION_STATE_LOG_REPLICATION_FACTOR: 1
      KAFKA_TRANSACTION_STATE_LOG_MIN_ISR: 1
      KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
      KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: PLAINTEXT:PLAINTEXT,PLAINTEXT_HOST:PLAINTEXT
      KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://localhost:9092,PLAINTEXT_HOST://localhost:29092
    networks:
      - microservices-net
确保你的 docker-compose.yml 文件中已经定义了 microservices-net 网络。如果没有,请在 docker-compose.yml 文件中添加以下内容:

networks:
  microservices-net:
    driver: bridge
完成以上配置后,运行 docker-compose up -d 命令来启动 Zookeeper 和 Kafka 容器。

Kafka 配置 (04:05:25 - 04:12:05)

接下来,我们需要配置 Kafka。首先,在 order 服务和 payment 服务的 pom.xml 文件中添加 Kafka 依赖:

<dependency>
    <groupId>org.springframework.kafka</groupId>
    <artifactId>spring-kafka</artifactId>
</dependency>

然后,在 config 包下创建一个名为 KafkaOrderConfig 的配置类,用于配置 Kafka Topic:

package com.alibou.ecommerce.config;

import org.apache.kafka.clients.admin.NewTopic;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.kafka.config.TopicBuilder;

@Configuration
public class KafkaOrderConfig {

    @Bean
    public NewTopic orderTopic() {
        return TopicBuilder.name("order-topic")
                .build();
    }
}

在订单服务中实现 Kafka 生产者 (04:12:05 - 04:32:06)

现在,我们需要在订单服务中实现 Kafka 生产者。 创建一个名为 OrderProducer 的类,并添加以下代码:

package com.alibou.ecommerce.kafka;

import com.alibou.ecommerce.dto.OrderConfirmation;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.messaging.Message;
import org.springframework.messaging.support.MessageBuilder;
import org.springframework.kafka.support.KafkaHeaders;
import org.springframework.stereotype.Service;

@Service
@RequiredArgsConstructor
@Slf4j
public class OrderProducer {

    private final KafkaTemplate<String, OrderConfirmation> kafkaTemplate;

    public void sendOrderConfirmation(OrderConfirmation orderConfirmation) {
        log.info("Sending order confirmation with body {}", orderConfirmation);

        Message<OrderConfirmation> message = MessageBuilder
                .withPayload(orderConfirmation)
                .setHeader(KafkaHeaders.TOPIC, "order-topic")
                .build();

        kafkaTemplate.send(message);
    }
}

在这个类中,我们注入了 KafkaTemplate,并使用它来发送消息到 Kafka 的 order-topic 主题。