笔记篇-GRPC学习笔记

Grpc介绍

在gRPC中,客户端应用程序可以直接在其他计算机上的服务器应用程序上调用方法,就好像它是本地对象一样,从而使您更轻松地创建分布式应用程序和服务。在许多RPC系统中,gRPC都基于定义服务的思想,即指定可以使用其参数和返回类型远程调用的方法。在服务器端,服务器实现此接口并运行gRPC服务器以处理客户端调用。在客户端,客户端具有一个存根(在某些语言中简称为客户端),提供与服务器相同的方法。

从Google内部的服务器到您自己的台式机,gRPC客户端和服务器可以在各种环境中运行并相互通信,并且可以使用gRPC支持的任何语言编写。因此,例如,您可以使用Go,Python或Ruby的客户端轻松地用Java创建gRPC服务器。此外,最新的Google API的接口将具有gRPC版本,可让您轻松地在应用程序中内置Google功能。

protobuf

Protobuf是Protocol Buffers的简称,它是Google公司开发的一种数据描述语言,用于描述一种轻便高效的结构化数据存储格式,并于2008年对外开源。Protobuf可以用于结构化数据串行化,或者说序列化。它的设计非常适用于在网络通讯中的数据载体,很适合做数据存储或 RPC 数据交换格式,它序列化出来的数据量少再加上以 K-V 的方式来存储数据,对消息的版本兼容性非常强,可用于通讯协议、数据存储等领域的语言无关、平台无关、可扩展的序列化结构数据格式。开发者可以通过Protobuf附带的工具生成代码并实现将结构化数据序列化的功能。

安装

  1. 下载对应版本编译器
1
https://github.com/protocolbuffers/protobuf/releases/tag/v21.12
  1. 移动可执行的二进制文件
1
sudo mv protoc /usr/local/bin
  1. 安装go插件
1
go install google.golang.org/protobuf/cmd/protoc-gen-go@latest
  1. 查看版本信息
1
protoc --version
  1. 测试生成
1
protoc -I=. --go-grpc_out=.  --go_out=. demo.proto

语法

指定语法

syntax:指定proto的语法版本

1
syntax = "proto3";

message

message表示rpc进行交互过程中的数据信息,可以理解为是结构体,里面的字段每个要对应一个自己的id号

1
2
3
message name {
[repeated] fieldType fieldName = uniqueId
}

modification

  • repeated:数组
  • onef:如果您有一条包含许多字段的消息,并且最多同时设置一个字段,您可以使用其中oneof功能来强制执行此行为并节省内存

支持的数据类型

1
2
3
4
5
6
7
8
9
10
11
12
13
message name {
map<string, Bar> foo = 1;//map
enum Corpus {//枚举类型
UNIVERSAL = 0;
WEB = 1;
IMAGES = 2;
LOCAL = 3;
NEWS = 4;
PRODUCTS = 5;
VIDEO = 6;
}
Corpus corpus = 1;
}

service

service:服务方法

1
2
3
4
5
6
7
8
9
10
service HiService {
rpc SayHi (HiRequest) returns (HiReply) {}
}
message HiRequest {
string name = 1;
}

message HiReply {
string message = 1;
}
  • import: 导入其他proto

参考:https://developers.google.com/protocol-buffers/docs/reference/go-generated

快速开始

简单案例

  1. 编写proto文件
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
syntax = "proto3";

option go_package = "examples/demo/proto";

service HiService {
rpc SayHi (HiRequest) returns (HiReply) {}
}

message HiRequest {
string name = 1;
}

message HiReply {
string message = 1;
}
  1. 生成代码
1
protoc -I=. --go-grpc_out=.  --go_out=. demo.proto
  1. 编写client
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
package main

import (
"context"
"google.golang.org/grpc"
"google.golang.org/grpc/credentials/insecure"
pb "google.golang.org/grpc/examples/demo/proto"
"log"
"time"
)

var addr = "localhost:65535"

func main() {
conn, err := grpc.Dial(addr, grpc.WithTransportCredentials(insecure.NewCredentials()))
if err != nil {
log.Fatalf("did not connect: %v", err)
}
defer conn.Close()
c := pb.NewHiServiceClient(conn)
ctx, cancel := context.WithTimeout(context.Background(), time.Second)
defer cancel()
r, err := c.SayHi(ctx, &pb.HiRequest{Name: "hi grpc"})
if err != nil {
log.Fatalf("could not greet: %v", err)
}
log.Printf("message: %s", r.GetMessage())
}

  1. 编写server
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
package main

import (
"context"
"google.golang.org/grpc"
pb "google.golang.org/grpc/examples/demo/proto"
"log"
"net"
)

var addr = "localhost:65535"

type server struct {
pb.UnimplementedHiServiceServer
}

func (s *server) SayHi(ctx context.Context, in *pb.HiRequest) (*pb.HiReply, error) {
log.Printf("Received: %v", in.GetName())
return &pb.HiReply{Message: "Hi " + in.GetName()}, nil
}

func main() {
lis, err := net.Listen("tcp", addr)
if err != nil {
log.Fatalf("failed to listen: %v", err)
}
s := grpc.NewServer()
pb.RegisterHiServiceServer(s, &server{})
log.Printf("server listening at %v", lis.Addr())
if err := s.Serve(lis); err != nil {
log.Fatalf("failed to serve: %v", err)
}
}

官方案例

The example code is part of the grpc-go repo.

  1. Download the repo as a zip file and unzip it, or clone the repo:
1
$ git clone -b v1.34.0 https://github.com/grpc/grpc-go
  1. Change to the quick start example directory:
1
$ cd grpc-go/examples/helloworld
  1. Compile and execute the server code:
1
$ go run greeter_server/main.go
  1. From a different terminal, compile and execute the client code to see the client output:
1
2
$ go run greeter_client/main.go
Greeting: Hello world

服务发现

grpc本身不提供服务发现的功能,是直接采用c/s直连进行rpc通信,当我们在集群环境或者想动态上下线服务的时候使用注册中心就能实现我们的效果,目前比较常用的注册中心有基于redis,mysql,zookeeper,etcd,eureka,consul等中间件来实现。

基于etcd注册中心

分析需求

  1. 注册服务
  2. 注销服务(主动和被动)
  3. 获取服务(获取后进行服务调用)

详细设计

画板

  1. 注册服务

服务注册主要是在服务端启动时将当前服务的ip+端口保存到etcd中以group+version的方式作为键存储,且进行心跳检测,超时将其移除

  1. 注销服务(主动和被动)

主动注销将其存储在etcd的值进行删除,被动注销通过设置其超时自动

  1. 获取服务(获取后进行服务调用)

通过获取etcd前缀的值列表,通过一定的负载均衡算法获取一个实例的信息进行实例化并调用,下次调用会继续使用,除非发生更新或者服务下线

编码实现

  1. 构建器主要用于创建注册器和etcd实例
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
package builder

import (
clientv3 "go.etcd.io/etcd/client/v3"
"gsrde/register"
"log"
"time"
)

type Config struct {
Endpoints []string
Timeout time.Duration
}

func NewRegister(cfg Config) register.DefaultRegister {
register := register.DefaultRegister{}
var err error
register.Cli, err = NewEtcdClient(cfg)
if err != nil {
log.Fatalf("error : %v", err)
}
return register
}

func NewEtcdClient(cfg Config) (*clientv3.Client, error) {
cli, err := clientv3.New(clientv3.Config{
Endpoints: cfg.Endpoints,
DialTimeout: cfg.Timeout,
})
if err != nil {
log.Fatalf("error : %v", err)
}
return cli, err
}
  1. 注册器用于注册服务
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
package register

import (
"context"
"fmt"
clientv3 "go.etcd.io/etcd/client/v3"
"log"
)

type Register interface {
Register(service, addr string) error
UnRegister(service string) error
}

type DefaultRegister struct {
Cli *clientv3.Client
}

// Register 注册一个rpc服务
func (r *DefaultRegister) Register(serviceName, addr string) (err error) {
key := fmt.Sprintf("%s/%s", serviceName, addr)
//创建租约
leaseResp, err := r.Cli.Grant(context.Background(), 10)
if err != nil {
log.Fatalf("Create Lease error :%s\n", err)
return err
}
//注册到etcd
log.Printf("Register to etcd : key : %s\n", key)
_, err = r.Cli.Put(context.Background(), key, key, clientv3.WithLease(leaseResp.ID))
if err != nil {
log.Fatalf("Register to etcd error : %s\n", err)
}
//开启心跳检查
ch, err := r.Cli.KeepAlive(context.Background(), leaseResp.ID)
if err != nil {
log.Fatalf("KeepAlive to etcd error : %s\n", err)
}
//清空ch的数据
go func() {
for {
<-ch
}
}()
return
}

// UnRegister 取消注册一个rpc服务
func (r *DefaultRegister) UnRegister(service string) error {
resp, err := r.Cli.Delete(context.Background(), service)
if err != nil {
log.Fatalf("UnRegister error : %s, service : %v, resp : %v\n", err, service, resp)
}
return err
}
  1. 服务发现
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
package discover

import (
"context"
"fmt"
"go.etcd.io/etcd/api/v3/mvccpb"
clientv3 "go.etcd.io/etcd/client/v3"
"google.golang.org/grpc/resolver"
"gsrde/builder"
"log"
"strings"
)

// EtcdResolver etcd解析器
type EtcdResolver struct {
cli *clientv3.Client
conn resolver.ClientConn
}

// NewResolver 初始化一个etcd解析器
func NewResolver(cfg builder.Config) resolver.Builder {
cli, err := builder.NewEtcdClient(cfg)
if err != nil {
log.Fatalf("error : %v", err)
}
return &EtcdResolver{
cli: cli,
}
}

func (r *EtcdResolver) Scheme() string {
return "etcd"
}

func (r *EtcdResolver) ResolveNow(rn resolver.ResolveNowOptions) {
}

func (r *EtcdResolver) Close() {
}

// Build 构建解析器
func (r *EtcdResolver) Build(target resolver.Target, ClientConn resolver.ClientConn, opts resolver.BuildOptions) (resolver.Resolver, error) {
r.conn = ClientConn
// 监听key的变化
go r.watch(target.Endpoint + "/")

return r, nil
}

// 监听etcd中某个key前缀的服务地址列表的变化
func (r *EtcdResolver) watch(keyPrefix string) {
//初始化服务地址列表
var addresses []resolver.Address
resp, err := r.cli.Get(context.Background(), keyPrefix, clientv3.WithPrefix())
if err != nil {
fmt.Println("Get service error :", err)
} else {
for i := range resp.Kvs {
addresses = append(addresses,
resolver.Address{
Addr: strings.TrimPrefix(string(resp.Kvs[i].Key), keyPrefix),
},
)
}
}
status := resolver.State{
Addresses: addresses,
}
r.conn.UpdateState(status)
//监听服务地址列表的变化
rch := r.cli.Watch(context.Background(), keyPrefix, clientv3.WithPrefix())
for n := range rch {
for _, event := range n.Events {
addr := strings.TrimPrefix(string(event.Kv.Key), keyPrefix)
switch event.Type {
case mvccpb.PUT:
if !exists(addresses, addr) {
addresses = append(addresses, resolver.Address{Addr: addr})
status.Addresses = addresses
r.conn.UpdateState(status)
}
log.Printf("service register :%s", addr)
case mvccpb.DELETE:
if s, ok := remove(addresses, addr); ok {
status.Addresses = s
r.conn.UpdateState(status)
}
log.Printf("service destroy :%s", addr)
}
}
}
}

func exists(addresses []resolver.Address, addr string) bool {
for i := range addresses {
if addresses[i].Addr == addr {
return true
}
}
return false
}

func remove(addresses []resolver.Address, addr string) ([]resolver.Address, bool) {
for i := range addresses {
if addresses[i].Addr == addr {
addresses[i] = addresses[len(addresses)-1]
return addresses[:len(addresses)-1], true
}
}
return nil, false
}

测试结果

  1. 启动服务器
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
package test

import (
"google.golang.org/grpc"
"gsrde/builder"
pb "gsrde/test/hello"
"log"
"net"
"testing"
)

func TestRegisterServiceA(t *testing.T) {
var addr = "localhost:65535"
lis, err := net.Listen("tcp", addr)
if err != nil {
log.Fatalf("failed to listen: %v", err)
}
s := grpc.NewServer()
pb.RegisterHiServiceServer(s, &pb.Server{})
log.Printf("server listening at %v", lis.Addr())
//注册到etcd
cfg := builder.Config{
Endpoints: []string{"192.168.100.24:2379"},
}
reg := builder.NewRegister(cfg)
reg.Register(pb.HiService_ServiceDesc.ServiceName, addr)
defer reg.UnRegister(pb.HiService_ServiceDesc.ServiceName)
if err := s.Serve(lis); err != nil {
log.Fatalf("failed to serve: %v", err)
}
}

func TestRegisterServiceB(t *testing.T) {
var addr = "localhost:65533"
lis, err := net.Listen("tcp", addr)
if err != nil {
log.Fatalf("failed to listen: %v", err)
}
s := grpc.NewServer()
pb.RegisterHiServiceServer(s, &pb.Server{})
log.Printf("server listening at %v", lis.Addr())
//注册到etcd
cfg := builder.Config{
Endpoints: []string{"192.168.100.24:2379"},
}
reg := builder.NewRegister(cfg)
reg.Register(pb.HiService_ServiceDesc.ServiceName, addr)
defer reg.UnRegister(pb.HiService_ServiceDesc.ServiceName)
if err := s.Serve(lis); err != nil {
log.Fatalf("failed to serve: %v", err)
}
}
  1. 启动客户端
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
package test

import (
"context"
"google.golang.org/grpc"
"google.golang.org/grpc/resolver"
"gsrde/builder"
"gsrde/discover"
pb "gsrde/test/hello"
"log"
"strconv"
"testing"
"time"
)

func TestConsumer(t *testing.T) {
cfg := builder.Config{
Endpoints: []string{"192.168.100.24:2379"},
}
r := discover.NewResolver(cfg)
resolver.Register(r)

conn, err := grpc.Dial(r.Scheme()+"://author/"+pb.HiService_ServiceDesc.ServiceName,
grpc.WithDefaultServiceConfig(`{"loadBalancingPolicy": "round_robin"}`),
grpc.WithInsecure())
if err != nil {
log.Fatalf("did not connect: %v", err)
}
defer conn.Close()
c := pb.NewHiServiceClient(conn)
ticker := time.NewTicker(5 * time.Second)
i := 1
for range ticker.C {
ctx, _ := context.WithTimeout(context.Background(), time.Second*5)
resp, err := c.SayHi(ctx, &pb.HiRequest{Name: "hi ,I am " + strconv.Itoa(i) + " message"})
if err != nil {
log.Fatalf("could not greet: %v", err)
}
log.Printf("message: %s", resp.GetMessage())
i++
}
}

通信模式

Interceptor拦截器

Metadata元数据

Retry失败重试

HealthCheck健康检查

Auth安全认证

BinaryLog

Backoff

Balance负载均衡

Resolver名称解析

KeepAlived

Http2

Trace

第三方组件

Encode编码

Compressor

相关资料

  1. 官方文档 https://grpc.io/docs/
  2. https://github.com/grpc/grpc
  3. https://github.com/protocolbuffers/protobuf

笔记篇-GRPC学习笔记
https://mikeygithub.github.io/2022/05/10/yuque/笔记篇-GRPC学习笔记/
作者
Mikey
发布于
2022年5月10日
许可协议