Linux内核-I/O多路复用[select]和I/O事件就绪通知源码分析
文章目录
概要
在select、poll和epoll的区别一文中不断地提到Linux内核I/O事件就绪通知
相关字眼,可以说其是多路复用I/O模型的绝对核心,是比阻塞I/O,非阻塞I/O模型高性能的一大关键点。
本文就基于select源码一探究竟,在I/O事件就绪通知源码上poll和epoll是适用的。
PS:基于linux内核V6.7。
一、了解 Linux
1.1、Linux下OSI五层模型
如图所示,当我们创建一个TCP socket后,然后通过select API阻塞进程,此时有个客户端
要建立连接,数据包通往网络抵达当前服务器,就会:
- 先到网卡,网卡收到的数据是光信号或电信号,然后将其还原成数字信息(由1和0组成),数据通过过滤校验后,会通过DMA的方式写入到指定的内存地址(这块地址也叫网卡缓冲区,是操作系统启动时,加载网卡驱动代码的时候,网卡驱动代码申请的),最后给CPU发送中断信号,这一阶段不需要CPU参与;
- CPU收到中断信号,就调用网卡对应的网卡驱动代码(网卡驱动程序每个网卡制造商都要集成到Linux内核中,如华为以太网驱动),驱动代码会将网卡缓冲区的数据包转换成Linux内核网络模块能识别的skb格式(网卡缓冲区的数据包格式只有网卡驱动知道),然后调用IP 层的入口函数上传给Linux内核网络模块的
IP层
; - IP 层的入口函数是 ip_rcv函数,将skb格式的数据包进行校验处理,继续调TCP层入口函数,上传给Linux内核网络模块的
TCP层
; - TCP层的入口函数是tcp_v4_rcv 函数,将skb格式的数据包进行校验处理,放进 socket 的 receive queue中,通过 socket 唤醒进程;
- 进程被唤醒后,就通过Glibc scoket API进行I/O操作,即可通过accept API与
客户端
建立连接。
1.2、Linux的进程状态转换模型
通过Linux任务状态常量可以看到进程状态有很多:
/* Used in tsk->__state: */
#define TASK_RUNNING 0x00000000
#define TASK_INTERRUPTIBLE 0x00000001
#define TASK_UNINTERRUPTIBLE 0x00000002
#define __TASK_STOPPED 0x00000004
#define __TASK_TRACED 0x00000008
/* Used in tsk->exit_state: */
#define EXIT_DEAD 0x00000010
#define EXIT_ZOMBIE 0x00000020
#define EXIT_TRACE (EXIT_ZOMBIE | EXIT_DEAD)
/* Used in tsk->__state again: */
#define TASK_PARKED 0x00000040
#define TASK_DEAD 0x00000080
#define TASK_WAKEKILL 0x00000100
#define TASK_WAKING 0x00000200
#define TASK_NOLOAD 0x00000400
#define TASK_NEW 0x00000800
#define TASK_RTLOCK_WAIT 0x00001000
#define TASK_FREEZABLE 0x00002000
#define __TASK_FREEZABLE_UNSAFE (0x00004000 * IS_ENABLED(CONFIG_LOCKDEP))
#define TASK_FROZEN 0x00008000
#define TASK_STATE_MAX 0x00010000
#define TASK_ANY (TASK_STATE_MAX-1)
/*
* DO NOT ADD ANY NEW USERS !
*/
#define TASK_FREEZABLE_UNSAFE (TASK_FREEZABLE | __TASK_FREEZABLE_UNSAFE)
/* Convenience macros for the sake of set_current_state: */
#define TASK_KILLABLE (TASK_WAKEKILL | TASK_UNINTERRUPTIBLE)
#define TASK_STOPPED (TASK_WAKEKILL | __TASK_STOPPED)
#define TASK_TRACED __TASK_TRACED
#define TASK_IDLE (TASK_UNINTERRUPTIBLE | TASK_NOLOAD)
/* Convenience macros for the sake of wake_up(): */
#define TASK_NORMAL (TASK_INTERRUPTIBLE | TASK_UNINTERRUPTIBLE)
/* get_task_state(): */
#define TASK_REPORT (TASK_RUNNING | TASK_INTERRUPTIBLE | TASK_UNINTERRUPTIBLE | __TASK_STOPPED |
__TASK_TRACED | EXIT_DEAD | EXIT_ZOMBIE | TASK_PARKED)
不过核心的就6个:
- TASK_RUNNING【R】:就绪或运行状态;
- TASK_INTERRUPTIBLE【S】:可中断等待状态。此进程排入任务等待队列中,一旦资源可用时就会被唤醒,
也可
由其他进程通过中断或信号唤醒; - TASK_UNINTERRUPTIBLE【D】:不可中断等待状态。一旦资源可用时就会被唤醒,
不可
由其他进程通过中断或信号唤醒,通常用于进程必须等待某件工作完成,如进程打开设备文件时就会处于该状态,不能被 kill。 - TASK_STOPPED【T】:暂停状态。此状态用于调试,因收到暂停信号(SIGSTOP)而暂停执行,也可能受其他进程的跟踪和调用而暂时让出处理器;
- EXIT_ZOMBIE【Z】:僵尸状态。进程运行完成,已释放了所占用的大部分资源,尚未释放
task_struct结构体
; - EXIT_DEAD【X】:退出状态。进程即将被销毁,非常短暂,几乎不会被ps命令捕捉到。
1.3、Linux下的中断之时钟中断
中断机制是计算机系统的重要组成部分,在Linux中也不例外,中断按照来源分为硬中断和软中断,而硬中断根据硬件范围分为外中断和内中断。时钟中断就是内中断的一种。
之所以要提到时钟中断,是因为时钟是操作系统进行调度工作的重要工具,如维护系统的绝对日期和时间、让分时进程按时间片轮转、让实时进程定时发送换接收控制信号、系统定时唤醒或阻塞进程、对用户进程记账、测量系统性能等,利用定时器能够确保操作系统在必要的时候获得控制权,陷入死循环的进程最终会因时间片耗尽而被迫让出CPU。
计算机里面主要有三类硬件时钟芯片,分别是实时时钟RTC(Real Time Clock)、定时器Timer、计时器Counter。RTC相当于是手表、座钟。定时器相当于是闹钟。计时器相当于是运动会中的计时器。注意是三类硬件时钟芯片,而不是三个,某一类时钟可能基于多个不同的硬件时钟芯片,某一个硬件时钟芯片也可能被用于实现多种时钟类型。当然还有其它的时钟类型,比如晶振时钟,是驱动CPU运行的周期信号,用来触发和同步CPU内部的操作,我们常说某CPU是多少GHz,就是说这个时钟晶振每秒向CPU发送多少信号。
时钟中断是基于硬件时钟芯片,Linux操作启动时:
- 会加载硬件时钟芯片的驱动(参考实时时钟RTC stm32驱动);
- 此时会注册时钟中断回调函数(参考RTC驱动框架);
- 等硬件时钟芯片到时是就会向CPU发送特定的中断信号,触发时钟中断回调函数,唤醒阻塞进程(参考RTC驱动框架最终回调,会调用wake_up_interruptible函数唤醒进程)。
请充分理解Linux RTC驱动这篇博客,将对你理解Linux时钟中断起到事半功倍的效果。
那么Linux 时钟中断架构归纳如下图:
Linux的时钟中断来源于硬件时钟芯片,通过【硬件时钟芯片驱动】作为中间层联通硬件与操作系统任务(进程,线程等)。当我们我们通过
hitmer
启动定时任务挂起进程时,进程状态由TASK_RUNNING变为TASK_INTERRUPTIBLE,当定时任务到期时就会唤醒进程,使其状态由TASK_INTERRUPTIBLE变为TASK_RUNNING。
这个时候肯呢个有小伙伴问了,Linux下一个硬件时钟芯片触发时钟中断,回调时钟中断函数,如何实现管理多个定时器呢?
简单来说就是用特定的数据结构(时间轮、最小堆、红黑树等,比如低精度定时器timer采用时间轮,高精度定时器hrtimer采用红黑树)管理众多的定时器,在时钟中断回调函数中判断哪些定时器超时,然后执行超时处理动作(一般是唤醒等待队列中的进程)。
所以说低精度定时器(ms级)实现不了高精度定时任务(ns级),会导致提前或延后触发定时任务。
二、select源码解析
我们结合select的Glibc库使用来分析。
2.1、select使用案例
select api说明。
对于select fd_set类型参数可以用下面函数进行操作:
void FD_SET(int fd, fd_set *set); //设置文件描述符到集合中
void FD_CLR(int fd, fd_set *set); //从集合中清除文件描述符标志
int FD_ISSET(int fd, fd_set *set); //判断文件描述符在集合中是否被标志
void FD_ZERO(fd_set *set); //清空集合
#define FD_SET(fd,fdsetp) __FD_SET(fd,fdsetp)
#define FD_CLR(fd,fdsetp) __FD_CLR(fd,fdsetp)
#define FD_ISSET(fd,fdsetp) __FD_ISSET(fd,fdsetp)
#define FD_ZERO(fdsetp) __FD_ZERO(fdsetp)
#define __FD_SET(fd, fdsetp) \
(((fd_set *)(fdsetp))->fds_bits[(fd) >> 5] |= (1<<((fd) & 31))) //其实就是将fds_bits数组特定位设置为1
#define __FD_CLR(fd, fdsetp) \
(((fd_set *)(fdsetp))->fds_bits[(fd) >> 5] &= ~(1<<((fd) & 31)))//其实就是将fds_bits数组特定位设置为0
#define __FD_ISSET(fd, fdsetp) \
((((fd_set *)(fdsetp))->fds_bits[(fd) >> 5] & (1<<((fd) & 31))) != 0)//其实就是判定fds_bits数组特定位是否为1
#define __FD_ZERO(fdsetp) \
(memset (fdsetp, 0, sizeof (*(fd_set *)(fdsetp)))) //其实就是将fds_bits数组所有位设置为0
综上可知fd_set结构体包含了一个fds_bits的long型数组,不管32位还是64为系统下,其总位数是FD_SETSIZE(内核默认1024),通过每一位是0或1来描述对应socket描述符状态(即是否有I/O事件)。我们通过select API将我们要监听的文件描述符对应的位设置好后,传递给内核空间,内核会重新设置要监听的文件描述符对应的位是0或1,最后返回到用户空间,告诉我们该描述符是否有I/O事件
DEMO:
#include <sys/types.h>
#include <sys/socket.h>
#include <stdio.h>
#include <netinet/in.h>
#include <sys/time.h>
#include <sys/ioctl.h>
#include <unistd.h>
#include <stdlib.h>
int main()
{
int server_sockfd, client_sockfd;
int server_len, client_len;
struct sockaddr_in server_address;
struct sockaddr_in client_address;
socklen_t addr_len = sizeof(client_address);
int ret, fd;
fd_set readfds, tempfds;
server_sockfd = socket(AF_INET, SOCK_STREAM, 0);//建立TCP socket
server_address.sin_family = AF_INET;
server_address.sin_addr.s_addr = htonl(INADDR_ANY); //地址 0.0.0.0
server_address.sin_port = htons(8888);
ret = bind(server_sockfd, (struct sockaddr *)&server_address, sizeof(server_address));
if(ret < 0)
{
printf("bind err: %d\n",errno);
exit(1);
}
ret = listen(server_sockfd, 125); //设置TCP连接队列容量为125
if(ret < 0)
{
printf("listen err: %d\n",errno);
exit(1);
}
FD_ZERO(&readfds);
FD_SET(server_sockfd, &readfds);//将TCP socket加入到监听集合中
int maxfd = server_sockfd; //监听的最大套接字
struct timeval tv = {
.tv_sec=5,
.tv_usec=0,
}; //select超时时间为5s
char buf[BUFSIZ]; //接收客户端数据
while(1)
{
tempfds = readfds;//将需要监视的描述符集copy到select查询队列中,select会对其修改,所以一定要分开使用变量
printf("server to waiting\n");
ret = select(maxfd+1, &tempfds, NULL, NULL, &tv); //FD_SETSIZE:系统默认的最大文件描述符
if(ret < 0)
{
printf("select err: %d\n",errno);
break; //这里出错就退出循环
}
if(ret == 0) //超时继续
{
printf("select timeout.\n");
continue;
}
//先处理下新的客户端连接
if(FD_ISSET(server_sockfd,&tempfds))
{
client_sockfd = accept(server_sockfd, (struct sockaddr *)&client_address, &addr_len);
if(client_sockfd < 0)
{
printf("accept err: %d\n",errno);
}
else if(client_sockfd >= FD_SETSIZE) //最多1024个
{
printf("error: too many client\n");
}
else
{
FD_SET(client_sockfd, &readfds);//将客户端socket加入到集合中
printf("new client fd: %d\n", client_sockfd);
if(maxfd < client_sockfd)
{
maxfd = client_sockfd; //更新maxfd
}
}
}
for(fd = 0; fd <= maxfd; fd++)
{
if(!FD_ISSET(fd, &tempfds) || fd == server_sockfd) //过滤
{
continue;
}
bzero (buf, BUFSIZ);
ret = read (fd, buf, BUFSIZ - 1);
if(ret < 0)
{
printf("read err: %d\n",errno);
}
else if(ret == 0) //关闭
{
printf("close client fd: %d\n", fd);
FD_CLR(fd, &readfds);//将socket 从集合中移除
close(fd);//关闭client socket
}
else
{
printf("msg form client[%d]: %s\n", fd, buf);
write(fd, "hello", 5);
}
}
}
close(server_sockfd); //关闭server socket
return 0;
}
2.2、select源码
我们通过Glibc库调用select API时,其实走的是Linux 内核一个函数:
SYSCALL_DEFINE5(select, int, n, fd_set __user *, inp, fd_set __user *, outp,
fd_set __user *, exp, struct __kernel_old_timeval __user *, tvp)
{
return kern_select(n, inp, outp, exp, tvp);
}
根据源码可知其调用链如下:
2.2.1、相关结构体
先熟悉下若干结构体。
typedef struct poll_table_struct {
poll_queue_proc _qproc; //关注下这个结构体_qproc是一个回调函数就好了,在select源码里面会被设置为__pollwait函数
__poll_t _key;
} poll_table;
/*
* Structures and helpers for select/poll syscall
*/
struct poll_wqueues {
poll_table pt;
struct poll_table_page *table;
struct task_struct *polling_task; //Linux中task_struct结构体是进程控制块(PCB)
int triggered;
int error;
int inline_index;
struct poll_table_entry inline_entries[N_INLINE_POLL_ENTRIES];
};
struct poll_table_entry {
struct file *filp;
__poll_t key;
wait_queue_entry_t wait; //当前进程在等待队列中的节点
wait_queue_head_t *wait_address; //等待队列头
};
2.2.2、core_sys_select
core_sys_select函数主要工作时申请内存,将数据从用户空间copy到内核空间,处理后再从内核空间copy到用户空间。
//注意,省略了一些代码,最好直接看源码
int core_sys_select(int n, fd_set __user *inp, fd_set __user *outp, fd_set __user *exp, struct timespec64 *end_time)
{
//栈上分配一段内存
long stack_fds[SELECT_STACK_ALLOC/sizeof(long)];
//n个socket描述符需要多少字节
size = FDS_BYTES(n);
bits = stack_fds;
/*
* 如果栈上的内存太小,那么就重新分配内存
* 为什么是除以6呢?
* 因为每个文件描述符要占6个bit(输入:可读,可写,异常;输出结果:可读,可写,异常)
*/
if (size > sizeof(stack_fds) / 6)
bits = kmalloc(6 * size, GFP_KERNEL);
//设置好bitmap对应的内存空间
fds.in = bits;//可读
fds.out = bits + size;//可写
fds.ex = bits + 2*size;//异常
fds.res_in = bits + 3*size;//用于返回可读结果
fds.res_out = bits + 4*size;//用于返回可写结果
fds.res_ex = bits + 5*size;//用于返回异常结果
//将应用层的监听集合拷贝到内核空间
if ((ret = get_fd_set(n, inp, fds.in)) ||
(ret = get_fd_set(n, outp, fds.out)) ||
(ret = get_fd_set(n, exp, fds.ex)))
goto out;
//清空三个输出结果的集合
zero_fd_set(n, fds.res_in);
zero_fd_set(n, fds.res_out);
zero_fd_set(n, fds.res_ex);
ret = do_select(n, &fds, end_time);//阻塞,直至有I/O事件或超时
//将结果拷贝回用户空间,这里会修改用户select API参数,所以在2.1小节demo中需会有【tempfds = readfds;】
if (set_fd_set(n, inp, fds.res_in) || set_fd_set(n, outp, fds.res_out) || set_fd_set(n, exp, fds.res_ex))
ret = -EFAULT;
}
2.2.3、do_select
do_select函数用于阻塞进程,此时进程进入TASK_INTERRUPTIBLE
状态,直至I/O事件就绪或超时,唤醒进程,进入TASK_RUNNING
状态。
static inline void init_poll_funcptr(poll_table *pt, poll_queue_proc qproc)
{
pt->_qproc = qproc;
pt->_key = ~(__poll_t)0; /* all events enabled */
}
void poll_initwait(struct poll_wqueues *pwq)//初始化
{
init_poll_funcptr(&pwq->pt, __pollwait);
pwq->polling_task = current; //设置当前PCB
pwq->triggered = 0;
pwq->error = 0;
pwq->table = NULL;
pwq->inline_index = 0;
}
static int do_select(int n, fd_set_bits *fds, struct timespec64 *end_time)
{
ktime_t expire, *to = NULL;
struct poll_wqueues table;
poll_table *wait;
int retval, i, timed_out = 0;
u64 slack = 0;
__poll_t busy_flag = net_busy_loop_on() ? POLL_BUSY_LOOP : 0; //检查下网络是否繁忙
unsigned long busy_start = 0;
rcu_read_lock();
retval = max_select_fd(n, fds);//确定下当前最大的socket描述符
rcu_read_unlock();
if (retval < 0)
return retval;
n = retval;
poll_initwait(&table); //初始化poll_wqueues,辅助select/poll做syscall的,注意这里面table.pt->_qproc会被设置为__pollwait函数
wait = &table.pt;
//需要注意end_time==NULL和end_time!=NULL && end_time->tv_sec==0 && end_time->tv_nsec==0的区别
if (end_time && !end_time->tv_sec && !end_time->tv_nsec) {
//end_time!=NULL && end_time->tv_sec==0 && end_time->tv_nsec==0情况下
wait->_qproc = NULL; //将wait->_qproc设置NULL,这样下面vfs_poll时,就不会调用__pollwait,进而将当前进程设置到socket等待队列中,
//也就是说这种情况下没有I/O事件就绪通知了
timed_out = 1;//直接这只超时标志位1,这样最外层的死循环只循环一次就结束了
}
//如果end_time==NULL,最外层的死循环会一直进行,这样超过CPU时间片当前进程就会被挂起,稍后再被schedule继续死循环,直至收到I/O事件就绪或收到信号才会退出循环。
if (end_time && !timed_out)
slack = select_estimate_accuracy(end_time);//估计一个松弛量
retval = 0;
for (;;) {
unsigned long *rinp, *routp, *rexp, *inp, *outp, *exp;
bool can_busy_loop = false;
inp = fds->in; outp = fds->out; exp = fds->ex;
rinp = fds->res_in; routp = fds->res_out; rexp = fds->res_ex;
for (i = 0; i < n; ++rinp, ++routp, ++rexp) {
unsigned long in, out, ex, all_bits, bit = 1, j;
unsigned long res_in = 0, res_out = 0, res_ex = 0;
__poll_t mask;
in = *inp++; out = *outp++; ex = *exp++;
all_bits = in | out | ex;
if (all_bits == 0) {
i += BITS_PER_LONG;
continue;
}
for (j = 0; j < BITS_PER_LONG; ++j, ++i, bit <<= 1) {
struct fd f;
if (i >= n)
break;
if (!(bit & all_bits))
continue;
mask = EPOLLNVAL;
f = fdget(i);
if (f.file) {
wait_key_set(wait, in, out, bit,
busy_flag);
mask = vfs_poll(f.file, wait);//调用每一个文件描述符对应驱动的poll函数,得到一个掩码
fdput(f);
}
if ((mask & POLLIN_SET) && (in & bit)) {//根据掩码设置可读集合中socket描述符对应位
res_in |= bit;
retval++;
wait->_qproc = NULL;
}
if ((mask & POLLOUT_SET) && (out & bit)) {//根据掩码设置可写集合中socket描述符对应位
res_out |= bit;
retval++;
wait->_qproc = NULL;
}
if ((mask & POLLEX_SET) && (ex & bit)) {//根据掩码设置异常集合中socket描述符对应位
res_ex |= bit;
retval++;
wait->_qproc = NULL;
}
/* got something, stop busy polling */
if (retval) {
can_busy_loop = false;
busy_flag = 0;
} else if (busy_flag & mask)
can_busy_loop = true;
}
if (res_in)
*rinp = res_in;
if (res_out)
*routp = res_out;
if (res_ex)
*rexp = res_ex;
cond_resched();
}
wait->_qproc = NULL;
if (retval || timed_out || signal_pending(current)) //有I/O事件就绪、超时、收到信号就退出死循环
break;
if (table.error) {
retval = table.error;
break;
}
/* only if found POLL_BUSY_LOOP sockets && not out of time */
if (can_busy_loop && !need_resched()) {
if (!busy_start) {
busy_start = busy_loop_current_time();
continue;
}
if (!busy_loop_timeout(busy_start))
continue;
}
busy_flag = 0;
if (end_time && !to) {
expire = timespec64_to_ktime(*end_time);
to = &expire;
}
//定时器,设置select超时时间,注意这里是允许进程被socket I/O事件就绪提前唤醒的
if (!poll_schedule_timeout(&table, TASK_INTERRUPTIBLE,to, slack))
timed_out = 1;
}
poll_freewait(&table); //将当前进程从readfds、writefds、exceptfds这三个集合中socket的等待队列中移,轮询
return retval;
}
通过源码可知,对于最外围的死循环:
如果进入
do_select函数
时恰好有I/O就绪事件,那么循环一次就结束了;
否则通过poll_schedule_timeout挂起进程,等待超时或I/O事件就绪通知,那么尽行第二次循环后结束;
但是如果参数end_time==NULL,此时poll_schedule_timeout函数直接返回非0值,不会挂起进程,那么就会一直循环,当前进程运行超过 CPU时间片会被挂起,稍后再被schedule继续死循环,直至收到I/O事件就绪通知或收到信号才会退出循环。
poll_schedule_timeout函数详情见第五章。
三、I/O事件就绪通知源码解析
在2.2小节
,我们知道select会通过vfs_poll(f.file, wait)
获取一个掩码来确定该socket是否有就绪的I/O事件,那么这个函数做了什么呢?
static inline __poll_t vfs_poll(struct file *file, struct poll_table_struct *pt)
{
if (unlikely(!file->f_op->poll))//该文件描述符驱动不支持poll函数,直接返回默认值
return DEFAULT_POLLMASK;
return file->f_op->poll(file, pt);
}
可以看到直接调用socket文件描述符对应驱动的poll函数,那poll函数是哪里来的的,这就要回到socket创建函数socket()
了。
/* family:被称为协议族。
* type:套接字类型。
* protocol:某个协议的类型常值,可以设置为 0。
* return:返回整型的文件描述符,如果返回 -1 就失败。
*/
int socket(int family, int type, int protocol)
//例如 int socket_fd = socket(AF_INET, SOCK_STREAM, 0);创建一个TCP的socket
我们通过Glibc库的socket创建socket描述符时,其本质调用Linux内核的__sys_socket函数。
根据源码可知其调用链如下图:
3.1、相关结构体
先熟悉下若干结构体。
struct file {
// ...
struct path f_path;
struct inode *f_inode;
const struct file_operations *f_op; //本文需特别关注这个,即文件支持的操作,如open,read等等
atomic_long_t f_count;
unsigned int f_flags;
fmode_t f_mode;
struct mutex f_pos_lock;
loff_t f_pos;
struct fown_struct f_owner;
// ...省略
}
struct file_operations {
struct module *owner;
loff_t (*llseek) (struct file *, loff_t, int);
ssize_t (*read) (struct file *, char __user *, size_t, loff_t *);
ssize_t (*write) (struct file *, const char __user *, size_t, loff_t *);
ssize_t (*read_iter) (struct kiocb *, struct iov_iter *);
ssize_t (*write_iter) (struct kiocb *, struct iov_iter *);
int (*iopoll)(struct kiocb *kiocb, struct io_comp_batch *,unsigned int flags);
int (*iterate_shared) (struct file *, struct dir_context *);
__poll_t (*poll) (struct file *, struct poll_table_struct *); //本文要特别关注这个,这就是文件对应的poll函数
long (*unlocked_ioctl) (struct file *, unsigned int, unsigned long);
long (*compat_ioctl) (struct file *, unsigned int, unsigned long);
int (*mmap) (struct file *, struct vm_area_struct *);
unsigned long mmap_supported_flags;
int (*open) (struct inode *, struct file *);
int (*flush) (struct file *, fl_owner_t id);
// ...省略
}
static const struct file_operations socket_file_ops = {
.owner = THIS_MODULE,
.llseek = no_llseek,
.read_iter = sock_read_iter,
.write_iter = sock_write_iter,
.poll = sock_poll, //本文要特别关注这个,这就是socket对应的poll函数,在2.2.2小节源码中file->f_op->poll(file, pt)执行的真正函数
.unlocked_ioctl = sock_ioctl,
#ifdef CONFIG_COMPAT
.compat_ioctl = compat_sock_ioctl,
#endif
.uring_cmd = io_uring_cmd_sock,
.mmap = sock_mmap,
.release = sock_close,
// ...省略
};
struct socket {
socket_state state;
short type;
unsigned long flags;
struct file *file;
struct sock *sk;
const struct proto_ops *ops; //本文需要特别关注这个,用于与上层协议TCP or UDP沟通的一个桥梁
struct socket_wq wq; //等待该socket的进程队列和异步通知队列,即同一个socket可能有多个进程都在等待使用
};
struct socket_wq {
/* Note: wait MUST be first field of socket_wq */
wait_queue_head_t wait; //真正起作用的等待队列,Linux等待队列
struct fasync_struct *fasync_list;
unsigned long flags; /* %SOCKWQ_ASYNC_NOSPACE, etc */
struct rcu_head rcu;
} ____cacheline_aligned_in_smp;
3.2、tcp与socket关联源码分析
从前文我们知道了sock_poll
函数就是2.2.2小节源码中file->f_op->poll(file, pt)执行的真正函数,我们先看下该函数干了什么。
static __poll_t sock_poll(struct file *file, poll_table *wait)
{
struct socket *sock = file->private_data;
const struct proto_ops *ops = READ_ONCE(sock->ops);
__poll_t events = poll_requested_events(wait), flag = 0;
if (!ops->poll)
return 0;
// ...省略
return ops->poll(file, sock, wait) | flag;
}
这又来一个ops->poll,只不过是socket结构体下的poll,它是从哪里来的呢?在本章节开始就介绍了socket函数内核调用链的流程图,在__sock_create
函数中有这样的逻辑:
pf = rcu_dereference(net_families[family])//选择给定的协议族
err = pf->create(net, sock, protocol, kern);
if (err < 0)
goto out_module_put;
没错,突破点就在协议簇上,在socket编程中只能是AF_INET协议族,其初始化函数是inet_init,其Linux内核加载原理请看本文。
static const struct net_proto_family inet_family_ops = { //定义AF_INET协议族初始化
.family = PF_INET,
.create = inet_create,
.owner = THIS_MODULE,
};
static struct inet_protosw inetsw_array[] =//定义AF_INET有哪些协议族
{
{
.type = SOCK_STREAM, //tcp
.protocol = IPPROTO_TCP,
.prot = &tcp_prot,
.ops = &inet_stream_ops,
.flags = INET_PROTOSW_PERMANENT | INET_PROTOSW_ICSK,
},
{
.type = SOCK_DGRAM, //udp
.protocol = IPPROTO_UDP,
.prot = &udp_prot,
.ops = &inet_dgram_ops,
.flags = INET_PROTOSW_PERMANENT,
},
// ...省略
};
static struct list_head inetsw[SOCK_MAX];
static int __init inet_init(void)
{
// ...省略
for (q = inetsw_array; q < &inetsw_array[INETSW_ARRAY_LEN]; ++q) //注册tcp,udp等协议栈
inet_register_protosw(q); //将tcp,udp等协议栈注册到全局变量inetsw中
// ...省略
(void)sock_register(&inet_family_ops);//注册
// ...省略
}
可以看到通过sock_register
函数注册了inet_family_ops
,该函数会将inet_family_ops
设置到全局变量net_families
中,需要注意PF_INET与AF_INET关系
#define PF_INET AF_INET
看到这里就可以知道pf->create
函数实际就是inet_create函数了。
const struct proto_ops inet_stream_ops = { //tcp相关操作
.family = PF_INET,
.owner = THIS_MODULE,
.release = inet_release,
.bind = inet_bind,
.connect = inet_stream_connect,
.socketpair = sock_no_socketpair,
.accept = inet_accept,
.getname = inet_getname,
.poll = tcp_poll, //本文需要关注
.ioctl = inet_ioctl,
// ...省略
};
//只列出了需要关注的代码
static int inet_create(struct net *net, struct socket *sock, int protocol,int kern)
{
struct sock *sk;
struct inet_protosw *answer;
// ...省略
rcu_read_lock();
//我们socket函数第二个参数是SOCK_STREAM,那answer就是inetsw_array的第一个元素
list_for_each_entry_rcu(answer, &inetsw[sock->type], list) {
// ...省略
}
sock->ops = answer->ops;//所以sock->ops被赋值为inet_stream_ops
// ...省略
sk = sk_alloc(net, PF_INET, GFP_KERNEL, answer_prot, kern);//初始化sock实例
if (!sk)
goto out;
// ...省略
sock_init_data(sock, sk);//设置sock实例,并与socket关联
// ...省略
}
到这里可以知道本节开头sock_poll
函数中的ops->poll
在tcp类型 的scoket下就是tcp_poll函数。
__poll_t tcp_poll(struct file *file, struct socket *sock, poll_table *wait)
{
// ...省略
sock_poll_wait(file, sock, wait);
// ...省略
mask = 0;
shutdown = READ_ONCE(sk->sk_shutdown);
if (shutdown == SHUTDOWN_MASK || state == TCP_CLOSE)
mask |= EPOLLHUP;
if (shutdown & RCV_SHUTDOWN)
mask |= EPOLLIN | EPOLLRDNORM | EPOLLRDHUP;
// ...省略
return mask;
}
static inline void sock_poll_wait(struct file *filp, struct socket *sock,
poll_table *p)
{
if (!poll_does_not_wait(p)) {
poll_wait(filp, &sock->wq.wait, p);
smp_mb();
}
}
static inline void poll_wait(struct file * filp, wait_queue_head_t * wait_address, poll_table *p)
{
if (p && p->_qproc && wait_address)
p->_qproc(filp, wait_address, p);
}
可以看到poll_wait
又调了p->_qproc
,其本质就是2.2.3小节 poll_initwait
函数会将p->_qproc设置为__pollwait函数。
static inline void init_waitqueue_func_entry(struct wait_queue_entry *wq_entry, wait_queue_func_t func)
{
wq_entry->flags = 0;
wq_entry->private = NULL;
wq_entry->func = func;
}
static void __pollwait(struct file *filp, wait_queue_head_t *wait_address, poll_table *p)
{
struct poll_wqueues *pwq = container_of(p, struct poll_wqueues, pt);
struct poll_table_entry *entry = poll_get_entry(pwq);
if (!entry)
return;
entry->filp = get_file(filp);
entry->wait_address = wait_address;
entry->key = p->_key; //这里会把poll_table中的key同步到poll_table_entry中
init_waitqueue_func_entry(&entry->wait, pollwake);
entry->wait.private = pwq; //将ppwq设置到等待队列中,注意pwq中有PCB,这时后面进行进程唤醒的关键
add_wait_queue(wait_address, &entry->wait); //加入socket的等待队列中
}
注意其中的entry->wait
是wait_queue_entry 结构体,init_waitqueue_func_entry
函数会将wait_queue_entry.func设置为pollwake函数。
struct wait_queue_entry {
unsigned int flags;
void *private;
wait_queue_func_t func;
struct list_head entry;
};
探索到这里也就知道了I/O事件就绪时的进程唤醒函数是pollwake函数,那谁去调用pollwake呢?
再看下如何加入到socket的等待队列中:
add_wait_queue。
这个了解下Linux等待队列构造就可以理解了。
void add_wait_queue(struct wait_queue_head *wq_head, struct wait_queue_entry *wq_entry)
{
unsigned long flags;
wq_entry->flags &= ~WQ_FLAG_EXCLUSIVE;
spin_lock_irqsave(&wq_head->lock, flags);
__add_wait_queue(wq_head, wq_entry);
spin_unlock_irqrestore(&wq_head->lock, flags);
}
static inline void __add_wait_queue(struct wait_queue_head *wq_head, struct wait_queue_entry *wq_entry)
{
struct list_head *head = &wq_head->head;
struct wait_queue_entry *wq;
list_for_each_entry(wq, &wq_head->head, entry) {
if (!(wq->flags & WQ_FLAG_PRIORITY))
break;
head = &wq->entry;
}
list_add(&wq_entry->entry, head);
}
socket_post_create
3.3、TCP收到数据时回调唤醒进程
在上一小节我们源码分析到了I/O事件就绪时的进程唤醒函数是pollwake函数
,谁去调用还是两眼一抹黑。
小伙伴可以回看下1.1小节Linux下OSI五层模型收到数据后的处理过程
,其中有提到经过网卡中断和层层回调上报,到达传输层的TCP时会走tcp_v4_rcv 函数。
int tcp_v4_rcv(struct sk_buff *skb)
{
// ...省略
struct sock *sk;
lookup:
//查找报文所属的套接口
sk = __inet_lookup_skb(net->ipv4.tcp_death_row.hashinfo,skb,
__tcp_hdrlen(th), th->source,th->dest, sdif, &refcounted);
// ...省略
if (sk->sk_state == TCP_LISTEN) {
ret = tcp_v4_do_rcv(sk, skb); //继续上报
goto put_and_return;
}
// ...省略
}
int tcp_v4_do_rcv(struct sock *sk, struct sk_buff *skb)
{
// ...省略
tcp_rcv_established(sk, skb);//继续上报
// ...省略
}
// Fast processing is turned on in tcp_data_queue when everything is OK.
void tcp_rcv_established(struct sock *sk, struct sk_buff *skb)
{
// ...省略
reason = tcp_ack(sk, skb, FLAG_SLOWPATH | FLAG_UPDATE_TS_RECENT);
if ((int)reason < 0) {
reason = -reason;
goto discard;
}
tcp_rcv_rtt_measure_ts(sk, skb);
/* Process urgent data. */
tcp_urg(sk, skb, th);
/* step 7: process the segment text */
tcp_data_queue(sk, skb); //继续上报
// ...省略
}
void tcp_data_ready(struct sock *sk)
{
if (tcp_epollin_ready(sk, sk->sk_rcvlowat) || sock_flag(sk, SOCK_DONE))
sk->sk_data_ready(sk);//唤醒进程
}
static void tcp_data_queue(struct sock *sk, struct sk_buff *skb)
{
struct tcp_sock *tp = tcp_sk(sk);
// ...省略
queue_and_out:
if (tcp_try_rmem_schedule(sk, skb, skb->truesize)) {
/* TODO: maybe ratelimit these WIN 0 ACK ? */
inet_csk(sk)->icsk_ack.pending |=
(ICSK_ACK_NOMEM | ICSK_ACK_NOW);
inet_csk_schedule_ack(sk);
sk->sk_data_ready(sk);//唤醒进程
if (skb_queue_len(&sk->sk_receive_queue)) {
reason = SKB_DROP_REASON_PROTO_MEM;
NET_INC_STATS(sock_net(sk), LINUX_MIB_TCPRCVQDROP);
goto drop;
}
sk_forced_mem_schedule(sk, skb->truesize);
}
// ...省略
if (!sock_flag(sk, SOCK_DEAD))
tcp_data_ready(sk); //继续上报
return;
}
// ...省略
}
我们追踪源码可以看到最后调用sk->sk_data_ready
来唤醒进行,那一起看下struct sock
结构体内容:
struct sock {
// ...省略
/*
* The backlog queue is special, it is always used with
* the per-socket spinlock held and requires low latency access. Therefore we special case it's implementation.
* Note : rmem_alloc is in this structure to fill a hole on 64bit arches, not because its logically part of backlog.
*/
struct {
atomic_t rmem_alloc;
int len;
struct sk_buff *head;
struct sk_buff *tail;
} sk_backlog;
// ...省略
union {
struct socket_wq __rcu *sk_wq;
/* private: */
struct socket_wq *sk_wq_raw;
/* public: */
};
// ...省略
void (*sk_state_change)(struct sock *sk);
void (*sk_data_ready)(struct sock *sk);
void (*sk_write_space)(struct sock *sk);
void (*sk_error_report)(struct sock *sk);
int (*sk_backlog_rcv)(struct sock *sk, struct sk_buff *skb);
// ...省略
};
到这里线索又又断了,这个sk->sk_data_ready
是什么时候设置的呢?一般都会想到是struct sock
初始化时设置的,那我们继续回到3.2小节的inet_create函数
,其中有个sock_init_data(sock, sk)
的函数调用,就是在这里设置的。
void sock_init_data(struct socket *sock, struct sock *sk)
{
kuid_t uid = sock ? SOCK_INODE(sock)->i_uid : make_kuid(sock_net(sk)->user_ns, 0);
sock_init_data_uid(sock, sk, uid);
}
void sock_init_data_uid(struct socket *sock, struct sock *sk, kuid_t uid)
{
// ...省略
if (sock) {
sk->sk_type = sock->type;
RCU_INIT_POINTER(sk->sk_wq, &sock->wq);
sock->sk = sk; //将sk设置到sock中
} else {
RCU_INIT_POINTER(sk->sk_wq, NULL);
}
// ...省略
sk->sk_state_change = sock_def_wakeup;
sk->sk_data_ready = sock_def_readable; //关注
sk->sk_write_space = sock_def_write_space;
sk->sk_error_report = sock_def_error_report;
sk->sk_destruct = sock_def_destruct;
// ...省略
}
所以sk->sk_data_ready
本质就是sock_def_readable函数,看名字就知道其是socket 可读时使用的。
void sock_def_readable(struct sock *sk)
{
struct socket_wq *wq;
trace_sk_data_ready(sk);
rcu_read_lock();
wq = rcu_dereference(sk->sk_wq);
if (skwq_has_sleeper(wq))
wake_up_interruptible_sync_poll(&wq->wait, EPOLLIN | EPOLLPRI | EPOLLRDNORM | EPOLLRDBAND);//进程唤醒
sk_wake_async(sk, SOCK_WAKE_WAITD, POLL_IN);
rcu_read_unlock();
}
wake_up_interruptible_sync_poll
#define wake_up_interruptible_sync_poll(x, m) __wake_up_sync_key((x), TASK_INTERRUPTIBLE, poll_to_key(m))
void __wake_up_sync_key(struct wait_queue_head *wq_head, unsigned int mode, void *key)
{
if (unlikely(!wq_head))
return;
__wake_up_common_lock(wq_head, mode, 1, WF_SYNC, key);
}
static int __wake_up_common_lock(struct wait_queue_head *wq_head, unsigned int mode, int nr_exclusive, int wake_flags, void *key)
{
unsigned long flags;
int remaining;
spin_lock_irqsave(&wq_head->lock, flags);
remaining = __wake_up_common(wq_head, mode, nr_exclusive, wake_flags,key);
spin_unlock_irqrestore(&wq_head->lock, flags);
return nr_exclusive - remaining;
}
//唤醒
static int __wake_up_common(struct wait_queue_head *wq_head, unsigned int mode,
int nr_exclusive, int wake_flags, void *key)
{
wait_queue_entry_t *curr, *next;
lockdep_assert_held(&wq_head->lock);
curr = list_first_entry(&wq_head->head, wait_queue_entry_t, entry);//获取等待队列wq_head的队头
if (&curr->entry == &wq_head->head)
return nr_exclusive;
list_for_each_entry_safe_from(curr, next, &wq_head->head, entry) {//循环处理
unsigned flags = curr->flags;
int ret;
ret = curr->func(curr, mode, wake_flags, key);//调用唤醒函数,即3.2小节的pollwake函数
if (ret < 0)
break;
if (ret && (flags & WQ_FLAG_EXCLUSIVE) && !--nr_exclusive)//跳出循环
break;
}
return nr_exclusive;
}
终于看到pollwake函数是如何被调用的了。
对于pollwake函数,我们在2.2小节select内核源码调用链图示
中已知pollwake函数调用链:
pollwake->__pollwake->default_wake_function->try_to_wake_up->ttwu_do_wakeup->WRITE_ONCE(p->__state, TASK_RUNNING)
WRITE_ONCE作用。
static int pollwake(wait_queue_entry_t *wait, unsigned mode, int sync, void *key)
{
// ...省略
return __pollwake(wait, mode, sync, key);
}
static int __pollwake(wait_queue_entry_t *wait, unsigned mode, int sync, void *key)
{
struct poll_wqueues *pwq = wait->private;
DECLARE_WAITQUEUE(dummy_wait, pwq->polling_task);//创建等待队列元素
smp_wmb();
pwq->triggered = 1;
return default_wake_function(&dummy_wait, mode, sync, key);//唤醒
}
int default_wake_function(wait_queue_entry_t *curr, unsigned mode, int wake_flags, void *key)
{
WARN_ON_ONCE(IS_ENABLED(CONFIG_SCHED_DEBUG) && wake_flags & ~(WF_SYNC|WF_CURRENT_CPU));
//这里的参数curr->private就是pwq->polling_task,即PCB,后续逻辑就是通过WRITE_ONCE将其__state设为TASK_RUNNING
return try_to_wake_up(curr->private, mode, wake_flags);
}
可以看到pollwake函数最终会把该进程的状态设置为TASK_RUNNING
状态,等待被调度即可。至此我们就把I/O事件就绪通知源码的整个调用链条理清楚了。
四、socket等待队列
通过第三章的源码追溯,可以知道socket等待队列本质就是Linux等待队列,相关结构体如下:
/*
* A single wait-queue entry structure:
*/
struct wait_queue_entry {
unsigned int flags;
void *private;
wait_queue_func_t func;
struct list_head entry;
};
struct wait_queue_head {
spinlock_t lock;
struct list_head head;
};
typedef struct wait_queue_head wait_queue_head_t;
那么struct socket的wq字段,即struct socket_wq的wait字段,即wait_queue_head_t在什么时候被初始化的呢?
4.1、虚拟文件系统sockfs
Linux中万物皆文件
,socket是一种VFS,在Linux中对应的文件系统叫sockfs,每创建一个socket,就在sockfs中创建了一个特殊的文件,同时创建了sockfs文件系统中的inode,该inode唯一标识当前socket的通信。
在Linux内核中,sock_mnt全局变量是一个指向
struct vfsmount
结构体的指针,它代表了sockfs的挂载点,包含了挂载点的所有信息,如挂载类型、挂载选项、挂载位置等。Linux在内核初始化过程时会通过sock_init函数将sockfs挂载到内核的文件系统层次结构中。
struct vfsmount {
struct dentry *mnt_root; //指向挂载点根目录的结构体指针
struct super_block *mnt_sb; /* pointer to superblock */ //本文需关注这个
int mnt_flags;
struct mnt_idmap *mnt_idmap;
} __randomize_layout;
static const struct super_operations sockfs_ops = {
.alloc_inode = sock_alloc_inode,//需要关注这个,socket创建时会用到
.free_inode = sock_free_inode,
.statfs = simple_statfs,
};
static int sockfs_init_fs_context(struct fs_context *fc)
{
struct pseudo_fs_context *ctx = init_pseudo(fc, SOCKFS_MAGIC);
if (!ctx)
return -ENOMEM;
ctx->ops = &sockfs_ops; //需要关注这个
ctx->dops = &sockfs_dentry_operations;
ctx->xattr = sockfs_xattr_handlers;
return 0;
}
static struct vfsmount *sock_mnt __read_mostly; //sockfs挂载点实例
static struct file_system_type sock_fs_type = {
.name = "sockfs",
.init_fs_context = sockfs_init_fs_context,//需要关注这个,挂载sockfs时会用到
.kill_sb = kill_anon_super,
};
static int __init sock_init(void)
{
int err;
// ...省略
err = register_filesystem(&sock_fs_type);//注册sockfs
if (err)
goto out;
sock_mnt = kern_mount(&sock_fs_type); //挂载sockfs
// ...省略
}
core_initcall(sock_init); /* early initcall */ //一个宏,注册sock_init,这样Linux在内核初始化时会执行sock_init函数
struct vfsmount *kern_mount(struct file_system_type *type)
{
struct vfsmount *mnt;
mnt = vfs_kern_mount(type, SB_KERNMOUNT, type->name, NULL);
// ...省略
return mnt;
}
struct vfsmount *vfs_kern_mount(struct file_system_type *type, int flags, const char *name, void *data)
{
struct fs_context *fc;
struct vfsmount *mnt;
// ...省略
fc = fs_context_for_mount(type, flags); //创建fs_context 并调用sockfs_init_fs_context函数
// ...省略
if (!ret)
mnt = fc_mount(fc); //基于fc创建mnt
else
mnt = ERR_PTR(ret);
put_fs_context(fc);
return mnt;
}
4.2、socket等待队列初始化
我们在第三章开头的socket创建的内核源码调用链图中有个sock_alloc函数。
container_of宏用法。
struct socket_alloc {
struct socket socket; //socket
struct inode vfs_inode;//socket inode
};
struct socket *sock_alloc(void)
{
struct inode *inode;
struct socket *sock;
inode = new_inode_pseudo(sock_mnt->mnt_sb);//开始申请socket inode
if (!inode)
return NULL;
// ...省略
sock = SOCKET_I(inode);
return sock;
}
static inline struct socket *SOCKET_I(struct inode *inode)
{
//这里与下文alloc_inode_sb函数申请的struct socket_alloc实例是联动的,通过inode的地址计算出struct socket_alloc下socket的地址
return &container_of(inode, struct socket_alloc, vfs_inode)->socket;
}
struct inode *new_inode_pseudo(struct super_block *sb)
{
struct inode *inode = alloc_inode(sb);
// ...省略
return inode;
}
static struct inode *alloc_inode(struct super_block *sb)
{
const struct super_operations *ops = sb->s_op;//这里就是调用sockfs_ops
struct inode *inode;
if (ops->alloc_inode)
inode = ops->alloc_inode(sb); //调用sock_alloc_inode函数
else
inode = alloc_inode_sb(sb, inode_cachep, GFP_KERNEL);
if (!inode)
return NULL;
// ...省略
return inode;
}
sock_alloc_inode函数。
alloc_inode_sb函数。
static struct inode *sock_alloc_inode(struct super_block *sb)
{
struct socket_alloc *ei;
//This must be used for allocating filesystems specific inodes to set up the inode reclaim context correctly.
ei = alloc_inode_sb(sb, sock_inode_cachep, GFP_KERNEL);//申请socket inode
if (!ei)
return NULL;
init_waitqueue_head(&ei->socket.wq.wait);//初始化socket的等待队列
// ...省略
return &ei->vfs_inode;//返回socket inode
}
void __init_waitqueue_head(struct wait_queue_head *wq_head, const char *name, struct lock_class_key *key)
{
spin_lock_init(&wq_head->lock);
lockdep_set_class_and_name(&wq_head->lock, key, name);
INIT_LIST_HEAD(&wq_head->head);
}
到这里就理清socket 等待队列如何初始化的了。
五、select超时源码解析
在2.2.3小节对do_select函数
分析可知,当参数end_time != NULL && (end_time->tv_sec > 0 || end_time->tv_nsec > 0)
时,如果第一次循环没有I/O就绪事件,则会通过poll_schedule_timeout函数挂起当前进程。
调用方式poll_schedule_timeout(&table, TASK_INTERRUPTIBLE,to, slack)
。
static int poll_schedule_timeout(struct poll_wqueues *pwq, int state, ktime_t *expires, unsigned long slack)
{
int rc = -EINTR;
set_current_state(state);//设置当前进程状态为TASK_INTERRUPTIBLE
if (!pwq->triggered) //进入poll_schedule_timeout函数时被I/O就绪事件通知,就不等待了
rc = schedule_hrtimeout_range(expires, slack, HRTIMER_MODE_ABS);//启动定时器
__set_current_state(TASK_RUNNING);//设置当前进程状态为TASK_RUNNING
smp_store_mb(pwq->triggered, 0);//定时器结束也通过信号量将pwq->triggered设为0。另外被I/O唤醒也会被设为0的,见__pollwake函数。
return rc;
}
int __sched schedule_hrtimeout_range(ktime_t *expires, u64 delta, const enum hrtimer_mode mode)
{
return schedule_hrtimeout_range_clock(expires, delta, mode, CLOCK_MONOTONIC);
}
int __sched schedule_hrtimeout_range_clock(ktime_t *expires, u64 delta,
const enum hrtimer_mode mode, clockid_t clock_id)
{
struct hrtimer_sleeper t;
if (expires && *expires == 0) { //就是end_time != NULL && end_time->tv_sec == 0 && end_time->tv_nsec == 0的条件
__set_current_state(TASK_RUNNING);//直接返回
return 0;
}
/*
* A NULL parameter means "infinite"
*/
if (!expires) { //就是end_time == NULL的条件
schedule(); //主动调度,让出CPU
return -EINTR;
}
if (rt_task(current))//current 就是当前进程PCB
delta = 0;
hrtimer_init_sleeper_on_stack(&t, clock_id, mode);
hrtimer_set_expires_range_ns(&t.timer, *expires, delta);
hrtimer_sleeper_start_expires(&t, mode);
if (likely(t.task))
schedule(); //主动调度,让出CPU
//进程被唤醒后,会继续从这里执行
hrtimer_cancel(&t.timer); //销毁定时器
destroy_hrtimer_on_stack(&t.timer);
__set_current_state(TASK_RUNNING);//设置当前进程状态为TASK_RUNNING
return !t.task ? 0 : -EINTR;
}
static void __hrtimer_init_sleeper(struct hrtimer_sleeper *sl, clockid_t clock_id, enum hrtimer_mode mode)
{
if (IS_ENABLED(CONFIG_PREEMPT_RT)) {
if (task_is_realtime(current) && !(mode & HRTIMER_MODE_SOFT))
mode |= HRTIMER_MODE_HARD;
}
__hrtimer_init(&sl->timer, clock_id, mode);//主要目的是初始化hrtimer的base字段,同时初始化作为红黑树的节点的node字段
sl->timer.function = hrtimer_wakeup; //设置唤醒函数
sl->task = current; //设置当前定时器绑定的进程PCB
}
我们先看看hrtimer_wakeup函数:
static enum hrtimer_restart hrtimer_wakeup(struct hrtimer *timer)
{
struct hrtimer_sleeper *t = container_of(timer, struct hrtimer_sleeper, timer);//根据struct hrtimer指针计算出struct hrtimer_sleeper的指针
struct task_struct *task = t->task;//获取当前定时器绑定进程的PCB
t->task = NULL;
if (task)
wake_up_process(task); //进程还没被销毁的话就唤醒
return HRTIMER_NORESTART;
}
int wake_up_process(struct task_struct *p)
{
return try_to_wake_up(p, TASK_NORMAL, 0);//后续逻辑就是通过WRITE_ONCE将其__state设为TASK_RUNNING,3.3小节末尾已提到
}
那谁去调hrtimer_wakeup函数呢?我们在1.3小节Linux时钟中断
已经了解了其工作原理,经过源码追踪hrtimer高精度定时器的时钟中断函数就是hrtimer_interrupt
void hrtimer_interrupt(struct clock_event_device *dev)
{
struct hrtimer_cpu_base *cpu_base = this_cpu_ptr(&hrtimer_bases);
// ...省略
__hrtimer_run_queues(cpu_base, now, flags, HRTIMER_ACTIVE_HARD);
// ...省略
}
static void __hrtimer_run_queues(struct hrtimer_cpu_base *cpu_base, ktime_t now,
unsigned long flags, unsigned int active_mask)
{
struct hrtimer_clock_base *base;
unsigned int active = cpu_base->active_bases & active_mask;
for_each_active_base(base, cpu_base, active) { //循环所有定时器
struct timerqueue_node *node;
ktime_t basenow;
basenow = ktime_add(now, base->offset);
while ((node = timerqueue_getnext(&base->active))) {
struct hrtimer *timer;
timer = container_of(node, struct hrtimer, node);
if (basenow < hrtimer_get_softexpires_tv64(timer))//定时器没到期就不执行
break;
__run_hrtimer(cpu_base, base, timer, &basenow, flags);//执行每个定时器
if (active_mask == HRTIMER_ACTIVE_SOFT)
hrtimer_sync_wait_running(cpu_base, flags);
}
}
}
static void __run_hrtimer(struct hrtimer_cpu_base *cpu_base,struct hrtimer_clock_base *base,
struct hrtimer *timer, ktime_t *now,unsigned long flags) __must_hold(&cpu_base->lock)
{
// ...省略
__remove_hrtimer(timer, base, HRTIMER_STATE_INACTIVE, 0);//移除
fn = timer->function;//获取定时器到期回调函数,即hrtimer_wakeup函数
// ...省略
restart = fn(timer);//执行hrtimer_wakeup函数
// ...省略
}
六、参考
1]:Linux 内核 container_of 宏详解
2]:Linux等待队列wait_queue
3]:Linux内核中的各种锁
4]:linux调度子系统8 - schedule函数
5]:linux内核的进程调度函数schedule
6]:浅析Linux sockfs文件系统
7:]socket&sock结构体介绍
8]:Linux内核协议栈- 创建socket
9]:linux内核之时间子系统
10]Linux的时间子系统
11]:linux驱动移植-RTC驱动
12]:linux驱动移植-通用时钟框架子系统
13]:Linux时间子系统之六:高精度定时器(HRTIMER)的原理和实现
14]:入门linux内核高精度定时器hrtimer机制
15]:Linux时钟子系统分析
16]: Linux 时间子系统