赞
踩
原文作者:smallnest
上一篇epoll方式减少资源占用 介绍了测试环境以及epoll方式实现百万连接的TCP服务器。这篇文章介绍百万连接服务器的几种实现方式,以及它们的吞吐率和延迟。
这几种服务器的实现包括:epoll
、multiple epoller
、prefork
和 workerpool
。
第一篇 百万 Go TCP 连接的思考: epoll方式减少资源占用
第二篇 百万 Go TCP 连接的思考2: 百万连接的吞吐率和延迟
第三篇 百万 Go TCP 连接的思考: 正常连接下的吞吐率和延迟
相关代码已发布到github上: 1m-go-tcp-server。
上一篇已经介绍了epoll方式的实现,为了测试吞吐率,我们需要通过传递特殊的数据来计算。
客户端将它发送数据时的时间戳传给服务器,这个时间戳只需要8个字节,服务器不需要任何改动,只需要原封不动的将数据回传给客户端:
1...... 2var ( 3 opsRate = metrics.NewRegisteredMeter("ops", nil) 4) 5func start() { 6 for { 7 connections, err := epoller.Wait() 8 if err != nil { 9 log.Printf("failed to epoll wait %v", err)10 continue11 }12 for _, conn := range connections {13 if conn == nil {14 break15 }16 // 将消息(时间戳)原封不动的写回17 _, err = io.CopyN(conn, conn, 8)18 if err != nil {19 if err := epoller.Remove(conn); err != nil {20 log.Printf("failed to remove %v", err)21 }22 conn.Close()23 }24 opsRate.Mark(1)25 }26 }27} 2var ( 3 opsRate = metrics.NewRegisteredMeter("ops", nil) 4) 5func start() { 6 for { 7 connections, err := epoller.Wait() 8 if err != nil { 9 log.Printf("failed to epoll wait %v", err)10 continue11 }12 for _, conn := range connections {13 if conn == nil {14 break15 }16 // 将消息(时间戳)原封不动的写回17 _, err = io.CopyN(conn, conn, 8)18 if err != nil {19 if err := epoller.Remove(conn); err != nil {20 log.Printf("failed to remove %v", err)21 }22 conn.Close()23 }24 opsRate.Mark(1)25 }26 }27}
这里epoll我们并没有注册为边缘触发的方式,默认是水平触发的方式。
每次读取8个字节(时间戳),然后返回给客户端。同时metric记录一次。
metric库使用的是rcrowley/go-metrics。
客户端不再发送hello world
数据,而是当前的时间戳,收到服务器的返回后,就可以计算出一次请求的总共的花费(延迟,latency),然后发送下一个请求。
所以客户端的测试并不是pipeline的方式,以下所有的测试都不是pipeline的方式,而是收到返回再发下一个请求。
客户端也需要改成epoll的方式,原先一个goroutine轮训所有的连接的方式性能比较底下,所以改成epoll的方式:
1package main 2import ( 3 "encoding/binary" 4 "flag" 5 "fmt" 6 "log" 7 "net" 8 "os" 9 "syscall" 10 "time" 11 "github.com/rcrowley/go-metrics" 12) 13var ( 14 ip = flag.String("ip", "127.0.0.1", "server IP") 15 connections = flag.Int("conn", 1, "number of tcp connections") 16 startMetric = flag.String("sm", time.Now().Format("2006-01-02T15:04:05 -0700"), "start time point of all clients") 17) 18var ( 19 opsRate = metrics.NewRegisteredTimer("ops", nil) 20) 21var epoller *epoll 22// client改造成epoll方式, 处理epoll消息是单线程的 23func main() { 24 flag.Parse() 25 go func() { 26 startPoint, err := time.Parse("2006-01-02T15:04:05 -0700", *startMetric) 27 if err != nil { 28 panic(err) 29 } 30 time.Sleep(startPoint.Sub(time.Now())) 31 metrics.Log(metrics.DefaultRegistry, 5*time.Second, log.New(os.Stderr, "metrics: ", log.Lmicroseconds)) 32 }() 33 var err error 34 epoller, err = MkEpoll() 35 if err != nil { 36 panic(err) 37 } 38 addr := *ip + ":8972" 39 log.Printf("连接到 %s", addr) 40 var conns []net.Conn 41 for i := 0; i < *connections; i++ { 42 c, err := net.DialTimeout("tcp", addr, 10*time.Second) 43 if err != nil { 44 fmt.Println("failed to connect", i, err) 45 i-- 46 continue 47 } 48 if err := epoller.Add(c); err != nil { 49 log.Printf("failed to add connection %v", err) 50 c.Close() 51 } 52 conns = append(conns, c) 53 } 54 log.Printf("完成初始化 %d 连接", len(conns)) 55 tts := time.Second 56 if *connections > 100 { 57 tts = time.Millisecond * 5 58 } 59 go start() 60 for i := 0; i < len(conns); i++ { 61 time.Sleep(tts) 62 conn := conns[i] 63 err = binary.Write(conn, binary.BigEndian, time.Now().UnixNano()) 64 if err != nil { 65 log.Printf("failed to write timestamp %v", err) 66 if err := epoller.Remove(conn); err != nil { 67 if err := epoller.Remove(conn); err != nil { 68 log.Printf("failed to remove %v", err) 69 } 70 } 71 } 72 } 73 select {} 74} 75func start() { 76 var nano int64 77 for { 78 connections, err := epoller.Wait() 79 if err != nil { 80 log.Printf("failed to epoll wait %v", err) 81 continue 82 } 83 for _, conn := range connections { 84 if conn == nil { 85 break 86 } 87 if err := binary.Read(conn, binary.BigEndian, &nano); err != nil { 88 log.Printf("failed to read %v", err) 89 if err := epoller.Remove(conn); err != nil { 90 log.Printf("failed to remove %v", err) 91 } 92 conn.Close() 93 continue 94 } else { 95 opsRate.Update(time.Duration(time.Now().UnixNano() - nano)) 96 } 97 err = binary.Write(conn, binary.BigEndian, time.Now().UnixNano()) 98 if err != nil { 99 log.Printf("failed to write %v", err)100 if err := epoller.Remove(conn); err != nil {101 log.Printf("failed to remove %v", err)102 }103 conn.Close()104 }105 }106 }107}package main 2import ( 3 "encoding/binary" 4 "flag" 5 "fmt" 6 "log" 7 "net" 8 "os" 9 "syscall" 10 "time" 11 "github.com/rcrowley/go-metrics" 12) 13var ( 14 ip = flag.String("ip", "127.0.0.1", "server IP") 15 connections = flag.Int("conn", 1, "number of tcp connections") 16 startMetric = flag.String("sm", time.Now().Format("2006-01-02T15:04:05 -0700"), "start time point of all clients") 17) 18var ( 19 opsRate = metrics.NewRegisteredTimer("ops", nil) 20) 21var epoller *epoll 22// client改造成epoll方式, 处理epoll消息是单线程的 23func main() { 24 flag.Parse() 25 go func() { 26 startPoint, err := time.Parse("2006-01-02T15:04:05 -0700", *startMetric) 27 if err != nil { 28 panic(err) 29 } 30 time.Sleep(startPoint.Sub(time.Now())) 31 metrics.Log(metrics.DefaultRegistry, 5*time.Second, log.New(os.Stderr, "metrics: ", log.Lmicroseconds)) 32 }() 33 var err error 34 epoller, err = MkEpoll() 35 if err != nil { 36 panic(err) 37 } 38 addr := *ip + ":8972" 39 log.Printf("连接到 %s", addr) 40 var conns []net.Conn 41 for i := 0; i < *connections; i++ { 42 c, err := net.DialTimeout("tcp", addr, 10*time.Second) 43 if err != nil { 44 fmt.Println("failed to connect", i, err) 45 i-- 46 continue 47 } 48 if err := epoller.Add(c); err != nil { 49 log.Printf("failed to add connection %v", err) 50 c.Close() 51 } 52 conns = append(conns, c) 53 } 54 log.Printf("完成初始化 %d 连接", len(conns)) 55 tts := time.Second 56 if *connections > 100 { 57 tts = time.Millisecond * 5 58 } 59 go start() 60 for i := 0; i < len(conns); i++ { 61 time.Sleep(tts) 62 conn := conns[i] 63 err = binary.Write(conn, binary.BigEndian, time.Now().UnixNano()) 64 if err != nil { 65 log.Printf("failed to write timestamp %v", err) 66 if err := epoller.Remove(conn); err != nil { 67 if err := epoller.Remove(conn); err != nil { 68 log.Printf("failed to remove %v", err) 69 } 70 } 71 } 72 } 73 select {} 74} 75func start() { 76 var nano int64 77 for { 78 connections, err := epoller.Wait() 79 if err != nil { 80 log.Printf("failed to epoll wait %v", err) 81 continue 82 } 83 for _, conn := range connections { 84 if conn == nil { 85 break 86 } 87 if err := binary.Read(conn, binary.BigEndian, &nano); err != nil { 88 log.Printf("failed to read %v", err) 89 if err := epoller.Remove(conn); err != nil { 90 log.Printf("failed to remove %v", err) 91 } 92 conn.Close() 93 continue 94 } else { 95 opsRate.Update(time.Duration(time.Now().UnixNano() - nano)) 96 } 97 err = binary.Write(conn, binary.BigEndian, time.Now().UnixNano()) 98 if err != nil { 99 log.Printf("failed to write %v", err)100 if err := epoller.Remove(conn); err != nil {101 log.Printf("failed to remove %v", err)102 }103 conn.Close()104 }105 }106 }107}
使用的epoll实现代码和服务器端是一样的。
客户端的统计会遇到一个问题,因为我们会启动50个docker容器,计算客户端的吞吐率的时候我们需要统计同一个时间段内这50个容器所有的请求和延迟。这里我们用了一个小小的技巧,让metrics库再同一个时间打印出它们的统计数据,基本可以保证统计的是这50个容器的同一个时间段内的指标。
这里我们对50个容器的日志进行统计, 汇总吞吐率进行相加,可以得到吞吐率(TPS)为 42495, 延迟(latency)为 23秒。
在上面的实现中,我们的客户端使用一个epoller处理所有的请求, 在事件监听的处理中,使用一个goroutine处理接收的所有的事件,如果处理事件比较慢,这个单一的goroutine将会是严重的瓶颈。
所以我们要把它改成多goroutine的方式去处理。一种方式是启动一个线程池,采用多event loop的方式处理事件,另外一种方式是使用多个epoller, 每个epoller处理一批连接,每个epoller独自占用一个goroutine。 我们的客户端采用第二种方式,实现起来比较简单。
Linux的Accept和epoller都曾有惊群的现象,也就是一个一个事件到来后会唤醒所有的监听的线程,目前这个问题应该已经不存在了。
1func main() { 2 flag.Parse() 3 setLimit() 4 go func() { 5 startPoint, _ := time.Parse("2006-01-02T15:04:05 -0700", *startMetric) 6 time.Sleep(startPoint.Sub(time.Now())) 7 metrics.Log(metrics.DefaultRegistry, 5*time.Second, log.New(os.Stderr, "metrics: ", log.Lmicroseconds)) 8 }() 9 addr := *ip + ":8972"10 log.Printf("连接到 %s", addr)11 for i := 0; i < *c; i++ {12 go mkClient(addr, *connections/(*c))13 }14 select {}15}16func mkClient(addr string, connections int) {17 epoller, err := MkEpoll()18 if err != nil {19 panic(err)20 }21 var conns []net.Conn22 for i := 0; i < connections; i++ {23 c, err := net.DialTimeout("tcp", addr, 10*time.Second)24 if err != nil {25 fmt.Println("failed to connect", i, err)26 i--27 continue28 }29 if err := epoller.Add(c); err != nil {30 log.Printf("failed to add connection %v", err)31 c.Close()32 }33 conns = append(conns, c)34 }35 log.Printf("完成初始化 %d 连接", len(conns))36 go start(epoller)37 tts := time.Second38 if *c > 100 {39 tts = time.Millisecond * 540 }41 for i := 0; i < len(conns); i++ {42 time.Sleep(tts)43 conn := conns[i]44 err = binary.Write(conn, binary.BigEndian, time.Now().UnixNano())45 if err != nil {46 log.Printf("failed to write timestamp %v", err)47 if err := epoller.Remove(conn); err != nil {48 if err := epoller.Remove(conn); err != nil {49 log.Printf("failed to remove %v", err)50 }51 }52 }53 }54 select {}55}56func start(epoller *epoll) {57 ...... //同上58}func main() { 2 flag.Parse() 3 setLimit() 4 go func() { 5 startPoint, _ := time.Parse("2006-01-02T15:04:05 -0700", *startMetric) 6 time.Sleep(startPoint.Sub(time.Now())) 7 metrics.Log(metrics.DefaultRegistry, 5*time.Second, log.New(os.Stderr, "metrics: ", log.Lmicroseconds)) 8 }() 9 addr := *ip + ":8972"10 log.Printf("连接到 %s", addr)11 for i := 0; i < *c; i++ {12 go mkClient(addr, *connections/(*c))13 }14 select {}15}16func mkClient(addr string, connections int) {17 epoller, err := MkEpoll()18 if err != nil {19 panic(err)20 }21 var conns []net.Conn22 for i := 0; i < connections; i++ {23 c, err := net.DialTimeout("tcp", addr, 10*time.Second)24 if err != nil {25 fmt.Println("failed to connect", i, err)26 i--27 continue28 }29 if err := epoller.Add(c); err != nil {30 log.Printf("failed to add connection %v", err)31 c.Close()32 }33 conns = append(conns, c)34 }35 log.Printf("完成初始化 %d 连接", len(conns))36 go start(epoller)37 tts := time.Second38 if *c > 100 {39 tts = time.Millisecond * 540 }41 for i := 0; i < len(conns); i++ {42 time.Sleep(tts)43 conn := conns[i]44 err = binary.Write(conn, binary.BigEndian, time.Now().UnixNano())45 if err != nil {46 log.Printf("failed to write timestamp %v", err)47 if err := epoller.Remove(conn); err != nil {48 if err := epoller.Remove(conn); err != nil {49 log.Printf("failed to remove %v", err)50 }51 }52 }53 }54 select {}55}56func start(epoller *epoll) {57 ...... //同上58}
测试脚本稍微一下,增加一个epoller数量的控制:
1CONNECTIONS=$1 2REPLICAS=$2 3IP=$3 4CONCURRENCY=$4 5DATE=`date -d "+2 minutes" +"%FT%T %z"` 6for (( c=0; c<${REPLICAS}; c++ )) 7do 8 docker run -v $(pwd)/mclient:/client --name 1mclient_$c -d alpine /client \ 9 -conn=${CONNECTIONS} -ip=${IP} -c=${CONCURRENCY} -sm "${DATE}"10done$1 2REPLICAS=$2 3IP=$3 4CONCURRENCY=$4 5DATE=`date -d "+2 minutes" +"%FT%T %z"` 6for (( c=0; c<${REPLICAS}; c++ )) 7do 8 docker run -v $(pwd)/mclient:/client --name 1mclient_$c -d alpine /client \ 9 -conn=${CONNECTIONS} -ip=${IP} -c=${CONCURRENCY} -sm "${DATE}"10done
这里我们对50个容器的日志进行统计, 汇总吞吐率进行相加,可以得到吞吐率(TPS)为 42402, 延迟(latency)为 0.8秒。
吞吐率并没有增加,但是得益于我们客户端可以并发的处理消息,可以大大减小事务的延迟,将相关的延迟可以降低到一秒以下。
基于我们上面客户端使用多个epoller的启发,我们可以修改服务器端也采用多个epoller的方式,看看是否能增加吞吐率或者降低延迟。
1package main 2import ( 3 "flag" 4 "io" 5 "log" 6 "net" 7 "net/http" 8 _ "net/http/pprof" 9 "os"10 "syscall"11 "time"12 "github.com/libp2p/go-reuseport"13 "github.com/rcrowley/go-metrics"14)15var (16 c = flag.Int("c", 10, "concurrency")17)18var (19 opsRate = metrics.NewRegisteredMeter("ops", nil)20)21func main() {22 flag.Parse()23 go metrics.Log(metrics.DefaultRegistry, 5*time.Second, log.New(os.Stderr, "metrics: ", log.Lmicroseconds))24 go func() {25 if err := http.ListenAndServe(":6060", nil); err != nil {26 log.Fatalf("pprof failed: %v", err)27 }28 }()29 for i := 0; i < *c; i++ {30 go startEpoll()31 }32 select {}33}34func startEpoll() {35 ln, err := reuseport.Listen("tcp", ":8972")36 if err != nil {37 panic(err)38 }39 epoller, err := MkEpoll()40 if err != nil {41 panic(err)42 }43 go start(epoller)44 for {45 conn, e := ln.Accept()46 if e != nil {47 if ne, ok := e.(net.Error); ok && ne.Temporary() {48 log.Printf("accept temp err: %v", ne)49 continue50 }51 log.Printf("accept err: %v", e)52 return53 }54 if err := epoller.Add(conn); err != nil {55 log.Printf("failed to add connection %v", err)56 conn.Close()57 }58 }59}60func start(epoller *epoll) {61 for {62 connections, err := epoller.Wait()63 if err != nil {64 log.Printf("failed to epoll wait %v", err)65 continue66 }67 for _, conn := range connections {68 if conn == nil {69 break70 }71 io.CopyN(conn, conn, 8)72 if err != nil {73 if err := epoller.Remove(conn); err != nil {74 log.Printf("failed to remove %v", err)75 }76 conn.Close()77 }78 opsRate.Mark(1)79 }80 }81}package main 2import ( 3 "flag" 4 "io" 5 "log" 6 "net" 7 "net/http" 8 _ "net/http/pprof" 9 "os"10 "syscall"11 "time"12 "github.com/libp2p/go-reuseport"13 "github.com/rcrowley/go-metrics"14)15var (16 c = flag.Int("c", 10, "concurrency")17)18var (19 opsRate = metrics.NewRegisteredMeter("ops", nil)20)21func main() {22 flag.Parse()23 go metrics.Log(metrics.DefaultRegistry, 5*time.Second, log.New(os.Stderr, "metrics: ", log.Lmicroseconds))24 go func() {25 if err := http.ListenAndServe(":6060", nil); err != nil {26 log.Fatalf("pprof failed: %v", err)27 }28 }()29 for i := 0; i < *c; i++ {30 go startEpoll()31 }32 select {}33}34func startEpoll() {35 ln, err := reuseport.Listen("tcp", ":8972")36 if err != nil {37 panic(err)38 }39 epoller, err := MkEpoll()40 if err != nil {41 panic(err)42 }43 go start(epoller)44 for {45 conn, e := ln.Accept()46 if e != nil {47 if ne, ok := e.(net.Error); ok && ne.Temporary() {48 log.Printf("accept temp err: %v", ne)49 continue50 }51 log.Printf("accept err: %v", e)52 return53 }54 if err := epoller.Add(conn); err != nil {55 log.Printf("failed to add connection %v", err)56 conn.Close()57 }58 }59}60func start(epoller *epoll) {61 for {62 connections, err := epoller.Wait()63 if err != nil {64 log.Printf("failed to epoll wait %v", err)65 continue66 }67 for _, conn := range connections {68 if conn == nil {69 break70 }71 io.CopyN(conn, conn, 8)72 if err != nil {73 if err := epoller.Remove(conn); err != nil {74 log.Printf("failed to remove %v", err)75 }76 conn.Close()77 }78 opsRate.Mark(1)79 }80 }81}
和客户端的类似,我们启动了多个epoller。这里我们使用reuseport
库启动多个goroutine监听同一个端口,这个特性应该在较新的Linux内核上已经支持, 内核会负责负载均衡。
当然我们也可以启动一个goroutine进行监听,接收到客户端的请求后在交给某个epoller进行处理(随机或者轮询),我们就负责连接的负载均衡。
再或者,多个goroutine可以同时调用同一个listener.Accept
方法,对Accept
进行竞争。
后面的处理逻辑和单个的epoller的方式是一样的,只不过我们使用多个goroutine进行处理。
这里我们对50个容器的日志进行统计, 汇总吞吐率进行相加,可以得到吞吐率(TPS)为 197814, 延迟(latency)为 0.9秒。
以下所有的测试都使用多epoller的客户端,下面的比较也是针对多epoller的客户端的测试:
和单poller的服务器实现相比较,多epoller的服务器客户端吞吐率大幅增加,而延迟略微增加。
Prefork 是Apache实现的一种服务方式。一个单一的控制进程启动的时候负责启动多个子进程,每个子进程都是独立的,使用单一的goroutine处理消息事件。
这是一个有趣的实现方式,子进程可以共享父进程打开的文件,这样我们就可以把net.Listener传给子进程,让所有的子进程共同监听这个端口。
传递给子进程的文件是通过exec.Cmd.ExtraFiles
字段进行传递的:
1type Cmd struct { 2 ...... 3 // ExtraFiles specifies additional open files to be inherited by the 4 // new process. It does not include standard input, standard output, or 5 // standard error. If non-nil, entry i becomes file descriptor 3+i. 6 // 7 // ExtraFiles is not supported on Windows. 8 ExtraFiles []*os.File 9 ......10}type Cmd struct { 2 ...... 3 // ExtraFiles specifies additional open files to be inherited by the 4 // new process. It does not include standard input, standard output, or 5 // standard error. If non-nil, entry i becomes file descriptor 3+i. 6 // 7 // ExtraFiles is not supported on Windows. 8 ExtraFiles []*os.File 9 ......10}
正如注释中所指出的,传递的第i个文件在子进程中的文件描述符为 3+i,所以如果父进程中启动子进程的命令如下的话:
1a_file_descriptor, _ := tcplistener.File()2children[i] = exec.Command(os.Args[0], "-prefork", "-child")3children[i].Stdout = os.Stdout4children[i].Stderr = os.Stderr5children[i].ExtraFiles = []*os.File{a_file_descriptor}2children[i] = exec.Command(os.Args[0], "-prefork", "-child")3children[i].Stdout = os.Stdout4children[i].Stderr = os.Stderr5children[i].ExtraFiles = []*os.File{a_file_descriptor}
子进程你可以这样得到这个父进程的文件:
1listener, err = net.FileListener(os.NewFile(3, ""))os.NewFile(3, ""))
我们实现的是父进程和子进程共享同一个listener的方式, 如果你使用reuseport在每个子进程打开同一个端口应该也是可以的,这样就父子之间不需要共享同一个文件了。
完整的服务器实现如下:
1package main 2import ( 3 "flag" 4 "io" 5 "log" 6 "net" 7 "os" 8 "os/exec" 9 "syscall" 10) 11var ( 12 c = flag.Int("c", 10, "concurrency") 13 prefork = flag.Bool("prefork", false, "use prefork") 14 child = flag.Bool("child", false, "is child proc") 15) 16func main() { 17 flag.Parse() 18 19 var ln net.Listener 20 var err error 21 if *prefork { 22 ln = doPrefork(*c) 23 } else { 24 ln, err = net.Listen("tcp", ":8972") 25 if err != nil { 26 panic(err) 27 } 28 } 29 startEpoll(ln) 30 select {} 31} 32func startEpoll(ln net.Listener) { 33 epoller, err := MkEpoll() 34 if err != nil { 35 panic(err) 36 } 37 go start(epoller) 38 for { 39 conn, e := ln.Accept() 40 if e != nil { 41 if ne, ok := e.(net.Error); ok && ne.Temporary() { 42 log.Printf("accept temp err: %v", ne) 43 continue 44 } 45 log.Printf("accept err: %v", e) 46 return 47 } 48 if err := epoller.Add(conn); err != nil { 49 log.Printf("failed to add connection %v", err) 50 conn.Close() 51 } 52 } 53} 54func doPrefork(c int) net.Listener { 55 var listener net.Listener 56 if !*child { 57 addr, err := net.ResolveTCPAddr("tcp", ":8972") 58 if err != nil { 59 log.Fatal(err) 60 } 61 tcplistener, err := net.ListenTCP("tcp", addr) 62 if err != nil { 63 log.Fatal(err) 64 } 65 fl, err := tcplistener.File() 66 if err != nil { 67 log.Fatal(err) 68 } 69 children := make([]*exec.Cmd, c) 70 for i := range children { 71 children[i] = exec.Command(os.Args[0], "-prefork", "-child") 72 children[i].Stdout = os.Stdout 73 children[i].Stderr = os.Stderr 74 children[i].ExtraFiles = []*os.File{fl} 75 err = children[i].Start() 76 if err != nil { 77 log.Fatalf("failed to start child: %v", err) 78 } 79 } 80 for _, ch := range children { 81 if err := ch.Wait(); err != nil { 82 log.Printf("failed to wait child's starting: %v", err) 83 } 84 } 85 os.Exit(0) 86 } else { 87 var err error 88 listener, err = net.FileListener(os.NewFile(3, "")) 89 if err != nil { 90 log.Fatal(err) 91 } 92 } 93 return listener 94} 95func start(epoller *epoll) { 96 for { 97 connections, err := epoller.Wait() 98 if err != nil { 99 log.Printf("failed to epoll wait %v", err)100 continue101 }102 for _, conn := range connections {103 if conn == nil {104 break105 }106 io.CopyN(conn, conn, 8)107 if err != nil {108 if err := epoller.Remove(conn); err != nil {109 log.Printf("failed to remove %v", err)110 }111 conn.Close()112 }113 }114 }115}package main 2import ( 3 "flag" 4 "io" 5 "log" 6 "net" 7 "os" 8 "os/exec" 9 "syscall" 10) 11var ( 12 c = flag.Int("c", 10, "concurrency") 13 prefork = flag.Bool("prefork", false, "use prefork") 14 child = flag.Bool("child", false, "is child proc") 15) 16func main() { 17 flag.Parse() 18 19 var ln net.Listener 20 var err error 21 if *prefork { 22 ln = doPrefork(*c) 23 } else { 24 ln, err = net.Listen("tcp", ":8972") 25 if err != nil { 26 panic(err) 27 } 28 } 29 startEpoll(ln) 30 select {} 31} 32func startEpoll(ln net.Listener) { 33 epoller, err := MkEpoll() 34 if err != nil { 35 panic(err) 36 } 37 go start(epoller) 38 for { 39 conn, e := ln.Accept() 40 if e != nil { 41 if ne, ok := e.(net.Error); ok && ne.Temporary() { 42 log.Printf("accept temp err: %v", ne) 43 continue 44 } 45 log.Printf("accept err: %v", e) 46 return 47 } 48 if err := epoller.Add(conn); err != nil { 49 log.Printf("failed to add connection %v", err) 50 conn.Close() 51 } 52 } 53} 54func doPrefork(c int) net.Listener { 55 var listener net.Listener 56 if !*child { 57 addr, err := net.ResolveTCPAddr("tcp", ":8972") 58 if err != nil { 59 log.Fatal(err) 60 } 61 tcplistener, err := net.ListenTCP("tcp", addr) 62 if err != nil { 63 log.Fatal(err) 64 } 65 fl, err := tcplistener.File() 66 if err != nil { 67 log.Fatal(err) 68 } 69 children := make([]*exec.Cmd, c) 70 for i := range children { 71 children[i] = exec.Command(os.Args[0], "-prefork", "-child") 72 children[i].Stdout = os.Stdout 73 children[i].Stderr = os.Stderr 74 children[i].ExtraFiles = []*os.File{fl} 75 err = children[i].Start() 76 if err != nil { 77 log.Fatalf("failed to start child: %v", err) 78 } 79 } 80 for _, ch := range children { 81 if err := ch.Wait(); err != nil { 82 log.Printf("failed to wait child's starting: %v", err) 83 } 84 } 85 os.Exit(0) 86 } else { 87 var err error 88 listener, err = net.FileListener(os.NewFile(3, "")) 89 if err != nil { 90 log.Fatal(err) 91 } 92 } 93 return listener 94} 95func start(epoller *epoll) { 96 for { 97 connections, err := epoller.Wait() 98 if err != nil { 99 log.Printf("failed to epoll wait %v", err)100 continue101 }102 for _, conn := range connections {103 if conn == nil {104 break105 }106 io.CopyN(conn, conn, 8)107 if err != nil {108 if err := epoller.Remove(conn); err != nil {109 log.Printf("failed to remove %v", err)110 }111 conn.Close()112 }113 }114 }115}
服务器启动50个子进程: ./server -c 50 -prefork
客户端还是一样: ./setupm.sh 20000 50 172.17.0.1 10
。
这里我们对50个容器的日志进行统计, 汇总吞吐率进行相加,可以得到吞吐率(TPS)为 444415, 延迟(latency)为 1.5秒。
和多poller的服务器实现相比较,prefork的服务器客户端吞吐率又大大幅增加,而延迟相对长一些了,比多poller的实现延迟翻倍。
从单个poller的代码分析可知,单goroutine处理消息到来的事件可能会有瓶颈,尤其是并发量比较大的情况下,无法使用多核的优势,因为我们采用多poller、prefork的方式可以并发地处理到来的消息,这里还有一种Reactor的方式,将I/O goroutine和业务goroutine分离, I/O goroutine采用单goroutine的方式,监听的消息交给一个goroutine池 (workerpool)去处理,这样可以并行的处理业务消息,而不会阻塞I/O goroutine。
这里实现的消息读取也是在 workerpool 中实现的, 一般更通用的方式是I/O goroutine解析出消息, 将解析好的消息再交给workerpool去处理。我们这里的例子比较简单,所以读取消息也在workerpool中实现。
worker pool的实现如下:
1package main 2import ( 3 "io" 4 "log" 5 "net" 6 "sync" 7) 8type pool struct { 9 workers int10 maxTasks int11 taskQueue chan net.Conn12 mu sync.Mutex13 closed bool14 done chan struct{}15}16func newPool(w int, t int) *pool {17 return &pool{18 workers: w,19 maxTasks: t,20 taskQueue: make(chan net.Conn, t),21 done: make(chan struct{}),22 }23}24func (p *pool) Close() {25 p.mu.Lock()26 p.closed = true27 close(p.done)28 close(p.taskQueue)29 p.mu.Unlock()30}31func (p *pool) addTask(conn net.Conn) {32 p.mu.Lock()33 if p.closed {34 p.mu.Unlock()35 return36 }37 p.mu.Unlock()38 p.taskQueue <- conn39}40func (p *pool) start() {41 for i := 0; i < p.workers; i++ {42 go p.startWorker()43 }44}45func (p *pool) startWorker() {46 for {47 select {48 case <-p.done:49 return50 case conn := <-p.taskQueue:51 if conn != nil {52 handleConn(conn)53 }54 }55 }56}57func handleConn(conn net.Conn) {58 _, err := io.CopyN(conn, conn, 8)59 if err != nil {60 if err := epoller.Remove(conn); err != nil {61 log.Printf("failed to remove %v", err)62 }63 conn.Close()64 }65 opsRate.Mark(1)66}package main 2import ( 3 "io" 4 "log" 5 "net" 6 "sync" 7) 8type pool struct { 9 workers int10 maxTasks int11 taskQueue chan net.Conn12 mu sync.Mutex13 closed bool14 done chan struct{}15}16func newPool(w int, t int) *pool {17 return &pool{18 workers: w,19 maxTasks: t,20 taskQueue: make(chan net.Conn, t),21 done: make(chan struct{}),22 }23}24func (p *pool) Close() {25 p.mu.Lock()26 p.closed = true27 close(p.done)28 close(p.taskQueue)29 p.mu.Unlock()30}31func (p *pool) addTask(conn net.Conn) {32 p.mu.Lock()33 if p.closed {34 p.mu.Unlock()35 return36 }37 p.mu.Unlock()38 p.taskQueue <- conn39}40func (p *pool) start() {41 for i := 0; i < p.workers; i++ {42 go p.startWorker()43 }44}45func (p *pool) startWorker() {46 for {47 select {48 case <-p.done:49 return50 case conn := <-p.taskQueue:51 if conn != nil {52 handleConn(conn)53 }54 }55 }56}57func handleConn(conn net.Conn) {58 _, err := io.CopyN(conn, conn, 8)59 if err != nil {60 if err := epoller.Remove(conn); err != nil {61 log.Printf("failed to remove %v", err)62 }63 conn.Close()64 }65 opsRate.Mark(1)66}
服务器端代码改造:
1var epoller *epoll 2var workerPool *pool 3func main() { 4 flag.Parse() 5 go metrics.Log(metrics.DefaultRegistry, 5*time.Second, log.New(os.Stderr, "metrics: ", log.Lmicroseconds)) 6 ln, err := net.Listen("tcp", ":8972") 7 if err != nil { 8 panic(err) 9 }10 go func() {11 if err := http.ListenAndServe(":6060", nil); err != nil {12 log.Fatalf("pprof failed: %v", err)13 }14 }()15 workerPool = newPool(*c, 1000000)16 workerPool.start()17 epoller, err = MkEpoll()18 if err != nil {19 panic(err)20 }21 go start()22 for {23 conn, e := ln.Accept()24 if e != nil {25 if ne, ok := e.(net.Error); ok && ne.Temporary() {26 log.Printf("accept temp err: %v", ne)27 continue28 }29 log.Printf("accept err: %v", e)30 return31 }32 if err := epoller.Add(conn); err != nil {33 log.Printf("failed to add connection %v", err)34 conn.Close()35 }36 }37 workerPool.Close()38}39func start() {40 for {41 connections, err := epoller.Wait()42 if err != nil {43 log.Printf("failed to epoll wait %v", err)44 continue45 }46 for _, conn := range connections {47 if conn == nil {48 break49 }50 workerPool.addTask(conn)51 }52 }53}var epoller *epoll 2var workerPool *pool 3func main() { 4 flag.Parse() 5 go metrics.Log(metrics.DefaultRegistry, 5*time.Second, log.New(os.Stderr, "metrics: ", log.Lmicroseconds)) 6 ln, err := net.Listen("tcp", ":8972") 7 if err != nil { 8 panic(err) 9 }10 go func() {11 if err := http.ListenAndServe(":6060", nil); err != nil {12 log.Fatalf("pprof failed: %v", err)13 }14 }()15 workerPool = newPool(*c, 1000000)16 workerPool.start()17 epoller, err = MkEpoll()18 if err != nil {19 panic(err)20 }21 go start()22 for {23 conn, e := ln.Accept()24 if e != nil {25 if ne, ok := e.(net.Error); ok && ne.Temporary() {26 log.Printf("accept temp err: %v", ne)27 continue28 }29 log.Printf("accept err: %v", e)30 return31 }32 if err := epoller.Add(conn); err != nil {33 log.Printf("failed to add connection %v", err)34 conn.Close()35 }36 }37 workerPool.Close()38}39func start() {40 for {41 connections, err := epoller.Wait()42 if err != nil {43 log.Printf("failed to epoll wait %v", err)44 continue45 }46 for _, conn := range connections {47 if conn == nil {48 break49 }50 workerPool.addTask(conn)51 }52 }53}
服务器启动50个子进程: ./server -c 50 -prefork
客户端还是一样: ./setupm.sh 20000 50 172.17.0.1 10
。
这里我们对50个容器的日志进行统计, 汇总吞吐率进行相加,可以得到吞吐率(TPS)为 190022, 延迟(latency)为 0.3秒。
总结
吞吐率 (tps) | 延迟 (latency) | |
---|---|---|
单epoller(单epoller client) | 42495 | 23s |
单epoller | 42402 | 0.8s |
多epoller | 197814 | 0.9s |
prefork | 444415 | 1.5s |
workerpool | 190022 | 0.3s |
从上表可以看出,客户端的实现对测试结果影响也是巨大的,不过实际我们的客户端分布在不同的节点上,而不像我们的测试不得不使用同一台机器启动百万个节点,所以下面的测试都是通过多epoller client进行测试的,尽量让客户端能并发的处理消息。
从测试结果来看, 在百万并发的情况下, workerpool的实现还是不错的, 既能达到很高的吞吐率(19万), 还能取得 0.3秒的延迟, 而且使用小量的goroutine的worker pool也不会占用太多的系统资源。prefork可以大幅提高吞吐率,但是延迟要稍微长一些。
以上是在巨量连接情况下的各种实现的吞吐率和延迟的测试,这是一类的应用场景, 还有一类很大的应用场景, 比如企业内的服务通讯, 连接数并不会很多,我们将介绍这类场景下几种实现方案的吞吐率和延迟。
版权申明:内容来源网络,版权归原创者所有。除非无法确认,我们都会标明作者及出处,如有侵权烦请告知,我们会立即删除并表示歉意。谢谢。
Golang语言社区
ID:Golangweb
www.GolangWeb.com
游戏服务器架构丨分布式技术丨大数据丨游戏算法学习
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。