写点什么

300 行 Go 代码玩转 RPC

  • 2019-11-14
  • 本文字数:4311 字

    阅读完需:约 14 分钟

300行Go代码玩转RPC

最近,小编一直在研究 RPC 的原理及实现方式。在本篇文章中将通过用 300 行纯 Golang 编写简单的 RPC 框架来解释 RPC。希望能帮助大家梳理 RPC 相关知识点。


我们通过从头开始在 Golang 中构建一个简单的 RPC 框架来学习 RPC 基础构成。

1 什么是 RPC

简单地说,服务 A 想调用服务 B 的函数。但是这两个服务不在同一个内存空间中。所以不能直接调用它。


因此,为了实现这个调用,我们需要表达如何调用以及如何通过网络传递通信的语义。


让我们考虑一下,当我们可以在相同的内存空间(本地调用)中运行时,我们要怎么做。


type User struct {  Name string  Age int}
var userDB = map[int]User{ 1: User{"Ankur", 85}, 9: User{"Anand", 25}, 8: User{"Ankur Anand", 27},}

func QueryUser(id int) (User, error) { if u, ok := userDB[id]; ok { return u, nil }
return User{}, fmt.Errorf("id %d not in user db", id)}

func main() { u , err := QueryUser(8) if err != nil { fmt.Println(err) return }
fmt.Printf("name: %s, age: %d \n", u.Name, u.Age)}
复制代码


现在我们如何在网络上进行相同的函数调用


客户端将通过网络调用 QueryUser(id int) 函数,并且将有一个服务端提供对该函数的调用,并返回响应 User{“Name”, id}, nil。

2 网络传输数据格式

我们将采用 TLV(定长报头+变长消息体)编码方案来规范 tcp 上的数据传输。稍后会详细介绍


在通过网络发送数据之前,我们需要定义如何通过网络发送数据的结构。


这有助于我们定义一个通用协议,客户端和服务端都可以理解这个协议。(protobuf IDL 定义了服务端和客户端都能理解的内容)。


因此,服务端接收到的数据、要调用的函数名和参数列表,或者来自客户端的数据都需要传递这些参数。


另外,让我们约定第二个返回值的类型为 error,表示 RPC 调用结果。


// RPC数据传输格式type RPCdata struct {  Name string        // name of the function  Args []interface{} // request's or response's body expect error.  Err  string        // Error any executing remote server}
复制代码


现在我们有了一个格式,我们需要序列化它以便我们可以通过网络发送它。在本例中,我们将使用 go 默认的二进制序列化协议进行编码和解码。


// be sent over the network.func Encode(data RPCdata) ([]byte, error) {  var buf bytes.Buffer  encoder := gob.NewEncoder(&buf)  if err := encoder.Encode(data); err != nil {    return nil, err  }  return buf.Bytes(), nil}
// Decode the binary data into the Go structfunc Decode(b []byte) (RPCdata, error) { buf := bytes.NewBuffer(b) decoder := gob.NewDecoder(buf) var data RPCdata if err := decoder.Decode(&data); err != nil { return Data{}, err } return data, nil}
复制代码

3 网络传输

选择 TLV 协议的原因是由于其非常容易实现,同时也完成了我们需要识别的数据读取的长度,因为我们需要确定这个请求读取的字节数的传入请求流。发送和接收都执行相同的操作。


// Transport will use TLV protocoltype Transport struct {  conn net.Conn // Conn is a generic stream-oriented network connection.}
// NewTransport creates a Transportfunc NewTransport(conn net.Conn) *Transport { return &Transport{conn}}
// Send TLV data over the networkfunc (t *Transport) Send(data []byte) error { // we will need 4 more byte then the len of data // as TLV header is 4bytes and in this header // we will encode how much byte of data // we are sending for this request. buf := make([]byte, 4+len(data)) binary.BigEndian.PutUint32(buf[:4], uint32(len(data))) copy(buf[4:], data) _, err := t.conn.Write(buf) if err != nil { return err } return nil}
// Read TLV sent over the wirefunc (t *Transport) Read() ([]byte, error) { header := make([]byte, 4) _, err := io.ReadFull(t.conn, header) if err != nil { return nil, err } dataLen := binary.BigEndian.Uint32(header) data := make([]byte, dataLen) _, err = io.ReadFull(t.conn, data) if err != nil { return nil, err } return data, nil}
复制代码


现在我们已经定义了数据格式和传输协议。下面我们还需要 RPC 服务器和 RPC 客户端的实现。

4 RPC 服务器

RPC 服务器将接收具有函数名的 RPCData。因此,我们需要维护和映射包含函数名到实际函数映射的函数


// RPCServer ...type RPCServer struct {  addr string  funcs map[string] reflect.Value}
// Register the name of the function and its entriesfunc (s *RPCServer) Register(fnName string, fFunc interface{}) { if _,ok := s.funcs[fnName]; ok { return }
s.funcs[fnName] = reflect.ValueOf(fFunc)}
复制代码


现在我们已经注册了 func,当我们收到请求时,我们将检查函数执行期间传递的 func 的名称是否存在。然后执行相应的操作


// Execute the given function if presentfunc (s *RPCServer) Execute(req RPCdata) RPCdata {  // get method by name  f, ok := s.funcs[req.Name]  if !ok {    // since method is not present    e := fmt.Sprintf("func %s not Registered", req.Name)    log.Println(e)    return RPCdata{Name: req.Name, Args: nil, Err: e}  }
log.Printf("func %s is called\n", req.Name) // unpackage request arguments inArgs := make([]reflect.Value, len(req.Args)) for i := range req.Args { inArgs[i] = reflect.ValueOf(req.Args[i]) }
// invoke requested method out := f.Call(inArgs) // now since we have followed the function signature style where last argument will be an error // so we will pack the response arguments expect error. resArgs := make([]interface{}, len(out) - 1) for i := 0; i < len(out) - 1; i ++ { // Interface returns the constant value stored in v as an interface{}. resArgs[i] = out[i].Interface() }
// pack error argument var er string if e, ok := out[len(out) - 1].Interface().(error); ok { // convert the error into error string value er = e.Error() } return RPCdata{Name: req.Name, Args: resArgs, Err: er}}
复制代码

5 RPC 客户端

由于函数的具体实现在服务器端,客户端只有函数的原型,所以我们需要调用函数的完整原型,这样我们才能调用它。


func (c *Client) callRPC(rpcName string, fPtr interface{}) {  container := reflect.ValueOf(fPtr).Elem()  f := func(req []reflect.Value) []reflect.Value {    cReqTransport := NewTransport(c.conn)    errorHandler := func(err error) []reflect.Value {      outArgs := make([]reflect.Value, container.Type().NumOut())      for i := 0; i < len(outArgs)-1; i++ {        outArgs[i] = reflect.Zero(container.Type().Out(i))      }      outArgs[len(outArgs)-1] = reflect.ValueOf(&err).Elem()      return outArgs    }
// Process input parameters inArgs := make([]interface{}, 0, len(req)) for _, arg := range req { inArgs = append(inArgs, arg.Interface()) }
// ReqRPC reqRPC := RPCdata{Name: rpcName, Args: inArgs} b, err := Encode(reqRPC) if err != nil { panic(err) } err = cReqTransport.Send(b) if err != nil { return errorHandler(err) } // receive response from server rsp, err := cReqTransport.Read() if err != nil { // local network error or decode error return errorHandler(err) } rspDecode, _ := Decode(rsp) if rspDecode.Err != "" { // remote server error return errorHandler(errors.New(rspDecode.Err)) }
if len(rspDecode.Args) == 0 { rspDecode.Args = make([]interface{}, container.Type().NumOut()) } // unpackage response arguments numOut := container.Type().NumOut() outArgs := make([]reflect.Value, numOut) for i := 0; i < numOut; i++ { if i != numOut-1 { // unpackage arguments (except error) if rspDecode.Args[i] == nil { // if argument is nil (gob will ignore "Zero" in transmission), set "Zero" value outArgs[i] = reflect.Zero(container.Type().Out(i)) } else { outArgs[i] = reflect.ValueOf(rspDecode.Args[i]) } } else { // unpackage error argument outArgs[i] = reflect.Zero(container.Type().Out(i)) } }
return outArgs } container.Set(reflect.MakeFunc(container.Type(), f))}
复制代码

6 测试一下我们的框架

package main
import ( "encoding/gob" "fmt" "net")
type User struct { Name string Age int}
var userDB = map[int]User{ 1: User{"Ankur", 85}, 9: User{"Anand", 25}, 8: User{"Ankur Anand", 27},}
func QueryUser(id int) (User, error) { if u, ok := userDB[id]; ok { return u, nil }
return User{}, fmt.Errorf("id %d not in user db", id)}
func main() { // new Type needs to be registered gob.Register(User{}) addr := "localhost:3212" srv := NewServer(addr)
// start server srv.Register("QueryUser", QueryUser) go srv.Run()
// wait for server to start. time.Sleep(1 * time.Second)
// start client conn, err := net.Dial("tcp", addr) if err != nil { panic(err) } cli := NewClient(conn)
var Query func(int) (User, error) cli.callRPC("QueryUser", &Query)
u, err := Query(1) if err != nil { panic(err) } fmt.Println(u)
u2, err := Query(8) if err != nil { panic(err) } fmt.Println(u2)}
复制代码


执行:go run main.go


输出内容


2019/07/23 20:26:18 func QueryUser is called{Ankur 85}2019/07/23 20:26:18 func QueryUser is called{Ankur Anand 27}
复制代码

总结

致此我们简单的 RPC 框架就实现完成了,旨在帮大家理解 RPC 的原理及上手简单实践。如果大家对这篇文章中所讲内容有异议,或者想进一步讨论,请留言回复。


本文转载自公众号 360 云计算(ID:hulktalk)。


原文链接:


https://mp.weixin.qq.com/s/fxrocOMLX7kqUH9lP94JpA


2019-11-14 17:341448

评论 1 条评论

发布
用户头像
请问有源码吗,少了Run方法
2020-07-28 11:14
回复
没有更多了
发现更多内容

MatrixOne混沌测试之道

MatrixOrigin

数据库 分布式 混沌测试

小程序技术能否成为移动应用市场新机遇?

Speedoooo

小程序 小程序容器 App生态

当代人假期这几种行为,你中了几条?

天翼云开发者社区

菲尔兹奖得主小平邦彦:数学是什么?

图灵社区

数学

给传统零售企业穿上“云武装”!

天翼云开发者社区

菲尔兹奖得主小平邦彦:数学是什么?

图灵教育

数学

坐标中国|中国速度,挑战极限驱动发展“快车”

天翼云开发者社区

Linux下文件目录权限操作

DS小龙哥

10月月更

Linux下automake工具使用(自动构建Makefile文件)

DS小龙哥

10月月更

Linux下Shell脚本基础语法

DS小龙哥

10月月更

阿里p8免费公开五份Java架构师学习手册,助力金九银十

小二,上酒上酒

Java 架构 阿里

Github三天点击破亿,四天助力金九银十,精通SpringCloud微服务架构,成就大厂梦

小二,上酒上酒

Java spring 编程 Spring Cloud

一文透彻理解微服务架构及相关组件

程序员小毕

Java 程序员 面试 微服务架构 程序人生

阿里内部力荐Spring生态全家桶,务必每个程序员人手一份

小二,上酒上酒

Java 阿里 大厂

5年大厂开发经验,加上这份Java高性能架构笔记,终于拿到了架构师薪资

小二,上酒上酒

Java 大厂 大厂面试 Java面试题

MASA MAUI Plugin IOS蓝牙低功耗(三)蓝牙扫描

MASA技术团队

MASA MAUI Xamarin MASA Blazor

浅析小程序插件

Speedoooo

小程序 插件 小程序容器

idea多模块启动

拾光师

IDEA 10月月更

八月裸辞,九月疫情在家闭关狂刷面试题,十月成功上岸京东物流

小二,上酒上酒

Java 阿里

再不看就来不及了,腾讯Spring Boot高阶笔记,限时开源48小时

小二,上酒上酒

Java 面试 大厂

阿里内部JVM G1GC纯手写学习笔记,你确定看得完?

小二,上酒上酒

编程 JVM 马士兵

从无到有,一步一步教你搭建微服务电商项目,包含笔记+视频+源码

小二,上酒上酒

微服务

Linux下基础命令(二)

DS小龙哥

10月月更

数据库故障处理优质文章汇总(含Oracle、MySQL、MogDB等)

墨天轮

MySQL 数据库 oracle 故障定位 国产数据库

带你读AI论文丨ACGAN-动漫头像生成

华为云开发者联盟

神经网络 GAN AI论文 ACGAN-动漫头像 企业号十月 PK 榜

玩转云端| 提升边缘应用交付效率,天翼云Serverless边缘容器有妙招

天翼云开发者社区

阿里内部独家Java架构面试题,面试再不过来找我

小二,上酒上酒

MySQL spring JVM 多线程 MQ

Linux系统下基础命令介绍

DS小龙哥

10月月更

百度App性能优化工具篇 - Thor原理及实践

百度Geek说

Java App 企业号十月 PK 榜

大闸蟹套路多?“码”上溯源让你安心吃蟹!

旺链科技

区块链 产业区块链 大闸蟹

300行Go代码玩转RPC_文化 & 方法_360云计算_InfoQ精选文章