当前位置:   article > 正文

RabbitMQ的使用_rabbitmq后台端口是多少

rabbitmq后台端口是多少

RabbitMQ的使用流程

1、新建一个连接 amqp_new_connection();
2、建立一个socket通道 amqp_tcp_socket_new(m_clsConnect);
3、打开socket连接 amqp_socket_open(m_mqSocket, strHostIP.c_str(), iHostPort);
4、登录到代理 amqp_rpc_reply_t_ res = amqp_login(m_clsConnect, “/”, 0, 131072, 0, AMQP_SASL_METHOD_PLAIN, strUser.c_str(), strPass.c_str());
5、完成前面四个步骤后就可以进行消息的发送和接收了,详见下面的RecvData和SendData函数。在接受消息之前需要先声明一个队列,然后将列队绑定到交换机与路由键,循环调用amqp_consume_message函数就可以不断的接收到消息了。
特别提醒:
RabbitMQ后台默认使用的端口号是5672,即amqp_socket_open函数中传的端口号为5672;而网页管理后台的端口号是15672,登录https://localhost:15672访问管理页面。如果amqp_socket_open函数的端口号传错了会返回库异常(AMQP_RESPONSE_LIBRARY_EXCEPTION)这个值给你(ps:之前调试时看到这个错误以为是库编译的版本有问题,后来换了几个版本的库还是报这个错,直到后面看到一篇文章才知道端口号错了…端口号错了为啥是这个错误提示?坑!!!)

#include "stdafx.h"
#include "RabbitMQ.h"

CRabbitMQ::CRabbitMQ()
{	
	m_clsConnect = amqp_new_connection();
}

CRabbitMQ::~CRabbitMQ()
{
	Disconnect();
	Sleep(100);
	//QueueDelete(m_strQueuename, 0);
	if (m_clsConnect != NULL) {
		if (m_bIsConnect) {
			amqp_channel_close(m_clsConnect, 1, AMQP_REPLY_SUCCESS);
			amqp_connection_close(m_clsConnect, AMQP_REPLY_SUCCESS);
		}		
		amqp_destroy_connection(m_clsConnect);
	}	
}


bool CRabbitMQ::ConnentMQ(string strHostIP, int iHostPort, string strUser, string strPass, int iChannel)
{
	if (m_bIsConnect) {
		return true;
	}
	m_mqSocket = amqp_tcp_socket_new(m_clsConnect);
	if (!m_mqSocket) {
		amqp_connection_close(m_clsConnect, AMQP_REPLY_SUCCESS);
		return false;
	}

	int iRes = amqp_socket_open(m_mqSocket, strHostIP.c_str(), iHostPort);
	if (iRes) {
		//LOG(INFO) << "打开socket连接失败!" << endl;
		return false;
	}

	//登录到代理
	amqp_rpc_reply_t_ res = amqp_login(m_clsConnect, "/", 0, 131072, 0, AMQP_SASL_METHOD_PLAIN, strUser.c_str(), strPass.c_str());
	ErrorMsg(res, "login ");
	amqp_channel_open(m_clsConnect, iChannel);

	m_iChannel = iChannel;
	m_bIsConnect = true;
	return true;
}

void CRabbitMQ::RecvData(int iChannel, string strExchange, string strBindKey, string strQueuname)
{
	amqp_bytes_t queuename;
	amqp_get_rpc_reply(m_clsConnect);
	
	queuename.bytes = (char *)strQueuname.c_str();
	queuename.len = strQueuname.size();
	m_strQueuename = strQueuname;
	
	//声明一个队列  倒数第二个参数,自动删除(amqp_boolean_t auto_delete)
	amqp_queue_declare_ok_t *r = amqp_queue_declare(
		m_clsConnect, iChannel, queuename, 0, 0, 0, 1, amqp_empty_table);
	amqp_get_rpc_reply(m_clsConnect);

	if (r == NULL) {
		return;
	}

	amqp_queue_bind(m_clsConnect, iChannel, queuename, amqp_cstring_bytes(strExchange.c_str()),
		amqp_cstring_bytes(strBindKey.c_str()), amqp_empty_table);
	amqp_get_rpc_reply(m_clsConnect);

	amqp_basic_consume(m_clsConnect, iChannel, queuename, amqp_empty_bytes, 0, 1, 0,
		amqp_empty_table);
	amqp_get_rpc_reply(m_clsConnect);

	HANDLE m_hThread = (HANDLE)_beginthreadex(NULL, 0, ThreadFun, this, 0, NULL);
}

void CRabbitMQ::SendData(string strExchange, string strBindKey, string strMessageBody)
{
	amqp_basic_properties_t props;
	props._flags = AMQP_BASIC_CONTENT_TYPE_FLAG | AMQP_BASIC_DELIVERY_MODE_FLAG;
	props.content_type = amqp_cstring_bytes("text/plain");
	props.delivery_mode = 2;
	amqp_basic_publish(m_clsConnect, 1, amqp_cstring_bytes(strExchange.c_str()),
		amqp_cstring_bytes(strBindKey.c_str()), 0, 0,
		&props, amqp_cstring_bytes(strMessageBody.c_str()));
}

unsigned int __stdcall CRabbitMQ::ThreadFun(PVOID pParam)
{
	CRabbitMQ *pThis = (CRabbitMQ*)pParam;
	amqp_rpc_reply_t res;
	amqp_envelope_t envelope;
	char *pszMsg;

	while (pThis->m_bIsConnect && pThis->m_clsConnect != NULL)
	{
		/* 释放amqp_connection_state_t占用的内存*/
		amqp_maybe_release_buffers(pThis->m_clsConnect);

		/*等待并消费一条消息*/
		res = amqp_consume_message(pThis->m_clsConnect, &envelope, NULL, 0);

		if (AMQP_RESPONSE_NORMAL != res.reply_type) {
			continue;
		}

		if (NULL != pThis->m_hWnd) {
			string strMsg((char *)envelope.message.body.bytes, envelope.message.body.len);
			pszMsg = new char[envelope.message.body.len + 1];
			strcpy(pszMsg, strMsg.c_str());
			if (0 != envelope.message.body.len) {
				PostMessage(pThis->m_hWnd, WM_TRANSFER, (WPARAM)pszMsg, NULL);
			}
		}

		/* 释放在amqp_consume_message()中分配的与amqp_envelope_t相关联的内存*/
		amqp_destroy_envelope(&envelope);
	}
	return 0;
}

int CRabbitMQ::ErrorMsg(amqp_rpc_reply_t x, char const *context) {
	switch (x.reply_type) {
	case AMQP_RESPONSE_NORMAL:
		return 0;

	case AMQP_RESPONSE_NONE:
		fprintf(stderr, "%s: missing RPC reply type!\n", context);
		break;

	case AMQP_RESPONSE_LIBRARY_EXCEPTION:
		fprintf(stderr, "%s: %s\n", context, amqp_error_string2(x.library_error));
		break;

	case AMQP_RESPONSE_SERVER_EXCEPTION:
		switch (x.reply.id) {
		case AMQP_CONNECTION_CLOSE_METHOD: {
			amqp_connection_close_t *m = (amqp_connection_close_t *)x.reply.decoded;
			fprintf(stderr, "%s: server connection error %uh, message: %.*s\n",
				context, m->reply_code, (int)m->reply_text.len,
				(char *)m->reply_text.bytes);
			break;
		}
		case AMQP_CHANNEL_CLOSE_METHOD: {
			amqp_channel_close_t *m = (amqp_channel_close_t *)x.reply.decoded;
			fprintf(stderr, "%s: server channel error %uh, message: %.*s\n",
				context, m->reply_code, (int)m->reply_text.len,
				(char *)m->reply_text.bytes);
			break;
		}
		default:
			fprintf(stderr, "%s: unknown server error, method id 0x%08X\n",
				context, x.reply.id);
			break;
		}
		break;
	}

	return -1;
}

void CRabbitMQ::SetHwnd(HWND hWnd)
{
	m_hWnd = hWnd;
}


int CRabbitMQ::QueueDelete(const string &strQueueName, int iIfUnused) {
	if (NULL == m_clsConnect) {
		fprintf(stderr, "QueueDelete m_pConn is null\n");
		return -1;
	}

	amqp_channel_open(m_clsConnect, m_iChannel);
	if (0 != ErrorMsg(amqp_get_rpc_reply(m_clsConnect), "open channel")) {
		amqp_channel_close(m_clsConnect, m_iChannel, AMQP_REPLY_SUCCESS);
		return -2;
	}

	amqp_queue_delete(m_clsConnect, m_iChannel, amqp_cstring_bytes(strQueueName.c_str()), iIfUnused, 0);
	if (0 != ErrorMsg(amqp_get_rpc_reply(m_clsConnect), "delete queue")) {
		amqp_channel_close(m_clsConnect, m_iChannel, AMQP_REPLY_SUCCESS);
		return -3;
	}

	amqp_channel_close(m_clsConnect, m_iChannel, AMQP_REPLY_SUCCESS);
	return 0;
}

int CRabbitMQ::Disconnect() {
	m_bIsConnect = false;
	return 0;
}


int CRabbitMQ::QueueDeclare(const string &strQueueName) {
	if (NULL == m_clsConnect) {
		fprintf(stderr, "QueueDeclare m_pConn is null\n");
		return -1;
	}

	amqp_channel_open(m_clsConnect, m_iChannel);
	amqp_bytes_t _queue = amqp_cstring_bytes(strQueueName.c_str());
	int32_t _passive = 0;
	int32_t _durable = 1;   //1-消息持久化    0-非持久化
	int32_t _exclusive = 0;
	int32_t _auto_delete = 0;  //1-没有消息后删除队列    0-不删除
	amqp_queue_declare(m_clsConnect, m_iChannel, _queue, _passive, _durable, _exclusive, _auto_delete, amqp_empty_table);
	if (0 != ErrorMsg(amqp_get_rpc_reply(m_clsConnect), "queue_declare")) {
		amqp_channel_close(m_clsConnect, m_iChannel, AMQP_REPLY_SUCCESS);
		return -1;
	}

	amqp_channel_close(m_clsConnect, m_iChannel, AMQP_REPLY_SUCCESS);
	return 0;
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40
  • 41
  • 42
  • 43
  • 44
  • 45
  • 46
  • 47
  • 48
  • 49
  • 50
  • 51
  • 52
  • 53
  • 54
  • 55
  • 56
  • 57
  • 58
  • 59
  • 60
  • 61
  • 62
  • 63
  • 64
  • 65
  • 66
  • 67
  • 68
  • 69
  • 70
  • 71
  • 72
  • 73
  • 74
  • 75
  • 76
  • 77
  • 78
  • 79
  • 80
  • 81
  • 82
  • 83
  • 84
  • 85
  • 86
  • 87
  • 88
  • 89
  • 90
  • 91
  • 92
  • 93
  • 94
  • 95
  • 96
  • 97
  • 98
  • 99
  • 100
  • 101
  • 102
  • 103
  • 104
  • 105
  • 106
  • 107
  • 108
  • 109
  • 110
  • 111
  • 112
  • 113
  • 114
  • 115
  • 116
  • 117
  • 118
  • 119
  • 120
  • 121
  • 122
  • 123
  • 124
  • 125
  • 126
  • 127
  • 128
  • 129
  • 130
  • 131
  • 132
  • 133
  • 134
  • 135
  • 136
  • 137
  • 138
  • 139
  • 140
  • 141
  • 142
  • 143
  • 144
  • 145
  • 146
  • 147
  • 148
  • 149
  • 150
  • 151
  • 152
  • 153
  • 154
  • 155
  • 156
  • 157
  • 158
  • 159
  • 160
  • 161
  • 162
  • 163
  • 164
  • 165
  • 166
  • 167
  • 168
  • 169
  • 170
  • 171
  • 172
  • 173
  • 174
  • 175
  • 176
  • 177
  • 178
  • 179
  • 180
  • 181
  • 182
  • 183
  • 184
  • 185
  • 186
  • 187
  • 188
  • 189
  • 190
  • 191
  • 192
  • 193
  • 194
  • 195
  • 196
  • 197
  • 198
  • 199
  • 200
  • 201
  • 202
  • 203
  • 204
  • 205
  • 206
  • 207
  • 208
  • 209
  • 210
  • 211
  • 212
  • 213
  • 214
  • 215
  • 216
  • 217
  • 218
  • 219
#pragma once
#include "amqp.h"
#include "amqp_tcp_socket.h"
#include <process.h>

class CRabbitMQ
{
public:
	CRabbitMQ();
	~CRabbitMQ();

	void SetHwnd(HWND hWnd);

	bool ConnentMQ(string strHostIP, int iHostPort, string strUser, string strPass, int iChannel);

	void RecvData(int iChannel, string strExchange, string strBindKey, string strQueuname);

	void SendData(string strExchange, string strBindKey, string strMessageBody);

	int Disconnect();

	int QueueDeclare(const string &strQueueName);
private:
	static unsigned int __stdcall ThreadFun(PVOID pParam);

	int ErrorMsg(amqp_rpc_reply_t x, char const *context);

	int QueueDelete(const string &strQueueName, int iIfUnused);
private:
	amqp_connection_state_t m_clsConnect;
	amqp_socket_t *m_mqSocket;
	HWND m_hWnd = NULL;
	bool m_bIsConnect = false;
	int m_iChannel;
	string m_strQueuename;
};


  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/人工智能uu/article/detail/994068
推荐阅读
相关标签
  

闽ICP备14008679号