AI 年度盘点与2025发展趋势展望,50+案例解析亮相AICon 了解详情
写点什么

SRS 5.0:如何实现 SRT 协程化

  • 2022-07-04
  • 本文字数:6071 字

    阅读完需:约 20 分钟

SRS 5.0:如何实现 SRT 协程化

协程是现代服务器的核心技术,能极大简化逻辑和提升维护性;SRT 是逐渐在取代 RTMP 推流的新协议,但它有自己的 IO 框架;只有实现了 SRT 的协程化,才能成为 SRS 的核心的成熟的协议,这是 SRS 5.0 迈出的第一步,也是至关重要的一步。


SRS 5.0 从 2022 年初启动以来,经过摸索和探讨,确定了以媒体网关作为核心方向,详细请看SRS 5.0核心问题定义和解法


SRT 作为主播/广播推流领域广泛采用的协议,而 Web 浏览器却不支持播放 SRT 流,这恰恰是媒体网关的核心价值,可以将 SRT 转成 RTMP/HLS/WebRTC 后,实现广播领域的 Web 超低延迟方案,还可以把 SRT 强大的跨国传输能力用起来。


而这些美好愿景的基础,就是这次要介绍的:改造 SRT 支持协程化(Coroutine Native SRT)。这是 SRS 5.0 至关重要的一步,也是具备深远影响的一步,详细代码请参考PR#3010


我们先了解下详细的背景介绍。

Introduction


在直播推流领域,RTMP 是事实上的工业标准,广泛使用,也是直播源站之间兼容性最好的协议。


随着场景的丰富和直播的发展, 几个比较严重的问题逐渐暴露出来:


  1. TCP 推流在长距离传输下,受丢包以及 RTT 抖动影响非常大,效果很差。

  2. RTMP 协议不支持多音轨,以及 H265、AV1 等一系列新的编解码。

  3. Adobe 已经放弃 RTMP 协议,很多年没有更新了,未来也不会更新。


为了解决这些问题,2018 年左右,广播电视领域开始广泛应用 SRT 协议推流,越来越多的推流设备和平台都支持了 SRT 协议。


SRS 在 2019 年底,SRS 4.0 支持了 SRT 推流,目前存在以下的问题:


  1. SRT 在 SRS 上使用多线程+异步实现,某些异常导致的程序 Crash,难以排查。

  2. SRT 在 SRS 实现是异步方式,代码复杂,维护难度高。

  3. HTTP 回调,SRT 播放不生效;SRT 推流依赖转 RTMP 后,RTMP 触发的回调。

  4. SRT 无法直接转 WebRTC,而是先转 RTMP 再转 WebRTC,导致延迟高。


这些问题的核心原因,是由于 SRT 使用了独立的异步 IO 和多线程,无法和 SRS 已有的 ST 协程结合起来。


要彻底解决这个问题,必须将 SRT 协程化,和 SRS 使用同一套 ST 协程框架。SRS 5.0 已经完成,详细代码请参考PR#3010,这是非常重要的一个功能。


在介绍 SRT 协程化之前,先介绍下什么是协程化,我们看下 ST 的 TCP 协程化(Coroutine Native),这是最好的例子。

Coroutine Native TCP

首先,非协程化的代码,也就是 epoll 驱动的异步代码,大概逻辑是这样:


int fd = accept(listen_fd); // Got a TCP connection.
int n = read(fd, buf, sizeof(buf));if (n == -1) { if (errno == EAGAIN) { // Not ready return epoll_ctl(fd, EPOLLIN); // Wait for fd to be ready. } return n; // Error.}
printf("Got %d size of data %p", n, buf);
复制代码


Note: 为了方便表达关键逻辑,我们使用示意代码,具体代码可以参考 epoll 的示例代码。


一般 read 是业务逻辑,读出来的数据也是业务数据,但是这里却需要在 EAGAIN 时调用 epoll 这个底层框架处理 fd。这就是把底层逻辑和业务逻辑混合在一起,导致难以维护。


Note: 尽管 NGINX 包装了一层框架,但是本质上并不能解决这个异步回调问题,当 fd 没有准备好必须返回当前函数,所以导致很多状态需要保存和恢复,在复杂的逻辑中状态机也变得非常复杂。


下面我们看协程化的逻辑会是怎样的,同样以上面代码为例:


st_netfd_t fd = st_accept(listen_fd); // Got a TCP connection
int n = st_read(fd, buf, sizeof(buf));if (n == -1) { return n; // Error.}
printf("Got %d size of data %p", n, buf);
复制代码


简单的看,就是没有 EAGAIN,看起来要么读取到了数据,要么就是出现了错误,不会出现 fd 没有准备好的情况。这就给整个业务逻辑带来了非常好的体验,因为不需要保存状态了,不会反复的尝试读取。


同样用 epoll,为何st_read就没有 EAGAIN 呢?这就是协程化,不是没有,只是在底下处理了这个事件。我们看st_readv这个函数:


ssize_t st_read(_st_netfd_t *fd, void *buf, size_t nbyte) {    while ((n = read(fd->osfd, buf, nbyte)) < 0) {        if (errno == EINTR)            continue;        if (!_IO_NOT_READY_ERROR) // Error, if not EAGAIN.            return -1;
/* Wait until the socket becomes readable */ if (st_netfd_poll(fd, POLLIN) < 0) // EAGAIN return -1; } return n;}
复制代码


很明显在EAGAIN时,会调用st_netfd_poll。在st_netfd_poll函数里,会将当前协程切换出让,调度线程执行下一个协程。并且在未来某个时刻,会因为 IO 事件到达或者超时错误,而将当前协程恢复。


Note: 由于协程切换和恢复,都是在这个函数中实现的,对于上层调用的代码,看起来没有发生什么,所以就不仅没有 EAGAIN 这个错误消息,也不会返回上一层函数,当然也不需要保存状态和恢复状态。


我们可以总结下,如何协程化任何协议的思路:


  1. 直接对 API 进行一次调用,如果成功,那么直接返回。

  2. 如果 API 返回失败,检查错误,非 IO 等待的错误直接返回。

  3. 将当前协程出让,调度器运行其他协程,直到该 FD 上的事件返回或者超时;如果超时,则返回错误;如果事件到达,则重复上面的步骤。


我们可以按照这个思路将 SRT 进行协程化(Coroutine Native)。

Coroutine Native SRT

我们以srt_recvmsg函数的协程化为例,这个函数类似 TCP 的read函数,定义如下:


SRT_API int srt_recvmsg (SRTSOCKET u, char* buf, int len);
复制代码


我们同样,提供一个SrsSrtSocket::recvmsg的函数,类似st_read函数,实现如下:


srs_error_t SrsSrtSocket::recvmsg(void* buf, size_t size, ssize_t* nread) {  while (true) {    int ret = srt_recvmsg(srt_fd_, (char*)buf, size);    if (ret >= 0) { // Receive message ok.      recv_bytes_ += ret;       *nread = ret;      return err;    }        // Got something error, return immediately.    if (srt_getlasterror(NULL) != SRT_EASYNCRCV) {      return srs_error_new(ERROR_SRT_IO, "srt_recvmsg");    }        // Wait for the fd ready or error, switch to other coroutines.    if ((err = wait_readable()) != srs_success) { // EAGAIN.      return srs_error_wrap(err, "wait readable");    }  }    return err;}
复制代码


可以看到和st_read非常类似,在wait_readable中也会实现协程的切换和恢复,只是我们使用st_cond_t条件变量来实现:


srs_error_t SrsSrtSocket::wait_readable() {  srt_poller_->mod_socket(this, SRT_EPOLL_IN);  srs_cond_timedwait(read_cond_);}
复制代码


Note: 这里先修改了 epoll 侦听的事件SRT_EPOLL_IN,等待 fd 可读后,再等待条件变量触发。


而触发这个条件变量的函数,是在SrsSrtPoller::wait,实现如下:


srs_error_t SrsSrtPoller::wait(int timeout_ms, int* pn_fds) {  int ret = srt_epoll_uwait(srt_epoller_fd_, events_.data(), events_.size());  for (int i = 0; i < ret; ++i) {    if (event.events & SRT_EPOLL_IN) {      srt_skt->notify_readable();    }  }}
void SrsSrtSocket::notify_readable() { srs_cond_signal(read_cond_);}
复制代码


这样就完全做到了将 SRT API 协程化,其他的 API 比如 srt_sendmsg, srt_connnect, srt_accept 也是类似的操作。


下面我们对比下,协程化(Coroutine Native)之后,和原始的回调(Callback)的区别。

Coroutine Native PK Callback

将 SRT 协程化以后, 业务逻辑和底层代码分离,上层的代码逻辑清晰明了。


先看看 accept 这个逻辑,之前也是由 epoll 触发的事件处理,创建srt_conn这个数据结构:


while (run_flag) {  int ret = srt_epoll_wait(_pollid, read_fds, &rfd_num, write_fds);  for (int index = 0; index < rfd_num; index++) {    SRT_SOCKSTATUS status = srt_getsockstate(read_fds[index]);    srt_handle_connection(status, read_fds[index], "read fd");  }}
void srt_server::srt_handle_connection(SRT_SOCKSTATUS status, SRTSOCKET input_fd) { if (status == SRTS_LISTENING) { conn_fd = srt_accept(input_fd, (sockaddr*)&scl, &sclen); _handle_ptr->add_newconn(conn_fd, SRT_EPOLL_IN); }}
void srt_handle::add_newconn(SRT_CONN_PTR conn_ptr, int events) { _push_conn_map.insert(std::make_pair(conn_ptr->get_path(), conn_ptr)); _conn_map.insert(std::make_pair(conn_ptr->get_conn(), conn_ptr)); int ret = srt_epoll_add_usock(_handle_pollid, conn_ptr->get_conn(), &events);}
复制代码


Note: 创建的srt_conn本身就是保存在全局数据结构之中,在后续的回调事件中持续修改和变更这个数据结构。


我们对比下协程化之后的业务逻辑,收到会话之后启动处理协程:


srs_error_t SrsSrtListener::cycle() {  while (true) {    srs_srt_t client_srt_fd = srs_srt_socket_invalid();    srt_skt_->accept(&client_srt_fd);        srt_server_->accept_srt_client(srt_fd);  }}
srs_error_t SrsSrtServer::accept_srt_client(srs_srt_t srt_fd) { fd_to_resource(srt_fd, &srt_conn); conn_manager_->add(srt_conn); srt_conn->start();}
复制代码


Note: 虽然有全局变量维护srt_conn,但这里不会关注到 epoll 的处理,而是由协程主导的执行逻辑,而不是由回调主导的逻辑。


回调主导的逻辑,维护和了解代码时,必须要从 epoll 回调事件开始看,而且不同事件都在修改srt_conn这个对象的状态,要了解对象生命周期是很有难度的。而协程主导的逻辑,它的生命周期是在协程中,收到srt_conn就启动协程处理它,后续的读写也在协程中。


我们继续看srt_conn的读处理逻辑,之前直接使用原生 SRT 的 read 函数,同样是由 epoll 的事件触发回调:


while (run_flag) {  int ret = srt_epoll_wait(_pollid, read_fds, &rfd_num, write_fds);  for (int index = 0; index < rfd_num; index++) {    SRT_SOCKSTATUS status = srt_getsockstate(read_fds[index]);    srt_handle_data(status, read_fds[index], "read fd");  }}
void srt_handle::handle_srt_socket(SRT_SOCKSTATUS status, SRTSOCKET conn_fd) { auto conn_ptr = get_srt_conn(conn_fd); int mode = conn_ptr->get_mode(); if (mode == PUSH_SRT_MODE && status == SRTS_CONNECTED) { handle_push_data(status, path, subpath, conn_fd); }}
void srt_handle::handle_push_data(SRT_SOCKSTATUS status, SRTSOCKET conn_fd) { srt_conn_ptr = get_srt_conn(conn_fd); if (status != SRTS_CONNECTED) { // Error. close_push_conn(conn_fd); return; }
ret = srt_conn_ptr->read_async(data, DEF_DATA_SIZE); if (ret <= 0) { // Error. if (srt_getlasterror(NULL) != SRT_EASYNCRCV) { return; } close_push_conn(conn_fd); return; }
srt2rtmp::get_instance()->insert_data_message(data, ret, subpath);}
复制代码


Note: 在回调中我们需要处理各种状态,而这个srt_conn的状态变化,是由各种回调决定的,很难一次了解到这个会话的主要处理逻辑。


我们看看 SRT 协程化之后,这个业务逻辑是怎样写的:


srs_error_t SrsMpegtsSrtConn::do_publishing() {  while (true) {    ssize_t nb = 0;    if ((err = srt_conn_->read(buf, sizeof(buf), &nb)) != srs_success) {      return srs_error_wrap(err, "srt: recvmsg");    }        if ((err = on_srt_packet(buf, nb)) != srs_success) {      return srs_error_wrap(err, "srt: process packet");    }  }}
复制代码


Note: 这里srt_conn的生命周期非常明确,它的状态就是直接在这里返回错误,对于这个会话来说,这就是它的主循环,不会因为read而导致进入 SRT 的 epoll 大循环,我们在维护时也不用关注这个异步事件触发和处理。


再次强调一次,维护代码时,我们需要了解的信息量是非常不同的。在基于异步回调的逻辑中,我们在回调函数中,是需要关注目前对象有哪些状态,修改了哪些状态,其他异步事件又有哪些影响。而基于协程的逻辑中,没有这些状态,协程的创建和执行,就是线性的,或者说这些状态就是在协程的函数调用中。


Note: 为何异步回调的状态就不能在函数调用中呢?因为异步回调的堆栈中不能保存srt_conn的状态,它本质上就是一个协程,保存的是 epoll 的循环的状态。而协程是根据每个srt_conn所创建的,它的堆栈中保存的都是这个对应的srt_conn的状态。


这本质上,是由于异步回调的状态,只能保存在全局数据结构之中。而协程的状态,是可以保存在各个局部变量之中,每个函数的局部变量,都是这个协程所独有的,协程没有结束前都可以使用。

What is the Next

SRS 虽然完成了 SRT 协程化,并没有解决所有的问题。后续的计划包括:


  1. SRT 直接转 WebRTC,低延迟直播的另外一种方式。

  2. 某些服务器之间的长链路可以将 TCP 替换为 SRT 传输, 比如跨国的 RTMP 转发。

  3. SRT 工具链的完善,比如srs-bench,支持压测 SRT 流。


欢迎加入 SRS 开源社区,一起做好一个流媒体服务器,让全世界都来白嫖。

One More Thing

有些朋友也很好奇,真正商用的视频云的 SRT,和开源的 SRT 的服务器,有什么区别,都做过哪些优化。


Note: 由于开源服务器侧重标准协议和兼容性,有些优化并不适合在开源项目中实现,所以在云计算的商业化服务器和开源服务器,一定是存在很大差异的。就算是 Linux 系统,其实云计算的 Linux 内核,和开源的 Linux 内核,也有很大的差异。


腾讯云在 SRT 的实战中积累很多经验,请参考之前分享的文章:



其中,最为严重的是 SRT 重传率过高、限带宽下表现不如 TCP/QUIC 等,腾讯云针对这些问题,做了几个优化:


  1. SRT 重传乱序度自适应:当接收到乱序报文,首次发起重传时,会根据当前的乱序度,等待 N 个包之后才发起重传。原生 SRT 这个乱序度是固定值,我们修改成为根据网络乱序情况自适应。

  2. SRT 传输参数优化:通过对参数优化,减少了一半的重传率。

  3. 加入了 BBR 拥塞控制算法:原生 SRT 拥塞控制非常弱,评估的带宽波动也非常大。我们加入了 BBR 拥塞控制算法,针对性的解决了这个问题。

  4. 强化了 SRT 多链路传输,增加了带宽聚合的模式:原生 SRT 只有 backup,broadcast 两种多链路传输模式,我们针对直播场景增加了 auto 模式,能够做到讲多个网卡的带宽聚合后进行直播,并智能动态选择链路。


Note: 腾讯云音视频在音视频领域已有超过 21 年的技术积累,持续支持国内 90%的音视频客户实现云上创新,独家具备 RT-ONE™ 全球网络,在此基础上,构建了业界最完整的 PaaS 产品家族,并通过腾讯云视立方 RT-Cube™ 提供 All in One 的终端 SDK,助力客户一键获取众多腾讯云音视频能力。腾讯云音视频为全真互联时代,提供坚实的数字化助力。


作者介绍:


肖志宏 (hondaxiao) 腾讯云工程师,全球 TOP1 开源音视频服务器 SRS TOC。


杨成立 (Winlin) 腾讯云工程师,全球 TOP1 开源音视频服务器 SRS 作者,音视频服务器和视频云领域超过 13 年经验。

2022-07-04 16:246715

评论

发布
暂无评论
发现更多内容

Zookeeper原理篇-Zookeeper启动流程分析,从底层开始带你了解并发编程

Java 程序员 后端

《黑马程序员》通讯录管理系统实战,终于搞明白了

Java 程序员 后端

【Java8 新特性 5】Java8 stream的详细用法,java开发面试视频

Java 程序员 后端

vivo官网商城开发团队:同城双活与异地多活架构分析,java面试问项目流程

Java 程序员 后端

“996”为什么还没实行,mybatis从入门到精通电子书

Java 程序员 后端

《重构 改善既有代码的设计 3》代码的可理解性应该是我们虔诚追求的目标

Java 程序员 后端

《零基础》MySQL 连接的使用(二十),开发多年HashMap原理不知道

Java 程序员 后端

【 大厂必考之JVM】01,kafka原理和面试笔试题目

Java 程序员 后端

【Java 多线程 3】线程池2,linux内核编程进阶篇pdf

Java 程序员 后端

volatile 和原子类的异同,画个图理解一下,面试官让我下周来上班

Java 程序员 后端

「源码解析」 消息队列Kombu基本架构综述,透过根源从而探究红黑树的本质

Java 程序员 后端

「Java」几种典型的内存溢出案例,linux视频教程迅雷下载

Java 程序员 后端

XML简介,kafka教程尚谷

Java 程序员 后端

《项目开发团队分配管理软件》,nginx面试题阿里

Java 程序员 后端

「并发原理专题」AQS的技术体系之CLH,java基础重点知识点

Java 程序员 后端

【Java 强化】代码规范,springcloud视频

Java 程序员 后端

【Java技术探索】,区块链技术kafka

Java 程序员 后端

【Java知识点详解 10】为何要配置环境变量(1),java从入门到精通第五版电子书百度云

Java 程序员 后端

uniapp props、$ref、$emit,如何保证高可用

Java 程序员 后端

Web开发基础:JavaScript常用类、面向对象和BOM,java中锁的实现原理

Java 程序员 后端

YYDS,瞬间秒杀全网,这套Java面试笔记可以解决90,kafka基础架构消费模式

Java 程序员 后端

Zookeeper(从7个方面来了解Zookeeper基础概念),java新技术网站

Java 程序员 后端

「Java」手把手理解CAS实现原理,学习linux编程

Java 程序员 后端

Tomcat目录结构,java基础教程第三版

Java 程序员 后端

“996”为什么还没实行(1),java零基础教程视频

Java 程序员 后端

【Java从0到架构师】学习记录,BAT大厂面试基础题集合

Java 程序员 后端

【java后台面经】春招&秋招求职大佬面试经验分享,java面试线程问题

Java 程序员 后端

VIVO一面竟然翻车,含泪整理了这些Java面经,看完我悟了

Java 程序员 后端

Zookeeper系列-我保证!样样聚到!没有一句废话,今日头条面试经历

Java 程序员 后端

【95 后 Java 程序员的大厂梦】三年开发经验,springboot开源项目讲解

Java 程序员 后端

VBA常用语法,操作系统原理与linux实践教程申丰山

Java 程序员 后端

SRS 5.0:如何实现 SRT 协程化_架构_肖志宏_InfoQ精选文章