rpc-远程服务调用, 用于分布式系统服务信息通信。

RPC基础

基于g的RPC-C/S实现

  1. service: 服务端功能RPC服务开发
  2. client: 客户端调用RPC服务开发
  3. protocol: C/S-RPC接口规范制定
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
import (
    "fmt"
    "log"
    "net"
    "net/rpc"
)

type RpcService struct{}

// RpcServiceFunc rpc服务函数定义
// 即rpc服务函数必须实现rpc的接口,这样就能将一般函数转化成rpc函数
// Func(req string, rep *string) error
func (s *RpcService) RpcServiceFunc(request string, replay *string) error {
    *replay = "service_func:" + request
    return nil
}

// RegisterRpcService 将对象类型中所有满足RPC规则的对象方法注册为RPC函数
// name注册空间的命名空间, 会将rpc服务保存在name标签下的空间内, 用于服务发现和方法调用
func RegisterRpcService(name string, rcvr any) error {
    return rpc.RegisterName(name, rcvr)
}

// StartTcpListenService 启动TCP监听服务
func StartTcpListenService(addr string) net.Conn {
    listener, err := net.Listen("tcp", addr)
    if err != nil {
        log.Fatal("ListenTCP error:", err)
    }
    conn, err := listener.Accept()
    if err != nil {
        log.Fatal("Accept error:", err)
    }
    return conn
}

var addr = "127.0.0.1:8080"

func RunRpcServiceByTcp() {
    err := RegisterRpcService("RpcService", new(RpcService))
    if err != nil {
        log.Fatal("Register Rcp error: ", err)
    }
    tcpConn := StartTcpListenService(addr)
    //rpc连接tcp提供rpc服务
    rpc.ServeConn(tcpConn)
}

func RunRpcClientByTcp() {
    // 客户端拨号
    client, err := rpc.Dial("tcp", addr)
    if err != nil {
        log.Fatal("dialing:", err)
    }

    var reply string
    // RpcService.RpcServiceFunc ==> [RPC服务名字.方法名字]
    // hello rpc ==> 消息体【可序列化】
    // replay ==> call RPC 服务返回内容
    err = client.Call("RpcService.RpcServiceFunc", "hello rpc", &reply)
    if err != nil {
        log.Fatal(err)
    }

    fmt.Println(reply)
}

测试

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
import "testing"

func TestRpcService(t *testing.T) {
    RunRpcServiceByTcp()
}

func TestRpcClient(t *testing.T) {
    RunRpcClientByTcp()
}
// replay
// service_func:hello rpc

重构

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
const RpcServiceName = "zrpc/rpcs.RpcService"

type RpcInterface interface {
    RpcServiceFunc(request string, replay *string) error
}

func RegisterRpcSvr(svrName string, svc RpcInterface) error {
    return rpc.RegisterName(svrName, svc)
}

// ----------- service 重构 ---------------

type RpcService struct{}

func (s *RpcService) RpcServiceFunc(request string, replay *string) error {
    *replay = "service_func:" + request
    return nil
}

func (s *RpcService) RpcServiceName() string {
    return RpcServiceName
}

func RunService() {
    rpcSvr := new(RpcService)
    _ = RegisterRpcSvr(rpcSvr.RpcServiceName(), rpcSvr)

    listener, err := net.Listen("tcp", addr)
    if err != nil {
        log.Fatal("ListenTCP error:", err)
    }

    for {
        conn, err := listener.Accept()
        if err != nil {
            log.Fatal("Accept error:", err)
        }

        go rpc.ServeConn(conn)
    }

}

// ---------- client 重构 ------------------

type RpcClient struct {
    *rpc.Client
}

var _ RpcInterface = (*RpcClient)(nil)

func (c *RpcClient) RpcServiceFunc(request string, replay *string) error {
    return c.Client.Call(RpcServiceName+".RpcServiceFunc", request, replay)
}

func DialRpcClient(network, address string) (*RpcClient, error) {
    client, err := rpc.Dial(network, address)
    if err != nil {
        return nil, err
    }
    return &RpcClient{client}, nil
}

func RunClient() {
    client, err := DialRpcClient("tcp", addr)
    if err != nil {
        log.Fatal("dialing:", err)
    }

    var reply string
    err = client.RpcServiceFunc("hello rpc", &reply)
    if err != nil {
        log.Fatal(err)
    }

    fmt.Println(reply)
}

RPC的跨语言编码

  • go的rpc数据打包可以通过插件实现自定义编码和解码
  • go的rpc建立在io.REadWriteCloser接口上,可以使用多种通讯协议

服务端

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
for {
    conn, err := listener.Accept()
    if err != nil {
        log.Fatal("Accept error:", err)
    }

    //go rpc.ServeConn(conn)
    //rpc.ServeCodec函数实现自定义rpc服务端编解码
    //通过net/rpc/jsonrpc实现json编解码器
    go rpc.ServeCodec(jsonrpc.NewServerCodec(conn))
}

客户端

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
func DialRpcClient(network, address string) (*RpcClient, error) {
    //client, err := rpc.Dial(network, address)
    //if err != nil {
    //	return nil, err
    //}
    //return &RpcClient{client}, nil

    //通过net实现指定协议的网络连接
    conn, err := net.Dial(network, address)
    if err != nil {
        return nil, err
    }
    //创建带有json编解码的rpc客户端
    client := rpc.NewClientWithCodec(jsonrpc.NewClientCodec(conn))
    return &RpcClient{client}, nil
}

Protobuf编码

Prorobuf是一种结构化数据存储格式,二进制方式存储,体积小,作为接口规范的描述语言,用于跨语言的RPC接口定义。

通过protoc工具,将.ptoto文件转换成多种语言代码。

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
# Linux 环境下安装
# 下载安装包
wget https://github.com/protocolbuffers/protobuf/releases/download/v3.11.2/protoc-3.11.2-linux-x86_64.zip
# 解压到 /usr/local 目录下
sudo 7z x protoc-3.11.2-linux-x86_64.zip -o/usr/local
# 解压路径bin目录加入环境变量
protoc --version
# libprotoc 3.19.4

# 安装protoc-gen-go插件 用于将.proto文件转换成go工程代码
go get -u github.com/golang/protobuf/protoc-gen-go
# 将$GOPATH/bin下的protoc-gen-go也加入环境变量