赞
踩
通过利用IDL,对每个DDS所使用的数据类型进行定义。OpenDDS使用#pragma指令,识别DDS传输以及处理的数据类型。这些数据由TAO IDL编译程序以及OpenDDS IDL编译程序进行处理,用于生成需要的代码,以便于利用OpenDDS传输这些类型的数据。示例如下:
module Supermarket { #pragma DCPS_DATA_TYPE "Supermarket::UpdateInf" #pragma DCPS_DATA_KEY "Supermarket::UpdateInf UpdateInfID" struct UpdateInf { long UpdateInfID; string UpdateInfTopic; string UpdateInfMessage; string PublisherName; string MTime; long ClockTime; }; #pragma DCPS_DATA_TYPE "Supermarket::DefaultInf" #pragma DCPS_DATA_KEY "Supermarket::DefaultInf DefaultInfID" struct DefaultInf { long DefaultInfID; string DefaultInfTopic; string DefaultInfMessage; string PublisherName; string MTime; long ClockTime; }; };
DCPS_DATA_TYPE
标记一个供OpenDDS使用的数据类型;DCPS_DATA_KEY
标记识别DCPS数据类型的字段,该字段被用作针对于此类型的键。在上述示例中,把Supermarket::UpdateInf 的成员UpdateInfID看作一个键。利用不同的UpdateInfID值所发布的每个样本,将会被定义为在相同主题内不同的实例。
该步骤可省略,之后可以通过MPC(The Makefile, Project, And Workspace
Creator)自动进行编译。
//初始化域参数工厂
DDS::DomainParticipantFactory_var dpf =
TheParticipantFactoryWithArgs(argc, argv);
//创建域参数
DDS::DomainParticipant_var participant =
dpf->create_participant(42, // domain ID
PARTICIPANT_QOS_DEFAULT,
0, // No listener required
OpenDDS::DCPS::DEFAULT_STATUS_MASK);
//域参数创建异常处理
if (!participant) {
std::cerr << "create_participant failed." << std::endl;
return 1;
}
//注册Supermarket::DefaultInf类型的类型支持类 Supermarket::DefaultInfTypeSupport_var ts = new Supermarket::DefaultInfTypeSupportImpl; //注册异常处理 if (DDS::RETCODE_OK != ts->register_type(participant, "")) { std::cerr << "register_type failed." << std::endl; return 1; } //创建主题(DefaultInf Supermarket) CORBA::String_var default_name = ts->get_type_name(); DDS::Topic_var default_topic = participant->create_topic("DefaultInf Supermarket", default_name, TOPIC_QOS_DEFAULT, 0, // No listener required OpenDDS::DCPS::DEFAULT_STATUS_MASK); //主题创建异常处理 if (!default_topic) { std::cerr << "create_topic failed." << std::endl; return 1; }
//创建发布者
DDS::Publisher_var pub =
participant->create_publisher(PUBLISHER_QOS_DEFAULT,
0, // No listener required
OpenDDS::DCPS::DEFAULT_STATUS_MASK);
//发布者创建异常处理
if (!pub) {
std::cerr << "create_publisher failed." << std::endl;
return 1;
}
//创建数据写者
DDS::DataWriter_var DefaultWriter =
pub->create_datawriter(default_topic,
DATAWRITER_QOS_DEFAULT,
0, // No listener required
OpenDDS::DCPS::DEFAULT_STATUS_MASK);
//数据写者创建异常
if (!DefaultWriter) {
std::cerr << "create_dataDefaultWriter failed." << std::endl;
return 1;
}
//写入数据样本 Supermarket::DefaultInf DefaultInf; Supermarket::DefaultInf DefaultInf2; DefaultInf.DefaultInfID= 0; DefaultInf.DefaultInfMessage = "您有新的饿了么外卖订单,请及时处理!"; DefaultInf.DefaultInfTopic= "饿了么"; DefaultInf.PublisherName = "饿了么(我才不饿)的发布者"; DefaultInf2.DefaultInfID = 0; DefaultInf2.DefaultInfMessage = "您有新的美团外卖订单,请及时处理!"; DefaultInf2.DefaultInfTopic = "美团"; DefaultInf2.PublisherName = "美团外卖送啥都快"; //开始发布了,发布default //开始发布了,发布update DDS::Duration_t timeout = { 30, 0 }; for (int i = 0; i < 1000; ++i) { time_t nowTime; time(&nowTime); struct timeb tb; ftime(&tb); double rand_num = rand() / (RAND_MAX + 1.0); if (rand_num>0.5) { DefaultInf.ClockTime = clock(); CORBA::String_var s_Time = CORBA::string_dup(ctime(&nowTime)); DefaultInf.MTime = s_Time; DDS::ReturnCode_t error = DefaultInf_DefaultWriter->write(DefaultInf, DDS::HANDLE_NIL); ++DefaultInf.DefaultInfID; if (error != DDS::RETCODE_OK) { std::cerr << "default write failed." << std::endl; return 1; } if (DefaultWriter->wait_for_acknowledgments(timeout) != DDS::RETCODE_OK) { std::cerr << "wait_for_acknowledgments failed." << std::endl; return 1; } cout << "[S01]发送了一条饿了么主题消息,发送时间是" << ctime(&nowTime) << endl; } else { DefaultInf2.ClockTime = clock(); CORBA::String_var s_Time = CORBA::string_dup(ctime(&nowTime)); DefaultInf2.MTime = s_Time; DDS::ReturnCode_t error = UpdateInf_DefaultWriter->write(DefaultInf2, DDS::HANDLE_NIL); ++DefaultInf2.DefaultInfID; if (error != DDS::RETCODE_OK) { std::cerr << "update write failed." << std::endl; return 1; } if (UpdateWriter->wait_for_acknowledgments(timeout) != DDS::RETCODE_OK) { std::cerr << "wait_for_acknowledgments failed." << std::endl; return 1; } cout << "[S01]发送了一条美团主题消息,发送时间是" << ctime(&nowTime) << endl; } Sleep(2500); }
//初始化域参数工厂
DDS::DomainParticipantFactory_var dpf =
TheParticipantFactoryWithArgs(argc, argv);
//创建域参数
DDS::DomainParticipant_var participant =
dpf->create_participant(42, // Domain ID
PARTICIPANT_QOS_DEFAULT,
0, // No listener required
OpenDDS::DCPS::DEFAULT_STATUS_MASK);
//域参数创建异常处理
if (!participant) {
std::cerr << "create_participant failed." << std::endl;
return 1;
}
//注册类型支持类 Supermarket::DefaultInfTypeSupport_var mts = new Supermarket::DefaultInfTypeSupportImpl; //支持类异常处理 if (DDS::RETCODE_OK != mts->register_type(participant, "")) { std::cerr << "Failed to register the DefaultInfTypeSupport." << std::endl; return 1; } //创建主题2(同样也是DefaultInf Supermarket) CORBA::String_var type_name = mts->get_type_name(); DDS::Topic_var topic2 = participant->create_topic("DefaultInf Supermarket", type_name, TOPIC_QOS_DEFAULT, 0, // No listener required OpenDDS::DCPS::DEFAULT_STATUS_MASK); //主题异常处理 if (!topic2) { std::cerr << "Failed to create_topic." << std::endl; return 1; } //创建主题(同样也是DefaultInf Supermarket) DDS::Topic_var topic = participant->create_topic("UpdateInf Supermarket", type_name, TOPIC_QOS_DEFAULT, 0, // No listener required OpenDDS::DCPS::DEFAULT_STATUS_MASK); //主题异常处理 if (!topic) { std::cerr << "Failed to create_topic." << std::endl; return 1; }
//创建订阅者
DDS::Subscriber_var sub =
participant->create_subscriber(SUBSCRIBER_QOS_DEFAULT,
0, // No listener required
OpenDDS::DCPS::DEFAULT_STATUS_MASK);
//异常处理
if (!sub) {
std::cerr << "Failed to create_subscriber." << std::endl;
return 1;
}
利用所创建的Listener对象,就可以检测数据什么时候可用。
// 创建监听 DDS::DataReaderListener_var listener(new DataReaderListenerImpl); // 创建数据读取者 DDS::DataReader_var dr = sub->create_datareader(topic, DATAREADER_QOS_DEFAULT, listener, OpenDDS::DCPS::DEFAULT_STATUS_MASK); //读者创建异常处理 if (!dr) { std::cerr << "create_datareader failed." << std::endl; return 1; } Supermarket::DefaultInfDataReader_var reader_i = Supermarket::DefaultInfDataReader::_narrow(dr); if (!reader_i) { std::cerr << "_narrow failed." << std::endl; }
void DataReaderListenerImpl::on_data_available(DDS::DataReader_ptr reader) { try { Supermarket::DefaultInfDataReader_var reader_i = Supermarket::DefaultInfDataReader::_narrow(reader); if (!reader_i) { std::cerr << "read: _narrow failed." << std::endl; return; } Supermarket::DefaultInf Supermarket_DefaultInf; DDS::SampleInfo si; DDS::ReturnCode_t status = reader_i->take_next_sample(Supermarket_DefaultInf, si); if (status == DDS::RETCODE_OK) { printf("\n"); //std::cout << "【冷漠到不想吃饭小组开心提示您】请查看以下信息:" << std::endl; //std::cout << "SampleInfo.sample_rank = " << si.sample_rank << std::endl; //std::cout << "SampleInfo.instance_state = " << si.instance_state << std::endl; if (si.valid_data) { time_t nowTime; time(&nowTime); struct timeb tb; ftime(&tb); std::cout << "[S01]发送者消息编号=" << Supermarket_DefaultInf.DefaultInfID << std::endl << "[S01]发送者毫秒级计时:" << Supermarket_DefaultInf.ClockTime<< std::endl << " 消息标题: " << Supermarket_DefaultInf.DefaultInfTopic << std::endl << "====================================================================="<<std::endl << Supermarket_DefaultInf.DefaultInfMessage << std::endl << "=====================================================================" << std::endl << "消息来源:"<< Supermarket_DefaultInf.PublisherName<< std::endl << "[S01]发送者时间:" << Supermarket_DefaultInf.MTime << "[S01]本地接收时间:" << ctime(&nowTime) << "[S01]本地接收毫秒级计时:" << clock() << std::endl; } else if (si.instance_state == DDS::NOT_ALIVE_DISPOSED_INSTANCE_STATE) { std::cout << "当前实例句柄" << std::endl; } else if (si.instance_state == DDS::NOT_ALIVE_NO_WRITERS_INSTANCE_STATE) { std::cout << "instance is unregistered" << std::endl; } else { std::cerr << "ERROR: received unknown instance state " << si.instance_state << std::endl; } } else if (status == DDS::RETCODE_NO_DATA) { std::cerr << "ERROR: reader received DDS::RETCODE_NO_DATA!" << std::endl; } else { std::cerr << "ERROR: read DefaultInf: Error: " << status << std::endl; } } catch (CORBA::Exception& e) { std::cerr << "Exception caught in main.cpp:" << std::endl << e << std::endl; ACE_OS::exit(1); } } void DataReaderListenerImpl::on_requested_deadline_missed( DDS::DataReader_ptr, const DDS::RequestedDeadlineMissedStatus &) { std::cerr << "DataReaderListenerImpl::on_requested_deadline_missed" << std::endl; } void DataReaderListenerImpl::on_requested_incompatible_qos( DDS::DataReader_ptr, const DDS::RequestedIncompatibleQosStatus &) { std::cerr << "DataReaderListenerImpl::on_requested_incompatible_qos" << std::endl; } void DataReaderListenerImpl::on_liveliness_changed( DDS::DataReader_ptr, const DDS::LivelinessChangedStatus &) { std::cout << "接收者生成或者死亡" << std::endl; } void DataReaderListenerImpl::on_subscription_matched( DDS::DataReader_ptr, const DDS::SubscriptionMatchedStatus &) { std::cout << "匹配成功或匹配结束" << std::endl; } void DataReaderListenerImpl::on_sample_rejected( DDS::DataReader_ptr, const DDS::SampleRejectedStatus&) { std::cerr << "DataReaderListenerImpl::on_sample_rejected" << std::endl; } void DataReaderListenerImpl::on_sample_lost( DDS::DataReader_ptr, const DDS::SampleLostStatus&) { std::cerr << "DataReaderListenerImpl::on_sample_lost" << std::endl; }
on_data_available
为监听的事件。
客户端指的就是发布者和订阅者,在publisher.cpp
和subscriber.cpp
文件内添加代码,清理OpenDDS相关的对象。
//实例清理
participant->delete_contained_entities();
dpf->delete_participant(participant);
TheServiceParticipant->shutdown();
1. 启动DCPSInfoRepo服务,用于集中式发现
%DDS_ROOT%/bin/DCPSInfoRepo -ORBEndpoint iiop://localhost:12345
2. 双击启动subscriber.exe和publisher.exe
利用DCPSInfoRepo服务可以实现集中式发现,另外RTPS服务可以实现对等发现。当OpenDDS应用程序需要与DDS规范的非OpenDDS实现进行交互操作,后者如果不希望再OpenDDS的部署中使用集中式发现时,RTPS对等发现的方式就比较重要了。
[1]. 《分布式系统实时发布订阅数据分发技术》
[2]. https://opendds.org/
[3]. https://download.objectcomputing.com/OpenDDS/OpenDDS-latest.pdf
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。