赞
踩
Kindling collector项目作为Go端分析器,使用类似opentelmetry的pinpeline进行数据分析。其中涉及5个组件:
其中协议解析流程主要在NetworkAnalyzer组件中进行,将接收的请求/响应事件成对匹配后,交由parseProtocols()函数解析出协议指标。
NetworkAnnalyzer.parseProtocols()方法定义了整体解析流程,根据协议解析器分别解析请求和响应,当最终都成功时输出指标。
正常的协议解析只负责逐帧解析指标功能。
现已支持5种协议解析,当协议越来越多时,遍历引起的解析会越来越耗时,那么需引入fastfail快速识别协议
对于复杂的多报文协议,如Kafka有不同的API报文,而相同API也有不同的版本报文。将所有报文解析逻辑都写在一起会使整个类过于臃肿且不易维护。为此引入树形多报文结构用于快速且低耦合地实现开发。
在树形报文解析过程中,有如下2个场景需要考虑
定义PayloadMessage,封装报文内容、读取偏移量和指标存储的Map。
type PayloadMessage struct {
Data []byte
Offset int
attributeMap *model.AttributeMap
}
由于引入协议树,协议解析过程parse() (ok bool)将不再适用。协议树中的个协议的解析成功不表示整个协议解析成功,需解析整颗树的协议是否成功,将API扩展为parse() (ok bool, complete bool)。
基于以上几点需求,设计树形结构的报文解析器PkgParser。PkgParser定义了fastFail(快速识别失败) 和parser(解析单个报文)函数;每个协议只需注册自身的PkgParser即可接入整套流程。
*fastFail(message PayloadMessage) (fail bool)
*parser(message PayloadMessage) (ok bool, complete bool)
ProtocolParser定义了请求和响应的解析器,并提供ParseRequest()和ParseResponse() API用于解析请求和响应
其中response的message携带了request解析出的attributes指标,用于匹配。eg. kafka的correlationId request和response报文需一致,且response报文解析用到了request的key。
const (
HTTP = "http"
...
XX = "xx" // 此处替换具体协议名
...
)
analyzer/network/protocol目录下创建文件夹xx,xx替换为具体协议,并创建3个文件xx_parser.go、xx_request.go 和 xx_response.go
analyzer/network/protocol/xx
├── xx_parser.go 协议解析器
├── xx_request.go 实现请求解析流程
└── xx_response.go 实现响应解析流程
实现fastfail()和parser()函数
func fastfailXXRequest() protocol.FastFailFn { return func(message *protocol.PayloadMessage) bool { // 根据报文实现具体的fastFail()函数 return false } } func parseXXRequest() protocol.ParsePkgFn { return func(message *protocol.PayloadMessage) (bool, bool) { // 解析报文内容 contentKey := getContentKey(message.Data) if contentKey == "" { // 第一个参数false 表示解析失败,第二个参数表示报文解析完成 return false, true } // 通过AddStringAttribute() 或 AttIntAttribute() 存储解析出的属性 message.AddStringAttribute(constlabels.ContentKey, contentKey) // 解析成功 return true, true } }
实现fastfail()和parser()函数
func fastfailXXResponse() protocol.FastFailFn { return func(message *protocol.PayloadMessage) bool { // 根据报文实现具体的fastFail()函数 return false } } func parseXXResponse() protocol.ParsePkgFn { return func(message *protocol.PayloadMessage) (bool, bool) { // 通过GetStringAttribute() 或 GetIntAttribute() 读取request解析后的参数 contentKey := message.GetStringAttribute(constlabels.ContentKey) // 解析响应报文 errorCode := getErrorCode(message) if errorCode > 20 { // 有errorCode或errorMsg等常见,需定义IsError为true用于后续processor生成Metric message.
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。