赞
踩
package main import ( "fmt" "log" "sync" "time" "github.com/streadway/amqp" ) func declareQueue(ch *amqp.Channel, queueName string) error { _, err := ch.QueueDeclare( queueName, // name false, // durable false, // delete when unused false, // exclusive false, // no-wait nil, // arguments ) return err } func producer(ch *amqp.Channel, wg *sync.WaitGroup, queueName string) { defer wg.Done() for i := 1; i <= 5; i++ { message := fmt.Sprintf("Message %d", i) err := ch.Publish( "", // exchange queueName, // routing key false, // mandatory false, // immediate amqp.Publishing{ ContentType: "text/plain", Body: []byte(message), }) if err != nil { log.Fatalf("Failed to publish a message: %v", err) } fmt.Printf("Sent: %s\n", message) time.Sleep(time.Second) } } func consumer(ch *amqp.Channel, wg *sync.WaitGroup, queueName string) { defer wg.Done() msgs, err := ch.Consume( queueName, // queue "", // consumer true, // auto-ack false, // exclusive false, // no-local false, // no-wait nil, // args ) if err != nil { log.Fatalf("Failed to register a consumer: %v", err) } for msg := range msgs { fmt.Printf("Received: %s\n", msg.Body) } } func main() { conn, err := amqp.Dial("amqp://guest:guest@localhost:5672/") if err != nil { log.Fatalf("Failed to connect to RabbitMQ: %v", err) } defer conn.Close() ch, err := conn.Channel() if err != nil { log.Fatalf("Failed to open a channel: %v", err) } defer ch.Close() queueName := "your_queue_name" // Declare the queue before using it err = declareQueue(ch, queueName) if err != nil { log.Fatalf("Failed to declare queue: %v", err) } var wg sync.WaitGroup wg.Add(2) go producer(ch, &wg, queueName) go consumer(ch, &wg, queueName) wg.Wait() }
Sent: Message 1
Received: Message 1
Sent: Message 2
Received: Message 2
Sent: Message 3
Received: Message 3
Sent: Message 4
Received: Message 4
Sent: Message 5
Received: Message 5
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。