当前位置:   article > 正文

RocketMQ5.0 生产者

RocketMQ5.0 生产者

生产者消息类型:

延迟队列的生产者

  1. package main
  2. import (
  3. "context"
  4. "fmt"
  5. "github.com/apache/rocketmq-clients/golang/v5"
  6. "github.com/apache/rocketmq-clients/golang/v5/credentials"
  7. errgroup2 "golang.org/x/sync/errgroup"
  8. "log"
  9. "os"
  10. "strconv"
  11. "time"
  12. )
  13. const (
  14. Topic = "DelayTopic"
  15. GroupName = "testG"
  16. Endpoint = "localhost:8081"
  17. Region = "xxxxxx"
  18. AccessKey = "xxxxxx"
  19. SecretKey = "xxxxxx"
  20. )
  21. func main() {
  22. os.Setenv("mq.consoleAppender.enabled", "true")
  23. golang.ResetLogger()
  24. // new producer instance
  25. producer, err := golang.NewProducer(&golang.Config{
  26. Endpoint: Endpoint,
  27. Credentials: &credentials.SessionCredentials{},
  28. },
  29. golang.WithTopics(Topic),
  30. )
  31. if err != nil {
  32. log.Fatal(err)
  33. }
  34. // start producer
  35. err = producer.Start()
  36. if err != nil {
  37. log.Fatal(err)
  38. }
  39. // gracefule stop producer
  40. defer producer.GracefulStop()
  41. var wg = errgroup2.Group{}
  42. wg.SetLimit(10)
  43. for i := 0; i < 1000; i++ {
  44. wg.Go(func() error {
  45. msg := &golang.Message{
  46. Topic: Topic,
  47. Body: []byte("this is a message : " + strconv.Itoa(i) + time.Now().Format(time.DateTime)),
  48. }
  49. // set keys and tag
  50. msg.SetKeys("a", "b")
  51. msg.SetTag("ab")
  52. msg.SetDelayTimestamp(time.Now().Add(time.Second * 10))
  53. // send message in sync
  54. resp, err := producer.Send(context.TODO(), msg)
  55. if err != nil {
  56. log.Fatal(err)
  57. }
  58. for i := 0; i < len(resp); i++ {
  59. fmt.Printf("%#v\n", resp[i])
  60. }
  61. return nil
  62. })
  63. // wait a moment
  64. time.Sleep(time.Second * 1)
  65. }
  66. wg.Wait()
  67. time.Sleep(time.Minute * 10)
  68. }

设置topic的。message.type                                                                                                                                     docker exec -it rmqnamesrv /bin/bash       

  1. sh mqadmin updateTopic -c DefaultCluster -t DelayTopic -n 127.0.0.1:9876 -a +message.type=DELAY
  2. sh mqadmin updateTopic -n <nameserver_address> -t <topic_name> -c <cluster_name> -a +message.type=DELAY

   消费者

  1. package main
  2. import (
  3. "context"
  4. "fmt"
  5. "log"
  6. "os"
  7. "time"
  8. "github.com/apache/rocketmq-clients/golang"
  9. "github.com/apache/rocketmq-clients/golang/credentials"
  10. )
  11. const (
  12. Topic = "DelayTopic"
  13. GroupName = "testG"
  14. Endpoint = "localhost:8081"
  15. )
  16. var (
  17. // maximum waiting time for receive func
  18. awaitDuration = time.Second * 5
  19. // maximum number of messages received at one time
  20. maxMessageNum int32 = 16
  21. // invisibleDuration should > 20s
  22. invisibleDuration = time.Second * 20
  23. // receive messages in a loop
  24. )
  25. func main() {
  26. // log to console
  27. os.Setenv("mq.consoleAppender.enabled", "true")
  28. golang.ResetLogger()
  29. // new simpleConsumer instance
  30. simpleConsumer, err := golang.NewSimpleConsumer(&golang.Config{
  31. Endpoint: Endpoint,
  32. Credentials: &credentials.SessionCredentials{},
  33. ConsumerGroup: "string",
  34. },
  35. golang.WithAwaitDuration(awaitDuration),
  36. golang.WithSubscriptionExpressions(map[string]*golang.FilterExpression{
  37. Topic: golang.SUB_ALL,
  38. }),
  39. )
  40. if err != nil {
  41. log.Fatal(err)
  42. }
  43. // start simpleConsumer
  44. err = simpleConsumer.Start()
  45. if err != nil {
  46. log.Fatal(err)
  47. }
  48. // gracefule stop simpleConsumer
  49. defer simpleConsumer.GracefulStop()
  50. go func() {
  51. defer func() {
  52. if err := recover(); err != nil {
  53. fmt.Println(err)
  54. }
  55. }()
  56. for {
  57. fmt.Println("start recevie message")
  58. mvs, err := simpleConsumer.Receive(context.TODO(), maxMessageNum, invisibleDuration)
  59. if err != nil {
  60. fmt.Println(err)
  61. }
  62. // ack message
  63. for _, mv := range mvs {
  64. simpleConsumer.Ack(context.TODO(), mv)
  65. fmt.Println(string(mv.GetBody()) + " " + time.Now().Format(time.DateTime))
  66. }
  67. fmt.Println("wait a moment")
  68. fmt.Println()
  69. time.Sleep(time.Second * 3)
  70. }
  71. }()
  72. // run for a while
  73. time.Sleep(time.Minute * 20)
  74. }

声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/Guff_9hys/article/detail/919993
推荐阅读
相关标签
  

闽ICP备14008679号