赞
踩
生产者消息类型:
延迟队列的生产者
- package main
-
- import (
- "context"
- "fmt"
- "github.com/apache/rocketmq-clients/golang/v5"
- "github.com/apache/rocketmq-clients/golang/v5/credentials"
- errgroup2 "golang.org/x/sync/errgroup"
- "log"
- "os"
- "strconv"
- "time"
- )
-
- const (
- Topic = "DelayTopic"
- GroupName = "testG"
- Endpoint = "localhost:8081"
- Region = "xxxxxx"
- AccessKey = "xxxxxx"
- SecretKey = "xxxxxx"
- )
-
- func main() {
- os.Setenv("mq.consoleAppender.enabled", "true")
- golang.ResetLogger()
- // new producer instance
-
- producer, err := golang.NewProducer(&golang.Config{
- Endpoint: Endpoint,
- Credentials: &credentials.SessionCredentials{},
- },
- golang.WithTopics(Topic),
- )
- if err != nil {
- log.Fatal(err)
- }
- // start producer
- err = producer.Start()
- if err != nil {
- log.Fatal(err)
- }
- // gracefule stop producer
- defer producer.GracefulStop()
- var wg = errgroup2.Group{}
- wg.SetLimit(10)
- for i := 0; i < 1000; i++ {
- wg.Go(func() error {
- msg := &golang.Message{
- Topic: Topic,
- Body: []byte("this is a message : " + strconv.Itoa(i) + time.Now().Format(time.DateTime)),
- }
- // set keys and tag
- msg.SetKeys("a", "b")
- msg.SetTag("ab")
- msg.SetDelayTimestamp(time.Now().Add(time.Second * 10))
-
- // send message in sync
- resp, err := producer.Send(context.TODO(), msg)
- if err != nil {
- log.Fatal(err)
- }
-
- for i := 0; i < len(resp); i++ {
- fmt.Printf("%#v\n", resp[i])
- }
- return nil
- })
-
- // wait a moment
- time.Sleep(time.Second * 1)
- }
- wg.Wait()
- time.Sleep(time.Minute * 10)
- }
![](https://csdnimg.cn/release/blogv2/dist/pc/img/newCodeMoreWhite.png)
设置topic的。message.type docker exec -it rmqnamesrv /bin/bash
- sh mqadmin updateTopic -c DefaultCluster -t DelayTopic -n 127.0.0.1:9876 -a +message.type=DELAY
-
-
- sh mqadmin updateTopic -n <nameserver_address> -t <topic_name> -c <cluster_name> -a +message.type=DELAY
-
消费者
- package main
-
- import (
- "context"
- "fmt"
- "log"
- "os"
- "time"
-
- "github.com/apache/rocketmq-clients/golang"
- "github.com/apache/rocketmq-clients/golang/credentials"
- )
-
- const (
- Topic = "DelayTopic"
- GroupName = "testG"
- Endpoint = "localhost:8081"
- )
-
- var (
- // maximum waiting time for receive func
- awaitDuration = time.Second * 5
- // maximum number of messages received at one time
- maxMessageNum int32 = 16
- // invisibleDuration should > 20s
- invisibleDuration = time.Second * 20
- // receive messages in a loop
- )
-
- func main() {
- // log to console
- os.Setenv("mq.consoleAppender.enabled", "true")
- golang.ResetLogger()
- // new simpleConsumer instance
- simpleConsumer, err := golang.NewSimpleConsumer(&golang.Config{
- Endpoint: Endpoint,
- Credentials: &credentials.SessionCredentials{},
- ConsumerGroup: "string",
- },
- golang.WithAwaitDuration(awaitDuration),
- golang.WithSubscriptionExpressions(map[string]*golang.FilterExpression{
- Topic: golang.SUB_ALL,
- }),
- )
- if err != nil {
- log.Fatal(err)
- }
- // start simpleConsumer
- err = simpleConsumer.Start()
- if err != nil {
- log.Fatal(err)
- }
- // gracefule stop simpleConsumer
- defer simpleConsumer.GracefulStop()
-
- go func() {
- defer func() {
- if err := recover(); err != nil {
- fmt.Println(err)
- }
- }()
- for {
- fmt.Println("start recevie message")
- mvs, err := simpleConsumer.Receive(context.TODO(), maxMessageNum, invisibleDuration)
- if err != nil {
- fmt.Println(err)
- }
- // ack message
- for _, mv := range mvs {
- simpleConsumer.Ack(context.TODO(), mv)
- fmt.Println(string(mv.GetBody()) + " " + time.Now().Format(time.DateTime))
- }
- fmt.Println("wait a moment")
- fmt.Println()
- time.Sleep(time.Second * 3)
- }
- }()
- // run for a while
- time.Sleep(time.Minute * 20)
- }
![](https://csdnimg.cn/release/blogv2/dist/pc/img/newCodeMoreWhite.png)
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。