当前位置:   article > 正文

go 简单实现Java线程池阻塞任务_goroot=d:\pojie-go\go-sdk #gosetup gopath=d:\gowor

goroot=d:\pojie-go\go-sdk #gosetup gopath=d:\goworks #gosetup d:\pojie-go\go

go 写起起来挺爽,在轻量级协程方面,处理起来更灵活,能够快速实现自己想要的多线程任务,下面的任务为了多线程采集服务器端口,并最终阻塞完成可用端口拼接,脚本方面比较简单,这里不给出,完成map转json输出。

下面是线程池的整个实现,为了复用线程池,这里定义了一个任务代理函数代理实际执行任务的Handler函数,利用chan来接受任务,和发送任务结果,并使用通用的类型Interface定义参数,这样我们可以完成很多自定义的任务输入和输出,函数的定义如下:

  1. //handler 为函数, jobs为定义的任务输入通道,res为输出通道,wg用来管理协程
  2. func worker(jobs chan interface{}, res chan interface{}, handler interface{}, wg *sync.WaitGroup) {
  3. wg.Add(1)
  4. for item := range jobs {
  5. result := handler.(func(arg interface{}) interface{})(item)
  6. res <- result
  7. }
  8. wg.Done()
  9. }

定义了线任务中转站后,可以定义任务输入chan、任务输出chan以及任务处理函数,并启动响应的协程

任务输入,启动线程池

  1. //contrlo workers stop
  2. wg := sync.WaitGroup{}
  3. //define how many worker to start
  4. jobs := make(chan interface{}, 100)
  5. //define the received res
  6. results := make(chan interface{}, 100)
  7. for w := 1; w <= 8; w++ {
  8. go worker(jobs, results, getAvailablePorts, &wg)
  9. }

阻塞任务,直到所有任务完成,主要关闭chan,防止线程死锁

  1. //close job
  2. close(jobs)
  3. wg.Wait()
  4. close(results)
  5. //get the res
  6. var resMap map[string]map[string]interface{}
  7. resMap = make(map[string]map[string]interface{})

最后执行任务完成函数,收集所有任务结果,整合所有线程池的输出

  1. for resValue := range results {
  2. //..todo
  3. }

下面是完整的例子和程序输出结果

  1. package main
  2. import (
  3. "encoding/json"
  4. "fmt"
  5. "strings"
  6. "sync"
  7. )
  8. func main() {
  9. //contrlo workers stop
  10. wg := sync.WaitGroup{}
  11. //define how many worker to start
  12. jobs := make(chan interface{}, 100)
  13. //define the received res
  14. results := make(chan interface{}, 100)
  15. for w := 1; w <= 8; w++ {
  16. go worker(jobs, results, getAvailablePorts, &wg)
  17. }
  18. for key, value := range GoldenDbPortRange {
  19. if len(value.ListenPortRange) != 0 {
  20. value.componentName = key
  21. value.componentPortType = "ListenPortRange"
  22. jobs <- value
  23. }
  24. if len(value.srcPortRange) != 0 {
  25. value.componentName = key
  26. value.componentPortType = "srcPortRange"
  27. jobs <- value
  28. }
  29. }
  30. //close job
  31. close(jobs)
  32. wg.Wait()
  33. close(results)
  34. //get the res
  35. var resMap map[string]map[string]interface{}
  36. resMap = make(map[string]map[string]interface{})
  37. for resValue := range results {
  38. temp := resValue.(ComponentCheckRes)
  39. if _, ok := resMap[temp.componentName]; ok {
  40. //exit
  41. value := resMap[temp.componentName]
  42. if temp.componentPortType == "ListenPortRange" {
  43. value["listenPortRange"] = temp.ListenPortRange
  44. resMap[temp.componentName] = value
  45. } else {
  46. value["srcPortRange"] = temp.srcPortRange
  47. resMap[temp.componentName] = value
  48. }
  49. } else {
  50. var tempMap map[string]interface{}
  51. tempMap = make(map[string]interface{})
  52. if temp.componentPortType == "ListenPortRange" {
  53. tempMap["listenPortRange"] = temp.ListenPortRange
  54. resMap[temp.componentName] = tempMap
  55. } else {
  56. tempMap["srcPortRange"] = temp.srcPortRange
  57. resMap[temp.componentName] = tempMap
  58. }
  59. }
  60. }
  61. //循环遍历Map
  62. for key, value := range resMap {
  63. fmt.Printf("%s=>%v\n", key, value)
  64. }
  65. mjson, _ := json.Marshal(resMap)
  66. mString := string(mjson)
  67. fmt.Printf("print mString:%s", mString)
  68. }
  69. type ComponentCheckRes struct {
  70. srcPortRange []string `json:"srcPortRange"`
  71. ListenPortRange []string `json:"listenPortRange"`
  72. componentName string
  73. componentPortType string
  74. }
  75. type Component struct {
  76. srcPortRange string
  77. ListenPortRange string
  78. componentName string
  79. componentPortType string
  80. }
  81. //to config the goldendb component port range, put as contants if not read from config file
  82. var GoldenDbPortRange = map[string]Component{
  83. //host resource
  84. "1": {srcPortRange: "5003,6401-6405", ListenPortRange: ""},
  85. "2": {srcPortRange: "", ListenPortRange: "5004,6406-6410"},
  86. "3": {srcPortRange: "5007,6001-6005", ListenPortRange: "5006,6006-6010"},
  87. "4": {srcPortRange: "5009,6011-6015", ListenPortRange: "5008,6016-6020"},
  88. "5": {srcPortRange: "5011,6021-6025", ListenPortRange: "5010,6026-6030"},
  89. "6": {srcPortRange: "5019,6101-6150", ListenPortRange: "5018,6151-6200"},
  90. "7": {srcPortRange: "5017,6031-6100", ListenPortRange: "5016,6451-6470"},
  91. "8": {srcPortRange: "5013,6201-6250", ListenPortRange: "5012,6251-6300"},
  92. "9": {srcPortRange: "6421-6425", ListenPortRange: "6426-6430"},
  93. "10": {srcPortRange: "6471-6480", ListenPortRange: ""},
  94. }
  95. //func worker(jobs <-chan interface{}, res chan<- interface{}, handler interface{}, args ...interface{}) {
  96. func worker(jobs chan interface{}, res chan interface{}, handler interface{}, wg *sync.WaitGroup) {
  97. wg.Add(1)
  98. for item := range jobs {
  99. result := handler.(func(arg interface{}) interface{})(item)
  100. res <- result
  101. }
  102. wg.Done()
  103. }
  104. func getAvailablePorts(arg interface{}) interface{} {
  105. var ports string
  106. name := arg.(Component).componentName
  107. portType := arg.(Component).componentPortType
  108. if portType == "ListenPortRange" {
  109. ports = arg.(Component).ListenPortRange
  110. } else {
  111. ports = arg.(Component).srcPortRange
  112. }
  113. //execute sh res
  114. var portList []string
  115. var res ComponentCheckRes
  116. res.componentName = name
  117. res.componentPortType = portType
  118. if strings.Contains(ports, ",") {
  119. //exe sh param1 represent single port
  120. //exe sh param2 represent list port for first one
  121. //exe sh param3 represent list port for last one
  122. str1 := strings.Split(ports, ",")
  123. param1 := str1[0]
  124. str2 := strings.Split(str1[1], "-")
  125. param2 := str2[0]
  126. param3 := str2[1]
  127. portList = append(portList, param1)
  128. portList = append(portList, param2)
  129. portList = append(portList, param3)
  130. } else {
  131. str2 := strings.Split(ports, "-")
  132. param1 := str2[0]
  133. param2 := str2[1]
  134. portList = append(portList, param1)
  135. portList = append(portList, param2)
  136. }
  137. if portType == "ListenPortRange" {
  138. res.ListenPortRange = portList
  139. } else {
  140. res.srcPortRange = portList
  141. }
  142. return res
  143. }
  1. GOROOT=D:\go-sdk #gosetup
  2. GOPATH=D:\goWorkSpace #gosetup
  3. D:\go-sdk\bin\go.exe build -o C:\Users\10284791\AppData\Local\Temp\___go_build_hello2_go.exe D:\goWorkSpace\src\hello2.go #gosetup
  4. C:\Users\10284791\AppData\Local\Temp\___go_build_hello2_go.exe #gosetup
  5. 5=>map[listenPortRange:[5010 6026 6030] srcPortRange:[5011 6021 6025]]
  6. 6=>map[listenPortRange:[5018 6151 6200] srcPortRange:[5019 6101 6150]]
  7. 10=>map[srcPortRange:[6471 6480]]
  8. 1=>map[srcPortRange:[5003 6401 6405]]
  9. 2=>map[listenPortRange:[5004 6406 6410]]
  10. 7=>map[listenPortRange:[5016 6451 6470] srcPortRange:[5017 6031 6100]]
  11. 4=>map[listenPortRange:[5008 6016 6020] srcPortRange:[5009 6011 6015]]
  12. 8=>map[listenPortRange:[5012 6251 6300] srcPortRange:[5013 6201 6250]]
  13. 9=>map[listenPortRange:[6426 6430] srcPortRange:[6421 6425]]
  14. 3=>map[listenPortRange:[5006 6006 6010] srcPortRange:[5007 6001 6005]]
  15. print mString:{"1":{"srcPortRange":["5003","6401","6405"]},"10":{"srcPortRange":["6471","6480"]},"2":{"listenPortRange":["5004","6406","6410"]},"3":{"listenPortRange":["5006","6006","6010"],"srcPortRange":["5007","6001","6005"]},"4":{"listenPortRange":["5008","6016","6020"],"srcPortRange":["5009","6011","6015"]},"5":{"listenPortRange":["5010","6026","6030"],"srcPortRange":["5011","6021","6025"]},"6":{"listenPortRange":["5018","6151","6200"],"srcPortRange":["5019","6101","6150"]},"7":{"listenPortRange":["5016","6451","6470"],"srcPortRange":["5017","6031","6100"]},"8":{"listenPortRange":["5012","6251","6300"],"srcPortRange":["5013","6201","6250"]},"9":{"listenPortRange":["6426","6430"],"srcPortRange":["6421","6425"]}}

总结:

go的协程用起来很灵活,可以轻易的启动多个协程干很多事,其中关键的是对协程的控制和任务输入和输出的规划

上面的程序可以完全封装起来,定义任务的输入函数和输出函数,并给定协程的数量,这样就可以正在实现一个阻塞线程池的调用,和java类似

声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/小小林熬夜学编程/article/detail/487764
推荐阅读
相关标签
  

闽ICP备14008679号