1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111
| package discover
import ( "context" "fmt" "go.etcd.io/etcd/api/v3/mvccpb" clientv3 "go.etcd.io/etcd/client/v3" "google.golang.org/grpc/resolver" "gsrde/builder" "log" "strings" )
type EtcdResolver struct { cli *clientv3.Client conn resolver.ClientConn }
func NewResolver(cfg builder.Config) resolver.Builder { cli, err := builder.NewEtcdClient(cfg) if err != nil { log.Fatalf("error : %v", err) } return &EtcdResolver{ cli: cli, } }
func (r *EtcdResolver) Scheme() string { return "etcd" }
func (r *EtcdResolver) ResolveNow(rn resolver.ResolveNowOptions) { }
func (r *EtcdResolver) Close() { }
func (r *EtcdResolver) Build(target resolver.Target, ClientConn resolver.ClientConn, opts resolver.BuildOptions) (resolver.Resolver, error) { r.conn = ClientConn go r.watch(target.Endpoint + "/")
return r, nil }
func (r *EtcdResolver) watch(keyPrefix string) { var addresses []resolver.Address resp, err := r.cli.Get(context.Background(), keyPrefix, clientv3.WithPrefix()) if err != nil { fmt.Println("Get service error :", err) } else { for i := range resp.Kvs { addresses = append(addresses, resolver.Address{ Addr: strings.TrimPrefix(string(resp.Kvs[i].Key), keyPrefix), }, ) } } status := resolver.State{ Addresses: addresses, } r.conn.UpdateState(status) rch := r.cli.Watch(context.Background(), keyPrefix, clientv3.WithPrefix()) for n := range rch { for _, event := range n.Events { addr := strings.TrimPrefix(string(event.Kv.Key), keyPrefix) switch event.Type { case mvccpb.PUT: if !exists(addresses, addr) { addresses = append(addresses, resolver.Address{Addr: addr}) status.Addresses = addresses r.conn.UpdateState(status) } log.Printf("service register :%s", addr) case mvccpb.DELETE: if s, ok := remove(addresses, addr); ok { status.Addresses = s r.conn.UpdateState(status) } log.Printf("service destroy :%s", addr) } } } }
func exists(addresses []resolver.Address, addr string) bool { for i := range addresses { if addresses[i].Addr == addr { return true } } return false }
func remove(addresses []resolver.Address, addr string) ([]resolver.Address, bool) { for i := range addresses { if addresses[i].Addr == addr { addresses[i] = addresses[len(addresses)-1] return addresses[:len(addresses)-1], true } } return nil, false }
|