赞
踩
Golang框架实战-KisFlow流式计算框架(1)-概述
Golang框架实战-KisFlow流式计算框架(2)-项目构建/基础模块-(上)
Golang框架实战-KisFlow流式计算框架(3)-项目构建/基础模块-(下)
Golang框架实战-KisFlow流式计算框架(4)-数据流
Golang框架实战-KisFlow流式计算框架(5)-Function调度
Golang框架实战-KisFlow流式计算框架(6)-Connector
Golang框架实战-KisFlow流式计算框架(7)-配置导入与导出
现在每次建立Flow和Function等,都需要一系列繁琐的添加,不是很方便,接下来,我们可以通过批量读写配置文件,构建KisFlow中的结构关系,并且也可以将KisFlow的结构导出到本地文件中。目前我们先用文件的形式做配置的持久化,开发者也可以今后做数据库或者远程配置的持久化均可。
首先我们在kis-flow/test/load_conf/
下创建需要加载的kisflow业务配置文件。
在kis-flow/test/load_conf/
下分别创建conn/
、flow/
、func/
三个文件夹分别存放Connector、Flow、Funciton的配置信息。
├── conn
│ └── conn-ConnName1.yml
├── flow
│ └── flow-FlowName1.yml
└── func
├── func-FuncName1.yml
├── func-FuncName2.yml
└── func-FuncName3.yml
分别创建一些yml
文件。具体内容如下:
kis-flow/test/load_conf/func/func-FuncNam1.yml
kistype: func
fname: funcName1
fmode: Verify
source:
name: 公众号抖音商城户订单数据
must:
- order_id
- user_id
kis-flow/test/load_conf/func/func-FuncNam2.yml
kistype: func
fname: funcName2
fmode: Save
source:
name: 用户订单错误率
must:
- order_id
- user_id
option:
cname: ConnName1
kis-flow/test/load_conf/func/func-FuncNam2.yml
kistype: func
fname: funcName2
fmode: Save
source:
name: 用户订单错误率
must:
- order_id
- user_id
option:
cname: ConnName1
kis-flow/test/load_conf/func/func-FuncNam3.yml
kistype: func
fname: funcName3
fmode: Calculate
source:
name: 用户订单错误率
must:
- order_id
- user_id
kis-flow/test/load_conf/func/func-ConnName1.yml
kistype: conn
cname: ConnName1
addrs: '0.0.0.0:9988,0.0.0.0:9999,0.0.0.0:9990'
type: redis
key: redis-key
params:
args1: value1
args2: value2
load: null
save:
- funcName2
kis-flow/test/load_conf/func/func-FlowName1.yml
kistype: flow
status: 1
flow_name: flowName1
flows:
- fname: funcName1
- fname: funcName2
- fname: funcName3
创建kis-flow/file/
目录,且创建kis-flow/file/config_import.go
文件。
首先定义一个可以存放全部配置的接口:
kis-flow/file/config_import.go
type allConfig struct {
Flows map[string]*config.KisFlowConfig
Funcs map[string]*config.KisFuncConfig
Conns map[string]*config.KisConnConfig
}
key作为各个模块的Name名称字段。
然后分别定义解析Flow、Function、Connector配置的方法。yaml的第三方库,我们用"gopkg.in/yaml.v3"
这个库。
kis-flow/go.mod
module kis-flow
go 1.18
require github.com/google/uuid v1.5.0
require gopkg.in/yaml.v3 v3.0.1 // indirect
kis-flow/file/config_import.go
// kisTypeFlowConfigure 解析Flow配置文件,yaml格式 func kisTypeFlowConfigure(all *allConfig, confData []byte, fileName string, kisType interface{}) error { flow := new(config.KisFlowConfig) if ok := yaml.Unmarshal(confData, flow); ok != nil { return errors.New(fmt.Sprintf("%s has wrong format kisType = %s", fileName, kisType)) } // 如果FLow状态为关闭,则不做配置加载 if common.KisOnOff(flow.Status) == common.FlowDisable { return nil } if _, ok := all.Flows[flow.FlowName]; ok { return errors.New(fmt.Sprintf("%s set repeat flow_id:%s", fileName, flow.FlowName)) } // 加入配置集合中 all.Flows[flow.FlowName] = flow return nil }
kisTypeFlowConfigure
会将配置信息解析到allConfig的Flows成员中。
同理,Function和Connector的解析办法如下。
kis-flow/file/config_import.go
// kisTypeFuncConfigure 解析Function配置文件,yaml格式
func kisTypeFuncConfigure(all *allConfig, confData []byte, fileName string, kisType interface{}) error {
function := new(config.KisFuncConfig)
if ok := yaml.Unmarshal(confData, function); ok != nil {
return errors.New(fmt.Sprintf("%s has wrong format kisType = %s", fileName, kisType))
}
if _, ok := all.Funcs[function.FName]; ok {
return errors.New(fmt.Sprintf("%s set repeat function_id:%s", fileName, function.FName))
}
// 加入配置集合中
all.Funcs[function.FName] = function
return nil
}
kis-flow/file/config_import.go
// kisTypeConnConfigure 解析Connector配置文件,yaml格式 func kisTypeConnConfigure(all *allConfig, confData []byte, fileName string, kisType interface{}) error { conn := new(config.KisConnConfig) if ok := yaml.Unmarshal(confData, conn); ok != nil { return errors.New(fmt.Sprintf("%s is wrong format nsType = %s", fileName, kisType)) } if _, ok := all.Conns[conn.CName]; ok { return errors.New(fmt.Sprintf("%s set repeat conn_id:%s", fileName, conn.CName)) } // 加入配置集合中 all.Conns[conn.CName] = conn return nil }
下面实现一个遍历一个路径loadPath
下面所有的yml和yaml类型文件,按照kistype类别解析配置信息到allConfig中。
kis-flow/file/config_import.go
// parseConfigWalkYaml 全盘解析配置文件,yaml格式, 讲配置信息解析到allConfig中 func parseConfigWalkYaml(loadPath string) (*allConfig, error) { all := new(allConfig) all.Flows = make(map[string]*config.KisFlowConfig) all.Funcs = make(map[string]*config.KisFuncConfig) all.Conns = make(map[string]*config.KisConnConfig) err := filepath.Walk(loadPath, func(filePath string, info os.FileInfo, err error) error { // 校验文件后缀是否合法 if suffix := path.Ext(filePath); suffix != ".yml" && suffix != ".yaml" { return nil } // 读取文件内容 confData, err := ioutil.ReadFile(filePath) if err != nil { return err } confMap := make(map[string]interface{}) // 校验yaml合法性 if err := yaml.Unmarshal(confData, confMap); err != nil { return err } // 判断kisType是否存在 if kisType, ok := confMap["kistype"]; !ok { return errors.New(fmt.Sprintf("yaml file %s has no file [kistype]!", filePath)) } else { switch kisType { case common.KisIdTypeFlow: return kisTypeFlowConfigure(all, confData, filePath, kisType) case common.KisIdTypeFunction: return kisTypeFuncConfigure(all, confData, filePath, kisType) case common.KisIdTypeConnnector: return kisTypeConnConfigure(all, confData, filePath, kisType) default: return errors.New(fmt.Sprintf("%s set wrong kistype %s", filePath, kisType)) } } }) if err != nil { return nil, err } return all, nil }
下面提供一个对外的公开方法ConfigImportYaml
,需要提供一个导入的文件根路径。
kis-flow/file/config_import.go
// ConfigImportYaml 全盘解析配置文件,yaml格式 func ConfigImportYaml(loadPath string) error { all, err := parseConfigWalkYaml(loadPath) if err != nil { return err } for flowName, flowConfig := range all.Flows { // 构建一个Flow newFlow := flow.NewKisFlow(flowConfig) for _, fp := range flowConfig.Flows { if err := buildFlow(all, fp, newFlow, flowName); err != nil { return err } } //将flow添加到FlowPool中 kis.Pool().AddFlow(flowName, newFlow) } return nil }
首先会调用parseConfigWalkYaml()
将全部的配置信息加载到内存中。
其次,遍历所有的Flow,依次去构建Flow,最终将flow添加到Pool当中,具体的构建流程如下:
kis-flow/file/config_import.go
func buildFlow(all *allConfig, fp config.KisFlowFunctionParam, newFlow kis.Flow, flowName string) error { //加载当前Flow依赖的Function if funcConfig, ok := all.Funcs[fp.FuncName]; !ok { return errors.New(fmt.Sprintf("FlowName [%s] need FuncName [%s], But has No This FuncName Config", flowName, fp.FuncName)) } else { //flow add connector if funcConfig.Option.CName != "" { // 加载当前Function依赖的Connector if connConf, ok := all.Conns[funcConfig.Option.CName]; !ok { return errors.New(fmt.Sprintf("FuncName [%s] need ConnName [%s], But has No This ConnName Config", fp.FuncName, funcConfig.Option.CName)) } else { // Function Config 关联 Connector Config _ = funcConfig.AddConnConfig(connConf) } } //flow add function if err := newFlow.Link(funcConfig, fp.Params); err != nil { return err } } return nil }
创建单元测试文件 kis-flow/test/kis_config_import_test.go
。
kis-flow/test/kis_config_import_test.go
package test import ( "context" "kis-flow/common" "kis-flow/file" "kis-flow/kis" "kis-flow/test/caas" "kis-flow/test/faas" "testing" ) func TestConfigImportYmal(t *testing.T) { ctx := context.Background() // 0. 注册Function 回调业务 kis.Pool().FaaS("funcName1", faas.FuncDemo1Handler) kis.Pool().FaaS("funcName2", faas.FuncDemo2Handler) kis.Pool().FaaS("funcName3", faas.FuncDemo3Handler) // 0. 注册ConnectorInit 和 Connector 回调业务 kis.Pool().CaaSInit("ConnName1", caas.InitConnDemo1) kis.Pool().CaaS("ConnName1", "funcName2", common.S, caas.CaasDemoHanler1) // 1. 加载配置文件并构建Flow if err := file.ConfigImportYaml("/Users/gopath/src/kis-flow/test/load_conf/"); err != nil { panic(err) } // 2. 获取Flow flow1 := kis.Pool().GetFlow("flowName1") // 3. 提交原始数据 _ = flow1.CommitRow("This is Data1 from Test") _ = flow1.CommitRow("This is Data2 from Test") _ = flow1.CommitRow("This is Data3 from Test") // 4. 执行flow1 if err := flow1.Run(ctx); err != nil { panic(err) } }
先注册业务方法。然后通过ConfigImportYaml加载配置,之后从Pool中得到flow实例,提交数据,运行。
注意,这里的配置文件路径,写的是绝对路径。
cd 到kis-flow/test/
目录下,执行指令:
go test -test.v -test.paniconexit0 -test.run TestConfigImportYmal
结果如下:
=== RUN TestConfigImportYmal Add KisPool FuncName=funcName1 Add KisPool FuncName=funcName2 Add KisPool FuncName=funcName3 Add KisPool CaaSInit CName=ConnName1 Add KisPool CaaS CName=ConnName1, FName=funcName2, Mode =Save ===> Call Connector InitDemo1 &{conn ConnName1 0.0.0.0:9988,0.0.0.0:9999,0.0.0.0:9990 redis redis-key map[args1:value1 args2:value2] [] [funcName2 funcName2]} Add FlowRouter FlowName=flowName1 context.Background ====> After CommitSrcData, flow_name = flowName1, flow_id = flow-bcaaa02a8d4b4a80b2f2895d9cecf20b All Level Data = map[FunctionIdFirstVirtual:[This is Data1 from Test This is Data2 from Test This is Data3 from Test]] KisFunctionV, flow = &{Id:flow-bcaaa02a8d4b4a80b2f2895d9cecf20b Name:flowName1 Conf:0xc00014eb00 Funcs:map[funcName1:0xc000114960 funcName2:0xc0001149c0 funcName3:0xc000114a20] FlowHead:0xc000114960 FlowTail:0xc000114a20 flock:{w:{state:0 sema:0} writerSem:0 readerSem:0 readerCount:0 readerWait:0} ThisFunction:0xc000114960 ThisFunctionId:func-37c7070f45144529891d433ae9c4ebfc PrevFunctionId:FunctionIdFirstVirtual funcParams:map[func-37c7070f45144529891d433ae9c4ebfc:map[] func-5315301ffbbb4ae4be021729ddff1569:map[] func-89a6a662729b4a0895e849c40bf29892:map[]] fplock:{w:{state:0 sema:0} writerSem:0 readerSem:0 readerCount:0 readerWait:0} buffer:[] data:map[FunctionIdFirstVirtual:[This is Data1 from Test This is Data2 from Test This is Data3 from Test]] inPut:[This is Data1 from Test This is Data2 from Test This is Data3 from Test]} ---> Call funcName1Handler ---- In FuncName = funcName1, FuncId = func-37c7070f45144529891d433ae9c4ebfc, row = This is Data1 from Test In FuncName = funcName1, FuncId = func-37c7070f45144529891d433ae9c4ebfc, row = This is Data2 from Test In FuncName = funcName1, FuncId = func-37c7070f45144529891d433ae9c4ebfc, row = This is Data3 from Test context.Background ====> After commitCurData, flow_name = flowName1, flow_id = flow-bcaaa02a8d4b4a80b2f2895d9cecf20b All Level Data = map[FunctionIdFirstVirtual:[This is Data1 from Test This is Data2 from Test This is Data3 from Test] func-37c7070f45144529891d433ae9c4ebfc:[data from funcName[funcName1], index = 0 data from funcName[funcName1], index = 1 data from funcName[funcName1], index = 2]] KisFunctionS, flow = &{Id:flow-bcaaa02a8d4b4a80b2f2895d9cecf20b Name:flowName1 Conf:0xc00014eb00 Funcs:map[funcName1:0xc000114960 funcName2:0xc0001149c0 funcName3:0xc000114a20] FlowHead:0xc000114960 FlowTail:0xc000114a20 flock:{w:{state:0 sema:0} writerSem:0 readerSem:0 readerCount:0 readerWait:0} ThisFunction:0xc0001149c0 ThisFunctionId:func-5315301ffbbb4ae4be021729ddff1569 PrevFunctionId:func-37c7070f45144529891d433ae9c4ebfc funcParams:map[func-37c7070f45144529891d433ae9c4ebfc:map[] func-5315301ffbbb4ae4be021729ddff1569:map[] func-89a6a662729b4a0895e849c40bf29892:map[]] fplock:{w:{state:0 sema:0} writerSem:0 readerSem:0 readerCount:0 readerWait:0} buffer:[] data:map[FunctionIdFirstVirtual:[This is Data1 from Test This is Data2 from Test This is Data3 from Test] func-37c7070f45144529891d433ae9c4ebfc:[data from funcName[funcName1], index = 0 data from funcName[funcName1], index = 1 data from funcName[funcName1], index = 2]] inPut:[data from funcName[funcName1], index = 0 data from funcName[funcName1], index = 1 data from funcName[funcName1], index = 2]} ---> Call funcName2Handler ---- In FuncName = funcName2, FuncId = func-5315301ffbbb4ae4be021729ddff1569, row = data from funcName[funcName1], index = 0 ===> In CaasDemoHanler1: flowName: flowName1, cName:ConnName1, fnName:funcName2, mode:Save ===> Call Connector CaasDemoHanler1, args from funciton: data from funcName[funcName1], index = 0 In FuncName = funcName2, FuncId = func-5315301ffbbb4ae4be021729ddff1569, row = data from funcName[funcName1], index = 1 ===> In CaasDemoHanler1: flowName: flowName1, cName:ConnName1, fnName:funcName2, mode:Save ===> Call Connector CaasDemoHanler1, args from funciton: data from funcName[funcName1], index = 1 In FuncName = funcName2, FuncId = func-5315301ffbbb4ae4be021729ddff1569, row = data from funcName[funcName1], index = 2 ===> In CaasDemoHanler1: flowName: flowName1, cName:ConnName1, fnName:funcName2, mode:Save ===> Call Connector CaasDemoHanler1, args from funciton: data from funcName[funcName1], index = 2 context.Background ====> After commitCurData, flow_name = flowName1, flow_id = flow-bcaaa02a8d4b4a80b2f2895d9cecf20b All Level Data = map[FunctionIdFirstVirtual:[This is Data1 from Test This is Data2 from Test This is Data3 from Test] func-37c7070f45144529891d433ae9c4ebfc:[data from funcName[funcName1], index = 0 data from funcName[funcName1], index = 1 data from funcName[funcName1], index = 2] func-5315301ffbbb4ae4be021729ddff1569:[data from funcName[funcName2], index = 0 data from funcName[funcName2], index = 1 data from funcName[funcName2], index = 2]] KisFunctionC, flow = &{Id:flow-bcaaa02a8d4b4a80b2f2895d9cecf20b Name:flowName1 Conf:0xc00014eb00 Funcs:map[funcName1:0xc000114960 funcName2:0xc0001149c0 funcName3:0xc000114a20] FlowHead:0xc000114960 FlowTail:0xc000114a20 flock:{w:{state:0 sema:0} writerSem:0 readerSem:0 readerCount:0 readerWait:0} ThisFunction:0xc000114a20 ThisFunctionId:func-89a6a662729b4a0895e849c40bf29892 PrevFunctionId:func-5315301ffbbb4ae4be021729ddff1569 funcParams:map[func-37c7070f45144529891d433ae9c4ebfc:map[] func-5315301ffbbb4ae4be021729ddff1569:map[] func-89a6a662729b4a0895e849c40bf29892:map[]] fplock:{w:{state:0 sema:0} writerSem:0 readerSem:0 readerCount:0 readerWait:0} buffer:[] data:map[FunctionIdFirstVirtual:[This is Data1 from Test This is Data2 from Test This is Data3 from Test] func-37c7070f45144529891d433ae9c4ebfc:[data from funcName[funcName1], index = 0 data from funcName[funcName1], index = 1 data from funcName[funcName1], index = 2] func-5315301ffbbb4ae4be021729ddff1569:[data from funcName[funcName2], index = 0 data from funcName[funcName2], index = 1 data from funcName[funcName2], index = 2]] inPut:[data from funcName[funcName2], index = 0 data from funcName[funcName2], index = 1 data from funcName[funcName2], index = 2]} ---> Call funcName3Handler ---- In FuncName = funcName3, FuncId = func-89a6a662729b4a0895e849c40bf29892, row = data from funcName[funcName2], index = 0 In FuncName = funcName3, FuncId = func-89a6a662729b4a0895e849c40bf29892, row = data from funcName[funcName2], index = 1 In FuncName = funcName3, FuncId = func-89a6a662729b4a0895e849c40bf29892, row = data from funcName[funcName2], index = 2 --- PASS: TestConfigImportYmal (0.01s) PASS ok kis-flow/test 0.517s
预期的结果和我们一致,现在我们可以通过配置文件进行加载且构建KisFlow了。
kis-flow/file/config_export.go
package file import ( "errors" "fmt" "gopkg.in/yaml.v3" "io/ioutil" "kis-flow/common" "kis-flow/kis" ) // ConfigExportYaml 将flow配置输出,且存储本地 func ConfigExportYaml(flow kis.Flow, savaPath string) error { if data, err := yaml.Marshal(flow.GetConfig()); err != nil { return err } else { //flow err := ioutil.WriteFile(savaPath+common.KisIdTypeFlow+"-"+flow.GetName()+".yaml", data, 0644) if err != nil { return err } //function for _, fp := range flow.GetConfig().Flows { fConf := flow.GetFuncConfigByName(fp.FuncName) if fConf == nil { return errors.New(fmt.Sprintf("function name = %s config is nil ", fp.FuncName)) } if fdata, err := yaml.Marshal(fConf); err != nil { return err } else { if err := ioutil.WriteFile(savaPath+common.KisIdTypeFunction+"-"+fp.FuncName+".yaml", fdata, 0644); err != nil { return err } } // Connector if fConf.Option.CName != "" { cConf, err := fConf.GetConnConfig() if err != nil { return err } if cdata, err := yaml.Marshal(cConf); err != nil { return err } else { if err := ioutil.WriteFile(savaPath+common.KisIdTypeConnnector+"-"+cConf.CName+".yaml", cdata, 0644); err != nil { return err } } } } } return nil }
这里面需要补充一些接口,如下:
kis-flow/kis/flow.go
package kis import ( "context" "kis-flow/common" "kis-flow/config" ) type Flow interface { // Run 调度Flow,依次调度Flow中的Function并且执行 Run(ctx context.Context) error // Link 将Flow中的Function按照配置文件中的配置进行连接 Link(fConf *config.KisFuncConfig, fParams config.FParam) error // CommitRow 提交Flow数据到即将执行的Function层 CommitRow(row interface{}) error // Input 得到flow当前执行Function的输入源数据 Input() common.KisRowArr // GetName 得到Flow的名称 GetName() string // GetThisFunction 得到当前正在执行的Function GetThisFunction() Function // GetThisFuncConf 得到当前正在执行的Function的配置 GetThisFuncConf() *config.KisFuncConfig // GetConnector 得到当前正在执行的Function的Connector GetConnector() (Connector, error) // GetConnConf 得到当前正在执行的Function的Connector的配置 GetConnConf() (*config.KisConnConfig, error) // +++++++++++++++++++++++++++++++ // GetConfig 得到当前Flow的配置 GetConfig() *config.KisFlowConfig // GetFuncConfigByName 得到当前Flow的配置 GetFuncConfigByName(funcName string) *config.KisFuncConfig // +++++++++++++++++++++++++++++++ }
flow新增的接口实现如下:
kis-flow/flow/kis_flow.go
func (flow *KisFlow) GetConfig() *config.KisFlowConfig {
return flow.Conf
}
// GetFuncConfigByName 得到当前Flow的配置
func (flow *KisFlow) GetFuncConfigByName(funcName string) *config.KisFuncConfig {
if f, ok := flow.Funcs[funcName]; ok {
return f.GetConfig()
} else {
log.Logger().ErrorF("GetFuncConfigByName(): Function %s not found", funcName)
return nil
}
}
这里面之前有个笔误。
kis-flow/flow/kis_flow.go
// KisFlow 用于贯穿整条流式计算的上下文环境 type KisFlow struct { // 基础信息 Id string // Flow的分布式实例ID(用于KisFlow内部区分不同实例) Name string // Flow的可读名称 Conf *config.KisFlowConfig // Flow配置策略 // Function列表 Funcs map[string]kis.Function // 当前flow拥有的全部管理的全部Function对象, key: FunctionName FlowHead kis.Function // 当前Flow所拥有的Function列表表头 FlowTail kis.Function // 当前Flow所拥有的Function列表表尾 flock sync.RWMutex // 管理链表插入读写的锁 ThisFunction kis.Function // Flow当前正在执行的KisFunction对象 ThisFunctionId string // 当前执行到的Function ID PrevFunctionId string // 当前执行到的Function 上一层FunctionID // Function列表参数 funcParams map[string]config.FParam // flow在当前Function的自定义固定配置参数,Key:function的实例KisID, value:FParam fplock sync.RWMutex // 管理funcParams的读写锁 // 数据 buffer common.KisRowArr // 用来临时存放输入字节数据的内部Buf, 一条数据为interface{}, 多条数据为[]interface{} 也就是KisBatch data common.KisDataMap // 流式计算各个层级的数据源 inPut common.KisRowArr // 当前Function的计算输入数据 }
这里的Funcs成员,其key的含义,之前我们定义的是KisID,现在要修正为key的含义是FunctionName。
下面想Funcs成员赋值的代码做一个简单的修正
// appendFunc 将Function添加到Flow中, 链表操作
func (flow *KisFlow) appendFunc(function kis.Function, fParam config.FParam) error {
// ... ...
//将Function Name 详细Hash对应关系添加到flow对象中
flow.Funcs[function.GetConfig().FName] = function
// ... ...
}
kis-flow/kis/pool.go
// GetFlows 得到全部的Flow
func (pool *kisPool) GetFlows() []Flow {
pool.flowLock.RLock() // 读锁
defer pool.flowLock.RUnlock()
var flows []Flow
for _, flow := range pool.flowRouter {
flows = append(flows, flow)
}
return flows
}
KisPool新增 获取全部Flow的方法,以支持导出模块使用。
在kis-flow/test/
创建kis_config_export_test.go
文件。
package test import ( "kis-flow/common" "kis-flow/file" "kis-flow/kis" "kis-flow/test/caas" "kis-flow/test/faas" "testing" ) func TestConfigExportYmal(t *testing.T) { // 0. 注册Function 回调业务 kis.Pool().FaaS("funcName1", faas.FuncDemo1Handler) kis.Pool().FaaS("funcName2", faas.FuncDemo2Handler) kis.Pool().FaaS("funcName3", faas.FuncDemo3Handler) // 0. 注册ConnectorInit 和 Connector 回调业务 kis.Pool().CaaSInit("ConnName1", caas.InitConnDemo1) kis.Pool().CaaS("ConnName1", "funcName2", common.S, caas.CaasDemoHanler1) // 1. 加载配置文件并构建Flow if err := file.ConfigImportYaml("/Users/gopath/src/kis-flow/test/load_conf/"); err != nil { panic(err) } // 2. 讲构建的内存KisFlow结构配置导出的文件当中 flows := kis.Pool().GetFlows() for _, flow := range flows { if err := file.ConfigExportYaml(flow, "/Users/gopath/src/kis-flow/test/export_conf/"); err != nil { panic(err) } } }
cd到kis-flow/test/
下 执行:
go test -test.v -test.paniconexit0 -test.run TestConfigExportYmal
会在kis-flow/test/export_conf/
下得到导出的配置。
├── export_conf
│ ├── conn-ConnName1.yaml
│ ├── flow-flowName1.yaml
│ ├── func-funcName1.yaml
│ ├── func-funcName2.yaml
│ └── func-funcName3.yaml
https://github.com/aceld/kis-flow/releases/tag/v0.5
作者:刘丹冰Aceld github: https://github.com/aceld
KisFlow开源项目地址:https://github.com/aceld/kis-flow
Golang框架实战-KisFlow流式计算框架(1)-概述
Golang框架实战-KisFlow流式计算框架(2)-项目构建/基础模块-(上)
Golang框架实战-KisFlow流式计算框架(3)-项目构建/基础模块-(下)
Golang框架实战-KisFlow流式计算框架(4)-数据流
Golang框架实战-KisFlow流式计算框架(5)-Function调度
Golang框架实战-KisFlow流式计算框架(6)-Connector
Golang框架实战-KisFlow流式计算框架(7)-配置导入与导出
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。