| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176 |
- package main
- import (
- "flag"
- "fmt"
- "log"
- proto "standard/etcd/openapi/client/proto"
- "strings"
- "time"
- "go.etcd.io/etcd/api/v3/mvccpb"
- "go.etcd.io/etcd/client/v3"
- "golang.org/x/net/context"
- "google.golang.org/grpc"
- "google.golang.org/grpc/resolver"
- )
- const schema = "avata"
- var (
- ServiceName = flag.String("ServiceName", "wenchangchain-ddc", "service name") //服务名称
- EtcdAddr = flag.String("EtcdAddr", "192.168.150.41:2379", "register etcd address") //etcd的地址
- )
- var cli *clientv3.Client
- //etcd解析器
- type etcdResolver struct {
- etcdAddr string
- clientConn resolver.ClientConn
- }
- //初始化一个etcd解析器
- func newResolver(etcdAddr string) resolver.Builder {
- return &etcdResolver{etcdAddr: etcdAddr}
- }
- func (r *etcdResolver) Scheme() string {
- return schema
- }
- //watch有变化以后会调用
- func (r *etcdResolver) ResolveNow(rn resolver.ResolveNowOptions) {
- log.Println("ResolveNow")
- }
- //解析器关闭时调用
- func (r *etcdResolver) Close() {
- log.Println("Close")
- }
- //构建解析器 grpc.Dial()同步调用
- func (r *etcdResolver) Build(target resolver.Target, clientConn resolver.ClientConn, opts resolver.BuildOptions) (resolver.Resolver, error) {
- var err error
- fmt.Println("2222",target.Endpoint,target.Authority,target.Scheme,target.URL)
- //构建etcd client
- if cli == nil {
- cli, err = clientv3.New(clientv3.Config{
- Endpoints: strings.Split(r.etcdAddr, ";"),
- DialTimeout: 15 * time.Second,
- Username: "root",
- Password: "JmZMbqQ1nc",
- })
- if err != nil {
- fmt.Printf("连接etcd失败:%s\n", err)
- return nil, err
- }
- }
- resolver.Register(r)
- r.clientConn = clientConn
- go r.watch("/" + target.Scheme + "/" + target.Endpoint + "/")
- return r, nil
- }
- //监听etcd中某个key前缀的服务地址列表的变化
- func (r *etcdResolver) watch(keyPrefix string) {
- //初始化服务地址列表
- var addrList []resolver.Address
- resp, err := cli.Get(context.Background(), keyPrefix, clientv3.WithPrefix())
- if err != nil {
- fmt.Println("获取服务地址列表失败:", err)
- } else {
- for i := range resp.Kvs {
- addrList = append(addrList, resolver.Address{Addr: strings.TrimPrefix(string(resp.Kvs[i].Key), keyPrefix)}) //删掉前缀keyPrefix
- }
- }
- r.clientConn.NewAddress(addrList)
- //监听服务地址列表的变化
- rch := cli.Watch(context.Background(), keyPrefix, clientv3.WithPrefix())
- for n := range rch {
- for _, ev := range n.Events {
- addr := strings.TrimPrefix(string(ev.Kv.Key), keyPrefix)
- switch ev.Type {
- case mvccpb.PUT:
- if !exists(addrList, addr) {
- addrList = append(addrList, resolver.Address{Addr: addr})
- r.clientConn.NewAddress(addrList)
- }
- case mvccpb.DELETE:
- if s, ok := remove(addrList, addr); ok {
- addrList = s
- r.clientConn.NewAddress(addrList)
- }
- }
- }
- }
- fmt.Println("clientConn",addrList)
- }
- func exists(l []resolver.Address, addr string) bool {
- for i := range l {
- if l[i].Addr == addr {
- return true
- }
- }
- return false
- }
- func remove(s []resolver.Address, addr string) ([]resolver.Address, bool) {
- for i := range s {
- if s[i].Addr == addr {
- s[i] = s[len(s)-1]
- return s[:len(s)-1], true
- }
- }
- return nil, false
- }
- func main() {
- flag.Parse()
- //注册etcd解析器
- r := newResolver(*EtcdAddr)
- resolver.Register(r)
- //客户端连接服务器(负载均衡:轮询) 会同步调用r.Build()
- //fmt.Println(r.Scheme()+":///"+*ServiceName)
- conn, err := grpc.Dial(r.Scheme()+":///"+"services/chains/wenchangchain-native", grpc.WithBalancerName("round_robin"), grpc.WithInsecure())
- if err != nil {
- fmt.Println("连接服务器失败:", err)
- }
- defer conn.Close()
- //获得grpc句柄
- c := proto.NewGreetClient(conn)
- ticker := time.NewTicker(3 * time.Second)
- for range ticker.C {
- fmt.Println("Morning 调用...")
- resp1, err := c.Morning(
- context.Background(),
- &proto.GreetRequest{Name: "JetWu"},
- )
- if err != nil {
- fmt.Println("Morning调用失败:", err)
- return
- }
- fmt.Printf("Morning 响应:%s,来自:%s\n", resp1.Message, resp1.From)
- fmt.Println("Night 调用...")
- resp2, err := c.Night(
- context.Background(),
- &proto.GreetRequest{Name: "JetWu"},
- )
- if err != nil {
- fmt.Println("Night调用失败:", err)
- return
- }
- fmt.Printf("Night 响应:%s,来自:%s\n", resp2.Message, resp2.From)
- }
- }
|