赞
踩
IPC-CFX有两种主要的通信方式,可以通过RabbitMQ发布和订阅,也可以通过request和response进行点对点的通信,本文主要讲的是点对点的通信方式。
在vscode里建立新的dotnet项目,可以通过终端输入dotnet new console来建立,文件目录为CFXDemo->machine1和CFXDemo->machine2。
通过nuget插件分别为两个项目都安装CFX.CFXSDK、AMQPNetLite.Core和Newtonsoft.Json这几个metapackage。
我们将machine1作为发送端(sendRequest),machine2作为接收端(Receive),则Machine1的代码如下所示:
- using System.Threading;
- using CFX;
- using CFX.Transport;
- using System;
- using System.Security.Principal;
-
- namespace machine1
- {
- class Program
- {
-
- static void Main(string[] args)
- {
-
- OpenRequest();
- Console.ReadLine();
- for(int i = 0;i<5;i++){
- SendRequest();
- Thread.Sleep(2000);
- }
-
- }
-
- static string sendCFXHandle = "a.b.001";
- static string receiveCFXHandle = "a.b.002";
- static string sendRequestUri = string.Format("amqp://127.0.0.1:1235");
- static string receivRequestUri = string.Format("amqp://127.0.0.1:1234");
-
-
- #region send request
-
- static AmqpCFXEndpoint endpointSendRequest;
-
- static void OpenRequest()
- {
-
-
- if (endpointSendRequest != null)
- {
- endpointSendRequest.Close();
- endpointSendRequest = null;
-
- }
-
- endpointSendRequest = new AmqpCFXEndpoint();
-
- if (!endpointSendRequest.IsOpen)
- {
- Console.WriteLine("endpointSendRequest.IsOpen:" + endpointSendRequest.IsOpen.ToString());
- endpointSendRequest.Open(sendCFXHandle); //这一步会绑定endpointSendRequest里的CFXHandle,即sendCFXHandle的值
- Console.WriteLine("Request.Source is : {0}",endpointSendRequest.CFXHandle);
- Console.WriteLine("endpointSendRequest.IsOpen:" + endpointSendRequest.IsOpen.ToString());
-
- }
-
- // Set a timeout of 20 seconds. If the target endpoint does not
- // respond in this time, the request will time out.
- AmqpCFXEndpoint.RequestTimeout = TimeSpan.FromSeconds(20);
-
- }
- static void SendRequest()
- {
-
- // Build a GetEndpointInfomation Request
- CFXEnvelope request = CFXEnvelope.FromCFXMessage(new GetEndpointInformationRequest()
- {
- CFXHandle = receiveCFXHandle
- });
- request.Source = endpointSendRequest.CFXHandle;
- request.Target = receiveCFXHandle;
-
- try
- {
- CFXEnvelope response = endpointSendRequest.ExecuteRequest(receivRequestUri, request);
- Console.WriteLine($"response:\n{response.ToJson(true)}");
- }
- catch (Exception ex)
- {
- Console.WriteLine(ex.Message);
- }
- }
-
- #endregion send request
-
-
- #region receive request
- static AmqpCFXEndpoint endpointReceiveRequest;
-
- static void OpenReceive()
- {
- if (endpointReceiveRequest != null)
- {
- endpointReceiveRequest.Close();
- endpointReceiveRequest = null;
- }
-
- endpointReceiveRequest = new AmqpCFXEndpoint();
-
- endpointReceiveRequest.OnRequestReceived -= Endpoint_OnRequestReceived;
- endpointReceiveRequest.OnRequestReceived += Endpoint_OnRequestReceived;
-
- if (!endpointReceiveRequest.IsOpen)
- {
- Console.WriteLine("endpointSendRequest.IsOpen:" + endpointReceiveRequest.IsOpen.ToString());
- endpointReceiveRequest.Open(receiveCFXHandle, new Uri(receivRequestUri));
- Console.WriteLine("endpointSendRequest.IsOpen:" + endpointReceiveRequest.IsOpen.ToString());
- }
- }
-
-
- static CFXEnvelope Endpoint_OnRequestReceived(CFXEnvelope request)
- {
- Console.WriteLine($"Endpoint_OnRequestReceived: { request.ToString()}");
-
- // Process request. Return Result.
- if (request.MessageBody is WhoIsThereRequest)
- {
- CFXEnvelope result = CFXEnvelope.FromCFXMessage(new WhoIsThereResponse()
- { CFXHandle = receiveCFXHandle, RequestNetworkUri = receivRequestUri, RequestTargetAddress = "" });
- result.Source = receiveCFXHandle;
- result.Target = request.Source;
- result.TimeStamp = DateTime.Now;
- return result;
- }
-
- if (request.MessageBody is GetEndpointInformationRequest)
- {
- CFXEnvelope result = CFXEnvelope.FromCFXMessage(new WhoIsThereResponse()
- { CFXHandle = receiveCFXHandle, RequestNetworkUri = receivRequestUri, RequestTargetAddress = "...." });
- result.Source = receiveCFXHandle;
- result.Target = request.Source;
- result.TimeStamp = DateTime.Now;
- return result;
- }
-
- return null;
- }
-
- #endregion receive request
-
-
-
-
- }
- }
作为接收端,machine2的代码如下所示:
- using CFX;
- using CFX.Transport;
- using System;
-
- namespace machine2
- {
- class Program
- {
-
-
- static void Main(string[] args)
- {
-
- Console.WriteLine("ReceivEndPoint is waiting Request......");
- OpenReceive();
- Console.WriteLine("Press Enter Key to end the App");
- Console.ReadKey();
-
- }
-
- static string sendCFXHandle = "a.b.001";
- static string receiveCFXHandle = "a.b.002";
- static string sendRequestUri = string.Format("amqp://127.0.0.1:1235");
- static string receivRequestUri = string.Format("amqp://127.0.0.1:1234");
-
-
- #region send request
-
- static AmqpCFXEndpoint endpointSendRequest;
-
- static void OpenRequest()
- {
-
-
- if (endpointSendRequest != null)
- {
- endpointSendRequest.Close();
- endpointSendRequest = null;
-
- }
-
- endpointSendRequest = new AmqpCFXEndpoint();
-
- if (!endpointSendRequest.IsOpen)
- {
- Console.WriteLine("endpointSendRequest.IsOpen:" + endpointSendRequest.IsOpen.ToString());
- endpointSendRequest.Open(receiveCFXHandle, new Uri(receivRequestUri));
- Console.WriteLine("endpointSendRequest.IsOpen:" + endpointSendRequest.IsOpen.ToString());
-
- }
-
- // Set a timeout of 20 seconds. If the target endpoint does not
- // respond in this time, the request will time out.
- AmqpCFXEndpoint.RequestTimeout = TimeSpan.FromSeconds(20);
-
- }
- static void SendRequest()
- {
-
- // Build a GetEndpointInfomation Request
- CFXEnvelope request = CFXEnvelope.FromCFXMessage(new GetEndpointInformationRequest()
- {
- CFXHandle = receiveCFXHandle
- });
- request.Source = endpointSendRequest.CFXHandle;
- request.Target = receiveCFXHandle;
-
- try
- {
- CFXEnvelope response = endpointSendRequest.ExecuteRequest(receivRequestUri, request);
- Console.WriteLine($"response:\n{response.ToJson(true)}");
- }
- catch (Exception ex)
- {
- Console.WriteLine(ex.Message);
- }
- }
-
- #endregion send request
-
-
- #region receive request
- static AmqpCFXEndpoint endpointReceiveRequest;
-
- static void OpenReceive()
- {
- if (endpointReceiveRequest != null)
- {
- endpointReceiveRequest.Close();
- endpointReceiveRequest = null;
- }
-
- endpointReceiveRequest = new AmqpCFXEndpoint();
-
- endpointReceiveRequest.OnRequestReceived -= Endpoint_OnRequestReceived;
- endpointReceiveRequest.OnRequestReceived += Endpoint_OnRequestReceived;
-
- if (!endpointReceiveRequest.IsOpen)
- {
- Console.WriteLine("endpointSendRequest.IsOpen:" + endpointReceiveRequest.IsOpen.ToString());
- endpointReceiveRequest.Open(receiveCFXHandle, new Uri(receivRequestUri));
- //endpointReceiveRequest.Open(receiveCFXHandle);
- Console.WriteLine("endpointSendRequest.IsOpen:" + endpointReceiveRequest.IsOpen.ToString());
- }
- }
-
-
- static CFXEnvelope Endpoint_OnRequestReceived(CFXEnvelope request)
- {
- Console.WriteLine($"Endpoint_OnRequestReceived: { request.ToString()}");
- Console.WriteLine($"request:\n{request.ToJson(true)}");
-
- // Process request. Return Result.
- if (request.MessageBody is WhoIsThereRequest)
- {
- CFXEnvelope result = CFXEnvelope.FromCFXMessage(new WhoIsThereResponse()
- { CFXHandle = receiveCFXHandle, RequestNetworkUri = receivRequestUri, RequestTargetAddress = "" });
- result.Source = receiveCFXHandle;
- result.Target = request.Source;
- result.TimeStamp = DateTime.Now;
- return result;
- }
-
- if (request.MessageBody is GetEndpointInformationRequest)
- {
- CFXEnvelope result = CFXEnvelope.FromCFXMessage(new WhoIsThereResponse()
- { CFXHandle = receiveCFXHandle, RequestNetworkUri = receivRequestUri, RequestTargetAddress = "..." });
- result.Source = receiveCFXHandle;
- result.Target = request.Source;
- result.TimeStamp = DateTime.Now;
- return result;
- }
-
- return null;
- }
-
- #endregion receive request
-
-
-
-
- }
- }
运行结果,可以用json格式对response和request的内容进行解析。
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。