赞
踩
go 写起起来挺爽,在轻量级协程方面,处理起来更灵活,能够快速实现自己想要的多线程任务,下面的任务为了多线程采集服务器端口,并最终阻塞完成可用端口拼接,脚本方面比较简单,这里不给出,完成map转json输出。
下面是线程池的整个实现,为了复用线程池,这里定义了一个任务代理函数代理实际执行任务的Handler函数,利用chan来接受任务,和发送任务结果,并使用通用的类型Interface定义参数,这样我们可以完成很多自定义的任务输入和输出,函数的定义如下:
- //handler 为函数, jobs为定义的任务输入通道,res为输出通道,wg用来管理协程
- func worker(jobs chan interface{}, res chan interface{}, handler interface{}, wg *sync.WaitGroup) {
- wg.Add(1)
- for item := range jobs {
- result := handler.(func(arg interface{}) interface{})(item)
- res <- result
- }
- wg.Done()
- }
定义了线任务中转站后,可以定义任务输入chan、任务输出chan以及任务处理函数,并启动响应的协程
任务输入,启动线程池
- //contrlo workers stop
- wg := sync.WaitGroup{}
- //define how many worker to start
- jobs := make(chan interface{}, 100)
- //define the received res
- results := make(chan interface{}, 100)
-
- for w := 1; w <= 8; w++ {
- go worker(jobs, results, getAvailablePorts, &wg)
- }
阻塞任务,直到所有任务完成,主要关闭chan,防止线程死锁
- //close job
- close(jobs)
- wg.Wait()
- close(results)
- //get the res
- var resMap map[string]map[string]interface{}
- resMap = make(map[string]map[string]interface{})
最后执行任务完成函数,收集所有任务结果,整合所有线程池的输出
- for resValue := range results {
-
- //..todo
- }
下面是完整的例子和程序输出结果
- package main
-
- import (
- "encoding/json"
- "fmt"
- "strings"
- "sync"
- )
-
- func main() {
- //contrlo workers stop
- wg := sync.WaitGroup{}
- //define how many worker to start
- jobs := make(chan interface{}, 100)
- //define the received res
- results := make(chan interface{}, 100)
-
- for w := 1; w <= 8; w++ {
- go worker(jobs, results, getAvailablePorts, &wg)
- }
- for key, value := range GoldenDbPortRange {
- if len(value.ListenPortRange) != 0 {
- value.componentName = key
- value.componentPortType = "ListenPortRange"
- jobs <- value
- }
- if len(value.srcPortRange) != 0 {
- value.componentName = key
- value.componentPortType = "srcPortRange"
- jobs <- value
- }
- }
-
- //close job
- close(jobs)
- wg.Wait()
- close(results)
- //get the res
- var resMap map[string]map[string]interface{}
- resMap = make(map[string]map[string]interface{})
-
- for resValue := range results {
- temp := resValue.(ComponentCheckRes)
- if _, ok := resMap[temp.componentName]; ok {
- //exit
- value := resMap[temp.componentName]
- if temp.componentPortType == "ListenPortRange" {
- value["listenPortRange"] = temp.ListenPortRange
- resMap[temp.componentName] = value
- } else {
- value["srcPortRange"] = temp.srcPortRange
- resMap[temp.componentName] = value
- }
- } else {
- var tempMap map[string]interface{}
- tempMap = make(map[string]interface{})
- if temp.componentPortType == "ListenPortRange" {
- tempMap["listenPortRange"] = temp.ListenPortRange
- resMap[temp.componentName] = tempMap
- } else {
- tempMap["srcPortRange"] = temp.srcPortRange
- resMap[temp.componentName] = tempMap
- }
- }
- }
- //循环遍历Map
- for key, value := range resMap {
- fmt.Printf("%s=>%v\n", key, value)
- }
-
- mjson, _ := json.Marshal(resMap)
- mString := string(mjson)
- fmt.Printf("print mString:%s", mString)
-
- }
-
- type ComponentCheckRes struct {
- srcPortRange []string `json:"srcPortRange"`
- ListenPortRange []string `json:"listenPortRange"`
- componentName string
- componentPortType string
- }
-
- type Component struct {
- srcPortRange string
- ListenPortRange string
- componentName string
- componentPortType string
- }
-
- //to config the goldendb component port range, put as contants if not read from config file
- var GoldenDbPortRange = map[string]Component{
- //host resource
- "1": {srcPortRange: "5003,6401-6405", ListenPortRange: ""},
- "2": {srcPortRange: "", ListenPortRange: "5004,6406-6410"},
- "3": {srcPortRange: "5007,6001-6005", ListenPortRange: "5006,6006-6010"},
- "4": {srcPortRange: "5009,6011-6015", ListenPortRange: "5008,6016-6020"},
- "5": {srcPortRange: "5011,6021-6025", ListenPortRange: "5010,6026-6030"},
- "6": {srcPortRange: "5019,6101-6150", ListenPortRange: "5018,6151-6200"},
- "7": {srcPortRange: "5017,6031-6100", ListenPortRange: "5016,6451-6470"},
- "8": {srcPortRange: "5013,6201-6250", ListenPortRange: "5012,6251-6300"},
- "9": {srcPortRange: "6421-6425", ListenPortRange: "6426-6430"},
- "10": {srcPortRange: "6471-6480", ListenPortRange: ""},
- }
-
- //func worker(jobs <-chan interface{}, res chan<- interface{}, handler interface{}, args ...interface{}) {
- func worker(jobs chan interface{}, res chan interface{}, handler interface{}, wg *sync.WaitGroup) {
- wg.Add(1)
- for item := range jobs {
- result := handler.(func(arg interface{}) interface{})(item)
- res <- result
- }
- wg.Done()
- }
-
- func getAvailablePorts(arg interface{}) interface{} {
- var ports string
- name := arg.(Component).componentName
- portType := arg.(Component).componentPortType
- if portType == "ListenPortRange" {
- ports = arg.(Component).ListenPortRange
- } else {
- ports = arg.(Component).srcPortRange
- }
- //execute sh res
- var portList []string
- var res ComponentCheckRes
- res.componentName = name
- res.componentPortType = portType
- if strings.Contains(ports, ",") {
- //exe sh param1 represent single port
- //exe sh param2 represent list port for first one
- //exe sh param3 represent list port for last one
- str1 := strings.Split(ports, ",")
- param1 := str1[0]
- str2 := strings.Split(str1[1], "-")
- param2 := str2[0]
- param3 := str2[1]
- portList = append(portList, param1)
- portList = append(portList, param2)
- portList = append(portList, param3)
- } else {
- str2 := strings.Split(ports, "-")
- param1 := str2[0]
- param2 := str2[1]
- portList = append(portList, param1)
- portList = append(portList, param2)
- }
-
- if portType == "ListenPortRange" {
- res.ListenPortRange = portList
- } else {
- res.srcPortRange = portList
- }
- return res
- }
- GOROOT=D:\go-sdk #gosetup
- GOPATH=D:\goWorkSpace #gosetup
- D:\go-sdk\bin\go.exe build -o C:\Users\10284791\AppData\Local\Temp\___go_build_hello2_go.exe D:\goWorkSpace\src\hello2.go #gosetup
- C:\Users\10284791\AppData\Local\Temp\___go_build_hello2_go.exe #gosetup
- 5=>map[listenPortRange:[5010 6026 6030] srcPortRange:[5011 6021 6025]]
- 6=>map[listenPortRange:[5018 6151 6200] srcPortRange:[5019 6101 6150]]
- 10=>map[srcPortRange:[6471 6480]]
- 1=>map[srcPortRange:[5003 6401 6405]]
- 2=>map[listenPortRange:[5004 6406 6410]]
- 7=>map[listenPortRange:[5016 6451 6470] srcPortRange:[5017 6031 6100]]
- 4=>map[listenPortRange:[5008 6016 6020] srcPortRange:[5009 6011 6015]]
- 8=>map[listenPortRange:[5012 6251 6300] srcPortRange:[5013 6201 6250]]
- 9=>map[listenPortRange:[6426 6430] srcPortRange:[6421 6425]]
- 3=>map[listenPortRange:[5006 6006 6010] srcPortRange:[5007 6001 6005]]
- print mString:{"1":{"srcPortRange":["5003","6401","6405"]},"10":{"srcPortRange":["6471","6480"]},"2":{"listenPortRange":["5004","6406","6410"]},"3":{"listenPortRange":["5006","6006","6010"],"srcPortRange":["5007","6001","6005"]},"4":{"listenPortRange":["5008","6016","6020"],"srcPortRange":["5009","6011","6015"]},"5":{"listenPortRange":["5010","6026","6030"],"srcPortRange":["5011","6021","6025"]},"6":{"listenPortRange":["5018","6151","6200"],"srcPortRange":["5019","6101","6150"]},"7":{"listenPortRange":["5016","6451","6470"],"srcPortRange":["5017","6031","6100"]},"8":{"listenPortRange":["5012","6251","6300"],"srcPortRange":["5013","6201","6250"]},"9":{"listenPortRange":["6426","6430"],"srcPortRange":["6421","6425"]}}
总结:
go的协程用起来很灵活,可以轻易的启动多个协程干很多事,其中关键的是对协程的控制和任务输入和输出的规划
上面的程序可以完全封装起来,定义任务的输入函数和输出函数,并给定协程的数量,这样就可以正在实现一个阻塞线程池的调用,和java类似
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。