1. 首页
  2. 后端

CQRS实践:使用SpringBoot构建强大的微服务

  CQRS实践:使用SpringBoot构建强大的微服务

===========================

简介:

CQRS代表Command Query Responsibility Segregation,是一种设计模式,它从根本上改变了我们对构建软件应用程序中的数据模型和数据库交互的架构的看法。传统上,系统使用相同的数据模型来读取和写入数据。然而,CQRS通过分离阅读和写的数据模型打破了这一规范。这种分离提供了许多优势,特别是在微服务等复杂系统的背景下。

在微服务架构中,不同的服务被设计为执行特定的功能并独立运行,CQRS可以发挥关键作用。通过划分读写操作,CQRS允许每个微服务以最有效的方式与数据库交互。例如,读取模型可以针对查询性能进行优化,利用反规范化和其他技术快速有效地获取数据。另一方面,写模型可以专注于事务完整性和业务规则实施,这对于维护数据一致性和可靠性至关重要。

典型的微服务架构:

CQRS实践:使用SpringBoot构建强大的微服务

图中描述的上述架构代表了微服务应用程序的典型分层结构,专门设计用于处理HTTP请求并与数据库交互。这种模型在许多Web框架中很常见,比如Spring靴子。以下是每一层的细分:

  1. Web客户端:这是应用程序面向用户的部分。它表示可以向服务器发送HTTP请求的任何客户端。这可以是Web浏览器、移动的应用程序或其他服务。客户端可以从服务器请求数据或发送数据进行处理。
  2. 控制器:控制器充当Web客户端和服务层之间的中间人。在Spring靴子应用程序中,这通常是一个REST控制器,它处理各种HTTP请求,如GET、POST、PUT和POST。控制器解释客户端的请求,调用必要的服务层操作,并返回适当的响应。
  3. 服务:这一层包含应用程序的业务逻辑。它负责处理数据、执行业务规则和执行计算。服务层在控制器和存储库层之间运行,确保业务逻辑与Web层和数据访问层保持分离。
  4. Repository:在Spring靴子中,repository层通常是一个抽象数据层的接口。它提供了一个类似集合的接口来访问域对象(例如,数据库中的实体)通常使用Spring Data。存储库负责从数据库中持久化和检索数据,对其他层隐藏数据访问代码的细节。
  5. 数据库:这是保存数据的存储系统。它可以是任何类型的数据库,例如关系数据库(例如,MySQL、PostgreSQL)或NoSQL数据库(例如,MongoDB,Cassandra).存储库层与数据库通信以执行CRUD(创建、读取、更新、删除)操作。

现在很清楚写和读在同一个微服务中的位置,尽管操作的负载很大,但是如果它们很难在单个微服务中编码,那么很坚韧单独处理操作强度(读与写)。这是将CQRS变为现实的实际触发器。

让我们开始使用Spring靴子微服务实现CQRS。在本文中,我们将假设银行场景,其中我们有一个存款(写服务/命令服务)和视图帐户(读服务/查询服务),我们还将考虑mongodb作为持久存储,redis pub/sub作为事件处理程序。

实施:

让我们看看命令和查询API的项目脚手架,它们看起来像下面这样。

CQRS实践:使用SpringBoot构建强大的微服务

现在让我们深入研究我们的第一个API,即命令API。让我们看看我们的pom.xml依赖管理。

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd">
    <modelVersion>4.0.0</modelVersion>
    <parent>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-parent</artifactId>
        <version>3.2.2</version>
        <relativePath/> <!-- lookup parent from repository -->
    </parent>
    <groupId>org.vaslabs</groupId>
    <artifactId>deposit</artifactId>
    <version>0.0.1-SNAPSHOT</version>
    <name>deposit</name>
    <description>deposit</description>
    <properties>
        <java.version>21</java.version>
    </properties>
    <dependencies>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-data-mongodb</artifactId>
        </dependency>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-web</artifactId>
        </dependency>

        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-devtools</artifactId>
            <scope>runtime</scope>
            <optional>true</optional>
        </dependency>
        <dependency>
            <groupId>org.projectlombok</groupId>
            <artifactId>lombok</artifactId>
            <optional>true</optional>
        </dependency>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-data-redis</artifactId>
        </dependency>
    </dependencies>

    <build>
        <plugins>
            <plugin>
                <groupId>org.springframework.boot</groupId>
                <artifactId>spring-boot-maven-plugin</artifactId>
                <configuration>
                    <excludes>
                        <exclude>
                            <groupId>org.projectlombok</groupId>
                            <artifactId>lombok</artifactId>
                        </exclude>
                    </excludes>
                </configuration>
            </plugin>
        </plugins>
    </build>

</project>

现在让我们看看我们的application.yaml管理我们的财产。

server:
  port: 9090
spring:
  data:
    mongodb:
      database: account_deposits
      host: localhost
      port: 27017
logging:
  level:
    org:
      springframework:
        data:
          mongodb:
            core:
              MongoTemplate: DEBUG
redis:
  pubsub:
    topic: deposit_event

现在是我们创建实体来将数据持久化到数据库的时候了。

package org.vaslabs.deposit.entity;

import lombok.Data;
import lombok.Getter;
import lombok.Setter;
import org.springframework.data.annotation.Id;
import org.springframework.data.mongodb.core.mapping.Document;

@Data
@Document(collection = "accounts")
@Getter
@Setter
public class Deposit {
    @Id
    private String id;
    private String accountNumber;
    private String firstName;
    private String lastName;
    private double amount;
}

现在是我们创建writemodel的时候了,它将作为我们的命令对象。

package org.vaslabs.deposit.writemodel;

import lombok.Data;
import lombok.Getter;
import lombok.Setter;

@Data
@Getter
@Setter
public class DepositCommand {
    private String accountNumber;
    private String firstName;
    private String lastName;
    private double amount;
}

让我们从事件处理的角度来看配置,其中服务将能够将事件传播到redis pub/sub。

package org.vaslabs.deposit.config;

import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.data.redis.connection.RedisConnectionFactory;
import org.springframework.data.redis.core.RedisTemplate;
import org.springframework.data.redis.listener.ChannelTopic;
import org.springframework.data.redis.serializer.GenericJackson2JsonRedisSerializer;

@Configuration
public class EventHandlerConfig {

    @Value("${redis.pubsub.topic}")
    private String topic;

    @Bean
    public RedisTemplate<String, Object> redisTemplate(RedisConnectionFactory redisConnectionFactory){
        RedisTemplate<String, Object> redisTemplate = new RedisTemplate<>();
        redisTemplate.setConnectionFactory(redisConnectionFactory);
        redisTemplate.setValueSerializer(new GenericJackson2JsonRedisSerializer());
        return redisTemplate;
    }

    @Bean
    public ChannelTopic channelTopic(){
        return new ChannelTopic(topic);
    }
}

让我们创建一个存储库来处理包装为存储库的数据库逻辑。

package org.vaslabs.deposit.respositories;

import org.springframework.data.mongodb.repository.MongoRepository;
import org.vaslabs.deposit.entity.Deposit;

public interface DepositRepository extends MongoRepository<Deposit, String> {
}

让我们创建一个控制器来处理来自客户端的请求。

package org.vaslabs.deposit.controller;

import org.springframework.http.MediaType;
import org.springframework.http.ResponseEntity;
import org.springframework.web.bind.annotation.PostMapping;
import org.springframework.web.bind.annotation.RequestBody;
import org.springframework.web.bind.annotation.RestController;
import org.vaslabs.deposit.command.handler.DepositCommandHandler;
import org.vaslabs.deposit.entity.Deposit;
import org.vaslabs.deposit.writemodel.DepositCommand;

@RestController
public class DepositController {

    private final DepositCommandHandler depositCommandHandler;

    public DepositController(DepositCommandHandler depositCommandHandler) {
        this.depositCommandHandler = depositCommandHandler;
    }

    @PostMapping(value = "/deposit", consumes = MediaType.APPLICATION_JSON_VALUE)
    public ResponseEntity<Deposit> saveDeposit(@RequestBody DepositCommand depositCommand){
        return ResponseEntity.ok().body(this.depositCommandHandler.handle(depositCommand));
    }

}

现在是我们编写处理程序的时候了,它将处理传入的命令。

package org.vaslabs.deposit.command.handler;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.data.redis.core.RedisTemplate;
import org.springframework.data.redis.listener.ChannelTopic;
import org.springframework.stereotype.Service;
import org.vaslabs.deposit.entity.Deposit;
import org.vaslabs.deposit.respositories.DepositRepository;
import org.vaslabs.deposit.writemodel.DepositCommand;

@Service
public class DepositCommandHandler {

    private static final Logger LOGGER = LoggerFactory.getLogger(DepositCommandHandler.class);
    private final DepositRepository depositRepository;
    private final RedisTemplate<String, Object> redisTemplate;
    private final ChannelTopic channelTopic;

    public DepositCommandHandler(DepositRepository depositRepository,
                                 RedisTemplate<String, Object> redisTemplate,
                                 ChannelTopic channelTopic) {
        this.depositRepository = depositRepository;
        this.redisTemplate = redisTemplate;
        this.channelTopic = channelTopic;
    }

    public Deposit handle(DepositCommand depositCommand) {
        Deposit deposit = mapCommandToEntity(depositCommand);
        deposit = this.depositRepository.save(deposit);
        publishEvent(deposit);
        return deposit;
    }

    private void publishEvent(Deposit deposit) {
        LOGGER.info("Publishing event: {} to channel: {}", deposit, channelTopic.getTopic());
        Long id = redisTemplate.convertAndSend(channelTopic.getTopic(), deposit);
        if (id != null) {
            LOGGER.info("event published to channel: {}", channelTopic.getTopic());
        }
    }

    private Deposit mapCommandToEntity(DepositCommand depositCommand) {
        Deposit deposit = new Deposit();
        deposit.setAccountNumber(depositCommand.getAccountNumber());
        deposit.setFirstName(depositCommand.getFirstName());
        deposit.setLastName(depositCommand.getLastName());
        deposit.setAmount(depositCommand.getAmount());
        return deposit;
    }
}

现在,一旦你运行代码,如果一切顺利,引导没有任何错误,下面应该是输出。

CQRS实践:使用SpringBoot构建强大的微服务

命令处理程序API的输出和事件传播到事件处理程序。

现在让我们深入研究第二个API,即查询API。让我们看看我们的pom.xml依赖管理。

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd">
    <modelVersion>4.0.0</modelVersion>
    <parent>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-parent</artifactId>
        <version>3.2.2</version>
        <relativePath/> <!-- lookup parent from repository -->
    </parent>
    <groupId>org.vaslabs</groupId>
    <artifactId>view_account</artifactId>
    <version>0.0.1-SNAPSHOT</version>
    <name>view_account</name>
    <description>view_account</description>
    <properties>
        <java.version>21</java.version>
    </properties>
    <dependencies>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-data-mongodb</artifactId>
        </dependency>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-web</artifactId>
        </dependency>

        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-devtools</artifactId>
            <scope>runtime</scope>
            <optional>true</optional>
        </dependency>
        <dependency>
            <groupId>org.projectlombok</groupId>
            <artifactId>lombok</artifactId>
            <optional>true</optional>
        </dependency>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-data-redis</artifactId>
        </dependency>
    </dependencies>

    <build>
        <plugins>
            <plugin>
                <groupId>org.springframework.boot</groupId>
                <artifactId>spring-boot-maven-plugin</artifactId>
                <configuration>
                    <excludes>
                        <exclude>
                            <groupId>org.projectlombok</groupId>
                            <artifactId>lombok</artifactId>
                        </exclude>
                    </excludes>
                </configuration>
            </plugin>
        </plugins>
    </build>

</project>

现在我们来看看application.yaml不同的物业管理。

server:
  port: 9091
spring:
  data:
    mongodb:
      database: account_views
      host: localhost
      port: 27017
logging:
  level:
    org:
      springframework:
        data:
          mongodb:
            core:
              MongoTemplate: DEBUG
redis:
  pubsub:
    topic: deposit_event

现在读取模型和实体与writemodel和实体保持相同,但没有规则要求它们必须相同,事实上这是CQRS模式的优点,它们可以不同。让我们看看我们的Event configuration

package org.vaslabs.view_account.config;

import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.data.redis.connection.RedisConnectionFactory;
import org.springframework.data.redis.listener.ChannelTopic;
import org.springframework.data.redis.listener.RedisMessageListenerContainer;
import org.springframework.data.redis.listener.adapter.MessageListenerAdapter;
import org.vaslabs.view_account.event.handler.RedisEventListener;

@Configuration
public class EventHandlerConfig {


    @Value("${redis.pubsub.topic}")
    private String topic;

    private final RedisEventListener redisEventListener;

    public EventHandlerConfig(RedisEventListener redisEventListener) {
        this.redisEventListener = redisEventListener;
    }

    @Bean
    public ChannelTopic channelTopic() {
        return new ChannelTopic(topic);
    }

    @Bean
    public MessageListenerAdapter messageListenerAdapter() {
        return new MessageListenerAdapter(redisEventListener, "listen");
    }

    @Bean
    public RedisMessageListenerContainer redisMessageListenerContainer(ChannelTopic channelTopic,
                                                                       RedisConnectionFactory redisConnectionFactory) {
        RedisMessageListenerContainer redisMessageListenerContainer = new RedisMessageListenerContainer();
        redisMessageListenerContainer.setConnectionFactory(redisConnectionFactory);
        redisMessageListenerContainer.addMessageListener(messageListenerAdapter(), channelTopic);
        return redisMessageListenerContainer;
    }
}

让我们看看上面的配置的EventListener。这个事件监听器的功能是在接收到来自redis pub/sub的消息后,将事件持久化到read数据库中。

package org.vaslabs.view_account.event.handler;

import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.ObjectMapper;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
import org.vaslabs.view_account.entity.Deposit;
import org.vaslabs.view_account.respositories.DepositRepository;

import java.io.IOException;
@Service
public class RedisEventListener {
    private static final Logger LOGGER = LoggerFactory.getLogger(RedisEventListener.class);
    private ObjectMapper objectMapper = new ObjectMapper();

    private final DepositRepository depositRepository;

    public RedisEventListener(DepositRepository depositRepository) {
        this.depositRepository = depositRepository;
    }

    public void listen(String message) throws JsonProcessingException {
        try {
            LOGGER.info("New event received: {}", message);
            Deposit depositEvent = objectMapper.readValue(message, Deposit.class);
            LOGGER.info("parsed event : {}", depositEvent);
            this.depositRepository.save(depositEvent);
        } catch (IOException e) {
            LOGGER.error("error while parsing message");
        }
    }
}

让我们看看ViewAccountRepository,它将与实际的数据库进行交互以进行不同的查询。

package org.vaslabs.view_account.respositories;

import org.springframework.data.mongodb.repository.MongoRepository;
import org.springframework.data.mongodb.repository.Query;
import org.vaslabs.view_account.entity.Deposit;

public interface ViewAccountRepository extends MongoRepository<Deposit, String> {
    @Query("{'accountNumber': ?0}")
    Deposit findByAccountNumber(String accountNumber);
}

让我们看看我们的ViewAccountController,它将处理传入的请求。

package org.vaslabs.view_account.controller;

import org.springframework.http.ResponseEntity;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.PathVariable;
import org.springframework.web.bind.annotation.RestController;
import org.vaslabs.view_account.entity.Deposit;
import org.vaslabs.view_account.query.handler.DepositQueryHandler;

import java.util.List;

@RestController
public class ViewAccountController {

    private final DepositQueryHandler depositQueryHandler;

    public ViewAccountController(DepositQueryHandler depositQueryHandler) {
        this.depositQueryHandler = depositQueryHandler;
    }

    @GetMapping(value = "/deposit/{accountNumber}")
    public ResponseEntity<Deposit> findbyAccountNumber(@PathVariable("accountNumber") String accountNumber){
        return ResponseEntity.ok().body(this.depositQueryHandler.handleByAccountNumber(accountNumber));
    }

    @GetMapping(value = "/deposit")
    public ResponseEntity<List<Deposit>> findAll(){
        return ResponseEntity.ok().body(this.depositQueryHandler.handleAll());
    }

}

现在我们来看看ViewAccountQueryHandler,它将处理来自用户的查询/项目。

package org.vaslabs.view_account.query.handler;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.stereotype.Service;
import org.vaslabs.view_account.entity.Deposit;
import org.vaslabs.view_account.respositories.ViewAccountRepository;

import java.util.List;


@Service
public class ViewAccountQueryHandler {

    private static final Logger LOGGER = LoggerFactory.getLogger(ViewAccountQueryHandler.class);

    private final ViewAccountRepository viewAccountRepository;

    public ViewAccountQueryHandler(ViewAccountRepository viewAccountRepository) {
        this.viewAccountRepository = viewAccountRepository;
    }

    public Deposit handleByAccountNumber(String accountNumber) {
        LOGGER.info("before querying the database, accountNumber: {}", accountNumber);
        return this.viewAccountRepository.findByAccountNumber(accountNumber);
    }

    public List<Deposit> handleAll(){
        return this.viewAccountRepository.findAll();
    }
}

如果一切顺利,您应该可以看到读取数据库中的数据,因为它是由事件异步填充的。观察下面的输出,它清楚地表明,我们已经取得了API只是通过提供一些readall查询,它也收到了事件,并成功地解析和持久化,在读存储。

CQRS实践:使用SpringBoot构建强大的微服务

qery API的输出,显示事件接收和qery处理。

使用CQRS的优点和缺点:

命令查询责任分离(CQRS)模式也不例外,它有自己的优点和缺点。它是在软件设计中处理数据操作的独特方法,提供了几个优点。首先,它通过分离读取和写入操作来增强性能,允许每个操作针对其特定用例进行优化。在具有各种读写模式的复杂系统中,这可以显著提高性能。对于读取,数据可以以视图模型的形式进行非规范化,这些视图模型针对用户界面进行了优化,从而减少了昂贵的连接数量,否则如果将单个数据模型用于读取和写入,则可能需要昂贵的连接。对于写入,系统可以关注数据的事务完整性和一致性。

尽管CQRS有其优点,但也并非没有缺点。随着CQRS的引入,系统的复杂性会显著增加。开发人员必须管理和同步两个独立的模型,这可能会使代码库和架构复杂化。它需要对领域有透彻的理解,并且对于简单的CRUD应用程序来说通常是多余的,因为在这些应用程序中,开销并不能证明其好处是合理的。

CQRS的另一个挑战是数据一致性。使用单独的阅读和写数据模型,存在陈旧读取的风险-其中读取模型尚未更新以反映最新的写入。在需要高一致性的系统中尤其如此。开发人员需要仔细设计系统,以处理最终的一致性,并确保应用程序可以容忍一定程度的数据陈旧。

结论:

命令查询责任隔离(Command Query Responsibility Segregation,CQRS)模式在优化和扩展复杂的软件系统方面具有明显的优势,特别是在微服务架构中。它允许通过分离读和写操作来有针对性地提高性能。然而,它的采用带来了数据一致性方面的复杂性和挑战。当业务逻辑和数据模型的复杂性证明开销合理时,应该考虑CQRS,而不是将其作为简单应用程序的默认方法。明智地实施CQRS可以产生强大的和可维护的系统,这些系统能够很好地适应不断变化的需求。

原文链接: https://juejin.cn/post/7360961068166856745

文章收集整理于网络,请勿商用,仅供个人学习使用,如有侵权,请联系作者删除,如若转载,请注明出处:http://www.cxyroad.com/17207.html

QR code