赞
踩
Golang框架实战-KisFlow流式计算框架(1)-概述
Golang框架实战-KisFlow流式计算框架(2)-项目构建/基础模块-(上)
Golang框架实战-KisFlow流式计算框架(3)-项目构建/基础模块-(下)
Golang框架实战-KisFlow流式计算框架(4)-数据流
Golang框架实战-KisFlow流式计算框架(5)-Function调度
Golang框架实战-KisFlow流式计算框架(6)-Connector
Golang框架实战-KisFlow流式计算框架(7)-配置导入与导出
Golang框架实战-KisFlow流式计算框架(8)-KisFlow Action
Golang框架实战-KisFlow流式计算框架(9)-Cache/Params 数据缓存与数据参数
Golang框架实战-KisFlow流式计算框架(10)-Flow多副本
KisFlow如果在执行流体中,需要被多个Goroutine来并发使用,可能需要同一个配置的创建多个Flow来匹配多个并发的计算流,所以Flow需要一个创建副本的能力。本章将实现这部分的能力。
首先,给Flow的抽象层新增一个接口Fork()
,原型如下:
kis-flow/kis/flow.go
type Flow interface { // Run 调度Flow,依次调度Flow中的Function并且执行 Run(ctx context.Context) error // Link 将Flow中的Function按照配置文件中的配置进行连接 Link(fConf *config.KisFuncConfig, fParams config.FParam) error // CommitRow 提交Flow数据到即将执行的Function层 CommitRow(row interface{}) error // Input 得到flow当前执行Function的输入源数据 Input() common.KisRowArr // GetName 得到Flow的名称 GetName() string // GetThisFunction 得到当前正在执行的Function GetThisFunction() Function // GetThisFuncConf 得到当前正在执行的Function的配置 GetThisFuncConf() *config.KisFuncConfig // GetConnector 得到当前正在执行的Function的Connector GetConnector() (Connector, error) // GetConnConf 得到当前正在执行的Function的Connector的配置 GetConnConf() (*config.KisConnConfig, error) // GetConfig 得到当前Flow的配置 GetConfig() *config.KisFlowConfig // GetFuncConfigByName 得到当前Flow的配置 GetFuncConfigByName(funcName string) *config.KisFuncConfig // Next 当前Flow执行到的Function进入下一层Function所携带的Action动作 Next(acts ...ActionFunc) error // GetCacheData 得到当前Flow的缓存数据 GetCacheData(key string) interface{} // SetCacheData 设置当前Flow的缓存数据 SetCacheData(key string, value interface{}, Exp time.Duration) // GetMetaData 得到当前Flow的临时数据 GetMetaData(key string) interface{} // SetMetaData 设置当前Flow的临时数据 SetMetaData(key string, value interface{}) // GetFuncParam 得到Flow的当前正在执行的Function的配置默认参数,取出一对key-value GetFuncParam(key string) string // GetFuncParamAll 得到Flow的当前正在执行的Function的配置默认参数,取出全部Key-Value GetFuncParamAll() config.FParam // +++++++++++++++++++++++++ // Fork 得到Flow的一个副本(深拷贝) Fork(ctx context.Context) Flow }
Fork()
会根据一个已有的KisFlow实例,完全克隆一个资源隔离的但是具有相同配置的KisFlow实例。
具体的实现方法如下:
kis-flow/flow/kis_flow.go
// Fork 得到Flow的一个副本(深拷贝) func (flow *KisFlow) Fork(ctx context.Context) kis.Flow { config := flow.Conf // 通过之前的配置生成一个新的Flow newFlow := NewKisFlow(config) for _, fp := range flow.Conf.Flows { if _, ok := flow.funcParams[flow.Funcs[fp.FuncName].GetId()]; !ok { //当前function没有配置Params newFlow.Link(flow.Funcs[fp.FuncName].GetConfig(), nil) } else { //当前function有配置Params newFlow.Link(flow.Funcs[fp.FuncName].GetConfig(), fp.Params) } } log.Logger().DebugFX(ctx, "=====>Flow Fork, oldFlow.funcParams = %+v\n", flow.funcParams) log.Logger().DebugFX(ctx, "=====>Flow Fork, newFlow.funcParams = %+v\n", newFlow.GetFuncParamsAllFuncs()) return newFlow }
在Fork()
中,首先会根据flow的配置信息,重新创建一个KisFlow实例,并且将flow所关联的Params等配置信息一同拷贝,最后通过Link()
将新建的Function和Flow连接起来。
上述代码为了调试,给Flow新增了一个打印全部FuncParams信息的接口GetFuncParamsAllFuncs()
,具体的实现方式如下:
kis-flow/kis/flow.go
type Flow interface {
// ... ...
// ... ...
// GetFuncParamsAllFuncs 得到Flow中所有Function的FuncParams,取出全部Key-Value
GetFuncParamsAllFuncs() map[string]config.FParam
// ... ...
}
kis-flow/flow/kis_flow_data.go
// GetFuncParamsAllFuncs 得到Flow中所有Function的FuncParams,取出全部Key-Value
func (flow *KisFlow) GetFuncParamsAllFuncs() map[string]config.FParam {
flow.fplock.RLock()
defer flow.fplock.RUnlock()
return flow.funcParams
}
下面我们来测试一个Fork能力,单元测试代码如下:
kis-flow/test/kis_fork_test.go
func TestForkFlow(t *testing.T) { ctx := context.Background() // 0. 注册Function 回调业务 kis.Pool().FaaS("funcName1", faas.FuncDemo1Handler) kis.Pool().FaaS("funcName2", faas.FuncDemo2Handler) kis.Pool().FaaS("funcName3", faas.FuncDemo3Handler) // 0. 注册ConnectorInit 和 Connector 回调业务 kis.Pool().CaaSInit("ConnName1", caas.InitConnDemo1) kis.Pool().CaaS("ConnName1", "funcName2", common.S, caas.CaasDemoHanler1) // 1. 加载配置文件并构建Flow if err := file.ConfigImportYaml("/Users/tal/gopath/src/kis-flow/test/load_conf/"); err != nil { panic(err) } // 2. 获取Flow flow1 := kis.Pool().GetFlow("flowName1") flow1Clone1 := flow1.Fork(ctx) // 3. 提交原始数据 _ = flow1Clone1.CommitRow("This is Data1 from Test") _ = flow1Clone1.CommitRow("This is Data2 from Test") _ = flow1Clone1.CommitRow("This is Data3 from Test") // 4. 执行flow1 if err := flow1Clone1.Run(ctx); err != nil { panic(err) } }
首先我们先创建flowName1的flow实例,然后通过fork()
得到flowClone1
,然后执行flowClone1的调度流程。
cd到kis-flow/test/
下执行:
go test -test.v -test.paniconexit0 -test.run TestForkFlow
结果如下:
=== RUN TestForkFlow Add KisPool FuncName=funcName1 Add KisPool FuncName=funcName2 Add KisPool FuncName=funcName3 Add KisPool CaaSInit CName=ConnName1 Add KisPool CaaS CName=ConnName1, FName=funcName2, Mode =Save ===> Call Connector InitDemo1 &{conn ConnName1 0.0.0.0:9988,0.0.0.0:9999,0.0.0.0:9990 redis redis-key map[args1:value1 args2:value2] [] [funcName2 funcName2]} Add FlowRouter FlowName=flowName1 Add FlowRouter FlowName=flowName2 Add FlowRouter FlowName=flowName3 Add FlowRouter FlowName=flowName4 ===> Call Connector InitDemo1 &{conn ConnName1 0.0.0.0:9988,0.0.0.0:9999,0.0.0.0:9990 redis redis-key map[args1:value1 args2:value2] [] [funcName2 funcName2 funcName2]} Add FlowRouter FlowName=flowName5 ===> Call Connector InitDemo1 &{conn ConnName1 0.0.0.0:9988,0.0.0.0:9999,0.0.0.0:9990 redis redis-key map[args1:value1 args2:value2] [] [funcName2 funcName2 funcName2]} context.Background =====>Flow Fork, oldFlow.funcParams = map[func-6b00f430fe494302a384c2ae09eb019c:map[default1:funcName3_param1 default2:funcName3_param2 myKey1:flowValue3-1 myKey2:flowValue3-2] func-bf9df5fc16684200b78f32985d073012:map[default1:funcName2_param1 default2:funcName2_param2 myKey1:flowValue2-1 myKey2:flowValue2-2] func-c0f1ae9850174f81b994a2e98fb34109:map[default1:funcName1_param1 default2:funcName1_param2 myKey1:flowValue1-1 myKey2:flowValue1-2]] context.Background =====>Flow Fork, newFlow.funcParams = map[func-614511f5142e4023b80373517f3ea0a7:map[default1:funcName3_param1 default2:funcName3_param2 myKey1:flowValue3-1 myKey2:flowValue3-2] func-9406285e2fa94bd582dab4a875771a97:map[default1:funcName1_param1 default2:funcName1_param2 myKey1:flowValue1-1 myKey2:flowValue1-2] func-de7c4e4175b74a898cb43863e53b3215:map[default1:funcName2_param1 default2:funcName2_param2 myKey1:flowValue2-1 myKey2:flowValue2-2]] context.Background ====> After CommitSrcData, flow_name = flowName1, flow_id = flow-38c362e52fee489db3af96ae7d83d56a All Level Data = map[FunctionIdFirstVirtual:[This is Data1 from Test This is Data2 from Test This is Data3 from Test]] KisFunctionV, flow = &{Id:flow-38c362e52fee489db3af96ae7d83d56a Name:flowName1 Conf:0xc000153f80 Funcs:map[funcName1:0xc00014d180 funcName2:0xc00014d200 funcName3:0xc00014d300] FlowHead:0xc00014d180 FlowTail:0xc00014d300 flock:{w:{state:0 sema:0} writerSem:0 readerSem:0 readerCount:0 readerWait:0} ThisFunction:0xc00014d180 ThisFunctionId:func-9406285e2fa94bd582dab4a875771a97 PrevFunctionId:FunctionIdFirstVirtual funcParams:map[func-614511f5142e4023b80373517f3ea0a7:map[default1:funcName3_param1 default2:funcName3_param2 myKey1:flowValue3-1 myKey2:flowValue3-2] func-9406285e2fa94bd582dab4a875771a97:map[default1:funcName1_param1 default2:funcName1_param2 myKey1:flowValue1-1 myKey2:flowValue1-2] func-de7c4e4175b74a898cb43863e53b3215:map[default1:funcName2_param1 default2:funcName2_param2 myKey1:flowValue2-1 myKey2:flowValue2-2]] fplock:{w:{state:0 sema:0} writerSem:0 readerSem:0 readerCount:0 readerWait:0} buffer:[] data:map[FunctionIdFirstVirtual:[This is Data1 from Test This is Data2 from Test This is Data3 from Test]] inPut:[This is Data1 from Test This is Data2 from Test This is Data3 from Test] abort:false action:{DataReuse:false ForceEntryNext:false JumpFunc: Abort:false} cache:0xc00011aae0 metaData:map[] mLock:{w:{state:0 sema:0} writerSem:0 readerSem:0 readerCount:0 readerWait:0}} ---> Call funcName1Handler ---- Params = map[default1:funcName1_param1 default2:funcName1_param2 myKey1:flowValue1-1 myKey2:flowValue1-2] In FuncName = funcName1, FuncId = func-9406285e2fa94bd582dab4a875771a97, row = This is Data1 from Test In FuncName = funcName1, FuncId = func-9406285e2fa94bd582dab4a875771a97, row = This is Data2 from Test In FuncName = funcName1, FuncId = func-9406285e2fa94bd582dab4a875771a97, row = This is Data3 from Test context.Background ====> After commitCurData, flow_name = flowName1, flow_id = flow-38c362e52fee489db3af96ae7d83d56a All Level Data = map[FunctionIdFirstVirtual:[This is Data1 from Test This is Data2 from Test This is Data3 from Test] func-9406285e2fa94bd582dab4a875771a97:[data from funcName[funcName1], index = 0 data from funcName[funcName1], index = 1 data from funcName[funcName1], index = 2]] KisFunctionS, flow = &{Id:flow-38c362e52fee489db3af96ae7d83d56a Name:flowName1 Conf:0xc000153f80 Funcs:map[funcName1:0xc00014d180 funcName2:0xc00014d200 funcName3:0xc00014d300] FlowHead:0xc00014d180 FlowTail:0xc00014d300 flock:{w:{state:0 sema:0} writerSem:0 readerSem:0 readerCount:0 readerWait:0} ThisFunction:0xc00014d200 ThisFunctionId:func-de7c4e4175b74a898cb43863e53b3215 PrevFunctionId:func-9406285e2fa94bd582dab4a875771a97 funcParams:map[func-614511f5142e4023b80373517f3ea0a7:map[default1:funcName3_param1 default2:funcName3_param2 myKey1:flowValue3-1 myKey2:flowValue3-2] func-9406285e2fa94bd582dab4a875771a97:map[default1:funcName1_param1 default2:funcName1_param2 myKey1:flowValue1-1 myKey2:flowValue1-2] func-de7c4e4175b74a898cb43863e53b3215:map[default1:funcName2_param1 default2:funcName2_param2 myKey1:flowValue2-1 myKey2:flowValue2-2]] fplock:{w:{state:0 sema:0} writerSem:0 readerSem:0 readerCount:0 readerWait:0} buffer:[] data:map[FunctionIdFirstVirtual:[This is Data1 from Test This is Data2 from Test This is Data3 from Test] func-9406285e2fa94bd582dab4a875771a97:[data from funcName[funcName1], index = 0 data from funcName[funcName1], index = 1 data from funcName[funcName1], index = 2]] inPut:[data from funcName[funcName1], index = 0 data from funcName[funcName1], index = 1 data from funcName[funcName1], index = 2] abort:false action:{DataReuse:false ForceEntryNext:false JumpFunc: Abort:false} cache:0xc00011aae0 metaData:map[] mLock:{w:{state:0 sema:0} writerSem:0 readerSem:0 readerCount:0 readerWait:0}} ---> Call funcName2Handler ---- Params = map[default1:funcName2_param1 default2:funcName2_param2 myKey1:flowValue2-1 myKey2:flowValue2-2] In FuncName = funcName2, FuncId = func-de7c4e4175b74a898cb43863e53b3215, row = data from funcName[funcName1], index = 0 ===> In CaasDemoHanler1: flowName: flowName1, cName:ConnName1, fnName:funcName2, mode:Save Params = map[args1:value1 args2:value2] ===> Call Connector CaasDemoHanler1, args from funciton: data from funcName[funcName1], index = 0 In FuncName = funcName2, FuncId = func-de7c4e4175b74a898cb43863e53b3215, row = data from funcName[funcName1], index = 1 ===> In CaasDemoHanler1: flowName: flowName1, cName:ConnName1, fnName:funcName2, mode:Save Params = map[args1:value1 args2:value2] ===> Call Connector CaasDemoHanler1, args from funciton: data from funcName[funcName1], index = 1 In FuncName = funcName2, FuncId = func-de7c4e4175b74a898cb43863e53b3215, row = data from funcName[funcName1], index = 2 ===> In CaasDemoHanler1: flowName: flowName1, cName:ConnName1, fnName:funcName2, mode:Save Params = map[args1:value1 args2:value2] ===> Call Connector CaasDemoHanler1, args from funciton: data from funcName[funcName1], index = 2 context.Background ====> After commitCurData, flow_name = flowName1, flow_id = flow-38c362e52fee489db3af96ae7d83d56a All Level Data = map[FunctionIdFirstVirtual:[This is Data1 from Test This is Data2 from Test This is Data3 from Test] func-9406285e2fa94bd582dab4a875771a97:[data from funcName[funcName1], index = 0 data from funcName[funcName1], index = 1 data from funcName[funcName1], index = 2] func-de7c4e4175b74a898cb43863e53b3215:[data from funcName[funcName2], index = 0 data from funcName[funcName2], index = 1 data from funcName[funcName2], index = 2]] KisFunctionC, flow = &{Id:flow-38c362e52fee489db3af96ae7d83d56a Name:flowName1 Conf:0xc000153f80 Funcs:map[funcName1:0xc00014d180 funcName2:0xc00014d200 funcName3:0xc00014d300] FlowHead:0xc00014d180 FlowTail:0xc00014d300 flock:{w:{state:0 sema:0} writerSem:0 readerSem:0 readerCount:0 readerWait:0} ThisFunction:0xc00014d300 ThisFunctionId:func-614511f5142e4023b80373517f3ea0a7 PrevFunctionId:func-de7c4e4175b74a898cb43863e53b3215 funcParams:map[func-614511f5142e4023b80373517f3ea0a7:map[default1:funcName3_param1 default2:funcName3_param2 myKey1:flowValue3-1 myKey2:flowValue3-2] func-9406285e2fa94bd582dab4a875771a97:map[default1:funcName1_param1 default2:funcName1_param2 myKey1:flowValue1-1 myKey2:flowValue1-2] func-de7c4e4175b74a898cb43863e53b3215:map[default1:funcName2_param1 default2:funcName2_param2 myKey1:flowValue2-1 myKey2:flowValue2-2]] fplock:{w:{state:0 sema:0} writerSem:0 readerSem:0 readerCount:0 readerWait:0} buffer:[] data:map[FunctionIdFirstVirtual:[This is Data1 from Test This is Data2 from Test This is Data3 from Test] func-9406285e2fa94bd582dab4a875771a97:[data from funcName[funcName1], index = 0 data from funcName[funcName1], index = 1 data from funcName[funcName1], index = 2] func-de7c4e4175b74a898cb43863e53b3215:[data from funcName[funcName2], index = 0 data from funcName[funcName2], index = 1 data from funcName[funcName2], index = 2]] inPut:[data from funcName[funcName2], index = 0 data from funcName[funcName2], index = 1 data from funcName[funcName2], index = 2] abort:false action:{DataReuse:false ForceEntryNext:false JumpFunc: Abort:false} cache:0xc00011aae0 metaData:map[] mLock:{w:{state:0 sema:0} writerSem:0 readerSem:0 readerCount:0 readerWait:0}} ---> Call funcName3Handler ---- Params = map[default1:funcName3_param1 default2:funcName3_param2 myKey1:flowValue3-1 myKey2:flowValue3-2] In FuncName = funcName3, FuncId = func-614511f5142e4023b80373517f3ea0a7, row = data from funcName[funcName2], index = 0 In FuncName = funcName3, FuncId = func-614511f5142e4023b80373517f3ea0a7, row = data from funcName[funcName2], index = 1 In FuncName = funcName3, FuncId = func-614511f5142e4023b80373517f3ea0a7, row = data from funcName[funcName2], index = 2 --- PASS: TestForkFlow (0.03s) PASS ok kis-flow/test 0.996s
通过结果可以看出,flowClone1和flowName1具有相同的配置信息。
https://github.com/aceld/kis-flow/releases/tag/v0.8
作者:刘丹冰Aceld github: https://github.com/aceld
KisFlow开源项目地址:https://github.com/aceld/kis-flow
Golang框架实战-KisFlow流式计算框架(1)-概述
Golang框架实战-KisFlow流式计算框架(2)-项目构建/基础模块-(上)
Golang框架实战-KisFlow流式计算框架(3)-项目构建/基础模块-(下)
Golang框架实战-KisFlow流式计算框架(4)-数据流
Golang框架实战-KisFlow流式计算框架(5)-Function调度
Golang框架实战-KisFlow流式计算框架(6)-Connector
Golang框架实战-KisFlow流式计算框架(7)-配置导入与导出
Golang框架实战-KisFlow流式计算框架(8)-KisFlow Action
Golang框架实战-KisFlow流式计算框架(10)-Flow多副本
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。