client.go 4.3 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176
  1. package main
  2. import (
  3. "flag"
  4. "fmt"
  5. "log"
  6. proto "standard/etcd/openapi/client/proto"
  7. "strings"
  8. "time"
  9. "go.etcd.io/etcd/api/v3/mvccpb"
  10. "go.etcd.io/etcd/client/v3"
  11. "golang.org/x/net/context"
  12. "google.golang.org/grpc"
  13. "google.golang.org/grpc/resolver"
  14. )
  15. const schema = "avata"
  16. var (
  17. ServiceName = flag.String("ServiceName", "wenchangchain-ddc", "service name") //服务名称
  18. EtcdAddr = flag.String("EtcdAddr", "192.168.150.41:2379", "register etcd address") //etcd的地址
  19. )
  20. var cli *clientv3.Client
  21. //etcd解析器
  22. type etcdResolver struct {
  23. etcdAddr string
  24. clientConn resolver.ClientConn
  25. }
  26. //初始化一个etcd解析器
  27. func newResolver(etcdAddr string) resolver.Builder {
  28. return &etcdResolver{etcdAddr: etcdAddr}
  29. }
  30. func (r *etcdResolver) Scheme() string {
  31. return schema
  32. }
  33. //watch有变化以后会调用
  34. func (r *etcdResolver) ResolveNow(rn resolver.ResolveNowOptions) {
  35. log.Println("ResolveNow")
  36. }
  37. //解析器关闭时调用
  38. func (r *etcdResolver) Close() {
  39. log.Println("Close")
  40. }
  41. //构建解析器 grpc.Dial()同步调用
  42. func (r *etcdResolver) Build(target resolver.Target, clientConn resolver.ClientConn, opts resolver.BuildOptions) (resolver.Resolver, error) {
  43. var err error
  44. fmt.Println("2222",target.Endpoint,target.Authority,target.Scheme,target.URL)
  45. //构建etcd client
  46. if cli == nil {
  47. cli, err = clientv3.New(clientv3.Config{
  48. Endpoints: strings.Split(r.etcdAddr, ";"),
  49. DialTimeout: 15 * time.Second,
  50. Username: "root",
  51. Password: "JmZMbqQ1nc",
  52. })
  53. if err != nil {
  54. fmt.Printf("连接etcd失败:%s\n", err)
  55. return nil, err
  56. }
  57. }
  58. resolver.Register(r)
  59. r.clientConn = clientConn
  60. go r.watch("/" + target.Scheme + "/" + target.Endpoint + "/")
  61. return r, nil
  62. }
  63. //监听etcd中某个key前缀的服务地址列表的变化
  64. func (r *etcdResolver) watch(keyPrefix string) {
  65. //初始化服务地址列表
  66. var addrList []resolver.Address
  67. resp, err := cli.Get(context.Background(), keyPrefix, clientv3.WithPrefix())
  68. if err != nil {
  69. fmt.Println("获取服务地址列表失败:", err)
  70. } else {
  71. for i := range resp.Kvs {
  72. addrList = append(addrList, resolver.Address{Addr: strings.TrimPrefix(string(resp.Kvs[i].Key), keyPrefix)}) //删掉前缀keyPrefix
  73. }
  74. }
  75. r.clientConn.NewAddress(addrList)
  76. //监听服务地址列表的变化
  77. rch := cli.Watch(context.Background(), keyPrefix, clientv3.WithPrefix())
  78. for n := range rch {
  79. for _, ev := range n.Events {
  80. addr := strings.TrimPrefix(string(ev.Kv.Key), keyPrefix)
  81. switch ev.Type {
  82. case mvccpb.PUT:
  83. if !exists(addrList, addr) {
  84. addrList = append(addrList, resolver.Address{Addr: addr})
  85. r.clientConn.NewAddress(addrList)
  86. }
  87. case mvccpb.DELETE:
  88. if s, ok := remove(addrList, addr); ok {
  89. addrList = s
  90. r.clientConn.NewAddress(addrList)
  91. }
  92. }
  93. }
  94. }
  95. fmt.Println("clientConn",addrList)
  96. }
  97. func exists(l []resolver.Address, addr string) bool {
  98. for i := range l {
  99. if l[i].Addr == addr {
  100. return true
  101. }
  102. }
  103. return false
  104. }
  105. func remove(s []resolver.Address, addr string) ([]resolver.Address, bool) {
  106. for i := range s {
  107. if s[i].Addr == addr {
  108. s[i] = s[len(s)-1]
  109. return s[:len(s)-1], true
  110. }
  111. }
  112. return nil, false
  113. }
  114. func main() {
  115. flag.Parse()
  116. //注册etcd解析器
  117. r := newResolver(*EtcdAddr)
  118. resolver.Register(r)
  119. //客户端连接服务器(负载均衡:轮询) 会同步调用r.Build()
  120. //fmt.Println(r.Scheme()+":///"+*ServiceName)
  121. conn, err := grpc.Dial(r.Scheme()+":///"+"services/chains/wenchangchain-native", grpc.WithBalancerName("round_robin"), grpc.WithInsecure())
  122. if err != nil {
  123. fmt.Println("连接服务器失败:", err)
  124. }
  125. defer conn.Close()
  126. //获得grpc句柄
  127. c := proto.NewGreetClient(conn)
  128. ticker := time.NewTicker(3 * time.Second)
  129. for range ticker.C {
  130. fmt.Println("Morning 调用...")
  131. resp1, err := c.Morning(
  132. context.Background(),
  133. &proto.GreetRequest{Name: "JetWu"},
  134. )
  135. if err != nil {
  136. fmt.Println("Morning调用失败:", err)
  137. return
  138. }
  139. fmt.Printf("Morning 响应:%s,来自:%s\n", resp1.Message, resp1.From)
  140. fmt.Println("Night 调用...")
  141. resp2, err := c.Night(
  142. context.Background(),
  143. &proto.GreetRequest{Name: "JetWu"},
  144. )
  145. if err != nil {
  146. fmt.Println("Night调用失败:", err)
  147. return
  148. }
  149. fmt.Printf("Night 响应:%s,来自:%s\n", resp2.Message, resp2.From)
  150. }
  151. }