在使用接口Channel::Create()连接到rabbitmq时,如果网络中断或者ip端口地址不对的时候,程序就会一直阻塞在这个调用上,没有 返回值没有异常提示,这种情况如果你想提示个错误什么的就无能为力了,Panda工作中也遇到这个问题,我想:如果他能提供一个连接超时异常就好了,毕竟 SimpleAmqpClient只是对另外一个c语言开源项目rabbitmq-c的封装,而且我记得rabbitmq-c是支持我所说的功能的。下面 请跟随我一起一步一步完成这个事情吧。
1
1 int m_nSockfd; 2 int m_nChannelIdSend; 3 int m_nChannelIdReve; 4 int m_nChannelIdResult; 5 amqp_connection_state_t m_Connection; 6 amqp_bytes_t m_stReply_to_queue;
m_Connection = amqp_new_connection(); 2 m_nSockfd = amqp_open_socket(m_strIp.toLocal8Bit().data(), m_nPort); 3 amqp_set_sockfd(m_Connection, m_nSockfd); 4 amqp_login(m_Connection, "/", 0, 131072, 0, AMQP_SASL_METHOD_PLAIN,m_strRabbitUser.toLocal8Bit().data(), m_strRabbitPwd.toLocal8Bit().data()); 5 6 //生产者 7 amqp_channel_open(m_Connection, m_nChannelIdSend); 8 amqp_get_rpc_reply(m_Connection); 9 amqp_exchange_declare(m_Connection, m_nChannelIdSend, amqp_cstring_bytes("ping") , Type, 10 0,1,0,0, amqp_empty_table);//绑定交换器 amqp_cstring_bytes("ping") 11 12 m_strExchange = "ping"; 13 m_strRoutingkey = "rpc"; 14 m_pProducer1 = new CMqProducerThread(m_Connection, m_nChannelIdSend, m_strExchange, m_strRoutingkey, this); 15 connect(m_pProducer1, SIGNAL(SendProcess(int, QString)), this, SLOT(SetProcess(int, QString))); 16 m_pProducer1->start(); 17 18 //测试结果上传 19 amqp_channel_open(m_Connection, m_nChannelIdResult); 20 amqp_get_rpc_reply(m_Connection); 21 amqp_exchange_declare(m_Connection, m_nChannelIdResult, amqp_cstring_bytes("testResult") , Type, 22 0,1,0,0, amqp_empty_table); 23 m_strExchange = "testResult"; 24 m_strRoutingkey = "result"; 25 m_pResoultThread = new MQResultThread(m_Connection, m_nChannelIdResult, m_strExchange, m_strRoutingkey, this);
先来看一下Channel::Channel(…)
然后在rabbitmq-c项目头文件amqp.h中找到创建非阻塞socket的函数
代码实现
有方向了,终于可以快乐的写代码o(∩_∩)o 。根据设计模式的开闭原则:我们做的事情更好的是扩展而不是修改现有的功能,所以比较优雅的方案应该是增加一个工厂函数生成创建一个channel,做法如下:
在Channel.h增加两个函数
/** * 以非阻塞的方法创建Channel * author: panxianzhan * @param timeout 最大等待事件,为NULL时采用阻塞方式打开 */ explicit Channel(const std::string &host, int port, const std::string &username, const std::string &password, const std::string &vhost, int frame_max, timeval* ); /** * 工厂方法 * 以非阻塞的方法创建Channel * author: panxianzhan * @param timeout 最大等待事件,为NULL时采用阻塞方式打开 */ static ptr_t CreateNoBlock(const std::string &host = "127.0.0.1", int port = 5672, const std::string &username = "guest", const std::string &password = "guest", const std::string &vhost = "/", int frame_max = 131072, timeval* timeout = NULL) { return boost::make_shared<Channel>(host, port, username, password, vhost, frame_max, timeout); }
然后在Channel.cpp实现
Channel::Channel(const std::string &host, int port, const std::string &username, const std::string &password, const std::string &vhost, int frame_max, timeval* timeout) : m_impl(new Detail::ChannelImpl) { m_impl->m_connection = amqp_new_connection(); if (NULL == m_impl->m_connection) { throw std::bad_alloc(); } try { amqp_socket_t *socket = amqp_tcp_socket_new(m_impl->m_connection); int sock = amqp_socket_open_noblock(socket, host.c_str(), port, timeout); } //如果连接超时,下面这一行就会抛出异常 m_impl->CheckForError(sock); m_impl->DoLogin(username, password, vhost, frame_max); } catch (...) { amqp_destroy_connection(m_impl->m_connection); throw; } m_impl->SetIsConnected(true); }
使用例子如下:
int main() { timeval tv = {0}; tv.tv_usec = 200 * 1000; //等待200毫秒 try { Channel::ptr_t channel = Channel::CreateNoBlock( "127.0.0.1", 5567,"guest", "guest", "/", 131072, &tv); ... ... } catch (AmqpLibraryException& ex) { //提示连接失败; } return 0; }