阿里、蚂蚁、晟腾、中科加禾精彩分享 AI 基础设施洞见,现购票可享受 9 折优惠 |AICon 了解详情
写点什么

SRS 5.0:如何实现 SRT 协程化

  • 2022-07-04
  • 本文字数:6071 字

    阅读完需:约 20 分钟

SRS 5.0:如何实现 SRT 协程化

协程是现代服务器的核心技术,能极大简化逻辑和提升维护性;SRT 是逐渐在取代 RTMP 推流的新协议,但它有自己的 IO 框架;只有实现了 SRT 的协程化,才能成为 SRS 的核心的成熟的协议,这是 SRS 5.0 迈出的第一步,也是至关重要的一步。


SRS 5.0 从 2022 年初启动以来,经过摸索和探讨,确定了以媒体网关作为核心方向,详细请看SRS 5.0核心问题定义和解法


SRT 作为主播/广播推流领域广泛采用的协议,而 Web 浏览器却不支持播放 SRT 流,这恰恰是媒体网关的核心价值,可以将 SRT 转成 RTMP/HLS/WebRTC 后,实现广播领域的 Web 超低延迟方案,还可以把 SRT 强大的跨国传输能力用起来。


而这些美好愿景的基础,就是这次要介绍的:改造 SRT 支持协程化(Coroutine Native SRT)。这是 SRS 5.0 至关重要的一步,也是具备深远影响的一步,详细代码请参考PR#3010


我们先了解下详细的背景介绍。

Introduction


在直播推流领域,RTMP 是事实上的工业标准,广泛使用,也是直播源站之间兼容性最好的协议。


随着场景的丰富和直播的发展, 几个比较严重的问题逐渐暴露出来:


  1. TCP 推流在长距离传输下,受丢包以及 RTT 抖动影响非常大,效果很差。

  2. RTMP 协议不支持多音轨,以及 H265、AV1 等一系列新的编解码。

  3. Adobe 已经放弃 RTMP 协议,很多年没有更新了,未来也不会更新。


为了解决这些问题,2018 年左右,广播电视领域开始广泛应用 SRT 协议推流,越来越多的推流设备和平台都支持了 SRT 协议。


SRS 在 2019 年底,SRS 4.0 支持了 SRT 推流,目前存在以下的问题:


  1. SRT 在 SRS 上使用多线程+异步实现,某些异常导致的程序 Crash,难以排查。

  2. SRT 在 SRS 实现是异步方式,代码复杂,维护难度高。

  3. HTTP 回调,SRT 播放不生效;SRT 推流依赖转 RTMP 后,RTMP 触发的回调。

  4. SRT 无法直接转 WebRTC,而是先转 RTMP 再转 WebRTC,导致延迟高。


这些问题的核心原因,是由于 SRT 使用了独立的异步 IO 和多线程,无法和 SRS 已有的 ST 协程结合起来。


要彻底解决这个问题,必须将 SRT 协程化,和 SRS 使用同一套 ST 协程框架。SRS 5.0 已经完成,详细代码请参考PR#3010,这是非常重要的一个功能。


在介绍 SRT 协程化之前,先介绍下什么是协程化,我们看下 ST 的 TCP 协程化(Coroutine Native),这是最好的例子。

Coroutine Native TCP

首先,非协程化的代码,也就是 epoll 驱动的异步代码,大概逻辑是这样:


int fd = accept(listen_fd); // Got a TCP connection.
int n = read(fd, buf, sizeof(buf));if (n == -1) { if (errno == EAGAIN) { // Not ready return epoll_ctl(fd, EPOLLIN); // Wait for fd to be ready. } return n; // Error.}
printf("Got %d size of data %p", n, buf);
复制代码


Note: 为了方便表达关键逻辑,我们使用示意代码,具体代码可以参考 epoll 的示例代码。


一般 read 是业务逻辑,读出来的数据也是业务数据,但是这里却需要在 EAGAIN 时调用 epoll 这个底层框架处理 fd。这就是把底层逻辑和业务逻辑混合在一起,导致难以维护。


Note: 尽管 NGINX 包装了一层框架,但是本质上并不能解决这个异步回调问题,当 fd 没有准备好必须返回当前函数,所以导致很多状态需要保存和恢复,在复杂的逻辑中状态机也变得非常复杂。


下面我们看协程化的逻辑会是怎样的,同样以上面代码为例:


st_netfd_t fd = st_accept(listen_fd); // Got a TCP connection
int n = st_read(fd, buf, sizeof(buf));if (n == -1) { return n; // Error.}
printf("Got %d size of data %p", n, buf);
复制代码


简单的看,就是没有 EAGAIN,看起来要么读取到了数据,要么就是出现了错误,不会出现 fd 没有准备好的情况。这就给整个业务逻辑带来了非常好的体验,因为不需要保存状态了,不会反复的尝试读取。


同样用 epoll,为何st_read就没有 EAGAIN 呢?这就是协程化,不是没有,只是在底下处理了这个事件。我们看st_readv这个函数:


ssize_t st_read(_st_netfd_t *fd, void *buf, size_t nbyte) {    while ((n = read(fd->osfd, buf, nbyte)) < 0) {        if (errno == EINTR)            continue;        if (!_IO_NOT_READY_ERROR) // Error, if not EAGAIN.            return -1;
/* Wait until the socket becomes readable */ if (st_netfd_poll(fd, POLLIN) < 0) // EAGAIN return -1; } return n;}
复制代码


很明显在EAGAIN时,会调用st_netfd_poll。在st_netfd_poll函数里,会将当前协程切换出让,调度线程执行下一个协程。并且在未来某个时刻,会因为 IO 事件到达或者超时错误,而将当前协程恢复。


Note: 由于协程切换和恢复,都是在这个函数中实现的,对于上层调用的代码,看起来没有发生什么,所以就不仅没有 EAGAIN 这个错误消息,也不会返回上一层函数,当然也不需要保存状态和恢复状态。


我们可以总结下,如何协程化任何协议的思路:


  1. 直接对 API 进行一次调用,如果成功,那么直接返回。

  2. 如果 API 返回失败,检查错误,非 IO 等待的错误直接返回。

  3. 将当前协程出让,调度器运行其他协程,直到该 FD 上的事件返回或者超时;如果超时,则返回错误;如果事件到达,则重复上面的步骤。


我们可以按照这个思路将 SRT 进行协程化(Coroutine Native)。

Coroutine Native SRT

我们以srt_recvmsg函数的协程化为例,这个函数类似 TCP 的read函数,定义如下:


SRT_API int srt_recvmsg (SRTSOCKET u, char* buf, int len);
复制代码


我们同样,提供一个SrsSrtSocket::recvmsg的函数,类似st_read函数,实现如下:


srs_error_t SrsSrtSocket::recvmsg(void* buf, size_t size, ssize_t* nread) {  while (true) {    int ret = srt_recvmsg(srt_fd_, (char*)buf, size);    if (ret >= 0) { // Receive message ok.      recv_bytes_ += ret;       *nread = ret;      return err;    }        // Got something error, return immediately.    if (srt_getlasterror(NULL) != SRT_EASYNCRCV) {      return srs_error_new(ERROR_SRT_IO, "srt_recvmsg");    }        // Wait for the fd ready or error, switch to other coroutines.    if ((err = wait_readable()) != srs_success) { // EAGAIN.      return srs_error_wrap(err, "wait readable");    }  }    return err;}
复制代码


可以看到和st_read非常类似,在wait_readable中也会实现协程的切换和恢复,只是我们使用st_cond_t条件变量来实现:


srs_error_t SrsSrtSocket::wait_readable() {  srt_poller_->mod_socket(this, SRT_EPOLL_IN);  srs_cond_timedwait(read_cond_);}
复制代码


Note: 这里先修改了 epoll 侦听的事件SRT_EPOLL_IN,等待 fd 可读后,再等待条件变量触发。


而触发这个条件变量的函数,是在SrsSrtPoller::wait,实现如下:


srs_error_t SrsSrtPoller::wait(int timeout_ms, int* pn_fds) {  int ret = srt_epoll_uwait(srt_epoller_fd_, events_.data(), events_.size());  for (int i = 0; i < ret; ++i) {    if (event.events & SRT_EPOLL_IN) {      srt_skt->notify_readable();    }  }}
void SrsSrtSocket::notify_readable() { srs_cond_signal(read_cond_);}
复制代码


这样就完全做到了将 SRT API 协程化,其他的 API 比如 srt_sendmsg, srt_connnect, srt_accept 也是类似的操作。


下面我们对比下,协程化(Coroutine Native)之后,和原始的回调(Callback)的区别。

Coroutine Native PK Callback

将 SRT 协程化以后, 业务逻辑和底层代码分离,上层的代码逻辑清晰明了。


先看看 accept 这个逻辑,之前也是由 epoll 触发的事件处理,创建srt_conn这个数据结构:


while (run_flag) {  int ret = srt_epoll_wait(_pollid, read_fds, &rfd_num, write_fds);  for (int index = 0; index < rfd_num; index++) {    SRT_SOCKSTATUS status = srt_getsockstate(read_fds[index]);    srt_handle_connection(status, read_fds[index], "read fd");  }}
void srt_server::srt_handle_connection(SRT_SOCKSTATUS status, SRTSOCKET input_fd) { if (status == SRTS_LISTENING) { conn_fd = srt_accept(input_fd, (sockaddr*)&scl, &sclen); _handle_ptr->add_newconn(conn_fd, SRT_EPOLL_IN); }}
void srt_handle::add_newconn(SRT_CONN_PTR conn_ptr, int events) { _push_conn_map.insert(std::make_pair(conn_ptr->get_path(), conn_ptr)); _conn_map.insert(std::make_pair(conn_ptr->get_conn(), conn_ptr)); int ret = srt_epoll_add_usock(_handle_pollid, conn_ptr->get_conn(), &events);}
复制代码


Note: 创建的srt_conn本身就是保存在全局数据结构之中,在后续的回调事件中持续修改和变更这个数据结构。


我们对比下协程化之后的业务逻辑,收到会话之后启动处理协程:


srs_error_t SrsSrtListener::cycle() {  while (true) {    srs_srt_t client_srt_fd = srs_srt_socket_invalid();    srt_skt_->accept(&client_srt_fd);        srt_server_->accept_srt_client(srt_fd);  }}
srs_error_t SrsSrtServer::accept_srt_client(srs_srt_t srt_fd) { fd_to_resource(srt_fd, &srt_conn); conn_manager_->add(srt_conn); srt_conn->start();}
复制代码


Note: 虽然有全局变量维护srt_conn,但这里不会关注到 epoll 的处理,而是由协程主导的执行逻辑,而不是由回调主导的逻辑。


回调主导的逻辑,维护和了解代码时,必须要从 epoll 回调事件开始看,而且不同事件都在修改srt_conn这个对象的状态,要了解对象生命周期是很有难度的。而协程主导的逻辑,它的生命周期是在协程中,收到srt_conn就启动协程处理它,后续的读写也在协程中。


我们继续看srt_conn的读处理逻辑,之前直接使用原生 SRT 的 read 函数,同样是由 epoll 的事件触发回调:


while (run_flag) {  int ret = srt_epoll_wait(_pollid, read_fds, &rfd_num, write_fds);  for (int index = 0; index < rfd_num; index++) {    SRT_SOCKSTATUS status = srt_getsockstate(read_fds[index]);    srt_handle_data(status, read_fds[index], "read fd");  }}
void srt_handle::handle_srt_socket(SRT_SOCKSTATUS status, SRTSOCKET conn_fd) { auto conn_ptr = get_srt_conn(conn_fd); int mode = conn_ptr->get_mode(); if (mode == PUSH_SRT_MODE && status == SRTS_CONNECTED) { handle_push_data(status, path, subpath, conn_fd); }}
void srt_handle::handle_push_data(SRT_SOCKSTATUS status, SRTSOCKET conn_fd) { srt_conn_ptr = get_srt_conn(conn_fd); if (status != SRTS_CONNECTED) { // Error. close_push_conn(conn_fd); return; }
ret = srt_conn_ptr->read_async(data, DEF_DATA_SIZE); if (ret <= 0) { // Error. if (srt_getlasterror(NULL) != SRT_EASYNCRCV) { return; } close_push_conn(conn_fd); return; }
srt2rtmp::get_instance()->insert_data_message(data, ret, subpath);}
复制代码


Note: 在回调中我们需要处理各种状态,而这个srt_conn的状态变化,是由各种回调决定的,很难一次了解到这个会话的主要处理逻辑。


我们看看 SRT 协程化之后,这个业务逻辑是怎样写的:


srs_error_t SrsMpegtsSrtConn::do_publishing() {  while (true) {    ssize_t nb = 0;    if ((err = srt_conn_->read(buf, sizeof(buf), &nb)) != srs_success) {      return srs_error_wrap(err, "srt: recvmsg");    }        if ((err = on_srt_packet(buf, nb)) != srs_success) {      return srs_error_wrap(err, "srt: process packet");    }  }}
复制代码


Note: 这里srt_conn的生命周期非常明确,它的状态就是直接在这里返回错误,对于这个会话来说,这就是它的主循环,不会因为read而导致进入 SRT 的 epoll 大循环,我们在维护时也不用关注这个异步事件触发和处理。


再次强调一次,维护代码时,我们需要了解的信息量是非常不同的。在基于异步回调的逻辑中,我们在回调函数中,是需要关注目前对象有哪些状态,修改了哪些状态,其他异步事件又有哪些影响。而基于协程的逻辑中,没有这些状态,协程的创建和执行,就是线性的,或者说这些状态就是在协程的函数调用中。


Note: 为何异步回调的状态就不能在函数调用中呢?因为异步回调的堆栈中不能保存srt_conn的状态,它本质上就是一个协程,保存的是 epoll 的循环的状态。而协程是根据每个srt_conn所创建的,它的堆栈中保存的都是这个对应的srt_conn的状态。


这本质上,是由于异步回调的状态,只能保存在全局数据结构之中。而协程的状态,是可以保存在各个局部变量之中,每个函数的局部变量,都是这个协程所独有的,协程没有结束前都可以使用。

What is the Next

SRS 虽然完成了 SRT 协程化,并没有解决所有的问题。后续的计划包括:


  1. SRT 直接转 WebRTC,低延迟直播的另外一种方式。

  2. 某些服务器之间的长链路可以将 TCP 替换为 SRT 传输, 比如跨国的 RTMP 转发。

  3. SRT 工具链的完善,比如srs-bench,支持压测 SRT 流。


欢迎加入 SRS 开源社区,一起做好一个流媒体服务器,让全世界都来白嫖。

One More Thing

有些朋友也很好奇,真正商用的视频云的 SRT,和开源的 SRT 的服务器,有什么区别,都做过哪些优化。


Note: 由于开源服务器侧重标准协议和兼容性,有些优化并不适合在开源项目中实现,所以在云计算的商业化服务器和开源服务器,一定是存在很大差异的。就算是 Linux 系统,其实云计算的 Linux 内核,和开源的 Linux 内核,也有很大的差异。


腾讯云在 SRT 的实战中积累很多经验,请参考之前分享的文章:



其中,最为严重的是 SRT 重传率过高、限带宽下表现不如 TCP/QUIC 等,腾讯云针对这些问题,做了几个优化:


  1. SRT 重传乱序度自适应:当接收到乱序报文,首次发起重传时,会根据当前的乱序度,等待 N 个包之后才发起重传。原生 SRT 这个乱序度是固定值,我们修改成为根据网络乱序情况自适应。

  2. SRT 传输参数优化:通过对参数优化,减少了一半的重传率。

  3. 加入了 BBR 拥塞控制算法:原生 SRT 拥塞控制非常弱,评估的带宽波动也非常大。我们加入了 BBR 拥塞控制算法,针对性的解决了这个问题。

  4. 强化了 SRT 多链路传输,增加了带宽聚合的模式:原生 SRT 只有 backup,broadcast 两种多链路传输模式,我们针对直播场景增加了 auto 模式,能够做到讲多个网卡的带宽聚合后进行直播,并智能动态选择链路。


Note: 腾讯云音视频在音视频领域已有超过 21 年的技术积累,持续支持国内 90%的音视频客户实现云上创新,独家具备 RT-ONE™ 全球网络,在此基础上,构建了业界最完整的 PaaS 产品家族,并通过腾讯云视立方 RT-Cube™ 提供 All in One 的终端 SDK,助力客户一键获取众多腾讯云音视频能力。腾讯云音视频为全真互联时代,提供坚实的数字化助力。


作者介绍:


肖志宏 (hondaxiao) 腾讯云工程师,全球 TOP1 开源音视频服务器 SRS TOC。


杨成立 (Winlin) 腾讯云工程师,全球 TOP1 开源音视频服务器 SRS 作者,音视频服务器和视频云领域超过 13 年经验。

2022-07-04 16:245942

评论

发布
暂无评论
发现更多内容

2021秋招必刷题:Redis+Mybatis,java使用教程答案

Java 程序员 后端

2021阿里大牛最新发布:Java高频面试题和核心技术(已涨薪6K

Java 程序员 后端

APP性能优化系列-自定义启动器(三),阿里巴巴java面试几轮

Java 程序员 后端

2021版阿里Java亿级并发设计手册:基础+数据库,linux服务器开发需要的技术

Java 程序员 后端

谈一谈区块链项目使用的数据库LevelDB

Regan Yue

区块链 leveldb 11月日更

BAT面试必考Java面试题100+:Kafka,mysql连接查询原理

Java 程序员 后端

2021毕业的Java应届生,面试需要掌握哪些技能,才能收割offer

Java 程序员 后端

双维度第一!百度智能云领衔中国“AI+工业互联网”市场领导者阵营

百度大脑

人工智能 百度

95 后程序员一出校门就拿年薪 32 万?,java入门视频教学

Java 程序员 后端

如何将字符串截取成一个集合

卢卡多多

内容合集 11月日更

09 K8S之对象类资源配置

穿过生命散发芬芳

k8s 11月日更

最佳实践|Apache Pulsar 在拉卡拉的技术实践

Apache Pulsar

开源 架构 中间件 Apache Pulsar 消息系统 Apache 分布式

97 道大厂 Java 核心面试题出炉,来试试看你会几道题?

Java 程序员 后端

BATJ互联网公司面试必问知识点:Spring全家桶全解,java分布式框架技术方案

Java 程序员 后端

【Java 原理剖析系列】深度分析 Semaphore工作原理分析

洛神灬殇

Java 并发编程 Semaphore 11月日更

BATJ互联网公司必问知识点:Spring十个面试专题及答案(1)

Java 程序员 后端

在线文本去空行工具

入门小站

工具

BATJ互联网月薪45K的Java岗面试题首次曝光,掌握这些Offer指定跑不了

Java 程序员 后端

50道Java面试常问的基础知识,虽是基础但是避坑之路可得小心谨慎

Java 程序员 后端

5年Java经验字节社招:半月3次面试,成功拿到Offer,大厂Mysql高频面试题

Java 程序员 后端

7张图带你轻松理解Java 线程安全,java开发架构思想

Java 程序员 后端

AcWing 1532,java教程下载网盘

Java 程序员 后端

一文了解 PG PITR 即时恢复

青云技术社区

数据库 postgresql 云计算

30岁,转行学编程靠谱吗?,java银行面试的问题

Java 程序员 后端

6种新方法帮你提高Java学习能力,mysql教程入门到精通pdf

Java 程序员 后端

linux之git入门命令

入门小站

Linux

@RequestMapping详解,隔壁都馋哭了

Java 程序员 后端

2021版最新!字节跳动3面+腾讯6面一次过,java高级特性面试题

Java 程序员 后端

30天熬夜苦学这本Java后端架构设计精讲,大厂三面架构问题so easy

Java 程序员 后端

3分钟快速搞懂Java的桥接方法,Java多态实现原理解析

Java 程序员 后端

BATJ互联网公司必问知识点:Spring十个面试专题及答案,java技术面试总结评语

Java 程序员 后端

SRS 5.0:如何实现 SRT 协程化_架构_肖志宏_InfoQ精选文章