Connect-轻量级的RPC库

image.png

背景

最近在写一个ci/cd的项目,分为server端和runner端,类似于github的actions方式进行构建、打包应用的功能,在进行构建时runner需要不断的将构建日志和状态上报传输给server,那么上报方式的选择有很多选择,最开始我们使用的是grpc的双向流方式,主要是因为server可能会发送终止的请求给runner端,双向流的消息最为及时且效率最高。但是双向流是长连接(有状态),在升级和高可用方面的表现就不尽人意。

其次我们尝试了使用普通的grpc方式进行轮询上报并检查server端是否取消当前任务,这种方式虽然是没有双向流那么及时,但是也还在可接受范围内。唯一的问题就是对外暴露rpc接口需要在istio中独立配置其网关,且鉴权grpc强制需要开启TLS,那么证书配置就比较麻烦,runner本身运行在不同主机上(win、mac、linux),如果证书过期就需要重新每台机器去配置,不是很理想的做法。如果不开启TLS那么数据相当于在裸奔(尽管我们对一些秘钥做了加密处理)。

再后退一步,我们查阅了grpc-web,但是其套娃的方式还需要配置 envoy proxy,这无疑使得整个项目的链路增长,变得更加复杂。

最后我们了解到了 connect-go 这个比较新的库,比较符合我们的应用场景,尽管他的性能可能不如原生grpc效率那么高,但是还是在可接受范围内。使用该库我们可以直接进行http的方式调用我们的grpc,可以无缝的通过我们旧的proto文件来生成对应的代码,改动的范围很小。

简介

Connect是一个用于构建浏览器和gRPC兼容的HTTP API的小型库。可以通过编写一个简短的Protocol Buffer模式并实现您的应用程序逻辑,Connect生成代码来处理编组、路由、压缩和内容类型协商。它还生成一个惯用的、类型安全的客户端。处理程序和客户端支持三种协议:gRPC、gRPC-Web和Connect自己的协议。

应用

安装必要工具

1
2
3
4
go install github.com/bufbuild/buf/cmd/buf@latest
go install github.com/fullstorydev/grpcurl/cmd/grpcurl@latest
go install google.golang.org/protobuf/cmd/protoc-gen-go@latest
go install connectrpc.com/connect/cmd/protoc-gen-connect-go@latest

编写配置文件

1
2
3
4
5
6
7
version: v2
lint:
use:
- DEFAULT
breaking:
use:
- FILE
1
2
3
4
5
6
7
8
9
10
11
12
version: v1
managed:
enabled: true
go_package_prefix:
default: pipeline
plugins:
- name: go
out: .
opt: paths=source_relative
- name: connect-go
out: .
opt: paths=source_relative
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
syntax = "proto3";
option go_package = "pipeline/pkg/proto/runner/v1";
package pkg.proto.runner.v1;

enum RunnerState {
RUNNER_STATE_UNSPECIFIED = 0;
RUNNER_STATE_IDLE = 1;
RUNNER_STATE_ACTIVE = 2;
RUNNER_STATE_OFFLINE = 3;
}

message PingRequest {
string data = 1;
}

message PingResponse {
string data = 1;
}

message UpdateTaskRequest {
string tx_uuid = 2;
uint32 stage_idx = 3;
uint32 step_row_idx = 4;
uint32 step_idx = 5;
StepState state = 6;
string outputs = 7;
bool do_next = 8;
}

message StepState {
string state = 1;
int64 start_time = 3;
int64 end_time = 4;
repeated ActionState actions = 5;
string message = 6;
}

message ActionState {
string name = 1;
uint32 index = 2;
string state = 3;
int64 start_time = 4;
int64 end_time = 5;
}

message UpdateTaskResponse {
string pipeline_state = 1;
string step_state = 2;
bool success = 4;
}

message FetchTaskRequest {
string token = 1;
repeated string labels = 2;
}

message FetchTaskResponse {
bool success = 1;
string message = 2;
string content = 3;
string clean = 4;
}

message RegisterRequest {
string name = 1;
string token = 2;
string version = 3;
repeated string labels = 4;
}

message RunnerResponse {
int64 id = 1;
string uuid = 2;
string token = 3;
string name = 4;
string version = 8;
repeated string labels = 9;
}

message RegisterResponse {
RunnerResponse runner = 1;
}

message DeclareRequest {
string token = 1;
string uuid = 2;
string version = 3;
repeated string labels = 4;
}

message DeclareResponse {
RunnerResponse runner = 1;
}

message UpdateLogRequest {
string tx_uuid = 1;
uint32 stage_idx = 2;
uint32 step_row_idx = 3;
uint32 step_idx = 4;
uint32 action_idx = 5;
int64 index = 6;
repeated string rows = 7;
bool no_more = 8;
}

message UpdateLogResponse {
int64 ack_index = 1;
}

message CleanRequest {
string uuid = 1;
}

message CleanResponse {
string data = 1;
}

service RunnerService {
// 健康检查
rpc Ping (PingRequest) returns (PingResponse) {}
// 注册runner
rpc Register (RegisterRequest) returns (RegisterResponse) {}
// 声明runner
rpc Declare(DeclareRequest) returns (DeclareResponse) {}
// 拉取任务
rpc FetchTask (FetchTaskRequest) returns (FetchTaskResponse) {}
// 运行step
rpc UpdateTask (UpdateTaskRequest) returns (UpdateTaskResponse) {}
// 更新日志
rpc UpdateLog (UpdateLogRequest) returns (UpdateLogResponse) {}
}

生成代码

1
buf generate
1
2
3
4
5
6
7
8
pkg
├── proto
│   └── runner
│   └── v1
│   ├── runner.pb.go
│   ├── runner.proto
│   └── runnerv1connect
│   └── runner.connect.go

调试接口

以gin为示例

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
// RegisterRouter RegisterRouter
func RegisterRouter(router *gin.Engine) {

prefix := "/api/v1/pipeline"
cAPI := router.Group(prefix)

// runner 接口
path, handler := NewRunnerServiceHandler()
cAPI.POST(path+"*method", func(c *gin.Context) {
http.StripPrefix(prefix, handler).ServeHTTP(c.Writer, c.Request)
})

// server 流水线相关接口
{
cAPI.POST("/state", State) // pipeline状态
cAPI.POST("/action", Action) // action状态
cAPI.POST("/logs", Logs) // 查询日志
// operation
cAPI.Use(account.AuthUToken(), middleware.Handler())
cAPI.POST("/manual", Run) // 手动运行pipeline
cAPI.POST("/skip", Skip) // 跳过step
cAPI.POST("/cancel", Cancel) // 取消step
cAPI.POST("/terminate", Terminate) // 终止pipeline
cAPI.POST("/stage", Stage) // 启动stage
cAPI.POST("/validate", Validate) // 启动stage
cAPI.POST("/download", DownloadLogger) // 下载日志
}
}

资料

  1. https://github.com/connectrpc/connect-go
  2. https://connectrpc.com/

Connect-轻量级的RPC库
https://mikeygithub.github.io/2024/05/09/yuque/Connect-轻量级的RPC库/
作者
Mikey
发布于
2024年5月9日
许可协议