赞
踩
1、新建一个连接 amqp_new_connection();
2、建立一个socket通道 amqp_tcp_socket_new(m_clsConnect);
3、打开socket连接 amqp_socket_open(m_mqSocket, strHostIP.c_str(), iHostPort);
4、登录到代理 amqp_rpc_reply_t_ res = amqp_login(m_clsConnect, “/”, 0, 131072, 0, AMQP_SASL_METHOD_PLAIN, strUser.c_str(), strPass.c_str());
5、完成前面四个步骤后就可以进行消息的发送和接收了,详见下面的RecvData和SendData函数。在接受消息之前需要先声明一个队列,然后将列队绑定到交换机与路由键,循环调用amqp_consume_message函数就可以不断的接收到消息了。
特别提醒:
RabbitMQ后台默认使用的端口号是5672,即amqp_socket_open函数中传的端口号为5672;而网页管理后台的端口号是15672,登录https://localhost:15672访问管理页面。如果amqp_socket_open函数的端口号传错了会返回库异常(AMQP_RESPONSE_LIBRARY_EXCEPTION)这个值给你(ps:之前调试时看到这个错误以为是库编译的版本有问题,后来换了几个版本的库还是报这个错,直到后面看到一篇文章才知道端口号错了…端口号错了为啥是这个错误提示?坑!!!)
#include "stdafx.h" #include "RabbitMQ.h" CRabbitMQ::CRabbitMQ() { m_clsConnect = amqp_new_connection(); } CRabbitMQ::~CRabbitMQ() { Disconnect(); Sleep(100); //QueueDelete(m_strQueuename, 0); if (m_clsConnect != NULL) { if (m_bIsConnect) { amqp_channel_close(m_clsConnect, 1, AMQP_REPLY_SUCCESS); amqp_connection_close(m_clsConnect, AMQP_REPLY_SUCCESS); } amqp_destroy_connection(m_clsConnect); } } bool CRabbitMQ::ConnentMQ(string strHostIP, int iHostPort, string strUser, string strPass, int iChannel) { if (m_bIsConnect) { return true; } m_mqSocket = amqp_tcp_socket_new(m_clsConnect); if (!m_mqSocket) { amqp_connection_close(m_clsConnect, AMQP_REPLY_SUCCESS); return false; } int iRes = amqp_socket_open(m_mqSocket, strHostIP.c_str(), iHostPort); if (iRes) { //LOG(INFO) << "打开socket连接失败!" << endl; return false; } //登录到代理 amqp_rpc_reply_t_ res = amqp_login(m_clsConnect, "/", 0, 131072, 0, AMQP_SASL_METHOD_PLAIN, strUser.c_str(), strPass.c_str()); ErrorMsg(res, "login "); amqp_channel_open(m_clsConnect, iChannel); m_iChannel = iChannel; m_bIsConnect = true; return true; } void CRabbitMQ::RecvData(int iChannel, string strExchange, string strBindKey, string strQueuname) { amqp_bytes_t queuename; amqp_get_rpc_reply(m_clsConnect); queuename.bytes = (char *)strQueuname.c_str(); queuename.len = strQueuname.size(); m_strQueuename = strQueuname; //声明一个队列 倒数第二个参数,自动删除(amqp_boolean_t auto_delete) amqp_queue_declare_ok_t *r = amqp_queue_declare( m_clsConnect, iChannel, queuename, 0, 0, 0, 1, amqp_empty_table); amqp_get_rpc_reply(m_clsConnect); if (r == NULL) { return; } amqp_queue_bind(m_clsConnect, iChannel, queuename, amqp_cstring_bytes(strExchange.c_str()), amqp_cstring_bytes(strBindKey.c_str()), amqp_empty_table); amqp_get_rpc_reply(m_clsConnect); amqp_basic_consume(m_clsConnect, iChannel, queuename, amqp_empty_bytes, 0, 1, 0, amqp_empty_table); amqp_get_rpc_reply(m_clsConnect); HANDLE m_hThread = (HANDLE)_beginthreadex(NULL, 0, ThreadFun, this, 0, NULL); } void CRabbitMQ::SendData(string strExchange, string strBindKey, string strMessageBody) { amqp_basic_properties_t props; props._flags = AMQP_BASIC_CONTENT_TYPE_FLAG | AMQP_BASIC_DELIVERY_MODE_FLAG; props.content_type = amqp_cstring_bytes("text/plain"); props.delivery_mode = 2; amqp_basic_publish(m_clsConnect, 1, amqp_cstring_bytes(strExchange.c_str()), amqp_cstring_bytes(strBindKey.c_str()), 0, 0, &props, amqp_cstring_bytes(strMessageBody.c_str())); } unsigned int __stdcall CRabbitMQ::ThreadFun(PVOID pParam) { CRabbitMQ *pThis = (CRabbitMQ*)pParam; amqp_rpc_reply_t res; amqp_envelope_t envelope; char *pszMsg; while (pThis->m_bIsConnect && pThis->m_clsConnect != NULL) { /* 释放amqp_connection_state_t占用的内存*/ amqp_maybe_release_buffers(pThis->m_clsConnect); /*等待并消费一条消息*/ res = amqp_consume_message(pThis->m_clsConnect, &envelope, NULL, 0); if (AMQP_RESPONSE_NORMAL != res.reply_type) { continue; } if (NULL != pThis->m_hWnd) { string strMsg((char *)envelope.message.body.bytes, envelope.message.body.len); pszMsg = new char[envelope.message.body.len + 1]; strcpy(pszMsg, strMsg.c_str()); if (0 != envelope.message.body.len) { PostMessage(pThis->m_hWnd, WM_TRANSFER, (WPARAM)pszMsg, NULL); } } /* 释放在amqp_consume_message()中分配的与amqp_envelope_t相关联的内存*/ amqp_destroy_envelope(&envelope); } return 0; } int CRabbitMQ::ErrorMsg(amqp_rpc_reply_t x, char const *context) { switch (x.reply_type) { case AMQP_RESPONSE_NORMAL: return 0; case AMQP_RESPONSE_NONE: fprintf(stderr, "%s: missing RPC reply type!\n", context); break; case AMQP_RESPONSE_LIBRARY_EXCEPTION: fprintf(stderr, "%s: %s\n", context, amqp_error_string2(x.library_error)); break; case AMQP_RESPONSE_SERVER_EXCEPTION: switch (x.reply.id) { case AMQP_CONNECTION_CLOSE_METHOD: { amqp_connection_close_t *m = (amqp_connection_close_t *)x.reply.decoded; fprintf(stderr, "%s: server connection error %uh, message: %.*s\n", context, m->reply_code, (int)m->reply_text.len, (char *)m->reply_text.bytes); break; } case AMQP_CHANNEL_CLOSE_METHOD: { amqp_channel_close_t *m = (amqp_channel_close_t *)x.reply.decoded; fprintf(stderr, "%s: server channel error %uh, message: %.*s\n", context, m->reply_code, (int)m->reply_text.len, (char *)m->reply_text.bytes); break; } default: fprintf(stderr, "%s: unknown server error, method id 0x%08X\n", context, x.reply.id); break; } break; } return -1; } void CRabbitMQ::SetHwnd(HWND hWnd) { m_hWnd = hWnd; } int CRabbitMQ::QueueDelete(const string &strQueueName, int iIfUnused) { if (NULL == m_clsConnect) { fprintf(stderr, "QueueDelete m_pConn is null\n"); return -1; } amqp_channel_open(m_clsConnect, m_iChannel); if (0 != ErrorMsg(amqp_get_rpc_reply(m_clsConnect), "open channel")) { amqp_channel_close(m_clsConnect, m_iChannel, AMQP_REPLY_SUCCESS); return -2; } amqp_queue_delete(m_clsConnect, m_iChannel, amqp_cstring_bytes(strQueueName.c_str()), iIfUnused, 0); if (0 != ErrorMsg(amqp_get_rpc_reply(m_clsConnect), "delete queue")) { amqp_channel_close(m_clsConnect, m_iChannel, AMQP_REPLY_SUCCESS); return -3; } amqp_channel_close(m_clsConnect, m_iChannel, AMQP_REPLY_SUCCESS); return 0; } int CRabbitMQ::Disconnect() { m_bIsConnect = false; return 0; } int CRabbitMQ::QueueDeclare(const string &strQueueName) { if (NULL == m_clsConnect) { fprintf(stderr, "QueueDeclare m_pConn is null\n"); return -1; } amqp_channel_open(m_clsConnect, m_iChannel); amqp_bytes_t _queue = amqp_cstring_bytes(strQueueName.c_str()); int32_t _passive = 0; int32_t _durable = 1; //1-消息持久化 0-非持久化 int32_t _exclusive = 0; int32_t _auto_delete = 0; //1-没有消息后删除队列 0-不删除 amqp_queue_declare(m_clsConnect, m_iChannel, _queue, _passive, _durable, _exclusive, _auto_delete, amqp_empty_table); if (0 != ErrorMsg(amqp_get_rpc_reply(m_clsConnect), "queue_declare")) { amqp_channel_close(m_clsConnect, m_iChannel, AMQP_REPLY_SUCCESS); return -1; } amqp_channel_close(m_clsConnect, m_iChannel, AMQP_REPLY_SUCCESS); return 0; }
#pragma once #include "amqp.h" #include "amqp_tcp_socket.h" #include <process.h> class CRabbitMQ { public: CRabbitMQ(); ~CRabbitMQ(); void SetHwnd(HWND hWnd); bool ConnentMQ(string strHostIP, int iHostPort, string strUser, string strPass, int iChannel); void RecvData(int iChannel, string strExchange, string strBindKey, string strQueuname); void SendData(string strExchange, string strBindKey, string strMessageBody); int Disconnect(); int QueueDeclare(const string &strQueueName); private: static unsigned int __stdcall ThreadFun(PVOID pParam); int ErrorMsg(amqp_rpc_reply_t x, char const *context); int QueueDelete(const string &strQueueName, int iIfUnused); private: amqp_connection_state_t m_clsConnect; amqp_socket_t *m_mqSocket; HWND m_hWnd = NULL; bool m_bIsConnect = false; int m_iChannel; string m_strQueuename; };
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。