50万奖金+官方证书,深圳国际金融科技大赛正式启动,点击报名 了解详情
写点什么

浅谈长连接的平滑重启

  • 2020-02-26
  • 本文字数:4626 字

    阅读完需:约 15 分钟

浅谈长连接的平滑重启

最近小编一直在做长连接相关的事情,最大的感触就是发版太痛苦,一个个踢掉连接然后发版,导致发版时长过长,操作繁琐。所以在想能不能实现优雅重启, 发版时客户端无感知。

难点

  • 如何做到不中断接收连接

  • 如何做到已有连接不中断

解决

如何做到不中断接受连接

以下是 linux 源码中 bind 的实现(linux-1.0)


// linux-1.0/net/socket.c 536static intsock_bind(int fd, struct sockaddr *umyaddr, int addrlen){  struct socket *sock;  int i;
DPRINTF((net_debug, "NET: sock_bind: fd = %d\n", fd)); if (fd < 0 || fd >= NR_OPEN || current->filp[fd] == NULL) return(-EBADF); //获取fd对应的socket结构 if (!(sock = sockfd_lookup(fd, NULL))) return(-ENOTSOCK); // 转调用bind指向的函数,下层函数(inet_bind) if ((i = sock->ops->bind(sock, umyaddr, addrlen)) < 0) { DPRINTF((net_debug, "NET: sock_bind: bind failed\n")); return(i); } return(0);}
// linux-1.0/net/inet/sock.c 1012static intinet_bind(struct socket *sock, struct sockaddr *uaddr, int addr_len){ ...outside_loop: for(sk2 = sk->prot->sock_array[snum & (SOCK_ARRAY_SIZE -1)]; sk2 != NULL; sk2 = sk2->next) {#if 1 /* should be below! */ if (sk2->num != snum) continue;/* if (sk2->saddr != sk->saddr) continue; */#endif if (sk2->dead) { destroy_sock(sk2); goto outside_loop; } if (!sk->reuse) { sti(); return(-EADDRINUSE); } if (sk2->num != snum) continue; /* more than one */ if (sk2->saddr != sk->saddr) continue; /* socket per slot ! -FB */ if (!sk2->reuse) { sti(); return(-EADDRINUSE); } } ... }
复制代码


  • sock_array 是一个链式哈希表,保存着各端口号的 sock 结构

  • 通过源码可以看到,bind 的时候会检测要绑定的地址和端口是否合法以及已被绑定, 如果发版时另一个进程和旧进程没有关系,则 bind 会返回错误 Address already in use

  • 若旧进程 fork 出新进程,新进程和旧进程为父子关系,新进程继承旧进程的文件表,本身"本进程"就已经监听这个端口了,则不会出现上面的问题


如何做到已有连接不中断


  • 新进程继承旧进程的用于连接的 fd,并且继续维持与客户端的心跳

  • linux 提供了 unix 域套接字可用于 socket 的传输, 新进程起来后通过 unix socket 通信继承旧进程所维护的连接

  • unix socket 用于*一台*主机的进程间通信,不需要基于网络协议,主要是基于文件系统的。


#include <sys/types.h>#include <sys/socket.h>
ssize_t sendmsg(int sockfd, const struct msghdr *msg, int flags);ssize_t recvmsg(int sockfd, struct msghdr *msg, int flags);
复制代码


发送端调用 sendmsg 发送文件描述符,接收端调用 revmsg 接收文件描述符。


两进程共享同一打开文件表,这与 fork 之后的父子进程共享打开文件表的情况完全相同。


由此解决了文章开头提出的两个问题


Demo 实现


  • 进程每次启动时必须 check 有无继承 socket(尝试连接本地的 unix server,如果连接失败,说明是第一次启动,否则可能有继承的 socket),如果有,就将 socket 加入到自己的连接池中, 并初始化连接状态

  • 旧进程监听 USR2 信号(通知进程需要重启,使用信号、http 接口等都可),监听后动作:

  • 1.监听 Unix socket, 等待新进程初始化完成,发来开始继承连接的请求

  • 2.使用旧进程启动的命令 fork 一个子进程(发布到线上的新二进制)。

  • 3.accept 到新进程的请求,关闭旧进程 listener(保证旧进程不会再接收新请求,同时所有 connector 不在进行 I/O 操作。

  • 4.旧进程将现有连接的 socket,以及连接状态(读写 buffer,connect session)通过 unix socket 发送到新进程。

  • 5.最后旧进程给新进程发送发送完毕信号,随后退出

  • 以下是简单实现的 demo, demo 中实现较为简单,只实现了文件描述符的传递,没有实现各连接状态的传递。


// server.go
package main
import ( "flag" "fmt" "golang.org/x/sys/unix" "log" "net" "os" "os/signal" "path/filepath" "sync" "syscall" "time")
var ( workSpace string
logger *log.Logger
writeTimeout = time.Second * 5 readTimeout = time.Second * 5
signalChan = make(chan os.Signal)
connFiles sync.Map
serverListener net.Listener
isUpdate = false)
func init() { flag.StringVar(&workSpace, "w", ".", "Usage:\n ./server -w=workspace") flag.Parse()
file, err := os.OpenFile(filepath.Join(workSpace, "server.log"), os.O_WRONLY|os.O_APPEND|os.O_CREATE, 0777) if err != nil { panic(err) } logger = log.New(file, "", 11) go beforeStart() go signalHandler()}
func main() { var err error serverListener, err = net.Listen("tcp", ":7000") if err != nil { panic(err) } for { if isUpdate == true { continue } conn, err := serverListener.Accept() if err != nil { logger.Println("conn error") continue } c := conn.(*net.TCPConn) go connectionHandler(c) }}
func connectionHandler(conn *net.TCPConn) { file, _ := conn.File() connFiles.Store(file, true) logger.Printf("conn fd %d\n", file.Fd()) defer func() { connFiles.Delete(file) _ = conn.Close() }() for { if isUpdate == true { continue } err := conn.SetReadDeadline(time.Now().Add(readTimeout)) if err != nil { logger.Println(err.Error()) return } rBuf := make([]byte, 4) _, err = conn.Read(rBuf) if err != nil { logger.Println(err.Error()) return } if string(rBuf) != "ping" { logger.Println("failed to parse the message " + string(rBuf)) return } err = conn.SetWriteDeadline(time.Now().Add(writeTimeout)) if err != nil { logger.Println(err.Error()) return } _, err = conn.Write([]byte(`pong`)) if err != nil { logger.Println(err.Error()) return } }}
func beforeStart() { connInterface, err := net.Dial("unix", filepath.Join(workSpace, "conn.sock")) if err != nil { logger.Println(err.Error()) return } defer func() { _ = connInterface.Close() }()
unixConn := connInterface.(*net.UnixConn)
b := make([]byte, 1) oob := make([]byte, 32) for { err = unixConn.SetWriteDeadline(time.Now().Add(time.Minute * 3)) if err != nil { fmt.Println(err.Error()) return } n, oobn, _, _, err := unixConn.ReadMsgUnix(b, oob) if err != nil { logger.Println(err.Error()) return } if n != 1 || b[0] != 0 { if n != 1 { logger.Printf("recv fd type error: %d\n", n) } else { logger.Println("init finish") } return } scms, err := unix.ParseSocketControlMessage(oob[0:oobn]) if err != nil { logger.Println(err.Error()) return } if len(scms) != 1 { logger.Printf("recv fd num != 1 : %d\n", len(scms)) return } fds, err := unix.ParseUnixRights(&scms[0]) if err != nil { logger.Println(err.Error()) return } if len(fds) != 1 { logger.Printf("recv fd num != 1 : %d\n", len(fds)) return } logger.Printf("recv fd %d\n", fds[0]) file := os.NewFile(uintptr(fds[0]), "fd-from-old") conn, err := net.FileConn(file) if err != nil { logger.Println(err.Error()) return } go connectionHandler(conn.(*net.TCPConn)) }}
func signalHandler() { signal.Notify( signalChan, syscall.SIGUSR2, ) for { sc := <-signalChan switch sc { case syscall.SIGUSR2: gracefulExit() default: continue } }}
func gracefulExit() { var connWait sync.WaitGroup _ = syscall.Unlink(filepath.Join(workSpace, "conn.sock")) listenerInterface, err := net.Listen("unix", filepath.Join(workSpace, "conn.sock")) if err != nil { logger.Println(err.Error()) return } defer func() { _ = listenerInterface.Close() }() unixListener := listenerInterface.(*net.UnixListener) connWait.Add(1) go func() { defer connWait.Done() unixConn, err := unixListener.AcceptUnix() if err != nil { logger.Println(err.Error()) return } defer func() { _ = unixConn.Close() }() connFiles.Range(func(key, value interface{}) bool { if key == nil || value == nil { return false } file := key.(*os.File) defer func() { _ = file.Close() }() buf := make([]byte, 1) buf[0] = 0 rights := syscall.UnixRights(int(file.Fd())) _, _, err := unixConn.WriteMsgUnix(buf, rights, nil) if err != nil { logger.Println(err.Error()) } logger.Printf("send fd %d\n", file.Fd()) return true }) finish := make([]byte, 1) finish[0] = 1 _, _, err = unixConn.WriteMsgUnix(finish, nil, nil) if err != nil { logger.Println(err.Error()) } }()
isUpdate = true execSpec := &syscall.ProcAttr{ Env: os.Environ(), Files: append([]uintptr{os.Stdin.Fd(), os.Stdout.Fd(), os.Stderr.Fd()}), }
pid, err := syscall.ForkExec(os.Args[0], os.Args, execSpec) if err != nil { logger.Println(err.Error()) return } logger.Printf("old process %d new process %d\n", os.Getpid(), pid) _ = serverListener.Close()
connWait.Wait() os.Exit(0)}// client.gopackage main
import ( "fmt" "net" "time")
var ( writeTimeout = time.Second * 5 readTimeout = time.Second * 5)
func main() { conn, err := net.Dial("tcp", "127.0.0.1:7000") if err != nil { panic(err) } defer func() { conn.Close() }() for { time.Sleep(time.Second) err := conn.SetWriteDeadline(time.Now().Add(writeTimeout)) if err != nil { fmt.Println(err.Error()) break } fmt.Println("send ping") _, err = conn.Write([]byte(`ping`)) if err != nil { fmt.Println(err.Error()) break } err = conn.SetReadDeadline(time.Now().Add(readTimeout)) if err != nil { fmt.Println(err.Error()) break } rBuf := make([]byte, 4) _, err = conn.Read(rBuf) if err != nil { fmt.Println(err.Error()) } fmt.Println("recv " + string(rBuf)) }}
复制代码


本文转载自 360 云计算公众号。


原文链接:https://mp.weixin.qq.com/s/be5NYjeqZ-lznXrEWD_ajA


2020-02-26 22:002020

评论

发布
暂无评论
发现更多内容

runtime笔记

Conan

ios

Kalm——基于Kubernetes的部署工具

David

开源 Kubernetes DevOps 运维 运维平台

javascript中的内置对象和数据结构

程序那些事

JavaScript 数据结构 ES6 程序那些事

感性赢了理性那一面——浅谈峰终定律

Justin

心理学 28天写作

字节跳动面试官这样问消息队列:高可用、不重复消费、可靠传输、顺序消费、消息堆积,我整理了下

冰河

面试 分布式 中间件 消息队列 一起进大厂

简单的网站搭建

很甜回忆

网站

【计算机内功修炼】十:线程间到底共享了哪些进程资源

码农的荒岛求生

c c++ 线程 操作系统 进程

【2021海量真实校招】软件测试面试真题,(大数据整理)刷完应对各家企业面试完全没有问题!

程序员阿沐

面试 软件测试 自动化测试 黑盒测试 白盒测试

为您收录的操作系统系列 - 进程管理(下篇)

鲁米

方法论 操作系统 进程

腾讯位置服务开发应用

我是哪吒

28天写作 2月春节不断更 腾讯地图 腾讯位置服务开发应用 腾讯位置

区块链电子合同存证,电子合同区块链服务平台

13530558032

28天瞎写的第二百四十三天:正念冥想可以解决什么问题?

树上

冥想 28天写作 正念

一口气发布十大建网利器,华为打算煲出怎样的5G味道?

脑极体

Elasticsearch 查询结果排序

escray

elastic 七日更 28天写作 死磕Elasticsearch 60天通过Elastic认证考试 2月春节不断更

科大讯飞发布全新一代智能办公本X2

Xue Liang

火山翻译:工业级应用与研究

DataFunTalk

阿里粗排技术体系与最新进展

DataFunTalk

移除数组中的数字,不用额外空间, 实战RxSwift中的Observable, subscribe, dispose, 吴军老师态度读后感 John 易筋 ARTS 打卡 Week 39

John(易筋)

ARTS 打卡计划 吴军的态度 态度读后感

工作多年,如何找到自己更好的职业方向

一笑

28天写作

基于grpc手撸一个RPC框架

cloudcoder

解读云原生技术

xcbeyond

Kubernetes 云原生 服务网格 28天写作

伊卡洛斯象征了什么?「Day 5」

道伟

文化 28天写作

(28DW-S8-Day5) 区块链如何防伪

mtfelix

比特币 区块链 非对称加密 28天写作 防伪技术

别再这么写代码了,这几个方法不香吗?

楼下小黑哥

Java 重构

Linux入门篇 —— Shell详解

若尘

Linux 命令行 linux操作

翻译:《实用的Python编程》02_05_Collections

codists

Python

如何有效改变别人的认知和行为?

数列科技杨德华

28天写作

专治小学生作业拖沓

Ian哥

28天写作

风口上的量子计算机:核聚变一样的赌局,钻石一样的骗局

脑极体

优雅编程 | javascript代码优化的4个小技巧

devpoint

递归 命名空间 闭包 函数绑定

诊所数字化:患者数字档案的价值机遇和风险

boshi

数字化医疗 七日更 28天写作

浅谈长连接的平滑重启_行业深度_360云计算_InfoQ精选文章