当前位置:   article > 正文

Golang框架实战-KisFlow流式计算框架(7)-配置导入与导出_golang dag 流式计算框架

golang dag 流式计算框架

Golang框架实战-KisFlow流式计算框架专栏

Golang框架实战-KisFlow流式计算框架(1)-概述
Golang框架实战-KisFlow流式计算框架(2)-项目构建/基础模块-(上)
Golang框架实战-KisFlow流式计算框架(3)-项目构建/基础模块-(下)
Golang框架实战-KisFlow流式计算框架(4)-数据流
Golang框架实战-KisFlow流式计算框架(5)-Function调度
Golang框架实战-KisFlow流式计算框架(6)-Connector
Golang框架实战-KisFlow流式计算框架(7)-配置导入与导出


6.1 配置的导入

现在每次建立Flow和Function等,都需要一系列繁琐的添加,不是很方便,接下来,我们可以通过批量读写配置文件,构建KisFlow中的结构关系,并且也可以将KisFlow的结构导出到本地文件中。目前我们先用文件的形式做配置的持久化,开发者也可以今后做数据库或者远程配置的持久化均可。

6.1 配置的导入

6.1.1 创建配置文件

首先我们在kis-flow/test/load_conf/下创建需要加载的kisflow业务配置文件。
kis-flow/test/load_conf/下分别创建conn/flow/func/三个文件夹分别存放Connector、Flow、Funciton的配置信息。

├── conn
│   └── conn-ConnName1.yml
├── flow
│   └── flow-FlowName1.yml
└── func
    ├── func-FuncName1.yml
    ├── func-FuncName2.yml
    └── func-FuncName3.yml

  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9

分别创建一些yml文件。具体内容如下:

A.Function

kis-flow/test/load_conf/func/func-FuncNam1.yml

kistype: func
fname: funcName1
fmode: Verify
source:
  name: 公众号抖音商城户订单数据
  must:
    - order_id
    - user_id
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8

kis-flow/test/load_conf/func/func-FuncNam2.yml

kistype: func
fname: funcName2
fmode: Save
source:
  name: 用户订单错误率
  must:
    - order_id
    - user_id
option:
  cname: ConnName1
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10

kis-flow/test/load_conf/func/func-FuncNam2.yml

kistype: func
fname: funcName2
fmode: Save
source:
  name: 用户订单错误率
  must:
    - order_id
    - user_id
option:
  cname: ConnName1
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10

kis-flow/test/load_conf/func/func-FuncNam3.yml

kistype: func
fname: funcName3
fmode: Calculate
source:
  name: 用户订单错误率
  must:
    - order_id
    - user_id
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
B.Connector

kis-flow/test/load_conf/func/func-ConnName1.yml

kistype: conn
cname: ConnName1
addrs: '0.0.0.0:9988,0.0.0.0:9999,0.0.0.0:9990'
type: redis
key: redis-key
params:
  args1: value1
  args2: value2
load: null
save:
  - funcName2
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
C.Flow

kis-flow/test/load_conf/func/func-FlowName1.yml

kistype: flow
status: 1
flow_name: flowName1
flows:
  - fname: funcName1
  - fname: funcName2
  - fname: funcName3

  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8

6.1.2 配置解析

创建kis-flow/file/目录,且创建kis-flow/file/config_import.go文件。

首先定义一个可以存放全部配置的接口:

kis-flow/file/config_import.go

type allConfig struct {
	Flows map[string]*config.KisFlowConfig
	Funcs map[string]*config.KisFuncConfig
	Conns map[string]*config.KisConnConfig
}
  • 1
  • 2
  • 3
  • 4
  • 5

key作为各个模块的Name名称字段。
然后分别定义解析Flow、Function、Connector配置的方法。yaml的第三方库,我们用"gopkg.in/yaml.v3"这个库。

kis-flow/go.mod

module kis-flow

go 1.18

require github.com/google/uuid v1.5.0
require gopkg.in/yaml.v3 v3.0.1 // indirect

  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
A. Flow 配置解析

kis-flow/file/config_import.go

// kisTypeFlowConfigure 解析Flow配置文件,yaml格式
func kisTypeFlowConfigure(all *allConfig, confData []byte, fileName string, kisType interface{}) error {
	flow := new(config.KisFlowConfig)
	if ok := yaml.Unmarshal(confData, flow); ok != nil {
		return errors.New(fmt.Sprintf("%s has wrong format kisType = %s", fileName, kisType))
	}

	// 如果FLow状态为关闭,则不做配置加载
	if common.KisOnOff(flow.Status) == common.FlowDisable {
		return nil
	}

	if _, ok := all.Flows[flow.FlowName]; ok {
		return errors.New(fmt.Sprintf("%s set repeat flow_id:%s", fileName, flow.FlowName))
	}

	// 加入配置集合中
	all.Flows[flow.FlowName] = flow

	return nil
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • confData:是文件二进制数据
  • fileName:是文件路径
  • kistype: 为配置文件类别

kisTypeFlowConfigure 会将配置信息解析到allConfig的Flows成员中。
同理,Function和Connector的解析办法如下。

B. Functioin配置解析

kis-flow/file/config_import.go

// kisTypeFuncConfigure 解析Function配置文件,yaml格式
func kisTypeFuncConfigure(all *allConfig, confData []byte, fileName string, kisType interface{}) error {
	function := new(config.KisFuncConfig)
	if ok := yaml.Unmarshal(confData, function); ok != nil {
		return errors.New(fmt.Sprintf("%s has wrong format kisType = %s", fileName, kisType))
	}
	if _, ok := all.Funcs[function.FName]; ok {
		return errors.New(fmt.Sprintf("%s set repeat function_id:%s", fileName, function.FName))
	}

	// 加入配置集合中
	all.Funcs[function.FName] = function

	return nil
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
C. Connector配置解析

kis-flow/file/config_import.go

// kisTypeConnConfigure 解析Connector配置文件,yaml格式
func kisTypeConnConfigure(all *allConfig, confData []byte, fileName string, kisType interface{}) error {
	conn := new(config.KisConnConfig)
	if ok := yaml.Unmarshal(confData, conn); ok != nil {
		return errors.New(fmt.Sprintf("%s is wrong format nsType = %s", fileName, kisType))
	}

	if _, ok := all.Conns[conn.CName]; ok {
		return errors.New(fmt.Sprintf("%s set repeat conn_id:%s", fileName, conn.CName))
	}

	// 加入配置集合中
	all.Conns[conn.CName] = conn

	return nil
}

  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17

6.1.3 遍历文件

下面实现一个遍历一个路径loadPath下面所有的yml和yaml类型文件,按照kistype类别解析配置信息到allConfig中。

kis-flow/file/config_import.go

// parseConfigWalkYaml 全盘解析配置文件,yaml格式, 讲配置信息解析到allConfig中
func parseConfigWalkYaml(loadPath string) (*allConfig, error) {

	all := new(allConfig)

	all.Flows = make(map[string]*config.KisFlowConfig)
	all.Funcs = make(map[string]*config.KisFuncConfig)
	all.Conns = make(map[string]*config.KisConnConfig)

	err := filepath.Walk(loadPath, func(filePath string, info os.FileInfo, err error) error {
		// 校验文件后缀是否合法
		if suffix := path.Ext(filePath); suffix != ".yml" && suffix != ".yaml" {
			return nil
		}

		// 读取文件内容
		confData, err := ioutil.ReadFile(filePath)
		if err != nil {
			return err
		}

		confMap := make(map[string]interface{})

		// 校验yaml合法性
		if err := yaml.Unmarshal(confData, confMap); err != nil {
			return err
		}

		// 判断kisType是否存在
		if kisType, ok := confMap["kistype"]; !ok {
			return errors.New(fmt.Sprintf("yaml file %s has no file [kistype]!", filePath))
		} else {
			switch kisType {
			case common.KisIdTypeFlow:
				return kisTypeFlowConfigure(all, confData, filePath, kisType)

			case common.KisIdTypeFunction:
				return kisTypeFuncConfigure(all, confData, filePath, kisType)

			case common.KisIdTypeConnnector:
				return kisTypeConnConfigure(all, confData, filePath, kisType)

			default:
				return errors.New(fmt.Sprintf("%s set wrong kistype %s", filePath, kisType))
			}
		}
	})

	if err != nil {
		return nil, err
	}

	return all, nil
}

  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40
  • 41
  • 42
  • 43
  • 44
  • 45
  • 46
  • 47
  • 48
  • 49
  • 50
  • 51
  • 52
  • 53
  • 54
  • 55

6.1.4 导入方法

下面提供一个对外的公开方法ConfigImportYaml,需要提供一个导入的文件根路径。

kis-flow/file/config_import.go

// ConfigImportYaml 全盘解析配置文件,yaml格式
func ConfigImportYaml(loadPath string) error {

	all, err := parseConfigWalkYaml(loadPath)
	if err != nil {
		return err
	}

	for flowName, flowConfig := range all.Flows {

		// 构建一个Flow
		newFlow := flow.NewKisFlow(flowConfig)

		for _, fp := range flowConfig.Flows {
			if err := buildFlow(all, fp, newFlow, flowName); err != nil {
				return err
			}
		}

		//将flow添加到FlowPool中
		kis.Pool().AddFlow(flowName, newFlow)
	}

	return nil
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25

首先会调用parseConfigWalkYaml()将全部的配置信息加载到内存中。
其次,遍历所有的Flow,依次去构建Flow,最终将flow添加到Pool当中,具体的构建流程如下:

kis-flow/file/config_import.go

func buildFlow(all *allConfig, fp config.KisFlowFunctionParam, newFlow kis.Flow, flowName string) error {
	//加载当前Flow依赖的Function
	if funcConfig, ok := all.Funcs[fp.FuncName]; !ok {
		return errors.New(fmt.Sprintf("FlowName [%s] need FuncName [%s], But has No This FuncName Config", flowName, fp.FuncName))
	} else {
		//flow add connector
		if funcConfig.Option.CName != "" {
			// 加载当前Function依赖的Connector
			if connConf, ok := all.Conns[funcConfig.Option.CName]; !ok {
				return errors.New(fmt.Sprintf("FuncName [%s] need ConnName [%s], But has No This ConnName Config", fp.FuncName, funcConfig.Option.CName))
			} else {
				// Function Config 关联 Connector Config
				_ = funcConfig.AddConnConfig(connConf)
			}
		}

		//flow add function
		if err := newFlow.Link(funcConfig, fp.Params); err != nil {
			return err
		}
	}

	return nil
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24

6.2 配置导入单元测试

创建单元测试文件 kis-flow/test/kis_config_import_test.go

kis-flow/test/kis_config_import_test.go

package test

import (
	"context"
	"kis-flow/common"
	"kis-flow/file"
	"kis-flow/kis"
	"kis-flow/test/caas"
	"kis-flow/test/faas"
	"testing"
)

func TestConfigImportYmal(t *testing.T) {
	ctx := context.Background()

	// 0. 注册Function 回调业务
	kis.Pool().FaaS("funcName1", faas.FuncDemo1Handler)
	kis.Pool().FaaS("funcName2", faas.FuncDemo2Handler)
	kis.Pool().FaaS("funcName3", faas.FuncDemo3Handler)

	// 0. 注册ConnectorInit 和 Connector 回调业务
	kis.Pool().CaaSInit("ConnName1", caas.InitConnDemo1)
	kis.Pool().CaaS("ConnName1", "funcName2", common.S, caas.CaasDemoHanler1)

	// 1. 加载配置文件并构建Flow
	if err := file.ConfigImportYaml("/Users/gopath/src/kis-flow/test/load_conf/"); err != nil {
		panic(err)
	}

	// 2. 获取Flow
	flow1 := kis.Pool().GetFlow("flowName1")

	// 3. 提交原始数据
	_ = flow1.CommitRow("This is Data1 from Test")
	_ = flow1.CommitRow("This is Data2 from Test")
	_ = flow1.CommitRow("This is Data3 from Test")

	// 4. 执行flow1
	if err := flow1.Run(ctx); err != nil {
		panic(err)
	}
}

  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40
  • 41
  • 42
  • 43

先注册业务方法。然后通过ConfigImportYaml加载配置,之后从Pool中得到flow实例,提交数据,运行。

注意,这里的配置文件路径,写的是绝对路径。

cd 到kis-flow/test/目录下,执行指令:

go test -test.v -test.paniconexit0 -test.run TestConfigImportYmal    
  • 1

结果如下:

=== RUN   TestConfigImportYmal
Add KisPool FuncName=funcName1
Add KisPool FuncName=funcName2
Add KisPool FuncName=funcName3
Add KisPool CaaSInit CName=ConnName1
Add KisPool CaaS CName=ConnName1, FName=funcName2, Mode =Save
===> Call Connector InitDemo1
&{conn ConnName1 0.0.0.0:9988,0.0.0.0:9999,0.0.0.0:9990 redis redis-key map[args1:value1 args2:value2] [] [funcName2 funcName2]}
Add FlowRouter FlowName=flowName1

context.Background
====> After CommitSrcData, flow_name = flowName1, flow_id = flow-bcaaa02a8d4b4a80b2f2895d9cecf20b
All Level Data =
 map[FunctionIdFirstVirtual:[This is Data1 from Test This is Data2 from Test This is Data3 from Test]]

KisFunctionV, flow = &{Id:flow-bcaaa02a8d4b4a80b2f2895d9cecf20b Name:flowName1 Conf:0xc00014eb00 Funcs:map[funcName1:0xc000114960 funcName2:0xc0001149c0 funcName3:0xc000114a20] FlowHead:0xc000114960 FlowTail:0xc000114a20 flock:{w:{state:0 sema:0} writerSem:0 readerSem:0 readerCount:0 readerWait:0} ThisFunction:0xc000114960 ThisFunctionId:func-37c7070f45144529891d433ae9c4ebfc PrevFunctionId:FunctionIdFirstVirtual funcParams:map[func-37c7070f45144529891d433ae9c4ebfc:map[] func-5315301ffbbb4ae4be021729ddff1569:map[] func-89a6a662729b4a0895e849c40bf29892:map[]] fplock:{w:{state:0 sema:0} writerSem:0 readerSem:0 readerCount:0 readerWait:0} buffer:[] data:map[FunctionIdFirstVirtual:[This is Data1 from Test This is Data2 from Test This is Data3 from Test]] inPut:[This is Data1 from Test This is Data2 from Test This is Data3 from Test]}

---> Call funcName1Handler ----
In FuncName = funcName1, FuncId = func-37c7070f45144529891d433ae9c4ebfc, row = This is Data1 from Test
In FuncName = funcName1, FuncId = func-37c7070f45144529891d433ae9c4ebfc, row = This is Data2 from Test
In FuncName = funcName1, FuncId = func-37c7070f45144529891d433ae9c4ebfc, row = This is Data3 from Test
context.Background
 ====> After commitCurData, flow_name = flowName1, flow_id = flow-bcaaa02a8d4b4a80b2f2895d9cecf20b
All Level Data =
 map[FunctionIdFirstVirtual:[This is Data1 from Test This is Data2 from Test This is Data3 from Test] func-37c7070f45144529891d433ae9c4ebfc:[data from funcName[funcName1], index = 0 data from funcName[funcName1], index = 1 data from funcName[funcName1], index = 2]]

KisFunctionS, flow = &{Id:flow-bcaaa02a8d4b4a80b2f2895d9cecf20b Name:flowName1 Conf:0xc00014eb00 Funcs:map[funcName1:0xc000114960 funcName2:0xc0001149c0 funcName3:0xc000114a20] FlowHead:0xc000114960 FlowTail:0xc000114a20 flock:{w:{state:0 sema:0} writerSem:0 readerSem:0 readerCount:0 readerWait:0} ThisFunction:0xc0001149c0 ThisFunctionId:func-5315301ffbbb4ae4be021729ddff1569 PrevFunctionId:func-37c7070f45144529891d433ae9c4ebfc funcParams:map[func-37c7070f45144529891d433ae9c4ebfc:map[] func-5315301ffbbb4ae4be021729ddff1569:map[] func-89a6a662729b4a0895e849c40bf29892:map[]] fplock:{w:{state:0 sema:0} writerSem:0 readerSem:0 readerCount:0 readerWait:0} buffer:[] data:map[FunctionIdFirstVirtual:[This is Data1 from Test This is Data2 from Test This is Data3 from Test] func-37c7070f45144529891d433ae9c4ebfc:[data from funcName[funcName1], index = 0 data from funcName[funcName1], index = 1 data from funcName[funcName1], index = 2]] inPut:[data from funcName[funcName1], index = 0 data from funcName[funcName1], index = 1 data from funcName[funcName1], index = 2]}

---> Call funcName2Handler ----
In FuncName = funcName2, FuncId = func-5315301ffbbb4ae4be021729ddff1569, row = data from funcName[funcName1], index = 0
===> In CaasDemoHanler1: flowName: flowName1, cName:ConnName1, fnName:funcName2, mode:Save
===> Call Connector CaasDemoHanler1, args from funciton: data from funcName[funcName1], index = 0
In FuncName = funcName2, FuncId = func-5315301ffbbb4ae4be021729ddff1569, row = data from funcName[funcName1], index = 1
===> In CaasDemoHanler1: flowName: flowName1, cName:ConnName1, fnName:funcName2, mode:Save
===> Call Connector CaasDemoHanler1, args from funciton: data from funcName[funcName1], index = 1
In FuncName = funcName2, FuncId = func-5315301ffbbb4ae4be021729ddff1569, row = data from funcName[funcName1], index = 2
===> In CaasDemoHanler1: flowName: flowName1, cName:ConnName1, fnName:funcName2, mode:Save
===> Call Connector CaasDemoHanler1, args from funciton: data from funcName[funcName1], index = 2
context.Background
 ====> After commitCurData, flow_name = flowName1, flow_id = flow-bcaaa02a8d4b4a80b2f2895d9cecf20b
All Level Data =
 map[FunctionIdFirstVirtual:[This is Data1 from Test This is Data2 from Test This is Data3 from Test] func-37c7070f45144529891d433ae9c4ebfc:[data from funcName[funcName1], index = 0 data from funcName[funcName1], index = 1 data from funcName[funcName1], index = 2] func-5315301ffbbb4ae4be021729ddff1569:[data from funcName[funcName2], index = 0 data from funcName[funcName2], index = 1 data from funcName[funcName2], index = 2]]

KisFunctionC, flow = &{Id:flow-bcaaa02a8d4b4a80b2f2895d9cecf20b Name:flowName1 Conf:0xc00014eb00 Funcs:map[funcName1:0xc000114960 funcName2:0xc0001149c0 funcName3:0xc000114a20] FlowHead:0xc000114960 FlowTail:0xc000114a20 flock:{w:{state:0 sema:0} writerSem:0 readerSem:0 readerCount:0 readerWait:0} ThisFunction:0xc000114a20 ThisFunctionId:func-89a6a662729b4a0895e849c40bf29892 PrevFunctionId:func-5315301ffbbb4ae4be021729ddff1569 funcParams:map[func-37c7070f45144529891d433ae9c4ebfc:map[] func-5315301ffbbb4ae4be021729ddff1569:map[] func-89a6a662729b4a0895e849c40bf29892:map[]] fplock:{w:{state:0 sema:0} writerSem:0 readerSem:0 readerCount:0 readerWait:0} buffer:[] data:map[FunctionIdFirstVirtual:[This is Data1 from Test This is Data2 from Test This is Data3 from Test] func-37c7070f45144529891d433ae9c4ebfc:[data from funcName[funcName1], index = 0 data from funcName[funcName1], index = 1 data from funcName[funcName1], index = 2] func-5315301ffbbb4ae4be021729ddff1569:[data from funcName[funcName2], index = 0 data from funcName[funcName2], index = 1 data from funcName[funcName2], index = 2]] inPut:[data from funcName[funcName2], index = 0 data from funcName[funcName2], index = 1 data from funcName[funcName2], index = 2]}

---> Call funcName3Handler ----
In FuncName = funcName3, FuncId = func-89a6a662729b4a0895e849c40bf29892, row = data from funcName[funcName2], index = 0
In FuncName = funcName3, FuncId = func-89a6a662729b4a0895e849c40bf29892, row = data from funcName[funcName2], index = 1
In FuncName = funcName3, FuncId = func-89a6a662729b4a0895e849c40bf29892, row = data from funcName[funcName2], index = 2
--- PASS: TestConfigImportYmal (0.01s)
PASS
ok      kis-flow/test   0.517s

  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40
  • 41
  • 42
  • 43
  • 44
  • 45
  • 46
  • 47
  • 48
  • 49
  • 50
  • 51
  • 52
  • 53

预期的结果和我们一致,现在我们可以通过配置文件进行加载且构建KisFlow了。

6.3 配置的导出

6.3.1 导出实现

kis-flow/file/config_export.go

package file

import (
	"errors"
	"fmt"
	"gopkg.in/yaml.v3"
	"io/ioutil"
	"kis-flow/common"
	"kis-flow/kis"
)

// ConfigExportYaml 将flow配置输出,且存储本地
func ConfigExportYaml(flow kis.Flow, savaPath string) error {

	if data, err := yaml.Marshal(flow.GetConfig()); err != nil {
		return err
	} else {
		//flow
		err := ioutil.WriteFile(savaPath+common.KisIdTypeFlow+"-"+flow.GetName()+".yaml", data, 0644)
		if err != nil {
			return err
		}

		//function
		for _, fp := range flow.GetConfig().Flows {
			fConf := flow.GetFuncConfigByName(fp.FuncName)
			if fConf == nil {
				return errors.New(fmt.Sprintf("function name = %s config is nil ", fp.FuncName))
			}

			if fdata, err := yaml.Marshal(fConf); err != nil {
				return err
			} else {
				if err := ioutil.WriteFile(savaPath+common.KisIdTypeFunction+"-"+fp.FuncName+".yaml", fdata, 0644); err != nil {
					return err
				}
			}

			// Connector
			if fConf.Option.CName != "" {
				cConf, err := fConf.GetConnConfig()
				if err != nil {
					return err
				}
				if cdata, err := yaml.Marshal(cConf); err != nil {
					return err
				} else {
					if err := ioutil.WriteFile(savaPath+common.KisIdTypeConnnector+"-"+cConf.CName+".yaml", cdata, 0644); err != nil {
						return err
					}
				}
			}
		}
	}

	return nil
}

  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40
  • 41
  • 42
  • 43
  • 44
  • 45
  • 46
  • 47
  • 48
  • 49
  • 50
  • 51
  • 52
  • 53
  • 54
  • 55
  • 56
  • 57
  • 58

这里面需要补充一些接口,如下:

6.3.2 Flow新增接口

kis-flow/kis/flow.go

package kis

import (
	"context"
	"kis-flow/common"
	"kis-flow/config"
)

type Flow interface {
	// Run 调度Flow,依次调度Flow中的Function并且执行
	Run(ctx context.Context) error
	// Link 将Flow中的Function按照配置文件中的配置进行连接
	Link(fConf *config.KisFuncConfig, fParams config.FParam) error
	// CommitRow 提交Flow数据到即将执行的Function层
	CommitRow(row interface{}) error
	// Input 得到flow当前执行Function的输入源数据
	Input() common.KisRowArr
	// GetName 得到Flow的名称
	GetName() string
	// GetThisFunction 得到当前正在执行的Function
	GetThisFunction() Function
	// GetThisFuncConf 得到当前正在执行的Function的配置
	GetThisFuncConf() *config.KisFuncConfig
	// GetConnector 得到当前正在执行的Function的Connector
	GetConnector() (Connector, error)
	// GetConnConf 得到当前正在执行的Function的Connector的配置
	GetConnConf() (*config.KisConnConfig, error)
    
    // +++++++++++++++++++++++++++++++
	// GetConfig 得到当前Flow的配置
	GetConfig() *config.KisFlowConfig
	// GetFuncConfigByName 得到当前Flow的配置
	GetFuncConfigByName(funcName string) *config.KisFuncConfig
    // +++++++++++++++++++++++++++++++
}

  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36

flow新增的接口实现如下:

kis-flow/flow/kis_flow.go

func (flow *KisFlow) GetConfig() *config.KisFlowConfig {
	return flow.Conf
}

// GetFuncConfigByName 得到当前Flow的配置
func (flow *KisFlow) GetFuncConfigByName(funcName string) *config.KisFuncConfig {
	if f, ok := flow.Funcs[funcName]; ok {
		return f.GetConfig()
	} else {
		log.Logger().ErrorF("GetFuncConfigByName(): Function %s not found", funcName)
		return nil
	}
}

  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14

6.3.3 KisFlow中Funcs修复

这里面之前有个笔误。

kis-flow/flow/kis_flow.go

// KisFlow 用于贯穿整条流式计算的上下文环境
type KisFlow struct {
	// 基础信息
	Id   string                // Flow的分布式实例ID(用于KisFlow内部区分不同实例)
	Name string                // Flow的可读名称
	Conf *config.KisFlowConfig // Flow配置策略

	// Function列表
	Funcs          map[string]kis.Function // 当前flow拥有的全部管理的全部Function对象, key: FunctionName
	FlowHead       kis.Function            // 当前Flow所拥有的Function列表表头
	FlowTail       kis.Function            // 当前Flow所拥有的Function列表表尾
	flock          sync.RWMutex            // 管理链表插入读写的锁
	ThisFunction   kis.Function            // Flow当前正在执行的KisFunction对象
	ThisFunctionId string                  // 当前执行到的Function ID
	PrevFunctionId string                  // 当前执行到的Function 上一层FunctionID

	// Function列表参数
	funcParams map[string]config.FParam // flow在当前Function的自定义固定配置参数,Key:function的实例KisID, value:FParam
	fplock     sync.RWMutex             // 管理funcParams的读写锁

	// 数据
	buffer common.KisRowArr  // 用来临时存放输入字节数据的内部Buf, 一条数据为interface{}, 多条数据为[]interface{} 也就是KisBatch
	data   common.KisDataMap // 流式计算各个层级的数据源
	inPut  common.KisRowArr  // 当前Function的计算输入数据
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25

这里的Funcs成员,其key的含义,之前我们定义的是KisID,现在要修正为key的含义是FunctionName。

下面想Funcs成员赋值的代码做一个简单的修正

// appendFunc 将Function添加到Flow中, 链表操作
func (flow *KisFlow) appendFunc(function kis.Function, fParam config.FParam) error {
   	// ... ... 


	//将Function Name 详细Hash对应关系添加到flow对象中
	flow.Funcs[function.GetConfig().FName] = function

	// ... ... 
}

  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11

6.3.4 KisPool新增方法

kis-flow/kis/pool.go

// GetFlows 得到全部的Flow
func (pool *kisPool) GetFlows() []Flow {
	pool.flowLock.RLock() // 读锁
	defer pool.flowLock.RUnlock()

	var flows []Flow

	for _, flow := range pool.flowRouter {
		flows = append(flows, flow)
	}

	return flows
}

  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14

KisPool新增 获取全部Flow的方法,以支持导出模块使用。

6.4 配置导出单元测试

kis-flow/test/创建kis_config_export_test.go文件。

package test

import (
	"kis-flow/common"
	"kis-flow/file"
	"kis-flow/kis"
	"kis-flow/test/caas"
	"kis-flow/test/faas"
	"testing"
)

func TestConfigExportYmal(t *testing.T) {
	// 0. 注册Function 回调业务
	kis.Pool().FaaS("funcName1", faas.FuncDemo1Handler)
	kis.Pool().FaaS("funcName2", faas.FuncDemo2Handler)
	kis.Pool().FaaS("funcName3", faas.FuncDemo3Handler)

	// 0. 注册ConnectorInit 和 Connector 回调业务
	kis.Pool().CaaSInit("ConnName1", caas.InitConnDemo1)
	kis.Pool().CaaS("ConnName1", "funcName2", common.S, caas.CaasDemoHanler1)

	// 1. 加载配置文件并构建Flow
	if err := file.ConfigImportYaml("/Users/gopath/src/kis-flow/test/load_conf/"); err != nil {
		panic(err)
	}

	// 2. 讲构建的内存KisFlow结构配置导出的文件当中
	flows := kis.Pool().GetFlows()
	for _, flow := range flows {
		if err := file.ConfigExportYaml(flow, "/Users/gopath/src/kis-flow/test/export_conf/"); err != nil {
			panic(err)
		}
	}
}

  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35

cd到kis-flow/test/下 执行:

go test -test.v -test.paniconexit0 -test.run  TestConfigExportYmal 
  • 1

会在kis-flow/test/export_conf/下得到导出的配置。

├── export_conf
│   ├── conn-ConnName1.yaml
│   ├── flow-flowName1.yaml
│   ├── func-funcName1.yaml
│   ├── func-funcName2.yaml
│   └── func-funcName3.yaml
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6

6.5 【V0.5】源代码

https://github.com/aceld/kis-flow/releases/tag/v0.5


作者:刘丹冰Aceld github: https://github.com/aceld
KisFlow开源项目地址:https://github.com/aceld/kis-flow


Golang框架实战-KisFlow流式计算框架专栏

Golang框架实战-KisFlow流式计算框架(1)-概述
Golang框架实战-KisFlow流式计算框架(2)-项目构建/基础模块-(上)
Golang框架实战-KisFlow流式计算框架(3)-项目构建/基础模块-(下)
Golang框架实战-KisFlow流式计算框架(4)-数据流
Golang框架实战-KisFlow流式计算框架(5)-Function调度
Golang框架实战-KisFlow流式计算框架(6)-Connector
Golang框架实战-KisFlow流式计算框架(7)-配置导入与导出

声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/木道寻08/article/detail/746804
推荐阅读
  

闽ICP备14008679号