当前位置:   article > 正文

open-messaging使用实例

open-messaging-api

本文主要展示一下open-messaging使用实例

consumer

PullConsumer

openmessaging-java/openmessaging-api-samples/src/main/java/io/openmessaging/samples/consumer/PullConsumerApp.java

  1. public class PullConsumerApp {
  2. public static void main(String[] args) throws OMSResourceNotExistException {
  3. //Load and start the vendor implementation from a specific OMS driver URL.
  4. final MessagingAccessPoint messagingAccessPoint =
  5. OMS.getMessagingAccessPoint("oms:rocketmq://alice@rocketmq.apache.org/us-east");
  6. messagingAccessPoint.startup();
  7. //Fetch a ResourceManager to create Queue resource.
  8. ResourceManager resourceManager = messagingAccessPoint.resourceManager();
  9. resourceManager.createQueue( "NS://HELLO_QUEUE", OMS.newKeyValue());
  10. //Start a PullConsumer to receive messages from the specific queue.
  11. final PullConsumer pullConsumer = messagingAccessPoint.createPullConsumer();
  12. pullConsumer.attachQueue("NS://HELLO_QUEUE");
  13. pullConsumer.startup();
  14. //Register a shutdown hook to close the opened endpoints.
  15. Runtime.getRuntime().addShutdownHook(new Thread(new Runnable() {
  16. @Override
  17. public void run() {
  18. pullConsumer.shutdown();
  19. messagingAccessPoint.shutdown();
  20. }
  21. }));
  22. //Receive one message from queue.
  23. Message message = pullConsumer.receive();
  24. //Acknowledge the consumed message
  25. pullConsumer.ack(message.sysHeaders().getString(Message.BuiltinKeys.RECEIPT_HANDLE));
  26. }
  27. }
  28. 复制代码
  • 首先创建messagingAccessPoint,然后启动是调用start,在shutdownHook里头调用shutdown
  • 然后通过resourceManager创建queue,和pullConsumer,并将其绑定
  • 之后调用pullConsumer的startup方法启动,然后关闭时shutdown方法
  • pullConsumer调用receive方法来拉取消息,这里改名为pull方法可能更合适些
  • pullConsumer可以对消息进行ack

PushConsumer

openmessaging-java/openmessaging-api-samples/src/main/java/io/openmessaging/samples/consumer/PushConsumerApp.java

  1. public class PushConsumerApp {
  2. public static void main(String[] args) throws OMSResourceNotExistException {
  3. //Load and start the vendor implementation from a specific OMS driver URL.
  4. final MessagingAccessPoint messagingAccessPoint =
  5. OMS.getMessagingAccessPoint("oms:rocketmq://localhost:10911/us-east");
  6. messagingAccessPoint.startup();
  7. //Fetch a ResourceManager to create Queue resource.
  8. ResourceManager resourceManager = messagingAccessPoint.resourceManager();
  9. final PushConsumer consumer = messagingAccessPoint.createPushConsumer();
  10. consumer.startup();
  11. //Register a shutdown hook to close the opened endpoints.
  12. Runtime.getRuntime().addShutdownHook(new Thread(new Runnable() {
  13. @Override
  14. public void run() {
  15. consumer.shutdown();
  16. messagingAccessPoint.shutdown();
  17. }
  18. }));
  19. //Consume messages from a simple queue.
  20. String simpleQueue = "NS://HELLO_QUEUE";
  21. resourceManager.createQueue( simpleQueue, OMS.newKeyValue());
  22. //This queue doesn't has a source queue, so only the message delivered to the queue directly can
  23. //be consumed by this consumer.
  24. consumer.attachQueue(simpleQueue, new MessageListener() {
  25. @Override
  26. public void onReceived(Message message, Context context) {
  27. System.out.println("Received one message: " + message);
  28. context.ack();
  29. }
  30. });
  31. }
  32. }
  33. 复制代码
  • 也是先创建messagingAccessPoint,然后创建PushConsumer
  • 也是通过resourceManager创建queue,然后跟PushConsumer绑定
  • PushConsumer通过注册MessageListener来处理回调逻辑

StreamingConsumer

openmessaging-java/openmessaging-api-samples/src/main/java/io/openmessaging/samples/consumer/StreamingConsumerApp.java

  1. public class StreamingConsumerApp {
  2. public static void main(String[] args) throws OMSResourceNotExistException {
  3. //Load and start the vendor implementation from a specific OMS driver URL.
  4. final MessagingAccessPoint messagingAccessPoint =
  5. OMS.getMessagingAccessPoint("oms:rocketmq://alice@rocketmq.apache.org/us-east");
  6. messagingAccessPoint.startup();
  7. //Fetch a ResourceManager to create Queue resource.
  8. String targetQueue = "NS://HELLO_QUEUE";
  9. ResourceManager resourceManager = messagingAccessPoint.resourceManager();
  10. resourceManager.createQueue(targetQueue, OMS.newKeyValue());
  11. //Fetch the streams of the target queue.
  12. List<String> streams = resourceManager.listStreams(targetQueue);
  13. //Start a StreamingConsumer to iterate messages from the specific stream.
  14. final StreamingConsumer streamingConsumer = messagingAccessPoint.createStreamingConsumer();
  15. streamingConsumer.startup();
  16. //Register a shutdown hook to close the opened endpoints.
  17. Runtime.getRuntime().addShutdownHook(new Thread(new Runnable() {
  18. @Override
  19. public void run() {
  20. streamingConsumer.shutdown();
  21. messagingAccessPoint.shutdown();
  22. }
  23. }));
  24. assert streams.size() != 0;
  25. StreamingIterator streamingIterator = streamingConsumer.seekToBeginning(streams.get(0));
  26. while (streamingIterator.hasNext()) {
  27. Message message = streamingIterator.next();
  28. System.out.println("Received one message: " + message);
  29. }
  30. //All the messages in the stream has been consumed.
  31. //Now consume the messages in reverse order
  32. while (streamingIterator.hasPrevious()) {
  33. Message message = streamingIterator.previous();
  34. System.out.println("Received one message again: " + message);
  35. }
  36. }
  37. }
  38. 复制代码
  • stream的这种方式跟kafka的使用方式有点类似
  • 通过StreamingConsumer获取StreamingIterator,然后遍历获取消息

producer

Producer

openmessaging-java/openmessaging-api-samples/src/main/java/io/openmessaging/samples/producer/ProducerApp.java

  1. public class ProducerApp {
  2. public static void main(String[] args) {
  3. final MessagingAccessPoint messagingAccessPoint =
  4. OMS.getMessagingAccessPoint("oms:rocketmq://alice@rocketmq.apache.org/us-east");
  5. final Producer producer = messagingAccessPoint.createProducer();
  6. messagingAccessPoint.startup();
  7. producer.startup();
  8. //Register a shutdown hook to close the opened endpoints.
  9. Runtime.getRuntime().addShutdownHook(new Thread(new Runnable() {
  10. @Override
  11. public void run() {
  12. producer.shutdown();
  13. messagingAccessPoint.shutdown();
  14. }
  15. }));
  16. //Sends a message to the specified destination synchronously.
  17. {
  18. SendResult sendResult = producer.send(producer.createBytesMessage(
  19. "NS://HELLO_QUEUE", "HELLO_BODY".getBytes(Charset.forName("UTF-8"))));
  20. System.out.println("Send sync message OK, message id is: " + sendResult.messageId());
  21. }
  22. //Sends a message to the specified destination asynchronously.
  23. //And get the result through Future
  24. {
  25. final Future<SendResult> result = producer.sendAsync(producer.createBytesMessage(
  26. "NS://HELLO_QUEUE", "HELLO_BODY".getBytes(Charset.forName("UTF-8"))));
  27. final SendResult sendResult = result.get(3000L);
  28. System.out.println("Send async message OK, message id is: " + sendResult.messageId());
  29. }
  30. //Sends a message to the specified destination asynchronously.
  31. //And retrieve the result through FutureListener
  32. {
  33. final Future<SendResult> result = producer.sendAsync(producer.createBytesMessage(
  34. "NS://HELLO_QUEUE", "HELLO_BODY".getBytes(Charset.forName("UTF-8"))));
  35. result.addListener(new FutureListener<SendResult>() {
  36. @Override
  37. public void operationComplete(Future<SendResult> future) {
  38. if (future.isDone() && null == future.getThrowable()) {
  39. System.out.println("Send async message OK, message id is: " + future.get().messageId());
  40. } else {
  41. System.out.println("Send async message Failed, cause is: " + future.getThrowable().getMessage());
  42. }
  43. }
  44. });
  45. }
  46. //Sends a message to the specific queue in OneWay manner.
  47. {
  48. //There is no {@code Future} related or {@code RuntimeException} thrown. The calling thread doesn't
  49. //care about the send result and also have no context to get the result.
  50. producer.sendOneway(producer.createBytesMessage(
  51. "NS://HELLO_QUEUE", "HELLO_BODY".getBytes(Charset.forName("UTF-8"))));
  52. }
  53. }
  54. }
  55. 复制代码
  • 通过messagingAccessPoint创建producer
  • producer可以send、sendAsync以及sendOneway
  • send是同步,sendAsync是异步,可以通过listener回调处理,sendOneway就是不关系发送结果

TransactionProducer

openmessaging-java/openmessaging-api-samples/src/main/java/io/openmessaging/samples/producer/TransactionProducerApp.java

  1. public class TransactionProducerApp {
  2. public static void main(String[] args) {
  3. final MessagingAccessPoint messagingAccessPoint =
  4. OMS.getMessagingAccessPoint("oms:rocketmq://alice@rocketmq.apache.org/us-east");
  5. final Producer producer = messagingAccessPoint.createProducer();
  6. messagingAccessPoint.startup();
  7. producer.startup();
  8. //Register a shutdown hook to close the opened endpoints.
  9. Runtime.getRuntime().addShutdownHook(new Thread(new Runnable() {
  10. @Override
  11. public void run() {
  12. producer.shutdown();
  13. messagingAccessPoint.shutdown();
  14. }
  15. }));
  16. Message message = producer.createBytesMessage(
  17. "NS://HELLO_QUEUE", "HELLO_BODY".getBytes(Charset.forName("UTF-8")));
  18. //Sends a transaction message to the specified destination synchronously.
  19. SendResult sendResult = producer.send(message, new LocalTransactionExecutor() {
  20. @Override
  21. public void execute(final Message message, final ExecutionContext context) {
  22. //Do some local transaction
  23. //Then commit this transaction and the message will be delivered.
  24. context.commit();
  25. }
  26. @Override
  27. public void check(final Message message, final CheckContext context) {
  28. //The server may lookup the transaction status forwardly associated the specified message
  29. context.commit();
  30. }
  31. }, OMS.newKeyValue());
  32. System.out.println("Send transaction message OK, message id is: " + sendResult.messageId());
  33. }
  34. }
  35. 复制代码
  • 使用的还是Producer,只是send方法使用的是有LocalTransactionExecutor参数的方法,来发送事务消息
  • LocalTransactionExecutor定义了execute和check方法
  • execute方法用来做本地事务相关的操作;check方法用于检查本地事务的状态

routing

openmessaging-java/openmessaging-api-samples/src/main/java/io/openmessaging/samples/routing/RoutingApp.java

  1. public class RoutingApp {
  2. public static void main(String[] args) throws OMSResourceNotExistException {
  3. //Load and start the vendor implementation from a specific OMS driver URL.
  4. final MessagingAccessPoint messagingAccessPoint =
  5. OMS.getMessagingAccessPoint("oms:rocketmq://alice@rocketmq.apache.org/us-east");
  6. messagingAccessPoint.startup();
  7. String destinationQueue = "NS://DESTINATION_QUEUE";
  8. String sourceQueue = "NS://SOURCE_QUEUE";
  9. //Fetch a ResourceManager to create source Queue, destination Queue, and the Routing instance.
  10. ResourceManager resourceManager = messagingAccessPoint.resourceManager();
  11. //Create the destination queue.
  12. resourceManager.createQueue(destinationQueue, OMS.newKeyValue());
  13. //Create the source queue.
  14. resourceManager.createQueue(sourceQueue, OMS.newKeyValue());
  15. KeyValue routingAttr = OMS.newKeyValue();
  16. routingAttr.put(OMSBuiltinKeys.ROUTING_SOURCE, sourceQueue)
  17. .put(OMSBuiltinKeys.ROUTING_DESTINATION, destinationQueue)
  18. .put(OMSBuiltinKeys.ROUTING_EXPRESSION, "color = 'red'");
  19. resourceManager.createRouting("NS://HELLO_ROUTING", routingAttr);
  20. //Send messages to the source queue ahead of the routing
  21. final Producer producer = messagingAccessPoint.createProducer();
  22. producer.startup();
  23. producer.send(producer.createBytesMessage(sourceQueue, "RED_COLOR".getBytes())
  24. .putUserHeaders("color", "red"));
  25. producer.send(producer.createBytesMessage(sourceQueue, "GREEN_COLOR".getBytes())
  26. .putUserHeaders("color", "green"));
  27. //Consume messages from the queue behind the routing.
  28. final PushConsumer pushConsumer = messagingAccessPoint.createPushConsumer();
  29. pushConsumer.startup();
  30. pushConsumer.attachQueue(destinationQueue, new MessageListener() {
  31. @Override
  32. public void onReceived(Message message, Context context) {
  33. //The message sent to the sourceQueue will be delivered to anotherConsumer by the routing rule
  34. //In this case, the push consumer will only receive the message with red color.
  35. System.out.println("Received a red message: " + message);
  36. context.ack();
  37. }
  38. });
  39. //Register a shutdown hook to close the opened endpoints.
  40. Runtime.getRuntime().addShutdownHook(new Thread(new Runnable() {
  41. @Override
  42. public void run() {
  43. producer.shutdown();
  44. pushConsumer.shutdown();
  45. messagingAccessPoint.shutdown();
  46. }
  47. }));
  48. }
  49. }
  50. 复制代码
  • routing用来做路由,可以通过表达式来从源队列过滤消息到目标队列,起到消息过滤的作用

小结

  • open messaging没有定义kafka的topic相关的概念,也没有consumer group的概念
  • amqp通过Exchange屏蔽了queue和topic的细节,不像JMS那样,需要producer去选择是要发到topic,还是发到queue
  • 这里open messaging虽然没有定义exchange,但是由于没有topic概念,发送都是发送到queue
  • open messaging的routing概念,跟amqp的outingKey有点类似,不过这个routing仅仅是作用于消息过滤,对消费者起作用

doc

声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/小惠珠哦/article/detail/742749
推荐阅读
相关标签
  

闽ICP备14008679号