server.go 4.1 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171
  1. package main
  2. import (
  3. "flag"
  4. "fmt"
  5. "net"
  6. "os"
  7. "os/signal"
  8. proto "standard/etcd/openapi/server/proto"
  9. "strings"
  10. "syscall"
  11. "time"
  12. "go.etcd.io/etcd/client/v3"
  13. "golang.org/x/net/context"
  14. "google.golang.org/grpc"
  15. )
  16. const schema = "openapi"
  17. var host = "127.0.0.1" //服务器主机
  18. var (
  19. Port = flag.Int("Port", 3000, "listening port") //服务器监听端口
  20. ServiceName = flag.String("ServiceName", "greet_service", "service name") //服务名称
  21. EtcdAddr = flag.String("EtcdAddr", "192.168.150.41:2379", "register etcd address") //etcd的地址
  22. )
  23. var cli *clientv3.Client
  24. //rpc服务接口
  25. type greetServer struct{}
  26. func (gs *greetServer) Morning(ctx context.Context, req *proto.GreetRequest) (*proto.GreetResponse, error) {
  27. fmt.Printf("Morning 调用: %s\n", req.Name)
  28. return &proto.GreetResponse{
  29. Message: "Good morning, " + req.Name,
  30. From: fmt.Sprintf("127.0.0.1:%d", *Port),
  31. }, nil
  32. }
  33. func (gs *greetServer) Night(ctx context.Context, req *proto.GreetRequest) (*proto.GreetResponse, error) {
  34. fmt.Printf("Night 调用: %s\n", req.Name)
  35. return &proto.GreetResponse{
  36. Message: "Good night, " + req.Name,
  37. From: fmt.Sprintf("127.0.0.1:%d", *Port),
  38. }, nil
  39. }
  40. //将服务地址注册到etcd中
  41. func register(etcdAddr, serviceName, serverAddr string, ttl int64) error {
  42. var err error
  43. if cli == nil {
  44. //构建etcd client
  45. cli, err = clientv3.New(clientv3.Config{
  46. Endpoints: strings.Split(etcdAddr, ";"),
  47. DialTimeout: 15 * time.Second,
  48. Username: "root",
  49. Password: "JmZMbqQ1nc",
  50. })
  51. if err != nil {
  52. fmt.Printf("连接etcd失败:%s\n", err)
  53. return err
  54. }
  55. }
  56. //与etcd建立长连接,并保证连接不断(心跳检测)
  57. ticker := time.NewTicker(time.Second * time.Duration(ttl))
  58. go func() {
  59. key :="/" + schema + "/" + serviceName + "/" + serverAddr
  60. for {
  61. resp, err := cli.Get(context.Background(), key)
  62. //fmt.Printf("resp:%+v\n", resp)
  63. if err != nil {
  64. fmt.Printf("获取服务地址失败:%s", err)
  65. } else if resp.Count == 0 { //尚未注册
  66. err = keepAlive(serviceName, serverAddr, ttl)
  67. if err != nil {
  68. fmt.Printf("保持连接失败:%s", err)
  69. }
  70. }
  71. <-ticker.C
  72. }
  73. }()
  74. return nil
  75. }
  76. //保持服务器与etcd的长连接
  77. func keepAlive(serviceName, serverAddr string, ttl int64) error {
  78. //创建租约
  79. leaseResp, err := cli.Grant(context.Background(), ttl)
  80. if err != nil {
  81. fmt.Printf("创建租期失败:%s\n", err)
  82. return err
  83. }
  84. //将服务地址注册到etcd中
  85. key := "/" + schema + "/" + serviceName + "/" + serverAddr
  86. _, err = cli.Put(context.Background(), key, serverAddr, clientv3.WithLease(leaseResp.ID))
  87. if err != nil {
  88. fmt.Printf("注册服务失败:%s", err)
  89. return err
  90. }
  91. //建立长连接
  92. ch, err := cli.KeepAlive(context.Background(), leaseResp.ID)
  93. if err != nil {
  94. fmt.Printf("建立长连接失败:%s\n", err)
  95. return err
  96. }
  97. //清空keepAlive返回的channel
  98. go func() {
  99. for {
  100. <-ch
  101. }
  102. }()
  103. return nil
  104. }
  105. //取消注册
  106. func unRegister(serviceName, serverAddr string) {
  107. if cli != nil {
  108. key :="/" + schema + "/" + serviceName + "/" + serverAddr
  109. cli.Delete(context.Background(), key)
  110. }
  111. }
  112. func main() {
  113. flag.Parse()
  114. //监听网络
  115. listener, err := net.Listen("tcp", fmt.Sprintf("127.0.0.1:%d", *Port))
  116. if err != nil {
  117. fmt.Println("监听网络失败:", err)
  118. return
  119. }
  120. defer listener.Close()
  121. //创建grpc句柄
  122. srv := grpc.NewServer()
  123. defer srv.GracefulStop()
  124. //将greetServer结构体注册到grpc服务中
  125. proto.RegisterGreetServer(srv, &greetServer{})
  126. //将服务地址注册到etcd中
  127. serverAddr := fmt.Sprintf("%s:%d", host, *Port)
  128. fmt.Printf("greeting server address: %s\n", serverAddr)
  129. register(*EtcdAddr, *ServiceName, serverAddr, 5)
  130. //关闭信号处理
  131. ch := make(chan os.Signal, 1)
  132. signal.Notify(ch, syscall.SIGTERM, syscall.SIGINT, syscall.SIGKILL, syscall.SIGHUP, syscall.SIGQUIT)
  133. go func() {
  134. s := <-ch
  135. unRegister(*ServiceName, serverAddr)
  136. if i, ok := s.(syscall.Signal); ok {
  137. os.Exit(int(i))
  138. } else {
  139. os.Exit(0)
  140. }
  141. }()
  142. //监听服务
  143. err = srv.Serve(listener)
  144. if err != nil {
  145. fmt.Println("监听异常:", err)
  146. return
  147. }
  148. }