写点什么

300 行 Go 代码玩转 RPC

  • 2019-11-14
  • 本文字数:4311 字

    阅读完需:约 14 分钟

300行Go代码玩转RPC

最近,小编一直在研究 RPC 的原理及实现方式。在本篇文章中将通过用 300 行纯 Golang 编写简单的 RPC 框架来解释 RPC。希望能帮助大家梳理 RPC 相关知识点。


我们通过从头开始在 Golang 中构建一个简单的 RPC 框架来学习 RPC 基础构成。

1 什么是 RPC

简单地说,服务 A 想调用服务 B 的函数。但是这两个服务不在同一个内存空间中。所以不能直接调用它。


因此,为了实现这个调用,我们需要表达如何调用以及如何通过网络传递通信的语义。


让我们考虑一下,当我们可以在相同的内存空间(本地调用)中运行时,我们要怎么做。


type User struct {  Name string  Age int}
var userDB = map[int]User{ 1: User{"Ankur", 85}, 9: User{"Anand", 25}, 8: User{"Ankur Anand", 27},}

func QueryUser(id int) (User, error) { if u, ok := userDB[id]; ok { return u, nil }
return User{}, fmt.Errorf("id %d not in user db", id)}

func main() { u , err := QueryUser(8) if err != nil { fmt.Println(err) return }
fmt.Printf("name: %s, age: %d \n", u.Name, u.Age)}
复制代码


现在我们如何在网络上进行相同的函数调用


客户端将通过网络调用 QueryUser(id int) 函数,并且将有一个服务端提供对该函数的调用,并返回响应 User{“Name”, id}, nil。

2 网络传输数据格式

我们将采用 TLV(定长报头+变长消息体)编码方案来规范 tcp 上的数据传输。稍后会详细介绍


在通过网络发送数据之前,我们需要定义如何通过网络发送数据的结构。


这有助于我们定义一个通用协议,客户端和服务端都可以理解这个协议。(protobuf IDL 定义了服务端和客户端都能理解的内容)。


因此,服务端接收到的数据、要调用的函数名和参数列表,或者来自客户端的数据都需要传递这些参数。


另外,让我们约定第二个返回值的类型为 error,表示 RPC 调用结果。


// RPC数据传输格式type RPCdata struct {  Name string        // name of the function  Args []interface{} // request's or response's body expect error.  Err  string        // Error any executing remote server}
复制代码


现在我们有了一个格式,我们需要序列化它以便我们可以通过网络发送它。在本例中,我们将使用 go 默认的二进制序列化协议进行编码和解码。


// be sent over the network.func Encode(data RPCdata) ([]byte, error) {  var buf bytes.Buffer  encoder := gob.NewEncoder(&buf)  if err := encoder.Encode(data); err != nil {    return nil, err  }  return buf.Bytes(), nil}
// Decode the binary data into the Go structfunc Decode(b []byte) (RPCdata, error) { buf := bytes.NewBuffer(b) decoder := gob.NewDecoder(buf) var data RPCdata if err := decoder.Decode(&data); err != nil { return Data{}, err } return data, nil}
复制代码

3 网络传输

选择 TLV 协议的原因是由于其非常容易实现,同时也完成了我们需要识别的数据读取的长度,因为我们需要确定这个请求读取的字节数的传入请求流。发送和接收都执行相同的操作。


// Transport will use TLV protocoltype Transport struct {  conn net.Conn // Conn is a generic stream-oriented network connection.}
// NewTransport creates a Transportfunc NewTransport(conn net.Conn) *Transport { return &Transport{conn}}
// Send TLV data over the networkfunc (t *Transport) Send(data []byte) error { // we will need 4 more byte then the len of data // as TLV header is 4bytes and in this header // we will encode how much byte of data // we are sending for this request. buf := make([]byte, 4+len(data)) binary.BigEndian.PutUint32(buf[:4], uint32(len(data))) copy(buf[4:], data) _, err := t.conn.Write(buf) if err != nil { return err } return nil}
// Read TLV sent over the wirefunc (t *Transport) Read() ([]byte, error) { header := make([]byte, 4) _, err := io.ReadFull(t.conn, header) if err != nil { return nil, err } dataLen := binary.BigEndian.Uint32(header) data := make([]byte, dataLen) _, err = io.ReadFull(t.conn, data) if err != nil { return nil, err } return data, nil}
复制代码


现在我们已经定义了数据格式和传输协议。下面我们还需要 RPC 服务器和 RPC 客户端的实现。

4 RPC 服务器

RPC 服务器将接收具有函数名的 RPCData。因此,我们需要维护和映射包含函数名到实际函数映射的函数


// RPCServer ...type RPCServer struct {  addr string  funcs map[string] reflect.Value}
// Register the name of the function and its entriesfunc (s *RPCServer) Register(fnName string, fFunc interface{}) { if _,ok := s.funcs[fnName]; ok { return }
s.funcs[fnName] = reflect.ValueOf(fFunc)}
复制代码


现在我们已经注册了 func,当我们收到请求时,我们将检查函数执行期间传递的 func 的名称是否存在。然后执行相应的操作


// Execute the given function if presentfunc (s *RPCServer) Execute(req RPCdata) RPCdata {  // get method by name  f, ok := s.funcs[req.Name]  if !ok {    // since method is not present    e := fmt.Sprintf("func %s not Registered", req.Name)    log.Println(e)    return RPCdata{Name: req.Name, Args: nil, Err: e}  }
log.Printf("func %s is called\n", req.Name) // unpackage request arguments inArgs := make([]reflect.Value, len(req.Args)) for i := range req.Args { inArgs[i] = reflect.ValueOf(req.Args[i]) }
// invoke requested method out := f.Call(inArgs) // now since we have followed the function signature style where last argument will be an error // so we will pack the response arguments expect error. resArgs := make([]interface{}, len(out) - 1) for i := 0; i < len(out) - 1; i ++ { // Interface returns the constant value stored in v as an interface{}. resArgs[i] = out[i].Interface() }
// pack error argument var er string if e, ok := out[len(out) - 1].Interface().(error); ok { // convert the error into error string value er = e.Error() } return RPCdata{Name: req.Name, Args: resArgs, Err: er}}
复制代码

5 RPC 客户端

由于函数的具体实现在服务器端,客户端只有函数的原型,所以我们需要调用函数的完整原型,这样我们才能调用它。


func (c *Client) callRPC(rpcName string, fPtr interface{}) {  container := reflect.ValueOf(fPtr).Elem()  f := func(req []reflect.Value) []reflect.Value {    cReqTransport := NewTransport(c.conn)    errorHandler := func(err error) []reflect.Value {      outArgs := make([]reflect.Value, container.Type().NumOut())      for i := 0; i < len(outArgs)-1; i++ {        outArgs[i] = reflect.Zero(container.Type().Out(i))      }      outArgs[len(outArgs)-1] = reflect.ValueOf(&err).Elem()      return outArgs    }
// Process input parameters inArgs := make([]interface{}, 0, len(req)) for _, arg := range req { inArgs = append(inArgs, arg.Interface()) }
// ReqRPC reqRPC := RPCdata{Name: rpcName, Args: inArgs} b, err := Encode(reqRPC) if err != nil { panic(err) } err = cReqTransport.Send(b) if err != nil { return errorHandler(err) } // receive response from server rsp, err := cReqTransport.Read() if err != nil { // local network error or decode error return errorHandler(err) } rspDecode, _ := Decode(rsp) if rspDecode.Err != "" { // remote server error return errorHandler(errors.New(rspDecode.Err)) }
if len(rspDecode.Args) == 0 { rspDecode.Args = make([]interface{}, container.Type().NumOut()) } // unpackage response arguments numOut := container.Type().NumOut() outArgs := make([]reflect.Value, numOut) for i := 0; i < numOut; i++ { if i != numOut-1 { // unpackage arguments (except error) if rspDecode.Args[i] == nil { // if argument is nil (gob will ignore "Zero" in transmission), set "Zero" value outArgs[i] = reflect.Zero(container.Type().Out(i)) } else { outArgs[i] = reflect.ValueOf(rspDecode.Args[i]) } } else { // unpackage error argument outArgs[i] = reflect.Zero(container.Type().Out(i)) } }
return outArgs } container.Set(reflect.MakeFunc(container.Type(), f))}
复制代码

6 测试一下我们的框架

package main
import ( "encoding/gob" "fmt" "net")
type User struct { Name string Age int}
var userDB = map[int]User{ 1: User{"Ankur", 85}, 9: User{"Anand", 25}, 8: User{"Ankur Anand", 27},}
func QueryUser(id int) (User, error) { if u, ok := userDB[id]; ok { return u, nil }
return User{}, fmt.Errorf("id %d not in user db", id)}
func main() { // new Type needs to be registered gob.Register(User{}) addr := "localhost:3212" srv := NewServer(addr)
// start server srv.Register("QueryUser", QueryUser) go srv.Run()
// wait for server to start. time.Sleep(1 * time.Second)
// start client conn, err := net.Dial("tcp", addr) if err != nil { panic(err) } cli := NewClient(conn)
var Query func(int) (User, error) cli.callRPC("QueryUser", &Query)
u, err := Query(1) if err != nil { panic(err) } fmt.Println(u)
u2, err := Query(8) if err != nil { panic(err) } fmt.Println(u2)}
复制代码


执行:go run main.go


输出内容


2019/07/23 20:26:18 func QueryUser is called{Ankur 85}2019/07/23 20:26:18 func QueryUser is called{Ankur Anand 27}
复制代码

总结

致此我们简单的 RPC 框架就实现完成了,旨在帮大家理解 RPC 的原理及上手简单实践。如果大家对这篇文章中所讲内容有异议,或者想进一步讨论,请留言回复。


本文转载自公众号 360 云计算(ID:hulktalk)。


原文链接:


https://mp.weixin.qq.com/s/fxrocOMLX7kqUH9lP94JpA


2019-11-14 17:341342

评论 1 条评论

发布
用户头像
请问有源码吗,少了Run方法
2020-07-28 11:14
回复
没有更多了
发现更多内容

精髓!不愧为京东内部 Spring Boot 全解笔记

程序知音

Java 微服务 后端 springboot Java进阶

亮点预告!金蝶云·苍穹技术开放日第五期AI专场邀你围观!

金蝶云·苍穹

AI RPA 直播 企业云服务 ChatGPT

袋鼠云春季生长大会圆满落幕,带来数实融合下的新产品、新方案、新实践!

袋鼠云数栈

数字化转型

架构误区系列16:不可靠的幂等

agnostic

幂等设计

CorelDRAW2023发布!详解七大新功能

茶色酒

CorelDraw2023

【Python实战】Python对中国500强排行榜数据进行可视化分析

BROKEN

三周年连更

拥抱Serverless释放生产力,探索华为云Serverless车联网最佳实践

华为云开发者联盟

Serverless 车联网 华为云 华为云开发者联盟 企业号 4 月 PK 榜

Cloud Kernel SIG月度动态:发布 Anolis 8.8 镜像、kABI 社区共建流程

OpenAnolis小助手

镜像 龙蜥社区 sig kernel 月报

预训练对话大模型深度解读

轻口味

AI 大模型 三周年连更

各行业常见的业务指标汇总(数据分析常用数据指标)

Data 探险实验室

数据分析 数据分析师 数据指标 指标中台; 数据分析 指标洞察

阿里最新 23版 Java 面试系列手册,竟堪称 GitHub 面试杀手锏

程序知音

Java java面试 后端技术 Java面试题 Java面试八股文

直播预告 | 字节跳动云原生大数据分析引擎 ByConity 与 ClickHouse 有何差异?

墨天轮

大数据 字节跳动 Clickhouse 数仓

iMazing软件最新版有哪些新功能?

茶色酒

imazing

CDR2023最新中文版下载安装详细教程

茶色酒

cdr2023

Tuxera NTFS2024免费版NTFS磁盘读写软件

茶色酒

Tuxera NTFS2024

MobTech MobPush|A/B测试提升运营决策

MobTech袤博科技

基于深度学习框架设计的货运管家(功能总结)

DS小龙哥

三周年连更

活动回顾|微服务x容器开源开发者 Meetup 成都站回放 & PPT 下载

阿里巴巴云原生

阿里云 开源 容器 微服务 云原生

爆肝了!阿里最新版的这份Spring Security源码手册,狂揽GitHub榜首

Java spring spring security

一图读懂|ONES X 中国信通院《中国企业软件研发管理白皮书》

万事ONES

【转载】三十而已,信智依然 | 田溯宁:写在亚信科技30华诞

亚信AntDB数据库

AntDB AntDB数据库 企业号 4 月 PK 榜

2023 最新版 Java 面试八股文大全 PDF 版限时分享,含 700 道高频面试题

三十而立

为什么医疗保健需要MFT来帮助保护EHR文件传输

镭速

基于容器平台 ACK 快速搭建 Stable Diffusion

阿里巴巴云原生

阿里云 云原生 容器服务

Sibelius2023免费版音乐制谱软件

茶色酒

Sibelius2023

CorelDRAW Graphics Suite2023最新中文版下载

茶色酒

cdr2023

引领文旅新体验!3DCAT实时云渲染助力打造“永不落幕”的湾区文采会元宇宙

3DCAT实时渲染

元宇宙 元宇宙线上虚拟展厅 VR虚拟现实

JVM调优-Nacos GC引发的服务批量下线问题

程序员小毕

程序员 微服务 后端 nacos jvm调优

ARB链挖矿dapp系统开发模式定制

开发v-hkkf5566

300行Go代码玩转RPC_文化 & 方法_360云计算_InfoQ精选文章