1. 首页
  2. 后端

2024 Q2 OKR 完结 RocketMQ优化Webhook处理等业务场景 (已上线)

  2024 Q2 OKR 完结 RocketMQ优化Webhook处理等业务场景 (已上线)

=============================================

1. 背景/历程/收获/展望

今天2024/05/23

背景: 最近组内okr 需要使用MQ优化多数据中心用户数据同步 / webhook处理等业务场景 / App推送提醒,我这边接手了这个优化任务,去年已经优化了一个场景 第三方adjust调用使用异步线程池接收回调,导致系统某个时刻大量的线程thread堆积,出现告警,影响服务的可用性,后面就使用MQ 异步发送 慢慢消费处理 达到削峰填谷的效果,后续问题就没有出现过了,但是监控方面没有做。

Q2季度主要对核心业务进行优化,同步多数据中心现在是使用python脚本(千万级别表 慢SQL),Webhook/推送/站内信 都是使用异步线程池 (危险就是 消息数据不是持久化,服务重启就没/线程池任务堆积 拒绝策略主线程执行 主线程出现阻塞现象/没有调用失败重试机制/没有消息数量,执行状态的监控……..)

  • 时间线: 2024/04-01- 2024/05/23.
  • 历程:

    1. 最近主要觉得写业务成长已经到瓶颈了,一个db+redis就够完成版本迭代的业务了,这段时间也想了很多,怎么提升自己的能力,并且能够作为okr去推动,自己就去当前业务里面发掘可以进行优化的点,找了一点并发量大一点的业务进行优化,当然我们还有短信业务(用户使用量较小 优化带来的收益不大),我们这边商城订单/购物车/发货都是db梭哈的,优化的空间非常大,主要业务不是我写的,现在能跑就行 如果你强行去优化,可能对自我的提升非常大,如果过程中出现bug,带来的损失可不是我能够承担的,风险太大不考虑商城这块了,这个季度主要对三个点Webhook处理等业务场景 / App推送提醒/多数据中心用户数据同步 进行代码重构优化。
    2. 工作两年来,虽然头脑中存储了很多MQ相关的知识点,却没有机会落地,这不机会来了 自己创造。整个的过程中,都是自测,预发自测/修复,再自测上线 观察线上功能是否正常,MQ数据的状态,各项指标是否正常。还有一个点我们这边RocketMQ搭建的单机版本的,可用性得不到保证,并且机器的配置很低,现在暂时能扛住当前用户流量并发操作。
    3. 其实我们有接入Amazon的SQS进行转发消息到多数据中国中心,leader那边想让我使用这个对部分业务进行改造,部分业务使用RocketMQ, SQS看了一眼设计和代码,就是一坨屎💩,那个sqs 感觉没啥用 首先restApi请求 又转发到其他数据中心的restApi请求,并且学习成本太大了,没必要用这个,两层api接口请求 完全保证不了消息发送成功/失败,leader说SQS有死信队列,在面板上可以直接一键重新消费,当时脑子比较懵的,当时对RocketMQ的死信队列没有什么概念,后面我去看了一眼RocketMQ的死信队列,没毛病啊,都能实现,刚才那个点不是能说服我的理由,后面说服leader,全部使用RocketMQ进行优化改造。
    4. 收获: 做完了之后 感觉没什么挑战性,主要个人技能提升,深入理解MQ及源码, 各种使用场景。
    5. 未来展望: RocketMQ 每个服务需要使用都要集成相关配置/日志监控困难等等,后续进行封装rocketmq-starter,简化代码/流程/简单使用,参与RocketMQ开源社区项目建设。

从这个季度上线到今天05/23 已经接收了160ws数据了

image.png

观察一下 每天MQ消息的数量

image.png

Gitlab 分支记录

image.png

2. MQ业务场景优化

2.1 MQ版本

Springboot-rocketmq 2.2.2 版本 内置rocketmq 版本是4.9.3

  • 看rocketmq 5.0.0+ 版本有很多新特性 比如自定义消息延迟时间(原来最大是2个小时)
<!-- rocketmq -->
<dependency>
    <groupId>org.apache.rocketmq</groupId>
    <artifactId>rocketmq-spring-boot-starter</artifactId>
    <version>2.2.2</version>
</dependency>

024

2.2 生产者

  • 生产者这边采用 asyncSend 异步发送,接收回调消息是否成功,依赖参数配置 retry-times-when-send-async-failed: 3# 异步消息发送失败重试次数 (其实这个步骤 发送失败重试的流程我不确定他是否会生效,消息发送到broker,我没办法模拟这个环境进行重试,可能这个参数没有起作用,后续再去看看网上博客/源码研究一下)

2.3 消费者

  • 消费者这边采用集群模式 messageModel = MessageModel.CLUSTERING,设置消费者重试5次 进入死信队列 consumer.setMaxReconsumeTimes(5); 每次提取的最大消息数 consumer.setPullBatchSize(32); 使用并发模式pull消息 consumeMode = ConsumeMode.CONCURRENTLY, 这个重试消费失败5次进入死信队列我测试过了 没问题,我这消费者业务逻辑都是try catch 然后打印错误,抛出throw new RuntimeException(e); 进行重试,但是这个流程其实有很大问题,你消费的业务需要保证事物 要么全部成功/要么全部失败,假如你的消费者有涉及到分布式接口调用场景,可能前面的请求都成功了,到最后一个请求失败了,那算消费失败需要重试呢 还是消费成功,是不是需要上本地消息表或者别的分布式事务来解决相关问题,问题的复杂性又提升了一个档次,我们这边的业务并没有涉及关心事物操作,失败就失败吧,重试就完事了,保证消费者幂等性操作就行。

消费失败加上重试操作 测试

image.png

  • 重试消息
    024

2.4. App推送提醒

  • app端顶端消息提醒大家应该都见过吧,我们推送接入的是第三方极光 www.jiguang.cn/accounts/lo… 的推送,并不是自研的推送,成本太大/维护成本太高,直接买就完事了,当大量推送消息到达时,线程池也有顶不住的时候,比如后台对50w用户进行推送活动相关的消息,线程池肯定扛不住啊,到时候不仅推送业务顶不住,还拖垮了这个服务中其他的业务,业务出现瓶颈,优化机会来了,下面都是改造过程中的一些配置(已脱敏)。

RocketMQConfig 配置

@Configuration
@Import({RocketMQAutoConfiguration.class})
public class RocketMQConfig {
}
# rocketmq配置
rocketmq:
  name-server: ${rocketmq.server.host:127.0.0.1}:${rocketmq.server.port:9876}
  # 生产者
  producer:
    group: producer_xxxxx
    # 消息发送失败重试次数
    retry-times-when-send-failed: 3
    # 异步消息发送失败重试次数
    retry-times-when-send-async-failed: 3

生产者

@Slf4j
@Component
public class PushMessageProducerService {

  private static final Gson GSON = new Gson();

  @Autowired
  private RocketMQTemplate rocketMQTemplate;

  public void asyncPushToUser(PushContents pushContents, int userId,
      int appFlag, int activityId, boolean passThough, int pushType) {
    PushToUserDto pushMessage = new PushToUserDto();
    pushMessage.setPushContents(pushContents);
    pushMessage.setUserId(userId);
    pushMessage.setAppFlag(appFlag);
    pushMessage.setActivityId(activityId);
    pushMessage.setPassThough(passThough);
    pushMessage.setPushType(pushType);

    String requestId = ThreadContext.get("requestId");
    pushMessage.setRequestId(requestId);
    // 异步发送消息
    log.info("push message producer send message {} {}", userId,
        GSON.toJson(pushMessage));
    rocketMQTemplate.asyncSend(BusinessConstants.TOPIC_PUSH_MESSAGE,
        pushMessage, new SendCallback() {
          @Override
          public void onSuccess(SendResult sendResult) {
            log.info("push message async message succeeded sendResult:{}",
                sendResult);
          }

          @Override
          public void onException(Throwable throwable) {
            log.error("push message async message failed: {}",
                throwable.getMessage());
          }
        });
  }
}

消费者

@Slf4j
@Component
@RocketMQMessageListener(topic = BusinessConstants.TOPIC_PUSH_MESSAGE,
    consumerGroup = BusinessConstants.GROUP_CONSUMER_TOOLS)
public class PushToUserConsumerService implements
    RocketMQListener<PushToUserDto>, RocketMQPushConsumerLifecycleListener {

  @Override
  public void onMessage(PushToUserDto pushToUserDto) {
    String requestId = pushToUserDto.getRequestId();
    if (!Strings.isNullOrEmpty(requestId)) {
      ThreadContext.put("requestId", requestId);
    }
    log.info("push message consumer receipt message {} {}",
        pushToUserDto.getUserId(),
        new Gson().toJson(pushToUserDto));
    try {
      //xxx 业务处理
    } catch (Exception e) {
      log.error("push message consumer receipt message fail {} {}",
          pushToUserDto.getUserId(), e.getMessage(), e);
      throw new RuntimeException(e);
    }
  }

  @Override
  public void prepareStart(DefaultMQPushConsumer consumer) {
    // 设置消费者重试次数
    consumer.setMaxReconsumeTimes(3);
    // 每次提取的最大消息数
    consumer.setPullBatchSize(32);
  }

2.5 Webhook处理等业务场景

  • webhook 回调的场景我们这边特别多,比如appstore购买订阅回调,google/paypal/stripe 购买订阅的回调等等,由于我们这边有多个数据中心,但是上游平台只能配置一个地址,我们也不知道这次的回调数据要回调到哪个数据中心,只能回调到其中一个,再进行转发到其他数据中心,现在转发的策略是通过new Thread() 进行异步转发,不考虑消息是否成功/失败,监听不到消息是否请求,对我们上层来说无感/危险⚠️性极大,这不改造机会来了,下面都是改造过程中的一些代码(已脱敏)。

生产者

@Slf4j
@Component
public class CallBackProducerService {

  private static final Gson GSON = new Gson();

  @Autowired
  private RocketMQTemplate rocketMQTemplate;


  /**
   * 异步发送 callback消息.
   *
   * @param appStoreCallBackDto
   */
  public void asyncSendAppStoreCallBackMessage(
      AppStoreCallBackDto appStoreCallBackDto) {
    log.info("appstore callback producer send message {}",
        GSON.toJson(appStoreCallBackDto));
    rocketMQTemplate.asyncSend(BusinessConstants.TOPIC_APPSTORE_CALLBACK,
        appStoreCallBackDto, new SendCallback() {
          @Override
          public void onSuccess(SendResult sendResult) {
            log.info("callback async message succeeded sendResult:{}",
                sendResult);
          }

          @Override
          public void onException(Throwable throwable) {
            log.error("callback async message failed: {}",
                throwable.getMessage());
          }
        });
  }
}

消费者

@Slf4j
@Component
@RocketMQMessageListener(
    topic = BusinessConstants.TOPIC_APPSTORE_CALLBACK,
    consumerGroup = BusinessConstants.GROUP_APPSTORE_CALLBACK,
    consumeMode = ConsumeMode.CONCURRENTLY,
    messageModel = MessageModel.CLUSTERING)
public class AppStoreCallBackConsumerService implements
    RocketMQListener<AppStoreCallBackDto>,
    RocketMQPushConsumerLifecycleListener {

  @Autowired
  WebClient webClient;

  @Override
  public void onMessage(AppStoreCallBackDto appStoreCallBackDto) {
    String requestId = appStoreCallBackDto.getRequestId();
    if (!Strings.isNullOrEmpty(requestId)) {
      ThreadContext.put("requestId", requestId);
    }

    try {
        // 业务处理xxxxxxxxxxxxx
      } catch (Exception e) {
        log.error("appstore callback consumer message fail {}",
            e.getMessage(), e);
        // 500 抛出异常 进行重试消息
        throw new RuntimeException(e);
      }
    }
  }


  @Override
  public void prepareStart(DefaultMQPushConsumer consumer) {
    // 设置消费者重试5次 进入死信队列
    consumer.setMaxReconsumeTimes(5);
    // 每次提取的最大消息数
    consumer.setPullBatchSize(32);
  }

2.6 多数据中心用户数据同步

  • 同步多数据中心现在是使用python脚本查从库MySQL将符合条件的用户查询出来,再直接连接其他数据中心的Redis 将数据写入进去,现在出现了慢SQL查询的瓶颈,同步的用户数据出现延迟,数据只会越来越多,带来的危害越来越大,所以才用业务优化的方案,放弃python脚本的同步,在用户登陆注册的时候/用户修改了xxxx将数据同步到其他数据中心,使用MQ异步写入,消费者慢慢消费 转发到其他数据中心,将数据同步到其他数据中心的Redis中。也是成功解决的现在存在的大表问题,主要这个同步方案是2021年设计的,当时的用户量并没有那么多 加上查询的是从库 不影响主库,所有这个设计方案也是成功运行了3年多了,现在也暴露出该有的问题了。下面都是改造过程中的一些代码(已脱敏)。

读写队列的大小都是默认值4/4 记住消费者的数量并不是越多越好 而是看队列的大小来设置的。

image.png

生产者

@Slf4j
@Component
public class AccountUpdateProducerService {

  private static final Gson GSON = new Gson();

  @Autowired
  private RocketMQTemplate rocketMQTemplate;

  /**
   * 异步发送同步到其他数据中心消息.
   *
   * @param accountUpdateDto
   * @param source
   */
  public void asyncSendAccountUpdateMessage(AccountUpdateDto accountUpdateDto,
      int source) {
    log.info("account update producer send message {} {}", source,
        GSON.toJson(accountUpdateDto));
    if (accountUpdateDto != null && accountUpdateDto.getUserId() == 0) {
      return;
    }
    rocketMQTemplate.asyncSend(BusinessConstants.TOPIC_ACCOUNT_UPDATE,
        accountUpdateDto, new SendCallback() {
          @Override
          public void onSuccess(SendResult sendResult) {
            log.info("account update producer async message succeeded " +
                "sendResult:{}", sendResult);
          }

          @Override
          public void onException(Throwable throwable) {
            log.error("account update producer async message failed: {}",
                throwable.getMessage());
          }
        });
  }
}
@Slf4j
@Component
@RocketMQMessageListener(
    topic = BusinessConstants.TOPIC_ACCOUNT_UPDATE,
    consumerGroup = BusinessConstants.GROUP_ACCOUNT_UPDATE,
    consumeMode = ConsumeMode.CONCURRENTLY,
    messageModel = MessageModel.CLUSTERING)
public class AccountUpdateConsumerService implements
    RocketMQListener<AccountUpdateDto>,
    RocketMQPushConsumerLifecycleListener {

  @Autowired
  WebClient webClient;

  @Override
  public void onMessage(AccountUpdateDto accountUpdateDto) {
    String requestId = accountUpdateDto.getRequestId();
    if (!Strings.isNullOrEmpty(requestId)) {
      ThreadContext.put("requestId", requestId);
    }
    log.info("account update consumer message {}",
        accountUpdateDto);
    try {
        // 业务处理xxxxxxxxxxx
      } catch (Exception e) {
        log.error("account update consumer message fail {}",
            e.getMessage(), e);
        // 500 抛出异常 进行重试消息
        throw new RuntimeException(e);
      }
    }
  }

  @Override
  public void prepareStart(DefaultMQPushConsumer consumer) {
    // 设置消费者重试5次 进入死信队列
    consumer.setMaxReconsumeTimes(5);
    // 每次提取的最大消息数
    consumer.setPullBatchSize(32);
  }
}

3.告警监控 RocketMQ Exporter+HertzBeat

  • 当我们加了一层MQ的话 架构的复杂性就增大了,我们需要保证中间件的高可用,以及宕机的时候能够直接告警,自动重启修复,至人工处理,所以我们需要监控的数据越来越多了,我这边主要是监控的数据有以下:

    • commitlog 磁盘使用大小 rocketmq_brokeruntime_commitlog_disk_ratio rocketmq_brokeruntime_commitlogdir_capacity_free
    • 消息延迟堆积大小 rocketmq_group_diff
    • 重试消息堆积大小 rocketmq_group_retrydiff
    • 死信队列堆积大小 rocketmq_group_dlqdiff
    • …………等等。
    • RocketMQ Exporter 安装 使用docker 参考这个作者的安装 juejin.cn/post/726253…
    • rocketmq.apache.org/zh/docs/4.x… 监控的指标选择
      image.png
    • 启动完 访问 http://127.0.0.1:9121/metrics 查看相关数据

hertzBeat MQ性能指标面板观看 绑定暴露出来的 /metrics接口

image.png

监控阈值设置 告警/恢复发送到飞书

image.png

image.png

4. 思考

  • (设计) 现在场景是回调都是打到mall服务上面 在mall服务继承rocketmq依赖 添加配置文件,但是推送/站内信/邮件是 打到tools服务上面,tools服务也继承rokcetmq依赖 添加配置文件,后续如果想在别的服务进行mq操作 又需要走这一套流程,好的方法就是mq抽成一个项目,别的服务器调用mq项目工具类进行生产消息,消费者消费信息的话也是在mq项目里面, 这样的话全都要走rpc调用了(有些直接方法调用),违背了服务拆分按照业务来。
  • (个人)MQ现在只是掌握一点毛皮,需要深入学习相关原理和配置合理的参数,考虑各种异常场景解决的方案,对于出现的异常能够独立有效解决, 继续深入学习吧,熟悉运用到各个场景里面。
  • 单机RocketMQ 有点危险,现在的业务量 还能够保证运行,以后就不确定了,业务并发量太小了,时常和朋友交流,他们那么的业务消息堆积起来都是百万级别的,在下游处理的消费者你需要TPS千级别+,听起来就刺激这种业务,可惜没机会遇到 进行发挥一下。

贴一张雀魂麻将 2024/05/22 悸动之夏-八木唯(动态)皮肤返场

image.png

5. 开源项目阅读

参考

文章收集整理于网络,请勿商用,仅供个人学习使用,如有侵权,请联系作者删除,如若转载,请注明出处:http://www.cxyroad.com/17039.html

QR code