AICon日程100%就绪,9折倒计时最后一周 了解详情
写点什么

300 行 Go 代码玩转 RPC

  • 2019-11-14
  • 本文字数:4311 字

    阅读完需:约 14 分钟

300行Go代码玩转RPC

最近,小编一直在研究 RPC 的原理及实现方式。在本篇文章中将通过用 300 行纯 Golang 编写简单的 RPC 框架来解释 RPC。希望能帮助大家梳理 RPC 相关知识点。


我们通过从头开始在 Golang 中构建一个简单的 RPC 框架来学习 RPC 基础构成。

1 什么是 RPC

简单地说,服务 A 想调用服务 B 的函数。但是这两个服务不在同一个内存空间中。所以不能直接调用它。


因此,为了实现这个调用,我们需要表达如何调用以及如何通过网络传递通信的语义。


让我们考虑一下,当我们可以在相同的内存空间(本地调用)中运行时,我们要怎么做。


type User struct {  Name string  Age int}
var userDB = map[int]User{ 1: User{"Ankur", 85}, 9: User{"Anand", 25}, 8: User{"Ankur Anand", 27},}

func QueryUser(id int) (User, error) { if u, ok := userDB[id]; ok { return u, nil }
return User{}, fmt.Errorf("id %d not in user db", id)}

func main() { u , err := QueryUser(8) if err != nil { fmt.Println(err) return }
fmt.Printf("name: %s, age: %d \n", u.Name, u.Age)}
复制代码


现在我们如何在网络上进行相同的函数调用


客户端将通过网络调用 QueryUser(id int) 函数,并且将有一个服务端提供对该函数的调用,并返回响应 User{“Name”, id}, nil。

2 网络传输数据格式

我们将采用 TLV(定长报头+变长消息体)编码方案来规范 tcp 上的数据传输。稍后会详细介绍


在通过网络发送数据之前,我们需要定义如何通过网络发送数据的结构。


这有助于我们定义一个通用协议,客户端和服务端都可以理解这个协议。(protobuf IDL 定义了服务端和客户端都能理解的内容)。


因此,服务端接收到的数据、要调用的函数名和参数列表,或者来自客户端的数据都需要传递这些参数。


另外,让我们约定第二个返回值的类型为 error,表示 RPC 调用结果。


// RPC数据传输格式type RPCdata struct {  Name string        // name of the function  Args []interface{} // request's or response's body expect error.  Err  string        // Error any executing remote server}
复制代码


现在我们有了一个格式,我们需要序列化它以便我们可以通过网络发送它。在本例中,我们将使用 go 默认的二进制序列化协议进行编码和解码。


// be sent over the network.func Encode(data RPCdata) ([]byte, error) {  var buf bytes.Buffer  encoder := gob.NewEncoder(&buf)  if err := encoder.Encode(data); err != nil {    return nil, err  }  return buf.Bytes(), nil}
// Decode the binary data into the Go structfunc Decode(b []byte) (RPCdata, error) { buf := bytes.NewBuffer(b) decoder := gob.NewDecoder(buf) var data RPCdata if err := decoder.Decode(&data); err != nil { return Data{}, err } return data, nil}
复制代码

3 网络传输

选择 TLV 协议的原因是由于其非常容易实现,同时也完成了我们需要识别的数据读取的长度,因为我们需要确定这个请求读取的字节数的传入请求流。发送和接收都执行相同的操作。


// Transport will use TLV protocoltype Transport struct {  conn net.Conn // Conn is a generic stream-oriented network connection.}
// NewTransport creates a Transportfunc NewTransport(conn net.Conn) *Transport { return &Transport{conn}}
// Send TLV data over the networkfunc (t *Transport) Send(data []byte) error { // we will need 4 more byte then the len of data // as TLV header is 4bytes and in this header // we will encode how much byte of data // we are sending for this request. buf := make([]byte, 4+len(data)) binary.BigEndian.PutUint32(buf[:4], uint32(len(data))) copy(buf[4:], data) _, err := t.conn.Write(buf) if err != nil { return err } return nil}
// Read TLV sent over the wirefunc (t *Transport) Read() ([]byte, error) { header := make([]byte, 4) _, err := io.ReadFull(t.conn, header) if err != nil { return nil, err } dataLen := binary.BigEndian.Uint32(header) data := make([]byte, dataLen) _, err = io.ReadFull(t.conn, data) if err != nil { return nil, err } return data, nil}
复制代码


现在我们已经定义了数据格式和传输协议。下面我们还需要 RPC 服务器和 RPC 客户端的实现。

4 RPC 服务器

RPC 服务器将接收具有函数名的 RPCData。因此,我们需要维护和映射包含函数名到实际函数映射的函数


// RPCServer ...type RPCServer struct {  addr string  funcs map[string] reflect.Value}
// Register the name of the function and its entriesfunc (s *RPCServer) Register(fnName string, fFunc interface{}) { if _,ok := s.funcs[fnName]; ok { return }
s.funcs[fnName] = reflect.ValueOf(fFunc)}
复制代码


现在我们已经注册了 func,当我们收到请求时,我们将检查函数执行期间传递的 func 的名称是否存在。然后执行相应的操作


// Execute the given function if presentfunc (s *RPCServer) Execute(req RPCdata) RPCdata {  // get method by name  f, ok := s.funcs[req.Name]  if !ok {    // since method is not present    e := fmt.Sprintf("func %s not Registered", req.Name)    log.Println(e)    return RPCdata{Name: req.Name, Args: nil, Err: e}  }
log.Printf("func %s is called\n", req.Name) // unpackage request arguments inArgs := make([]reflect.Value, len(req.Args)) for i := range req.Args { inArgs[i] = reflect.ValueOf(req.Args[i]) }
// invoke requested method out := f.Call(inArgs) // now since we have followed the function signature style where last argument will be an error // so we will pack the response arguments expect error. resArgs := make([]interface{}, len(out) - 1) for i := 0; i < len(out) - 1; i ++ { // Interface returns the constant value stored in v as an interface{}. resArgs[i] = out[i].Interface() }
// pack error argument var er string if e, ok := out[len(out) - 1].Interface().(error); ok { // convert the error into error string value er = e.Error() } return RPCdata{Name: req.Name, Args: resArgs, Err: er}}
复制代码

5 RPC 客户端

由于函数的具体实现在服务器端,客户端只有函数的原型,所以我们需要调用函数的完整原型,这样我们才能调用它。


func (c *Client) callRPC(rpcName string, fPtr interface{}) {  container := reflect.ValueOf(fPtr).Elem()  f := func(req []reflect.Value) []reflect.Value {    cReqTransport := NewTransport(c.conn)    errorHandler := func(err error) []reflect.Value {      outArgs := make([]reflect.Value, container.Type().NumOut())      for i := 0; i < len(outArgs)-1; i++ {        outArgs[i] = reflect.Zero(container.Type().Out(i))      }      outArgs[len(outArgs)-1] = reflect.ValueOf(&err).Elem()      return outArgs    }
// Process input parameters inArgs := make([]interface{}, 0, len(req)) for _, arg := range req { inArgs = append(inArgs, arg.Interface()) }
// ReqRPC reqRPC := RPCdata{Name: rpcName, Args: inArgs} b, err := Encode(reqRPC) if err != nil { panic(err) } err = cReqTransport.Send(b) if err != nil { return errorHandler(err) } // receive response from server rsp, err := cReqTransport.Read() if err != nil { // local network error or decode error return errorHandler(err) } rspDecode, _ := Decode(rsp) if rspDecode.Err != "" { // remote server error return errorHandler(errors.New(rspDecode.Err)) }
if len(rspDecode.Args) == 0 { rspDecode.Args = make([]interface{}, container.Type().NumOut()) } // unpackage response arguments numOut := container.Type().NumOut() outArgs := make([]reflect.Value, numOut) for i := 0; i < numOut; i++ { if i != numOut-1 { // unpackage arguments (except error) if rspDecode.Args[i] == nil { // if argument is nil (gob will ignore "Zero" in transmission), set "Zero" value outArgs[i] = reflect.Zero(container.Type().Out(i)) } else { outArgs[i] = reflect.ValueOf(rspDecode.Args[i]) } } else { // unpackage error argument outArgs[i] = reflect.Zero(container.Type().Out(i)) } }
return outArgs } container.Set(reflect.MakeFunc(container.Type(), f))}
复制代码

6 测试一下我们的框架

package main
import ( "encoding/gob" "fmt" "net")
type User struct { Name string Age int}
var userDB = map[int]User{ 1: User{"Ankur", 85}, 9: User{"Anand", 25}, 8: User{"Ankur Anand", 27},}
func QueryUser(id int) (User, error) { if u, ok := userDB[id]; ok { return u, nil }
return User{}, fmt.Errorf("id %d not in user db", id)}
func main() { // new Type needs to be registered gob.Register(User{}) addr := "localhost:3212" srv := NewServer(addr)
// start server srv.Register("QueryUser", QueryUser) go srv.Run()
// wait for server to start. time.Sleep(1 * time.Second)
// start client conn, err := net.Dial("tcp", addr) if err != nil { panic(err) } cli := NewClient(conn)
var Query func(int) (User, error) cli.callRPC("QueryUser", &Query)
u, err := Query(1) if err != nil { panic(err) } fmt.Println(u)
u2, err := Query(8) if err != nil { panic(err) } fmt.Println(u2)}
复制代码


执行:go run main.go


输出内容


2019/07/23 20:26:18 func QueryUser is called{Ankur 85}2019/07/23 20:26:18 func QueryUser is called{Ankur Anand 27}
复制代码

总结

致此我们简单的 RPC 框架就实现完成了,旨在帮大家理解 RPC 的原理及上手简单实践。如果大家对这篇文章中所讲内容有异议,或者想进一步讨论,请留言回复。


本文转载自公众号 360 云计算(ID:hulktalk)。


原文链接:


https://mp.weixin.qq.com/s/fxrocOMLX7kqUH9lP94JpA


2019-11-14 17:341270

评论 1 条评论

发布
用户头像
请问有源码吗,少了Run方法
2020-07-28 11:14
回复
没有更多了
发现更多内容

力扣每日一练之二维数组上篇Day4

京与旧铺

6月月更

字符串

Jason199

js 字符串 6月月更

gogs使用webhook部署react单页应用

Nick

ci 持续集成 React 6月月更 gogs

CentOS环境基于nginx搭建负载均衡

乌龟哥哥

6月月更

InfoQ 极客传媒 15 周年庆征文|漫谈公网网络延迟

耳东@Erdong

运维 6月月更 InfoQ极客传媒15周年庆 网络延迟

读《Software Systems Architecture》(23)—— Archiving Consistency Across Views

术子米德

架构师成长笔记

读《Software Systems Architecture》(25)—— The Security Perspective

术子米德

架构师成长笔记

中台的细节

卢卡多多

中台 6月月更

Paper Reading 预告 | Volcano-An Extensible and Parallel Query Evaluation System(众神推荐的 Paper 下载合集)

TiDB 社区干货传送门

TiDB Paper Reading

实战 | Kibana面板使用

写程序的小王叔叔

Kibana ELK Stack 6月月更

读《Software Systems Architecture》(26)—— The Performance and Scalability Perspective

术子米德

架构师成长笔记

【协程】LifecycleScope源码解析

yechaoa

android 协程 6月月更 LifecycleScope

读《Software Systems Architecture》(24)—— Introduction to the Perspective Catalog

术子米德

架构师成长笔记

python停车时间计算,时分秒计算(split()函数)

写代码两年半

Python 6月月更

几个非常有用的 Flutter 技巧,你可以立即使用!

坚果

6月月更

面试突击57:聚簇索引=主键索引吗?

王磊

Java MySQL 面试

[数据分析实践]-文本分析-U.S. Patent Phrase-1

浩波的笔记

数据分析

JVM调优简要思想及简单案例-JVM是什么?

zarmnosaj

6月月更

InfoQ 极客传媒 15 周年庆征文| 手把手带你入门 API 开发

宇宙之一粟

flask-restful 6月月更 InfoQ极客传媒15周年庆 API开发

读《Software Systems Architecture》(27)—— The Availability and Resilience Perspective

术子米德

架构师成长笔记

测试开发【Mock平台】04实战:前后端项目初始化与登录鉴权实现

MegaQi

测试平台开发教程 测试干货 6月月更

莫把功能当能力!从企业架构视角看警察在火锅店站岗

涛哥 数字产品和业务架构

企业架构

Java Core 「8」字节码增强技术

Samson

学习笔记 Java core 6月月更

远程办公-如何提高开会效率?| 社区征文

石云升

远程办公 开会 会议 6月月更 初夏征文

【愚公系列】2022年06月 通用职责分配原则(三)-低耦合原则

愚公搬代码

6月月更

spring4.1.8扩展实战之五:改变bean的定义(BeanFactoryPostProcessor接口)

程序员欣宸

Java spring Spring Framework 6月月更

测试流程如何落地?

老张

软件测试 质量保障

读《Software Systems Architecture》(28)—— The Evolution Perspective

术子米德

架构师成长笔记

flutter系列之:Material中的3D组件Card

程序那些事

flutter 程序那些事 6月月更

c语言选择,循环语句概述

工程师日月

6月月更

C#入门系列(十六) -- 类及其成员介绍

陈言必行

C# 6月月更

300行Go代码玩转RPC_文化 & 方法_360云计算_InfoQ精选文章