grpc和protobuf

什么是rpc

RPC(Remote Procedure Call),翻译成中文叫远程过程调用。其设计思路是程序A可以像支持本地函数一样调用远程程序B的一个函数。程序A和程序B很可能不在一台机器上,这中间就需要网络通讯,一般都是基于tcp或者http协议。函数有入参和返回值这里就需要约定一致的序列化方式,比较常见的有binary,json,xml。函数的调用过程可以同步和异步。随着分布式程序近几年的大行其道,rpc作为其主要的通讯方式也越来越进入我们的视野,比如grpc,thrift。 ## go标准库的rpc golang的标准库就支持rpc。

server端

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
package main

import (
"errors"
"log"
"net"
"net/rpc"
)
type Param struct {
A,B int
}
type Server struct {}

func (t *Server) Multiply(args *Param, reply *int) error {
*reply = args.A * args.B
return nil
}
func (t *Server) Divide(args *Param, reply *int) error {
if args.B == 0 {
return errors.New("divide by zero")
}
*reply = args.A / args.B
return nil
}

func main() {
rpc.RegisterName("test", new(Server))
listener, err := net.Listen("tcp", ":9700")
if err != nil {
log.Fatal("ListenTCP error:", err)
}
for {
conn, err := listener.Accept()
if err != nil {
log.Fatal("Accept error:", err)
}

go rpc.ServeConn(conn)
}
}

client端

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
package main
import (
"fmt"
"log"
"net/rwida/micro-server
"time"wida/micro-server
)
type Param struct {
A,B int
}
func main() {
client, err := rpc.Dial("tcp", "localhost:9700")
if err != nil {
log.Fatal("dialing:", err)
}
//同步调用
var reply int
err = client.Call("test.Multiply", Param{34,35}, &reply)
if err != nil {
log.Fatal(err)
}
fmt.Println(reply)
//异步调用
done := make(chan *rpc.Call, 1)
client.Go("test.Divide", Param{34,17}, &reply,done)
select {
case d := <-done:
fmt.Println(* d.Reply.(*int))

case <-time.After(3e9):
fmt.Println("time out")
}
}

1
2
3
4
$ go run server/main.go &
$ go run client/main.go
1190
2

程序中我们没有看到参数的序列化和反序列化过程,实际上go标准库rpc使用encoding/gob做了序列化和反序列化操作,但是encoding/gob只支持go语言内部做交互,如果需要夸语言的话就不能用encoding/gob了。我们还可以使用标准库中的jsonrpc.

server

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
package main

import (
"errors"
"log"
"net"
"net/rpc"
"net/rpc/jsonrpc"
)

type Param struct {
A,B int
}
type Server struct {}

func (t *Server) Multiply(args *Param, reply *int) error {
*reply = args.A * args.B
return nil
}
func (t *Server) Divide(args *Param, reply *int) error {
if args.B == 0 {
return errors.New("divide by zero")
}
*reply = args.A / args.B
return nil
}

func main() {
rpc.RegisterName("test", new(Server))
listener, err := net.Listen("tcp", ":9700")
if err != nil {
log.Fatal("ListenTCP error:", err)
}
for {
conn, err := listener.Accept()
if err != nil {
log.Fatal("Accept error:", err)
}

go rpc.ServeCodec(jsonrpc.NewServerCodec(conn))
}
}

1
2
3
4
5
$ go run server/main.go &
$ echo -e '{"method":"test.Multiply","params":[{"A":34,"B":35}],"id":0}' | nc localhost 9700
{"id":0,"result":1190,"error":null}
$ echo -e '{"method":"test.Divide","params":[{"A":34,"B":17}],"id":1}' | nc localhost 9700
{"id":0,"result":1190,"error":null}

这个例子中我们用go写了服务端,客户端我们使用linux 命令行工具nc(natcat可能需要单独安装)直接和服务端交互。可以看到交互的序列化方式是json,因此其它语言也很容易实现和该服务端的交互。

当然我们上面演示的是基于tcp的rpc方式,标准库同时也支持http协议的rpc,感兴趣的同学可以去了解下,这边就不做展开介绍。

GRPC

上面我们发了一些篇幅介绍了标准库的rpc,主要的目的是了解rpc是什么。实际的开发中我们反而比较少用标准库的rpc,我们通常会选择grpc,grpc不仅仅在go生态里头非常流行,在其他语言生态里头同样也非常流行。

grpc是google公司开发的基于http2协议设计和protobuf开发的,高性能,跨语言的rpc框架。这边着重介绍下http2的一个重要特性当tcp多路复用功能,说得直白点就是一个在tcp链接执行多个请求,所以Service A提供N多个服务,Client B和A的所有交互都只用一个链接,这样子可以省很多的链接资源。对tcp连接复用( connection multiplexing)感兴趣的同学可以阅读下yamux的源码。

protobuf

protobuf是google开发的一种平台中立,语言中立,可拓展的数据描述语言。类似json,xml等数据描述语言类似,proto也实现了自己的序列化和反序列化方式,对相对于json来说,protobuf序列化效率更高,体积更小。官方地址protobuf有兴趣的同学可以看看。

protobuf安装

我们可以从protobuf/releases里头下载到相应的二进制版本安装protobuf。比如

1
2
protoc-3.10.0-win32.zip
protoc-3.10.0-linux-x86_64.zip
解压文件到文件夹,然后将该文件目录添加到环境变量PATH中。
1
2
$ protoc --version
libprotoc 3.10.0
ok protobuf就安装成功了。

安装protobuf go插件

1
2
go get -u -v github.com/golang/protobuf/proto
go get -u -v github.com/golang/protobuf/protoc-gen-go

hello world

新建一个go项目

1
2
3
4
5
6
7
├── client
│ └── main.go
├── pb
│ ├── gen.sh
│ └── search.proto
└── server
└── main.go

protobuf 消息定义文件为 pb/search.proto

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
syntax = "proto3";

package pb;

service Searcher {
rpc Search (SearchRequest) returns (SearchReply) {}
}

message SearchRequest {
bool name = 1;
}

message SearchReply {
string name = 1;
}
这个文件我定义了rpc的服务Searcher,里头定了一个接口Search,同时定义了入参和返回值类型。

我们写一个shell脚本gen.sh来生成go程序文件

1
2
3
#!/bin/bash
PROTOC=`which protoc`
$PROTOC --go_out=plugins=grpc:. search.proto

cd 到pb文件夹执行 gen.sh 脚本

1
2
3
4
5
6
7
8
9
10
$ ./gen.sh
$ cd ../ && tree
├── client
│ └── main.go
├── pb
│ ├── gen.sh
│ ├── search.pb.go
│ └── search.proto
└── server
└── main.go
可以看到我们多了一个 search.pb.go的文件。 我们在server文件下写我们服务main.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
package main
import (
"log"
"net"
"context"
"github.com/widaT/gorpc_demo/grpc/pb"
"google.golang.org/grpc"
)

const (
port = ":50051"
)

type server struct{}

func (s *server) Search(ctx context.Context, in *pb.SearchRequest) (*pb.SearchReply, error) {
return &pb.SearchReply{Name:"hello " + in.GetName()}, nil
}
func main() {
lis, err := net.Listen("tcp", port)
if err != nil {
log.Fatalf("failed to listen: %v", err)
}
s := grpc.NewServer()
pb.RegisterSearcherServer(s, &server{})
s.Serve(lis)
}

我们在client文件下写我们的客户端main.go

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
package main

import (
"context"
"fmt"
"github.com/widaT/gorpc_demo/grpc/pb"
"google.golang.org/grpc"
"log"
"time"
)

const (
address = "localhost:50051"
)

func main() {
// Set up a connection to the server.
conn, err := grpc.Dial(address, grpc.WithInsecure())
if err != nil {
log.Fatalf("did not connect: %v", err)
}
defer conn.Close()
c := pb.NewSearcherClient(conn)
s := time.Now()
r, err := c.Search(context.Background(), &pb.SearchRequest{Name: "world"})
if err != nil {
log.Fatalf("could not greet: %v", err)
}
fmt.Println(r , time.Now().Sub(s))
}

1
2
3
$ go run server/main.go &
$ go run client/main.go &
name:"hello world" 14.767013ms

grpc stream

上面的hello world程序演示了grpc的一般用法,这种方式能满足大部分的场景。grpc还提供了双向或者单向流的方式我们成为grpc stream,stream的方式一般用在有大量的数据交互或者长时间交互。

我们修改的grpc服务定义文件pb/search.proto,在service增加 rpc Search2 (stream SearchRequest) returns (stream SearchReply) {}

1
2
3
4
5
6
7
8
9
10
11
12
syntax = "proto3";
package pb;
service Searcher {
rpc Search (SearchRequest) returns (SearchReply) {}
rpc Search2 (stream SearchRequest) returns (stream SearchReply) {}
}
message SearchRequest {
string name = 1;
}
message SearchReply {
string name = 1;
}

服务端我们修改代码,添加一个func (s *server) Search2(pb.Searcher_Search2Server) error的一个实现方法。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
package main

import (
"io"
"log"
"net"

"context"
"github.com/widaT/gorpc_demo/grpc/pb"
"google.golang.org/grpc"
)

const (
port = ":50051"
)

type server struct{}

func (s *server) Search(ctx context.Context, in *pb.SearchRequest) (*pb.SearchReply, error) {
return &pb.SearchReply{Name:"hello " + in.GetName()}, nil
}
func (s *server) Search2(stream pb.Searcher_Search2Server) error {
for {
args, err := stream.Recv()
if err != nil {
if err == io.EOF {
return nil
}
return err
}
reply := &pb.SearchReply{ Name:"hello:" + args.GetName()}
err = stream.Send(reply)
if err != nil {
return err
}
}
return nil
}

func main() {
lis, err := net.Listen("tcp", port)
if err != nil {
log.Fatalf("failed to listen: %v", err)
}
s := grpc.NewServer()
pb.RegisterSearcherServer(s, &server{})
s.Serve(lis)
}

为了方便阅读代码,client代码我们重写

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
package main

import (
"context"
"fmt"
"github.com/widaT/gorpc_demo/grpc/pb"
"google.golang.org/grpc"
"io"
"log"
"time"
)

const (
address = "localhost:50051"
)

func main() {
conn, err := grpc.Dial(address, grpc.WithInsecure())
if err != nil {
log.Fatalf("did not connect: %v", err)
}
defer conn.Close()
c := pb.NewSearcherClient(conn)

stream, err := c.Search2(context.Background())

if err != nil {
log.Fatal(err)
}

go func() { //这边启动一个goroutine 发送请求
for {
if err := stream.Send(&pb.SearchRequest{Name: "world"}); err != nil {
log.Fatal(err)
}
time.Sleep(time.Second)
}
}()

for { //主goroutine 一直接收结果
rep, err := stream.Recv()
if err != nil {
if err == io.EOF {
break
}
log.Fatal(err)
}
fmt.Println(rep.GetName())
}
}

运行代码

1
2
3
4
5
6
$ go run server/main.go &
$ go run client/main.go
hello:world
hello:world
hello:world
...

总结

本文只是对golang使用grpc做了简要的介绍,grpc的使用范围很广,需要我们持续了解和学习。 下面推荐个资料 - grpc go官方examples - 《Go语言高级编程》RPC和Protobuf

参考文档

作者

ฅ´ω`ฅ

发布于

2022-04-06

更新于

2022-04-06

许可协议


评论