go语言使用RocketMQ 5.0示例
====================
Why?
====
网上用 go 语言调用 RocketMQ 的文章、资料目前大部分是使用4.x版本的RocketMQ。4.x版本的 RocketMQ 使用的是 Remoting 协议 SDK,而5.0版本使用的是基于gRPC 协议 SDK。这篇文章演示快速启动一个环境并使用的5.0版本的普通消息。
安装环境
这里使用 docker compose 安装环境。所以需要预先安装 docker。
首先创建一个文件,命名为 docker-compose.yml
。
version: '3.8'
services:
namesrv:
image: apache/rocketmq:5.2.0
container_name: rmqnamesrv
ports:
- 9876:9876
networks:
- rocketmq
command: sh mqnamesrv
broker:
image: apache/rocketmq:5.2.0
container_name: rmqbroker
ports:
- 10909:10909
- 10911:10911
- 10912:10912
environment:
- NAMESRV_ADDR=rmqnamesrv:9876
depends_on:
- namesrv
networks:
- rocketmq
command: sh mqbroker
proxy:
image: apache/rocketmq:5.2.0
container_name: rmqproxy
networks:
- rocketmq
depends_on:
- broker
- namesrv
ports:
- 8080:8080
- 8081:8081
restart: on-failure
environment:
- NAMESRV_ADDR=rmqnamesrv:9876
command: sh mqproxy
dashboard:
image: apacherocketmq/rocketmq-dashboard:latest
container_name: rocketmq-dashboard
ports:
- 8888:8080
environment:
- JAVA_OPTS=-Drocketmq.namesrv.addr=rmqnamesrv:9876
depends_on:
- namesrv
networks:
- rocketmq
networks:
rocketmq:
driver: bridge
在上面那个文件创建的目录下执行命令
$ docker compose up -d
如果你的 docker compose 版本比较旧,那么执行 docker-compose up -d
等待所有容器启动。
使用 docker ps
可以查看已经启动的容器。
CONTAINER ID IMAGE COMMAND CREATED STATUS PORTS NAMES
17cb84bd47e6 apache/rocketmq:5.2.0 "./docker-entrypoint…" 2 days ago Up 2 days 9876/tcp, 10909/tcp, 0.0.0.0:8080-8081->8080-8081/tcp, :::8080-8081->8080-8081/tcp, 10911-10912/tcp rmqproxy
6793ff0c9532 apache/rocketmq:5.2.0 "./docker-entrypoint…" 2 days ago Up 2 days 0.0.0.0:10909->10909/tcp, :::10909->10909/tcp, 9876/tcp, 0.0.0.0:10911-10912->10911-10912/tcp, :::10911-10912->10911-10912/tcp rmqbroker
b2da8ad65981 apacherocketmq/rocketmq-dashboard:latest "sh -c 'java $JAVA_O…" 2 days ago Up 2 days 0.0.0.0:8888->8080/tcp, :::8888->8080/tcp rocketmq-dashboard
45642db36512 apache/rocketmq:5.2.0 "./docker-entrypoint…" 2 days ago Up 2 days 10909/tcp, 0.0.0.0:9876->9876/tcp, :::9876->9876/tcp, 10911-10912/tcp rmqnamesrv
如果你想停止这些服务,在刚才的目录下执行 docker compose down
或docker-compose down
。
接下来你可以访问 http://{换成你的宿主机IP}:8888/ 访问仪表盘。
配置主题
接下来在主题
里新增topic。
点击新增/更新
这里的主题名写完之后在程序里要使用相同的主题名。
创建生产者
创建生产者的代码,以下代码由 官方示例 修改而来。
package main
import (
"context"
"fmt"
"github.com/apache/rocketmq-clients/golang/v5/credentials"
"log"
"os"
"strconv"
"time"
rmq_client "github.com/apache/rocketmq-clients/golang/v5"
)
const (
Topic = "test-topic" // 主题名称
Endpoint = "10.101.5.11:8081" // grpc proxy address,换成宿主机的ip和上文安装环境的 proxy 的端口。
AccessKey = "xxxxxx" // 没有的话这么写就行
SecretKey = "xxxxxx"
)
func main() {
os.Setenv("mq.consoleAppender.enabled", "true")
rmq_client.ResetLogger()
// In most case, you don't need to create many producers, singleton pattern is more recommended.
producer, err := rmq_client.NewProducer(&rmq_client.Config{
Endpoint: Endpoint,
Credentials: &credentials.SessionCredentials{
AccessKey: AccessKey,
AccessSecret: SecretKey,
},
},
rmq_client.WithTopics(Topic),
)
if err != nil {
log.Fatal(err)
}
// start producer
err = producer.Start()
if err != nil {
log.Fatal(err)
}
// graceful stop producer
defer producer.GracefulStop()
for i := 0; i < 10; i++ {
// new a message
msg := &rmq_client.Message{
Topic: Topic,
Body: []byte("this is a message : " + strconv.Itoa(i)),
}
// set keys and tag
msg.SetKeys("a", "b")
msg.SetTag("ab")
// send message in sync
resp, err := producer.Send(context.TODO(), msg)
if err != nil {
log.Fatal(err)
}
for i := 0; i < len(resp); i++ {
fmt.Printf("%#v\n", resp[i])
}
// wait a moment
time.Sleep(time.Second * 1)
}
}
这里要注意一点,RocketMQ 4.0 的go sdk 连接地址是namesrv
,RocketMQ 5.0 的go sdk 的Endpoint是 proxy
的地址。
执行上面这段代码,只要终端没有报错就是正确执行了,成功后消息队列里应当有10条记录。
创建消费者
接下来执行消费者代码。
package main
import (
"context"
"fmt"
"github.com/apache/rocketmq-clients/golang/v5/credentials"
"log"
"os"
"time"
rmq_client "github.com/apache/rocketmq-clients/golang/v5"
)
const (
Topic = "test-topic"
ConsumerGroup = "test-group"
Endpoint = "10.101.5.11:8081" // grpc proxy address
AccessKey = "xxxxxx"
SecretKey = "xxxxxx"
)
var (
// maximum waiting time for receive func
awaitDuration = time.Second * 5
// maximum number of messages received at one time
maxMessageNum int32 = 16
// invisibleDuration should > 20s
invisibleDuration = time.Second * 20
// receive messages in a loop
)
func main() {
// log to console
os.Setenv("mq.consoleAppender.enabled", "true")
rmq_client.ResetLogger()
// In most case, you don't need to create many consumers, singleton pattern is more recommended.
simpleConsumer, err := rmq_client.NewSimpleConsumer(&rmq_client.Config{
Endpoint: Endpoint,
ConsumerGroup: ConsumerGroup,
Credentials: &credentials.SessionCredentials{
AccessKey: AccessKey,
AccessSecret: SecretKey,
},
},
rmq_client.WithAwaitDuration(awaitDuration),
rmq_client.WithSubscriptionExpressions(map[string]*rmq_client.FilterExpression{
Topic: rmq_client.SUB_ALL,
}),
)
if err != nil {
log.Fatal(err)
}
// start simpleConsumer
err = simpleConsumer.Start()
if err != nil {
log.Fatal(err)
}
// graceful stop simpleConsumer
defer simpleConsumer.GracefulStop()
go func() {
for {
fmt.Println("start receive message")
mvs, err := simpleConsumer.Receive(context.TODO(), maxMessageNum, invisibleDuration)
if err != nil {
fmt.Println(err)
}
// ack message
for _, mv := range mvs {
simpleConsumer.Ack(context.TODO(), mv)
fmt.Println(string(mv.GetBody()))
}
fmt.Println("wait a moment")
fmt.Println()
time.Sleep(time.Second * 3)
}
}()
// 阻塞主 goroutine 防止退出,观察子 goroutine 状态
done := make(chan struct{})
<-done
}
可以看到终端输出了生产者投送的消息。
可能遇到的问题
failed to get topic route data result from remote during client startup
找不到主题,有可能是程序里的主题名称与仪表盘上的不一致。如果确认是一致的,那么在仪表盘上发送一条消息,再启动程序,有可能就不会出现这个错误了。
原文链接: https://juejin.cn/post/7376617794426273833
文章收集整理于网络,请勿商用,仅供个人学习使用,如有侵权,请联系作者删除,如若转载,请注明出处:http://www.cxyroad.com/17398.html