赞
踩
本文介绍如何将chatserver设置为分布式服务,并且实现statusserver的负载均衡处理,根据每个chatserver现有的连接数匹配最小的chatserver返回给GateServer并返回给客户端。
为了实现这一系列分布式设计,我们需要先完善chatserver,增加grpc客户端和服务端。这样能实现两个chatserver之间端对端的通信。
visual studio中右键chatserver项目选择添加新文件ChatGrpcClient, 会为我们生成ChatGrpcClient.h和ChatGrpcClient.cpp文件。
先实现ChatConPool连接池
class ChatConPool { public: ChatConPool(size_t poolSize, std::string host, std::string port): poolSize_(poolSize), host_(host),port_(port),b_stop_(false){ for (size_t i = 0; i < poolSize_; ++i) { std::shared_ptr<Channel> channel = grpc::CreateChannel(host + ":" + port, grpc::InsecureChannelCredentials()); connections_.push(ChatService::NewStub(channel)); } } ~ChatConPool() { std::lock_guard<std::mutex> lock(mutex_); Close(); while (!connections_.empty()) { connections_.pop(); } } std::unique_ptr<ChatService::Stub> getConnection() { std::unique_lock<std::mutex> lock(mutex_); cond_.wait(lock, [this] { if (b_stop_) { return true; } return !connections_.empty(); }); //如果停止则直接返回空指针 if (b_stop_) { return nullptr; } auto context = std::move(connections_.front()); connections_.pop(); return context; } void returnConnection(std::unique_ptr<ChatService::Stub> context) { std::lock_guard<std::mutex> lock(mutex_); if (b_stop_) { return; } connections_.push(std::move(context)); cond_.notify_one(); } void Close() { b_stop_ = true; cond_.notify_all(); } private: atomic<bool> b_stop_; size_t poolSize_; std::string host_; std::string port_; std::queue<std::unique_ptr<ChatService::Stub> > connections_; std::mutex mutex_; std::condition_variable cond_; };
然后利用单例模式实现grpc通信的客户端
class ChatGrpcClient: public Singleton<ChatGrpcClient> { friend class Singleton<ChatGrpcClient>; public: ~ChatGrpcClient() { } AddFriendRsp NotifyAddFriend(std::string server_ip, const AddFriendReq& req); AuthFriendRsp NotifyAuthFriend(std::string server_ip, const AuthFriendReq& req); bool GetBaseInfo(std::string base_key, int uid, std::shared_ptr<UserInfo>& userinfo); TextChatMsgRsp NotifyTextChatMsg(std::string server_ip, const TextChatMsgReq& req, const Json::Value& rtvalue); private: ChatGrpcClient(); unordered_map<std::string, std::unique_ptr<ChatConPool>> _pools; };
实现具体的ChatGrpcClient
ChatGrpcClient::ChatGrpcClient() { auto& cfg = ConfigMgr::Inst(); auto server_list = cfg["PeerServer"]["Servers"]; std::vector<std::string> words; std::stringstream ss(server_list); std::string word; while (std::getline(ss, word, ',')) { words.push_back(word); } for (auto& word : words) { if (cfg[word]["Name"].empty()) { continue; } _pools[cfg[word]["Name"]] = std::make_unique<ChatConPool>(5, cfg[word]["Host"], cfg[word]["Port"]); } } AddFriendRsp ChatGrpcClient::NotifyAddFriend(std::string server_ip, const AddFriendReq& req) { AddFriendRsp rsp; return rsp; } AuthFriendRsp ChatGrpcClient::NotifyAuthFriend(std::string server_ip, const AuthFriendReq& req) { AuthFriendRsp rsp; return rsp; } bool ChatGrpcClient::GetBaseInfo(std::string base_key, int uid, std::shared_ptr<UserInfo>& userinfo) { return true; } TextChatMsgRsp ChatGrpcClient::NotifyTextChatMsg(std::string server_ip, const TextChatMsgReq& req, const Json::Value& rtvalue) { TextChatMsgRsp rsp; return rsp; }
向ChatServer中添加ChatServiceImpl类,自动生成头文件和源文件
class ChatServiceImpl final : public ChatService::Service { public: ChatServiceImpl(); Status NotifyAddFriend(ServerContext* context, const AddFriendReq* request, AddFriendRsp* reply) override; Status NotifyAuthFriend(ServerContext* context, const AuthFriendReq* request, AuthFriendRsp* response) override; Status NotifyTextChatMsg(::grpc::ServerContext* context, const TextChatMsgReq* request, TextChatMsgRsp* response) override; bool GetBaseInfo(std::string base_key, int uid, std::shared_ptr<UserInfo>& userinfo); private: };
实现服务逻辑,先简单写成不处理直接返回。
ChatServiceImpl::ChatServiceImpl() { } Status ChatServiceImpl::NotifyAddFriend(ServerContext* context, const AddFriendReq* request, AddFriendRsp* reply) { return Status::OK; } Status ChatServiceImpl::NotifyAuthFriend(ServerContext* context, const AuthFriendReq* request, AuthFriendRsp* response) { return Status::OK; } Status ChatServiceImpl::NotifyTextChatMsg(::grpc::ServerContext* context, const TextChatMsgReq* request, TextChatMsgRsp* response) { return Status::OK; } bool ChatServiceImpl::GetBaseInfo(std::string base_key, int uid, std::shared_ptr<UserInfo>& userinfo) { return true; }
并且完善chatserver配置
增加了PeerServer字段,存储对端server列表,通过逗号分隔,可以通过逗号切割对端服务器名字,再根据名字去配置里查找对应字段。
对应的chatserver复制一份,改名为chatserver2,然后修改config.ini配置。要和server1配置不同,实现端对端的配置。具体详见服务器代码。
每当服务器chatserver启动后,都要重新设置一下用户连接数管理,并且我们每个chatserver既要有tcp服务监听也要有grpc服务监听
using namespace std; bool bstop = false; std::condition_variable cond_quit; std::mutex mutex_quit; int main() { auto& cfg = ConfigMgr::Inst(); auto server_name = cfg["SelfServer"]["Name"]; try { auto pool = AsioIOServicePool::GetInstance(); //将登录数设置为0 RedisMgr::GetInstance()->HSet(LOGIN_COUNT, server_name, "0"); //定义一个GrpcServer std::string server_address(cfg["SelfServer"]["Host"] + ":" + cfg["SelfServer"]["RPCPort"]); ChatServiceImpl service; grpc::ServerBuilder builder; // 监听端口和添加服务 builder.AddListeningPort(server_address, grpc::InsecureServerCredentials()); builder.RegisterService(&service); // 构建并启动gRPC服务器 std::unique_ptr<grpc::Server> server(builder.BuildAndStart()); std::cout << "RPC Server listening on " << server_address << std::endl; //单独启动一个线程处理grpc服务 std::thread grpc_server_thread([&server]() { server->Wait(); }); boost::asio::io_context io_context; boost::asio::signal_set signals(io_context, SIGINT, SIGTERM); signals.async_wait([&io_context, pool, &server](auto, auto) { io_context.stop(); pool->Stop(); server->Shutdown(); }); auto port_str = cfg["SelfServer"]["Port"]; CServer s(io_context, atoi(port_str.c_str())); io_context.run(); RedisMgr::GetInstance()->HDel(LOGIN_COUNT, server_name); RedisMgr::GetInstance()->Close(); grpc_server_thread.join(); } catch (std::exception& e) { std::cerr << "Exception: " << e.what() << endl; } }
我们在服务器启动后将本服务器的登录数量设置为0.
同样的道理,我们将服务器关闭后,也要删除对应key。
因为我们用户登录后,要将连接(session)和用户uid绑定。为以后登陆踢人做准备。所以新增UserMgr管理类.
其声明如下
class CSession;
class UserMgr : public Singleton<UserMgr>
{
friend class Singleton<UserMgr>;
public:
~UserMgr();
std::shared_ptr<CSession> GetSession(int uid);
void SetUserSession(int uid, std::shared_ptr<CSession> session);
void RmvUserSession(int uid);
private:
UserMgr();
std::mutex _session_mtx;
std::unordered_map<int, std::shared_ptr<CSession>> _uid_to_session;
};
其实现如下
UserMgr:: ~UserMgr() { _uid_to_session.clear(); } std::shared_ptr<CSession> UserMgr::GetSession(int uid) { std::lock_guard<std::mutex> lock(_session_mtx); auto iter = _uid_to_session.find(uid); if (iter == _uid_to_session.end()) { return nullptr; } return iter->second; } void UserMgr::SetUserSession(int uid, std::shared_ptr<CSession> session) { std::lock_guard<std::mutex> lock(_session_mtx); _uid_to_session[uid] = session; } void UserMgr::RmvUserSession(int uid) { auto uid_str = std::to_string(uid); //因为再次登录可能是其他服务器,所以会造成本服务器删除key,其他服务器注册key的情况 // 有可能其他服务登录,本服删除key造成找不到key的情况 //RedisMgr::GetInstance()->Del(USERIPPREFIX + uid_str); { std::lock_guard<std::mutex> lock(_session_mtx); _uid_to_session.erase(uid); } } UserMgr::UserMgr() { }
RmvUserSession 暂时屏蔽,以后做登录踢人后能保证有序移除用户ip操作。
当有连接异常时,可以调用移除用户Session的接口
void CServer::ClearSession(std::string session_id) {
if (_sessions.find(session_id) != _sessions.end()) {
//移除用户和session的关联
UserMgr::GetInstance()->RmvUserSession(_sessions[session_id]->GetUserId());
}
{
lock_guard<mutex> lock(_mutex);
_sessions.erase(session_id);
}
}
聊天服务完善用户登录,当用户登录后, 设置其uid对应的serverip。以及更新其所在服务器的连接数。
void LogicSystem::LoginHandler(shared_ptr<CSession> session, const short &msg_id, const string &msg_data) { Json::Reader reader; Json::Value root; reader.parse(msg_data, root); auto uid = root["uid"].asInt(); auto token = root["token"].asString(); std::cout << "user login uid is " << uid << " user token is " << token << endl; Json::Value rtvalue; Defer defer([this, &rtvalue, session]() { std::string return_str = rtvalue.toStyledString(); session->Send(return_str, MSG_CHAT_LOGIN_RSP); }); //从redis获取用户token是否正确 std::string uid_str = std::to_string(uid); std::string token_key = USERTOKENPREFIX + uid_str; std::string token_value = ""; bool success = RedisMgr::GetInstance()->Get(token_key, token_value); if (!success) { rtvalue["error"] = ErrorCodes::UidInvalid; return; } if (token_value != token) { rtvalue["error"] = ErrorCodes::TokenInvalid; return; } rtvalue["error"] = ErrorCodes::Success; std::string base_key = USER_BASE_INFO + uid_str; auto user_info = std::make_shared<UserInfo>(); bool b_base = GetBaseInfo(base_key, uid, user_info); if (!b_base) { rtvalue["error"] = ErrorCodes::UidInvalid; return; } rtvalue["uid"] = uid; rtvalue["pwd"] = user_info->pwd; rtvalue["name"] = user_info->name; rtvalue["email"] = user_info->email; rtvalue["nick"] = user_info->nick; rtvalue["desc"] = user_info->desc; rtvalue["sex"] = user_info->sex; rtvalue["icon"] = user_info->icon; //从数据库获取申请列表 //获取好友列表 auto server_name = ConfigMgr::Inst().GetValue("SelfServer", "Name"); //将登录数量增加 auto rd_res = RedisMgr::GetInstance()->HGet(LOGIN_COUNT, server_name); int count = 0; if (!rd_res.empty()) { count = std::stoi(rd_res); } count++; auto count_str = std::to_string(count); RedisMgr::GetInstance()->HSet(LOGIN_COUNT, server_name, count_str); //session绑定用户uid session->SetUserId(uid); //为用户设置登录ip server的名字 std::string ipkey = USERIPPREFIX + uid_str; RedisMgr::GetInstance()->Set(ipkey, server_name); //uid和session绑定管理,方便以后踢人操作 UserMgr::GetInstance()->SetUserSession(uid, session); return; }
状态服务更新配置
配置文件同样增加了chatservers列表,用来管理多个服务,接下来实现根据连接数动态返回chatserverip的功能
Status StatusServiceImpl::GetChatServer(ServerContext* context,
const GetChatServerReq* request, GetChatServerRsp* reply)
{
std::string prefix("llfc status server has received : ");
const auto& server = getChatServer();
reply->set_host(server.host);
reply->set_port(server.port);
reply->set_error(ErrorCodes::Success);
reply->set_token(generate_unique_string());
insertToken(request->uid(), reply->token());
return Status::OK;
}
getChatServer用来获取最小连接数的chatserver 名字
ChatServer StatusServiceImpl::getChatServer() { std::lock_guard<std::mutex> guard(_server_mtx); auto minServer = _servers.begin()->second; auto count_str = RedisMgr::GetInstance()->HGet(LOGIN_COUNT, minServer.name); if (count_str.empty()) { //不存在则默认设置为最大 minServer.con_count = INT_MAX; } else { minServer.con_count = std::stoi(count_str); } // 使用范围基于for循环 for (auto& server : _servers) { if (server.second.name == minServer.name) { continue; } auto count_str = RedisMgr::GetInstance()->HGet(LOGIN_COUNT, server.second.name); if (count_str.empty()) { server.second.con_count = INT_MAX; } else { server.second.con_count = std::stoi(count_str); } if (server.second.con_count < minServer.con_count) { minServer = server.second; } } return minServer; }
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。