当前位置:   article > 正文

基于IPC-CFX的点对点通信C#_ipc cfx

ipc cfx

        IPC-CFX有两种主要的通信方式,可以通过RabbitMQ发布和订阅,也可以通过request和response进行点对点的通信,本文主要讲的是点对点的通信方式。

        在vscode里建立新的dotnet项目,可以通过终端输入dotnet new console来建立,文件目录为CFXDemo->machine1和CFXDemo->machine2。

        通过nuget插件分别为两个项目都安装CFX.CFXSDK、AMQPNetLite.Core和Newtonsoft.Json这几个metapackage。

         我们将machine1作为发送端(sendRequest),machine2作为接收端(Receive),则Machine1的代码如下所示:

  1. using System.Threading;
  2. using CFX;
  3. using CFX.Transport;
  4. using System;
  5. using System.Security.Principal;
  6. namespace machine1
  7. {
  8. class Program
  9. {
  10. static void Main(string[] args)
  11. {
  12. OpenRequest();
  13. Console.ReadLine();
  14. for(int i = 0;i<5;i++){
  15. SendRequest();
  16. Thread.Sleep(2000);
  17. }
  18. }
  19. static string sendCFXHandle = "a.b.001";
  20. static string receiveCFXHandle = "a.b.002";
  21. static string sendRequestUri = string.Format("amqp://127.0.0.1:1235");
  22. static string receivRequestUri = string.Format("amqp://127.0.0.1:1234");
  23. #region send request
  24. static AmqpCFXEndpoint endpointSendRequest;
  25. static void OpenRequest()
  26. {
  27. if (endpointSendRequest != null)
  28. {
  29. endpointSendRequest.Close();
  30. endpointSendRequest = null;
  31. }
  32. endpointSendRequest = new AmqpCFXEndpoint();
  33. if (!endpointSendRequest.IsOpen)
  34. {
  35. Console.WriteLine("endpointSendRequest.IsOpen:" + endpointSendRequest.IsOpen.ToString());
  36. endpointSendRequest.Open(sendCFXHandle); //这一步会绑定endpointSendRequest里的CFXHandle,即sendCFXHandle的值
  37. Console.WriteLine("Request.Source is : {0}",endpointSendRequest.CFXHandle);
  38. Console.WriteLine("endpointSendRequest.IsOpen:" + endpointSendRequest.IsOpen.ToString());
  39. }
  40. // Set a timeout of 20 seconds. If the target endpoint does not
  41. // respond in this time, the request will time out.
  42. AmqpCFXEndpoint.RequestTimeout = TimeSpan.FromSeconds(20);
  43. }
  44. static void SendRequest()
  45. {
  46. // Build a GetEndpointInfomation Request
  47. CFXEnvelope request = CFXEnvelope.FromCFXMessage(new GetEndpointInformationRequest()
  48. {
  49. CFXHandle = receiveCFXHandle
  50. });
  51. request.Source = endpointSendRequest.CFXHandle;
  52. request.Target = receiveCFXHandle;
  53. try
  54. {
  55. CFXEnvelope response = endpointSendRequest.ExecuteRequest(receivRequestUri, request);
  56. Console.WriteLine($"response:\n{response.ToJson(true)}");
  57. }
  58. catch (Exception ex)
  59. {
  60. Console.WriteLine(ex.Message);
  61. }
  62. }
  63. #endregion send request
  64. #region receive request
  65. static AmqpCFXEndpoint endpointReceiveRequest;
  66. static void OpenReceive()
  67. {
  68. if (endpointReceiveRequest != null)
  69. {
  70. endpointReceiveRequest.Close();
  71. endpointReceiveRequest = null;
  72. }
  73. endpointReceiveRequest = new AmqpCFXEndpoint();
  74. endpointReceiveRequest.OnRequestReceived -= Endpoint_OnRequestReceived;
  75. endpointReceiveRequest.OnRequestReceived += Endpoint_OnRequestReceived;
  76. if (!endpointReceiveRequest.IsOpen)
  77. {
  78. Console.WriteLine("endpointSendRequest.IsOpen:" + endpointReceiveRequest.IsOpen.ToString());
  79. endpointReceiveRequest.Open(receiveCFXHandle, new Uri(receivRequestUri));
  80. Console.WriteLine("endpointSendRequest.IsOpen:" + endpointReceiveRequest.IsOpen.ToString());
  81. }
  82. }
  83. static CFXEnvelope Endpoint_OnRequestReceived(CFXEnvelope request)
  84. {
  85. Console.WriteLine($"Endpoint_OnRequestReceived: { request.ToString()}");
  86. // Process request. Return Result.
  87. if (request.MessageBody is WhoIsThereRequest)
  88. {
  89. CFXEnvelope result = CFXEnvelope.FromCFXMessage(new WhoIsThereResponse()
  90. { CFXHandle = receiveCFXHandle, RequestNetworkUri = receivRequestUri, RequestTargetAddress = "" });
  91. result.Source = receiveCFXHandle;
  92. result.Target = request.Source;
  93. result.TimeStamp = DateTime.Now;
  94. return result;
  95. }
  96. if (request.MessageBody is GetEndpointInformationRequest)
  97. {
  98. CFXEnvelope result = CFXEnvelope.FromCFXMessage(new WhoIsThereResponse()
  99. { CFXHandle = receiveCFXHandle, RequestNetworkUri = receivRequestUri, RequestTargetAddress = "...." });
  100. result.Source = receiveCFXHandle;
  101. result.Target = request.Source;
  102. result.TimeStamp = DateTime.Now;
  103. return result;
  104. }
  105. return null;
  106. }
  107. #endregion receive request
  108. }
  109. }

        作为接收端,machine2的代码如下所示:

  1. using CFX;
  2. using CFX.Transport;
  3. using System;
  4. namespace machine2
  5. {
  6. class Program
  7. {
  8. static void Main(string[] args)
  9. {
  10. Console.WriteLine("ReceivEndPoint is waiting Request......");
  11. OpenReceive();
  12. Console.WriteLine("Press Enter Key to end the App");
  13. Console.ReadKey();
  14. }
  15. static string sendCFXHandle = "a.b.001";
  16. static string receiveCFXHandle = "a.b.002";
  17. static string sendRequestUri = string.Format("amqp://127.0.0.1:1235");
  18. static string receivRequestUri = string.Format("amqp://127.0.0.1:1234");
  19. #region send request
  20. static AmqpCFXEndpoint endpointSendRequest;
  21. static void OpenRequest()
  22. {
  23. if (endpointSendRequest != null)
  24. {
  25. endpointSendRequest.Close();
  26. endpointSendRequest = null;
  27. }
  28. endpointSendRequest = new AmqpCFXEndpoint();
  29. if (!endpointSendRequest.IsOpen)
  30. {
  31. Console.WriteLine("endpointSendRequest.IsOpen:" + endpointSendRequest.IsOpen.ToString());
  32. endpointSendRequest.Open(receiveCFXHandle, new Uri(receivRequestUri));
  33. Console.WriteLine("endpointSendRequest.IsOpen:" + endpointSendRequest.IsOpen.ToString());
  34. }
  35. // Set a timeout of 20 seconds. If the target endpoint does not
  36. // respond in this time, the request will time out.
  37. AmqpCFXEndpoint.RequestTimeout = TimeSpan.FromSeconds(20);
  38. }
  39. static void SendRequest()
  40. {
  41. // Build a GetEndpointInfomation Request
  42. CFXEnvelope request = CFXEnvelope.FromCFXMessage(new GetEndpointInformationRequest()
  43. {
  44. CFXHandle = receiveCFXHandle
  45. });
  46. request.Source = endpointSendRequest.CFXHandle;
  47. request.Target = receiveCFXHandle;
  48. try
  49. {
  50. CFXEnvelope response = endpointSendRequest.ExecuteRequest(receivRequestUri, request);
  51. Console.WriteLine($"response:\n{response.ToJson(true)}");
  52. }
  53. catch (Exception ex)
  54. {
  55. Console.WriteLine(ex.Message);
  56. }
  57. }
  58. #endregion send request
  59. #region receive request
  60. static AmqpCFXEndpoint endpointReceiveRequest;
  61. static void OpenReceive()
  62. {
  63. if (endpointReceiveRequest != null)
  64. {
  65. endpointReceiveRequest.Close();
  66. endpointReceiveRequest = null;
  67. }
  68. endpointReceiveRequest = new AmqpCFXEndpoint();
  69. endpointReceiveRequest.OnRequestReceived -= Endpoint_OnRequestReceived;
  70. endpointReceiveRequest.OnRequestReceived += Endpoint_OnRequestReceived;
  71. if (!endpointReceiveRequest.IsOpen)
  72. {
  73. Console.WriteLine("endpointSendRequest.IsOpen:" + endpointReceiveRequest.IsOpen.ToString());
  74. endpointReceiveRequest.Open(receiveCFXHandle, new Uri(receivRequestUri));
  75. //endpointReceiveRequest.Open(receiveCFXHandle);
  76. Console.WriteLine("endpointSendRequest.IsOpen:" + endpointReceiveRequest.IsOpen.ToString());
  77. }
  78. }
  79. static CFXEnvelope Endpoint_OnRequestReceived(CFXEnvelope request)
  80. {
  81. Console.WriteLine($"Endpoint_OnRequestReceived: { request.ToString()}");
  82. Console.WriteLine($"request:\n{request.ToJson(true)}");
  83. // Process request. Return Result.
  84. if (request.MessageBody is WhoIsThereRequest)
  85. {
  86. CFXEnvelope result = CFXEnvelope.FromCFXMessage(new WhoIsThereResponse()
  87. { CFXHandle = receiveCFXHandle, RequestNetworkUri = receivRequestUri, RequestTargetAddress = "" });
  88. result.Source = receiveCFXHandle;
  89. result.Target = request.Source;
  90. result.TimeStamp = DateTime.Now;
  91. return result;
  92. }
  93. if (request.MessageBody is GetEndpointInformationRequest)
  94. {
  95. CFXEnvelope result = CFXEnvelope.FromCFXMessage(new WhoIsThereResponse()
  96. { CFXHandle = receiveCFXHandle, RequestNetworkUri = receivRequestUri, RequestTargetAddress = "..." });
  97. result.Source = receiveCFXHandle;
  98. result.Target = request.Source;
  99. result.TimeStamp = DateTime.Now;
  100. return result;
  101. }
  102. return null;
  103. }
  104. #endregion receive request
  105. }
  106. }

         运行结果,可以用json格式对response和request的内容进行解析。

 

声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/盐析白兔/article/detail/847780
推荐阅读
相关标签
  

闽ICP备14008679号