1. 首页
  2. 后端

go语言使用RocketMQ 5.0示例

  go语言使用RocketMQ 5.0示例

====================

Why?

====

网上用 go 语言调用 RocketMQ 的文章、资料目前大部分是使用4.x版本的RocketMQ。4.x版本的 RocketMQ 使用的是 Remoting 协议 SDK,而5.0版本使用的是基于gRPC 协议 SDK。这篇文章演示快速启动一个环境并使用的5.0版本的普通消息。

安装环境

这里使用 docker compose 安装环境。所以需要预先安装 docker。

首先创建一个文件,命名为 docker-compose.yml

version: '3.8'
services:
  namesrv:
    image: apache/rocketmq:5.2.0
    container_name: rmqnamesrv
    ports:
      - 9876:9876
    networks:
      - rocketmq
    command: sh mqnamesrv
  broker:
    image: apache/rocketmq:5.2.0
    container_name: rmqbroker
    ports:
      - 10909:10909
      - 10911:10911
      - 10912:10912
    environment:
      - NAMESRV_ADDR=rmqnamesrv:9876
    depends_on:
      - namesrv
    networks:
      - rocketmq
    command: sh mqbroker
  proxy:
    image: apache/rocketmq:5.2.0
    container_name: rmqproxy
    networks:
      - rocketmq
    depends_on:
      - broker
      - namesrv
    ports:
      - 8080:8080
      - 8081:8081
    restart: on-failure
    environment:
      - NAMESRV_ADDR=rmqnamesrv:9876
    command: sh mqproxy
  dashboard:
    image: apacherocketmq/rocketmq-dashboard:latest
    container_name: rocketmq-dashboard
    ports:
      - 8888:8080
    environment:
      - JAVA_OPTS=-Drocketmq.namesrv.addr=rmqnamesrv:9876
    depends_on:
      - namesrv
    networks:
      - rocketmq
networks:
  rocketmq:
    driver: bridge

在上面那个文件创建的目录下执行命令

$ docker compose up -d

如果你的 docker compose 版本比较旧,那么执行 docker-compose up -d

等待所有容器启动。

使用 docker ps 可以查看已经启动的容器。

CONTAINER ID   IMAGE                                      COMMAND                  CREATED       STATUS       PORTS                                                                                                                            NAMES
17cb84bd47e6   apache/rocketmq:5.2.0                      "./docker-entrypoint…"   2 days ago    Up 2 days    9876/tcp, 10909/tcp, 0.0.0.0:8080-8081->8080-8081/tcp, :::8080-8081->8080-8081/tcp, 10911-10912/tcp                              rmqproxy
6793ff0c9532   apache/rocketmq:5.2.0                      "./docker-entrypoint…"   2 days ago    Up 2 days    0.0.0.0:10909->10909/tcp, :::10909->10909/tcp, 9876/tcp, 0.0.0.0:10911-10912->10911-10912/tcp, :::10911-10912->10911-10912/tcp   rmqbroker
b2da8ad65981   apacherocketmq/rocketmq-dashboard:latest   "sh -c 'java $JAVA_O…"   2 days ago    Up 2 days    0.0.0.0:8888->8080/tcp, :::8888->8080/tcp                                                                                        rocketmq-dashboard
45642db36512   apache/rocketmq:5.2.0                      "./docker-entrypoint…"   2 days ago    Up 2 days    10909/tcp, 0.0.0.0:9876->9876/tcp, :::9876->9876/tcp, 10911-10912/tcp                                                            rmqnamesrv

如果你想停止这些服务,在刚才的目录下执行 docker compose downdocker-compose down

接下来你可以访问 http://{换成你的宿主机IP}:8888/ 访问仪表盘。

image.png

配置主题

接下来在主题里新增topic。
点击新增/更新

image.png

这里的主题名写完之后在程序里要使用相同的主题名。

创建生产者

创建生产者的代码,以下代码由 官方示例 修改而来。

package main

import (
    "context"
    "fmt"
    "github.com/apache/rocketmq-clients/golang/v5/credentials"
    "log"
    "os"
    "strconv"
    "time"

    rmq_client "github.com/apache/rocketmq-clients/golang/v5"
)

const (
    Topic     = "test-topic"       // 主题名称
    Endpoint  = "10.101.5.11:8081" // grpc proxy address,换成宿主机的ip和上文安装环境的 proxy 的端口。
    AccessKey = "xxxxxx"           // 没有的话这么写就行
    SecretKey = "xxxxxx"
)

func main() {
    os.Setenv("mq.consoleAppender.enabled", "true")
    rmq_client.ResetLogger()
    // In most case, you don't need to create many producers, singleton pattern is more recommended.
    producer, err := rmq_client.NewProducer(&rmq_client.Config{
       Endpoint: Endpoint,
       Credentials: &credentials.SessionCredentials{
          AccessKey:    AccessKey,
          AccessSecret: SecretKey,
       },
    },
       rmq_client.WithTopics(Topic),
    )
    if err != nil {
       log.Fatal(err)
    }
    // start producer
    err = producer.Start()
    if err != nil {
       log.Fatal(err)
    }
    // graceful stop producer
    defer producer.GracefulStop()

    for i := 0; i < 10; i++ {
       // new a message
       msg := &rmq_client.Message{
          Topic: Topic,
          Body:  []byte("this is a message : " + strconv.Itoa(i)),
       }
       // set keys and tag
       msg.SetKeys("a", "b")
       msg.SetTag("ab")
       // send message in sync
       resp, err := producer.Send(context.TODO(), msg)
       if err != nil {
          log.Fatal(err)
       }
       for i := 0; i < len(resp); i++ {
          fmt.Printf("%#v\n", resp[i])
       }
       // wait a moment
       time.Sleep(time.Second * 1)
    }
}

这里要注意一点,RocketMQ 4.0 的go sdk 连接地址是namesrv,RocketMQ 5.0 的go sdk 的Endpoint是 proxy 的地址。

执行上面这段代码,只要终端没有报错就是正确执行了,成功后消息队列里应当有10条记录。

创建消费者

接下来执行消费者代码。

package main

import (
    "context"
    "fmt"
    "github.com/apache/rocketmq-clients/golang/v5/credentials"
    "log"
    "os"
    "time"

    rmq_client "github.com/apache/rocketmq-clients/golang/v5"
)

const (
    Topic         = "test-topic"
    ConsumerGroup = "test-group"
    Endpoint      = "10.101.5.11:8081" // grpc proxy address
    AccessKey     = "xxxxxx"
    SecretKey     = "xxxxxx"
)

var (
    // maximum waiting time for receive func
    awaitDuration = time.Second * 5
    // maximum number of messages received at one time
    maxMessageNum int32 = 16
    // invisibleDuration should > 20s
    invisibleDuration = time.Second * 20
    // receive messages in a loop
)

func main() {
    // log to console
    os.Setenv("mq.consoleAppender.enabled", "true")
    rmq_client.ResetLogger()
    // In most case, you don't need to create many consumers, singleton pattern is more recommended.
    simpleConsumer, err := rmq_client.NewSimpleConsumer(&rmq_client.Config{
       Endpoint:      Endpoint,
       ConsumerGroup: ConsumerGroup,
       Credentials: &credentials.SessionCredentials{
          AccessKey:    AccessKey,
          AccessSecret: SecretKey,
       },
    },
       rmq_client.WithAwaitDuration(awaitDuration),
       rmq_client.WithSubscriptionExpressions(map[string]*rmq_client.FilterExpression{
          Topic: rmq_client.SUB_ALL,
       }),
    )
    if err != nil {
       log.Fatal(err)
    }
    // start simpleConsumer
    err = simpleConsumer.Start()
    if err != nil {
       log.Fatal(err)
    }
    // graceful stop simpleConsumer
    defer simpleConsumer.GracefulStop()

    go func() {
       for {
          fmt.Println("start receive message")
          mvs, err := simpleConsumer.Receive(context.TODO(), maxMessageNum, invisibleDuration)
          if err != nil {
             fmt.Println(err)
          }
          // ack message
          for _, mv := range mvs {
             simpleConsumer.Ack(context.TODO(), mv)
             fmt.Println(string(mv.GetBody()))
          }
          fmt.Println("wait a moment")
          fmt.Println()
          time.Sleep(time.Second * 3)
       }
    }()
    // 阻塞主 goroutine 防止退出,观察子 goroutine 状态
    done := make(chan struct{})
    <-done
}

可以看到终端输出了生产者投送的消息。

可能遇到的问题

failed to get topic route data result from remote during client startup

找不到主题,有可能是程序里的主题名称与仪表盘上的不一致。如果确认是一致的,那么在仪表盘上发送一条消息,再启动程序,有可能就不会出现这个错误了。

原文链接: https://juejin.cn/post/7376617794426273833

文章收集整理于网络,请勿商用,仅供个人学习使用,如有侵权,请联系作者删除,如若转载,请注明出处:http://www.cxyroad.com/17398.html

QR code