赞
踩
目录
1. 一种自动反射消息类型的 Google Protobuf 网络传输方案
2. 在 muduo 中实现 protobuf 编解码器与消息分发器
Google Protocol Buffers (Protobuf) 是一款非常优秀的库,它定义了一种紧凑的可扩展二进制消息格式,特别适合网络数据传输。它为多种语言提供 binding,大大方便了分布式程序的开发,让系统不再局限于用某一种语言来编写。
根据 type name 反射自动创建 Message 对象
Protobuf 本身具有很强的反射(reflection)功能,可以根据 type name 创建具体类型的 Message 对象。Protobuf class diagram 如下:
大家在使用的时候,通常关心和使用的是图的左半部分:MessageLite、Message、Generated Message Types (Person, AddressBook) 等,而较少注意到图的右半部分:Descriptor, DescriptorPool, MessageFactory。
上图中,其关键作用的是 Descriptor class,每个具体 Message Type 对应一个 Descriptor 对象。尽管我们没有直接调用它的函数,但是Descriptor在“根据 type name 创建具体类型的 Message 对象”中扮演了重要的角色,起了桥梁作用。上图的红色箭头描述了根据 type name 创建具体 Message 对象的过程,后文会详细介绍。
Protobuf Message class 采用了 prototype pattern,Message class 定义了 New() 虚函数,用以返回本对象的一份新实体,类型与本对象的真实类型相同。也就是说,拿到 Message* 指针,不用知道它的具体类型,就能创建和它类型一样的具体 Message Type 的对象。
每个具体 Message Type 都有一个 default instance,可以通过 ConcreteMessage::default_instance() 获得,也可以通过 MessageFactory::GetPrototype(const Descriptor*) 来获得。所以,现在问题转变为 1. 如何拿到 MessageFactory;2. 如何拿到 Descriptor*。
当然,ConcreteMessage::descriptor() 返回了我们想要的 Descriptor*,但是,在不知道 ConcreteMessage 的时候,如何调用它的静态成员函数呢?我们的英雄是 DescriptorPool,它可以根据 type name 查到 Descriptor*,只要找到合适的 DescriptorPool,再调用 DescriptorPool::FindMessageTypeByName(const string& type_name) 即可。
根据 type name 自动创建 Message 的关键代码
- //codec.cc中函数
- google::protobuf::Message* ProtobufCodec::createMessage(const std::string& typeName)
- {
- google::protobuf::Message* message = NULL;
- const google::protobuf::Descriptor* descriptor =
- google::protobuf::DescriptorPool::generated_pool()->FindMessageTypeByName(typeName);
- if (descriptor)
- {
- const google::protobuf::Message* prototype =
- google::protobuf::MessageFactory::generated_factory()->GetPrototype(descriptor);
- if (prototype)
- {
- message = prototype->New();
- }
- }
- return message;
- }
调用方式:
- google::protobuf::Message* newQuery = createMessage("muduo.Query");
- assert(newQuery != NULL);
- assert(typeid(*newQuery) == typeid(muduo::Query::default_instance()));
- cout << "createMessage(/"muduo.Query/") = " << newQuery << endl;
createMessage() 返回的是动态创建的对象的指针,调用方有责任释放它,不然就会内存泄露。在 muduo 里,我用 shared_ptr 来自动管理 Message 对象的生命期。拿到Message *后,如何调用这个具体消息类型的处理函数,就需要依赖于消息分发器(dispatcher)。
Protobuf 是经过深思熟虑的消息打包方案,它的默认序列化格式没有包含消息的长度与类型,自然有其道理。如果 protobuf 无条件地把长度和类型放到序列化的字节串中,只会浪费网络带宽和存储。用户在使用时,根据应用场景确认是否加入长度和类型到序列化字节中。
什么是编解码器 codec?
Codec 是 encoder 和 decoder 的缩写,这是一个到软硬件都在使用的术语,这里我借指“把网络数据和业务消息之间互相转换”的代码。
在最简单的网络编程中,没有消息 message 只有字节流数据,这时候是用不到 codec 的。比如我们前面讲过的 echo server,它只需要把收到的数据原封不动地发送回去,它不必关心消息的边界(也没有“消息”的概念),收多少就发多少,这种情况下它干脆直接使用 muduo::net::Buffer,取到数据再交给 TcpConnection 发送回去,见下图。
non-trivial 的网络服务程序通常会以消息为单位来通信,每条消息有明确的长度与界限。程序每次收到一个完整的消息的时候才开始处理,发送的时候也是把一个完整的消息交给网络库。比如我们前面讲过的 asio chat 服务,它的一条聊天记录就是一条消息,我们设计了一个简单的消息格式,即在聊天记录前面加上 4 字节的 length header,LengthHeaderCodec 代码及解说不详述。
codec 的基本功能之一是做 TCP 分包:确定每条消息的长度,为消息划分界限。在 non-blocking 网络编程中,codec 几乎是必不可少的。如果只收到了半条消息,那么不会触发消息回调,数据会停留在 Buffer 里(数据已经读到 Buffer 中了),等待收到一个完整的消息再通知处理函数。既然这个任务太常见,我们干脆做一个 utility class,避免服务端和客户端程序都要自己处理分包,这就有了 LengthHeaderCodec。这个 codec 的使用有点奇怪,不需要继承,它也没有基类,只要把它当成普通 data member 来用,把 TcpConnection 的数据喂给它,然后向它注册 onXXXMessage() 回调,代码见 asio chat 示例。muduo 里的 codec 都是这样的风格,通过 boost::function 粘合到一起。
codec 是一层间接性,它位于 TcpConnection 和 ChatServer 之间,拦截处理收到的数据,在收到完整的消息之后再调用 CharServer 对应的处理函数,注意 CharServer::onStringMessage() 的参数是 std::string,不再是 muduo::net::Buffer,也就是说 LengthHeaderCodec 把 Buffer 解码成了 string。另外,在发送消息的时候,ChatServer 通过 LengthHeaderCodec::send() 来发送 string,LengthHeaderCodec 负责把它编码成 Buffer。这正是“编解码器”名字的由来。消息流程如图所示:
Protobuf codec 与此非常类似,只不过消息类型从 std::string 变成了 protobuf::Message。对于只接收处理 Query 消息的 QueryServer 来说,用 ProtobufCodec 非常方便,收到 protobuf::Message 之后 down cast 成 Query 来用就行。
如果要接收处理不止一种消息,ProtobufCodec 恐怕还不能单独完成工作,请继续阅读下文。
Protobuf传输格式
在muduo库中设计了一个简单的打包格式,包含Protobuf data和其对应的长度和类型信息,消息的末尾还有一个check sum。格式如下图所示,图中方块的宽度是32-bit。
将该格式用C代码描述:
- struct ProtobufTransportFormat __attribute__ ((__packed__))
- {
- int32_t len;
- int32_t nameLen;
- char typeName[nameLen];
- char protobufData[len-nameLen-8];
- int32_t checkSum; // adler32 of nameLen, typeName and protobufData
- }
例子,用这个格式打包一个muduo.Query对象的结果如下:
以下作者在设计这个传输格式时的考虑:
实现 ProtobufCodec
编码算法很直截了当,按照前文定义的消息格式一路打包下来,最后更新一下首部的长度即可。函数ProtobufCodec::fillEmptyBuffer实现如下:
- void ProtobufCodec::fillEmptyBuffer(Buffer* buf, const google::protobuf::Message& message)
- {
- // buf->retrieveAll();
- assert(buf->readableBytes() == 0);
-
- const std::string& typeName = message.GetTypeName();
- int32_t nameLen = static_cast<int32_t>(typeName.size()+1);
- buf->appendInt32(nameLen);
- buf->append(typeName.c_str(), nameLen);
-
- // code copied from MessageLite::SerializeToArray() and MessageLite::SerializePartialToArray().
- GOOGLE_DCHECK(message.IsInitialized()) << InitializationErrorMessage("serialize", message);
-
- #if GOOGLE_PROTOBUF_VERSION > 3009002
- int byte_size = google::protobuf::internal::ToIntSize(message.ByteSizeLong());
- #else
- int byte_size = message.ByteSize();
- #endif
- buf->ensureWritableBytes(byte_size);
-
- uint8_t* start = reinterpret_cast<uint8_t*>(buf->beginWrite());
- uint8_t* end = message.SerializeWithCachedSizesToArray(start);
- if (end - start != byte_size)
- {
- #if GOOGLE_PROTOBUF_VERSION > 3009002
- ByteSizeConsistencyError(byte_size, google::protobuf::internal::ToIntSize(message.ByteSizeLong()), static_cast<int>(end - start));
- #else
- ByteSizeConsistencyError(byte_size, message.ByteSize(), static_cast<int>(end - start));
- #endif
- }
- buf->hasWritten(byte_size);
-
- int32_t checkSum = static_cast<int32_t>(
- ::adler32(1,
- reinterpret_cast<const Bytef*>(buf->peek()),
- static_cast<int>(buf->readableBytes())));
- buf->appendInt32(checkSum);
- assert(buf->readableBytes() == sizeof nameLen + nameLen + byte_size + sizeof checkSum);
- int32_t len = sockets::hostToNetwork32(static_cast<int32_t>(buf->readableBytes()));
- buf->prepend(&len, sizeof len);
- }
解码算法ProtobufCodec::onMessage函数有几个要点:
- void ProtobufCodec::onMessage(const TcpConnectionPtr& conn,
- Buffer* buf,
- Timestamp receiveTime)
- {
- while (buf->readableBytes() >= kMinMessageLen + kHeaderLen)
- {
- const int32_t len = buf->peekInt32();
- if (len > kMaxMessageLen || len < kMinMessageLen)
- {
- errorCallback_(conn, buf, receiveTime, kInvalidLength);
- break;
- }
- else if (buf->readableBytes() >= implicit_cast<size_t>(len + kHeaderLen))
- {
- ErrorCode errorCode = kNoError;
- MessagePtr message = parse(buf->peek()+kHeaderLen, len, &errorCode);
- //先检查校验码,再根据type name创建相应的prototype,再调用MessageLite::ParseFormArray,解析二进制格式的数据。
- if (errorCode == kNoError && message)
- {
- messageCallback_(conn, message, receiveTime);
- buf->retrieve(kHeaderLen+len);
- }
- else
- {
- errorCallback_(conn, buf, receiveTime, errorCode);
- break;
- }
- }
- else
- {
- break;
- }
- }
- }
ProtobufCodec 在实际使用中有明显的不足:它只负责把 muduo::net::Buffer 转换为具体类型的 protobuf::Message,应用程序拿到 Message 之后还有再根据其具体类型做一次分发。我们可以考虑做一个简单通用的分发器 dispatcher,以简化客户代码。
消息分发器 dispatcher 有什么用?
前面提到,在使用 TCP 长连接,且在一个连接上传递不止一种 protobuf 消息的情况下,客户代码需要对收到的消息按类型做分发。比方说,收到 Logon 消息就交给 QueryServer::onLogon() 去处理,收到 Query 消息就交给 QueryServer::onQuery() 去处理。这个消息分派机制可以做得稍微有点通用性,让所有 muduo+protobuf 程序收益,而且不增加复杂性。
换句话说,又是一层间接性,ProtobufCodec 拦截了 TcpConnection 的数据,把它转换为 Message,ProtobufDispatcher 拦截了 ProtobufCodec 的 callback,按消息具体类型把它分派给多个 callbacks。处理过程如下所示:
ProtobufDispatcher的两种实现
要完成消息分发,其实就是对消息做type-switch,这样做的扩展性不太好,但Protobuf Message的Descriptor没有留下定制点(比如暴露一个boost::any成员),只能这么做了。
ProtobufDispatcherLite的结构很简单,它有一个map<const Descriptor*, ProtobufMessageCallback>
成员,客户代码可以以Descriptor*为键注册回调(每个具体消息类型都有一个全局的Descriptor对象,其地址是不变的)。在收到Protobuf Message之后,在map中找到相应的ProtobufMessageCallback,然后调用它。如果找不到,就调用defaultCallback。
不过,这个设计有小小的缺陷,因为ProtobufMessageCallback的定义为:
- typedef std::function<void (const muduo::net::TcpConnectionPtr&,
- const MessagePtr&,
- muduo::Timestamp)> ProtobufMessageCallback;
这样就限制了客户代码只能接受基类Message,那么客户代码在处理消息时,需要自己做down cast,如下图所示:
如果说我们希望down cast交给dispatcher处理,客户代码拿到的就已经是具体类型,其接口如下图所示:
也就是说,我们需要如何将ProtobufDispatcher与多个未知的消息类型合作?可以将多态与模板结合,利用templated derived class来实现。
ProtobufDispatcher 有一个模板成员函数,可以接受注册任意消息类型 T 的回调,然后它创建一个模板化的派生类 CallbackT,这样消息的类新信息就保存在了 CallbackT 中,做 down casting 就简单了。比方说,我们有两个具体消息类型 Query 和 Answer 继承自Message,注册回调如下:
- dispatcher_.registerMessageCallback(
- boost::bind(&QueryServer::onQuery, this, _1, _2, _3));
- dispatcher_.registerMessageCallback(
- boost::bind(&QueryServer::onAnswer, this, _1, _2, _3));
这样会具现化 (instantiation) 出两个 CallbackT 实体,如下:
ProtobufDispatcherLite和ProtobufDispatcher代码比较
ProtobufDispatcherLite主要代码:
- typedef std::shared_ptr<google::protobuf::Message> MessagePtr;
-
- class ProtobufDispatcherLite : muduo::noncopyable
- {
- public:
- typedef std::function<void (const muduo::net::TcpConnectionPtr&,
- const MessagePtr&,
- muduo::Timestamp)> ProtobufMessageCallback;
-
- explicit ProtobufDispatcherLite(const ProtobufMessageCallback& defaultCb)
- : defaultCallback_(defaultCb)
- {
- }
-
- void onProtobufMessage(const muduo::net::TcpConnectionPtr& conn,
- const MessagePtr& message,
- muduo::Timestamp receiveTime) const
- {
- CallbackMap::const_iterator it = callbacks_.find(message->GetDescriptor());
- if (it != callbacks_.end())
- {
- it->second(conn, message, receiveTime);
- }
- else
- {
- defaultCallback_(conn, message, receiveTime);
- }
- }
-
- void registerMessageCallback(const google::protobuf::Descriptor* desc,
- const ProtobufMessageCallback& callback)
- {
- callbacks_[desc] = callback;
- }
-
- private:
-
- typedef std::map<const google::protobuf::Descriptor*, ProtobufMessageCallback> CallbackMap;
- CallbackMap callbacks_;
- ProtobufMessageCallback defaultCallback_;
- };
ProtobufDispatcher主要代码:
- typedef std::shared_ptr<google::protobuf::Message> MessagePtr;
-
- class Callback : muduo::noncopyable
- {
- public:
- virtual ~Callback() = default;
- virtual void onMessage(const muduo::net::TcpConnectionPtr&,
- const MessagePtr& message,
- muduo::Timestamp) const = 0;
- };
-
- template <typename T>
- class CallbackT : public Callback
- {
- static_assert(std::is_base_of<google::protobuf::Message, T>::value,
- "T must be derived from gpb::Message.");
- public:
- typedef std::function<void (const muduo::net::TcpConnectionPtr&,
- const std::shared_ptr<T>& message,
- muduo::Timestamp)> ProtobufMessageTCallback;
-
- CallbackT(const ProtobufMessageTCallback& callback)
- : callback_(callback)
- {
- }
-
- void onMessage(const muduo::net::TcpConnectionPtr& conn,
- const MessagePtr& message,
- muduo::Timestamp receiveTime) const override
- {
- std::shared_ptr<T> concrete = muduo::down_pointer_cast<T>(message);
- assert(concrete != NULL);
- callback_(conn, concrete, receiveTime);
- }
-
- private:
- ProtobufMessageTCallback callback_;
- };
-
- class ProtobufDispatcher
- {
- public:
- typedef std::function<void (const muduo::net::TcpConnectionPtr&,
- const MessagePtr& message,
- muduo::Timestamp)> ProtobufMessageCallback;
-
- explicit ProtobufDispatcher(const ProtobufMessageCallback& defaultCb)
- : defaultCallback_(defaultCb)
- {
- }
-
- void onProtobufMessage(const muduo::net::TcpConnectionPtr& conn,
- const MessagePtr& message,
- muduo::Timestamp receiveTime) const
- {
- CallbackMap::const_iterator it = callbacks_.find(message->GetDescriptor());
- if (it != callbacks_.end())
- {
- it->second->onMessage(conn, message, receiveTime);
- }
- else
- {
- defaultCallback_(conn, message, receiveTime);
- }
- }
-
- template<typename T>
- void registerMessageCallback(const typename CallbackT<T>::ProtobufMessageTCallback& callback)
- {
- std::shared_ptr<CallbackT<T> > pd(new CallbackT<T>(callback));
- callbacks_[T::descriptor()] = pd;
- }
-
- private:
- typedef std::map<const google::protobuf::Descriptor*, std::shared_ptr<Callback> > CallbackMap;
-
- CallbackMap callbacks_;
- ProtobufMessageCallback defaultCallback_;
- };
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。