网络编程-Linux高性能服务器编程

image.png

目录

第六章 I/O复用

I/O复用使得程序同时监听多个文件描述符

通常网络程序在以下情况需要使用到I/O复用

  • 客户端程序(用户进程)要同时处理多个socket
  • 客户端程序要同时处理用户输入和网络连接
  • TCP服务要同时处理监听socket和连接socket(这是I/O复用最多的场景)
  • 服务器需要同时处理TCP请求和UDP请求
  • 服务器要同时监听多个端口,或者处理多种服务

Linux下实现I/O复用的系统调用主要有select、poll和epoll

select系统调用

select系统调用的用途是:在一段时间内,监听用户感兴趣的文件描述符上的可读、可写和异常事件。

代码流程

1.系统调用开始

1
2
3
4
5
// 代码基于linux5-19-RC8 linux/fs/select.c
// 系统调用别名
SYSCALL_DEFINE5(select, int, n, fd_set __user *, inp, fd_set __user *, outp,fd_set __user *, exp, struct __kernel_old_timeval __user *, tvp){
return kern_select(n, inp, outp, exp, tvp);
}
1
2
3
4
5
6
7
8
9
10
#undef __FD_SETSIZE
#define __FD_SETSIZE 1024 //fd_set最大支持1024个描述符

typedef struct {
unsigned long fds_bits[__FD_SETSIZE / (8 * sizeof(long))];
} __kernel_fd_set;

//linux/types.h
typedef __kernel_fd_set fd_set;

2.调用kern_select函数执行

判断是否是带有超时时间如果带有则需要复制到内核空间

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
/**
* select调用入口
* @param n 指定被监听文件描述符的总数
* @param __user 用户空间宏定义 表示后面的指针是用户空间的数据
* @param inp 读描述符集指针
* @param outp 写描述符集指针
* @param exp 异常描述符集指针
* @param tvp 超时时间指针
* @return 返回可以操作的文件描述符数
*/
static int kern_select(int n, fd_set __user *inp, fd_set __user *outp, fd_set __user *exp, struct __kernel_old_timeval __user *tvp){
struct timespec64 end_time, *to = NULL;//超时结构体
struct __kernel_old_timeval tv;//旧版本超时结构体
int ret;
//如果是带有超时调用
if (tvp) {
//将超时参数从用户空间的数据拷贝到内核空间
if (copy_from_user(&tv, tvp, sizeof(tv)))return -EFAULT;
//将超时时间设置给end_time变量
to = &end_time;
if (poll_select_set_timeout(to,tv.tv_sec + (tv.tv_usec / USEC_PER_SEC),(tv.tv_usec % USEC_PER_SEC) * NSEC_PER_USEC))return -EINVAL;
}
//(主线)执行select调用
ret = core_sys_select(n, inp, outp, exp, to);
//将剩余时间写回
return poll_select_finish(&end_time, tvp, PT_TIMEVAL, ret);
}

3.调用core_sys_select执行

分配位图内存

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
/**
* 核心系统调用select
* @param n 指定被监听文件描述符的总数
* @param __user 用户空间宏定义 表示后面的指针是用户空间的数据
* @param inp 读描述符集指针
* @param outp 写描述符集指针
* @param exp 异常描述符集指针
* @param 超时时间指针(已复制到内核空间)
* @return 返回可以操作的文件描述符数
*/
int core_sys_select(int n, fd_set __user *inp, fd_set __user *outp,fd_set __user *exp, struct timespec64 *end_time){
fd_set_bits fds;//文件描述符集位图(结构体看3.1)
void *bits;
int ret, max_fds;//最大文件描述符
size_t size, alloc_size;
struct fdtable *fdt;//文件描述符表
/* Allocate small arguments on the stack to save memory and be faster */
//在栈上分配一段内存
long stack_fds[SELECT_STACK_ALLOC/sizeof(long)];
ret = -EINVAL;
//参数验证小于0直接返回
if (n < 0)goto out_nofds;
/* max_fds can increase, so grab it once to avoid race */
//获得当前进程打开的文件 fd 表,获取最大的 fd
rcu_read_lock();
//读取文件描述符表
fdt = files_fdtable(current->files);
//从files结构中获取最大值(当前进程能够处理的最大文件数目)
max_fds = fdt->max_fds;
//释放rcu锁
rcu_read_unlock();
//防止n超过最大的fds
if (n > max_fds)n = max_fds;
/*
* We need 6 bitmaps (in/out/ex for both incoming and outgoing),
* since we used fdset we need to allocate memory in units of
* long-words.
*/
//根据传入的需要监控的fd数量获取其需要分配的字节大小
size = FDS_BYTES(n);
bits = stack_fds;
/* 如果栈上的内存太小,那么就在堆上重新分配内存
* 为什么是除以6呢?
* 因为每个文件描述符要占6个bit(输入:可读,可写,异常;输出结果:可读,可写,异常)*/
if (size > sizeof(stack_fds) / 6) {
/* Not enough space in on-stack array; must use kmalloc */
ret = -ENOMEM;
if (size > (SIZE_MAX / 6))goto out_nofds;
alloc_size = 6 * size;
bits = kvmalloc(alloc_size, GFP_KERNEL);//分配虚拟内存
if (!bits)goto out_nofds;//分配失败直接结束
}
//设置好bitmap对应的内存地址
fds.in = bits;//可读
fds.out = bits + size;//可写
fds.ex = bits + 2*size;//异常
fds.res_in = bits + 3*size;//返回结果,可读
fds.res_out = bits + 4*size;//返回结果,可写
fds.res_ex = bits + 5*size;//返回结果,异常
//将fd从用户空间(用户进程)拷贝到内核空间
if ((ret = get_fd_set(n, inp, fds.in)) ||(ret = get_fd_set(n, outp, fds.out)) ||(ret = get_fd_set(n, exp, fds.ex)))goto out;
//清空返回结果的文件描述符集
zero_fd_set(n, fds.res_in);
zero_fd_set(n, fds.res_out);
zero_fd_set(n, fds.res_ex);
//执行select(主线)
ret = do_select(n, &fds, end_time);
//错误结束
if (ret < 0)goto out;
//超时结束
if (!ret) {
ret = -ERESTARTNOHAND;
// 检测到有信号则系统调用退出,返回用户空间执行信号处理函数
if (signal_pending(current))goto out;
ret = 0;
}
//将fd拷贝到用户空间(用户进程)
if (set_fd_set(n, inp, fds.res_in) ||set_fd_set(n, outp, fds.res_out) ||set_fd_set(n, exp, fds.res_ex))ret = -EFAULT;

out:
//如果是堆内存需要主动释放
if (bits != stack_fds)kvfree(bits);//释放堆内存
out_nofds:
return ret;
}

3.1结构体 fd_set_bits

1
2
3
4
5
typedef struct {
//指针都是用来指向描述符集合
unsigned long *in, *out, *ex;//输入的文件描述符集事件
unsigned long *res_in, *res_out, *res_ex;//响应的文件描述符集事件
} fd_set_bits;

3.2结构体 fdtable文件描述符表

1
2
3
4
5
6
7
8
struct fdtable {
unsigned int max_fds;
struct file __rcu **fd; /* current fd array */
unsigned long *close_on_exec;
unsigned long *open_fds;
unsigned long *full_fds_bits;
struct rcu_head rcu;
};

4.调用 do_select 方法

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
/**
* select 的具体实现
* @param n 指定被监听文件描述符的总数
* @param fds 文件描述符位图
* @param end_time 超时时间
* @return
*/
static int do_select(int n, fd_set_bits *fds, struct timespec64 *end_time){
ktime_t expire, *to = NULL;//
struct poll_wqueues table;//定义一个poll_wqueues变量
poll_table *wait;
int retval, i, timed_out = 0;//超时标识
u64 slack = 0;
__poll_t busy_flag = net_busy_loop_on() ? POLL_BUSY_LOOP : 0;
unsigned long busy_start = 0;

//Read Copy Update 锁机制
rcu_read_lock();
//根据设置的fd位图fds,检查确认所有位置对应的fd确实被打开了,并返回最大的fd
retval = max_select_fd(n, fds);
//释放锁
rcu_read_unlock();
//如果为负值直接返回
if (retval < 0) return retval;
n = retval;
//初始化 poll_wqueues (重点)
//poll_wqueues.poll_table.qproc函数指针初始化,该函数是驱动程序中poll函数(fop->poll)实现中必须要调用的poll_wait()中使用的函数。
poll_initwait(&table);
wait = &table.pt;//poll_table封装在poll_wqueues中
//判断是否超时
if (end_time && !end_time->tv_sec && !end_time->tv_nsec) {
wait->_qproc = NULL;
timed_out = 1;
}
//重新估算时间
if (end_time && !timed_out)slack = select_estimate_accuracy(end_time);

retval = 0;
for (;;) {//死循环
//六种类型指针
unsigned long *rinp, *routp, *rexp, *inp, *outp, *exp;
bool can_busy_loop = false;
//给上面指针赋值,指向fds对应的类型
inp = fds->in; outp = fds->out; exp = fds->ex;
rinp = fds->res_in; routp = fds->res_out; rexp = fds->res_ex;
//遍历所有的fd(n个文件描述符)
for (i = 0; i < n; ++rinp, ++routp, ++rexp) {
unsigned long in, out, ex, all_bits, bit = 1, j;
unsigned long res_in = 0, res_out = 0, res_ex = 0;
__poll_t mask;
// 先取出当前循环周期中的32(设long占32位)个文件描述符对应的bitmaps
in = *inp++; out = *outp++; ex = *exp++;
all_bits = in | out | ex;//按位或,组合所有类型
if (all_bits == 0) {//当前位图块没有需要处理的文件描述符(关心的fd),则结束本块fd,调到下一个fd位图块
i += BITS_PER_LONG;//BITS_PER_LONG 位图宏定义
continue;
}
//遍历当前所有位
for (j = 0; j < BITS_PER_LONG; ++j, ++i, bit <<= 1) {
struct fd f;
//struct fd {
// struct file *file;//文件指针
// unsigned int flags;
//};
//i超出n范围直接跳出
if (i >= n)break;
//跳过不关心的bit
if (!(bit & all_bits))continue;
mask = EPOLLNVAL;
//获取当前文件描述符的file结构指针
f = fdget(i);
//因为没有rdlock加锁,因此当前进程中描述符i对应的文件可能已经 被异步关闭。这就是为什么需要判断file是否为空的原因
//(重点主线)如果文件存在
if (f.file) {
// 设置poll_table智能柜想要监听的事件
wait_key_set(wait, in, out, bit,busy_flag);
//(重点)调用文件的poll操作,返回准备好的事件
mask = vfs_poll(f.file, wait);
// 关闭文件监听
fdput(f);
}
/* events验证,其中retval表示就绪的资源数 */
//可读
if ((mask & POLLIN_SET) && (in & bit)) {
res_in |= bit;//设置响应
retval++;
wait->_qproc = NULL;
}
//可写
if ((mask & POLLOUT_SET) && (out & bit)) {
res_out |= bit;
retval++;
wait->_qproc = NULL;
}
//异常
if ((mask & POLLEX_SET) && (ex & bit)) {
res_ex |= bit;
retval++;
wait->_qproc = NULL;
}
/* got something, stop busy polling */
if (retval) {
can_busy_loop = false;
busy_flag = 0;
/*
* only remember a returned
* POLL_BUSY_LOOP if we asked for it
*/
} else if (busy_flag & mask)can_busy_loop = true;

}
/* 写出结果,注意写出的目的地是传进来的fd_set_bits */
if (res_in)*rinp = res_in;
if (res_out)*routp = res_out;
if (res_ex)*rexp = res_ex;
//主动让出cpu等待下次循环
cond_resched();
}//遍历完n个fd
wait->_qproc = NULL;
//如果当前这轮循环有准备好的事件|超时|(中断)检测到有信号则系统调用退出,返回用户空间执行信号处理函数 跳出死循环
if (retval || timed_out || signal_pending(current))break;
//存在错误
if (table.error) {
retval = table.error;
break;
}

/* only if found POLL_BUSY_LOOP sockets && not out of time */
if (can_busy_loop && !need_resched()) {
if (!busy_start) {
busy_start = busy_loop_current_time();
continue;
}
if (!busy_loop_timeout(busy_start))continue;
}
busy_flag = 0;

/*
* If this is the first loop and we have a timeout
* given, then we convert to ktime_t and set the to
* pointer to the expiry value.
*/
if (end_time && !to) {
expire = timespec64_to_ktime(*end_time);
to = &expire;
}
/* 能够到达这一步就说明没有发生就绪、中断以及超时 */
/*
* 判断poll_wqueues是否已触发,如果还没有触发,那就设置当前运行状态为可中断阻塞并进行睡眠,等待被唤醒...
* 被唤醒之后重新进行迭代,获取资源就绪情况...
* 在向资源注册监听与判断poll_wqueues是否已触发这段时间内,可能资源异步就绪了,如果没有触发标志,那么可能就
* 会丢失资源就绪这个事件,可能导致select()永久沉睡...
* 这就是为什么需要poll_wqueues.triggered字段的原因...
*/
if (!poll_schedule_timeout(&table, TASK_INTERRUPTIBLE,to, slack))timed_out = 1;
}
/*
* 1. 卸载安装到资源监听队列上的poll_table_entry
* 2. 释放poll_wqueues占用的资源
*/
poll_freewait(&table);
//返回就绪的资源数
return retval;
}

4.1 结构体

1
2
3
4
5
6
7
8
9
struct poll_wqueues {
poll_table pt;
struct poll_table_page *table;
struct task_struct *polling_task;//保存当前调用select的用户进程struct task_struct结构体
int triggered;//当前用户进程被唤醒后置成1,以免该进程接着进睡眠
int error;//错误码
int inline_index;//数组inline_entries的引用下标
struct poll_table_entry inline_entries[N_INLINE_POLL_ENTRIES];
};
1
2
3
4
5
struct poll_table_page {
struct poll_table_page * next;//指向下一个申请的物理页
struct poll_table_entry * entry;//指向entries[]中首个待分配(空的) poll_table_entry地址
struct poll_table_entry entries[];//该page页后面剩余的空间都是待分配的poll_table_entry结构体
};
1
2
3
4
typedef struct poll_table_struct {
poll_queue_proc _qproc;
__poll_t _key;
} poll_table;
1
2
3
4
5
6
struct poll_table_entry {
struct file *filp;//指向特定fd对应的file结构体;
__poll_t key;//等待特定fd对应硬件设备的事件掩码,如POLLIN、 POLLOUT、POLLERR
wait_queue_entry_t wait;//代表调用select()的应用进程,等待在fd对应设备的特定事件 (读或者写)的等待队列头上,的等待队列项
wait_queue_head_t *wait_address;//设备驱动程序中特定事件的等待队列头(该fd执行fop->poll,需要等待时在哪等,所以叫等待地址)
};

4.2 调用 poll_initwait 进行初始化

1
2
3
4
5
6
7
8
void poll_initwait(struct poll_wqueues *pwq){
init_poll_funcptr(&pwq->pt, __pollwait);
pwq->polling_task = current;
pwq->triggered = 0;
pwq->error = 0;
pwq->table = NULL;
pwq->inline_index = 0;
}

4.4 调用 init_poll_funcptr 初始化开启监听事件

1
2
3
4
static inline void init_poll_funcptr(poll_table *pt, poll_queue_proc qproc){
pt->_qproc = qproc;
pt->_key = ~(__poll_t)0;//缺省开启全部事件监听
}

4.5 调用wait_key_set设置要监听的事件

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
#define POLLIN_SET (EPOLLRDNORM | EPOLLRDBAND | EPOLLIN | EPOLLHUP | EPOLLERR | EPOLLNVAL)
#define POLLOUT_SET (EPOLLWRBAND | EPOLLWRNORM | EPOLLOUT | EPOLLERR | EPOLLNVAL)
#define POLLEX_SET (EPOLLPRI | EPOLLNVAL)

/**
* 设置 poll_table 要监听的事件
* @param wait
* @param in
* @param out
* @param bit
* @param ll_flag
*/
static inline void wait_key_set(poll_table *wait, unsigned long in,unsigned long out, unsigned long bit,__poll_t ll_flag) {
wait->_key = POLLEX_SET | ll_flag;
if (in & bit)wait->_key |= POLLIN_SET;//in事件
if (out & bit)wait->_key |= POLLOUT_SET;//out事件
}

4.6 调用 vfs_poll 返回准备好的事件

1
2
3
4
5
6
7
8
9
10
11
/**
* 调用文件操作的poll方法
* @param file 文件
* @param pt poll_table_struct 结构体指针
* @return 返回准备好的事件
*/
static inline __poll_t vfs_poll(struct file *file, struct poll_table_struct *pt){
//如果 fd 所在的文件系统的 file_operations 实现了 poll ,那么就会直接调用,如果没有,那么就会报告响应的错误码
if (unlikely(!file->f_op->poll))return DEFAULT_POLLMASK;
return file->f_op->poll(file, pt);
}

4.7 结构体file

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
//linux-5.19-rc8/include/linux/fs.h
struct file {
union {
struct llist_node fu_llist;
struct rcu_head fu_rcuhead;
} f_u;
struct path f_path;
struct inode *f_inode; /* cached value */
const struct file_operations *f_op;//(主线)文件支持的操作

/*
* Protects f_ep, f_flags.
* Must not be taken from IRQ context.
*/
spinlock_t f_lock;
atomic_long_t f_count;
unsigned int f_flags;
fmode_t f_mode;
struct mutex f_pos_lock;
loff_t f_pos;
struct fown_struct f_owner;
const struct cred *f_cred;
struct file_ra_state f_ra;

u64 f_version;
#ifdef CONFIG_SECURITY
void *f_security;
#endif
/* needed for tty driver, and maybe others */
void *private_data;

#ifdef CONFIG_EPOLL
/* Used by fs/eventpoll.c to link all the hooks to this file */
struct hlist_head *f_ep;
#endif /* #ifdef CONFIG_EPOLL */
struct address_space *f_mapping;
errseq_t f_wb_err;
errseq_t f_sb_err; /* for syncfs */
} __randomize_layout
__attribute__((aligned(4))); /* lest something weird decides that 2 is OK */

4.8 结构体 file_operations (4.6中调用的poll)

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
struct file_operations {
struct module *owner;
loff_t (*llseek) (struct file *, loff_t, int);
ssize_t (*read) (struct file *, char __user *, size_t, loff_t *);
ssize_t (*write) (struct file *, const char __user *, size_t, loff_t *);
ssize_t (*read_iter) (struct kiocb *, struct iov_iter *);
ssize_t (*write_iter) (struct kiocb *, struct iov_iter *);
int (*iopoll)(struct kiocb *kiocb, struct io_comp_batch *,unsigned int flags);
int (*iterate) (struct file *, struct dir_context *);
int (*iterate_shared) (struct file *, struct dir_context *);
__poll_t (*poll) (struct file *, struct poll_table_struct *);//(主线)调用文件的poll方法操作
long (*unlocked_ioctl) (struct file *, unsigned int, unsigned long);
long (*compat_ioctl) (struct file *, unsigned int, unsigned long);
int (*mmap) (struct file *, struct vm_area_struct *);
unsigned long mmap_supported_flags;
int (*open) (struct inode *, struct file *);
int (*flush) (struct file *, fl_owner_t id);
int (*release) (struct inode *, struct file *);
int (*fsync) (struct file *, loff_t, loff_t, int datasync);
int (*fasync) (int, struct file *, int);
int (*lock) (struct file *, int, struct file_lock *);
ssize_t (*sendpage) (struct file *, struct page *, int, size_t, loff_t *, int);
unsigned long (*get_unmapped_area)(struct file *, unsigned long, unsigned long, unsigned long, unsigned long);
int (*check_flags)(int);
int (*flock) (struct file *, int, struct file_lock *);
ssize_t (*splice_write)(struct pipe_inode_info *, struct file *, loff_t *, size_t, unsigned int);
ssize_t (*splice_read)(struct file *, loff_t *, struct pipe_inode_info *, size_t, unsigned int);
int (*setlease)(struct file *, long, struct file_lock **, void **);
long (*fallocate)(struct file *file, int mode, loff_t offset,loff_t len);
void (*show_fdinfo)(struct seq_file *m, struct file *f);
#ifndef CONFIG_MMU
unsigned (*mmap_capabilities)(struct file *);
#endif
ssize_t (*copy_file_range)(struct file *, loff_t, struct file *,loff_t, size_t, unsigned int);
loff_t (*remap_file_range)(struct file *file_in, loff_t pos_in,struct file *file_out, loff_t pos_out,loff_t len, unsigned int remap_flags);
int (*fadvise)(struct file *, loff_t, loff_t, int);
int (*uring_cmd)(struct io_uring_cmd *ioucmd, unsigned int issue_flags);
} __randomize_layout;

4.9 释放 poll_wqueues 的资源占用

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
/**
* 释放 poll_wqueues 的资源占用
* @param pwq
*/
void poll_freewait(struct poll_wqueues *pwq)
{
struct poll_table_page * p = pwq->table;
int i;
for (i = 0; i < pwq->inline_index; i++)free_poll_entry(pwq->inline_entries + i);
while (p) {
struct poll_table_entry * entry;
struct poll_table_page *old;

entry = p->entry;
do {
entry--;
free_poll_entry(entry);
} while (entry > p->entries);
old = p;
p = p->next;
free_page((unsigned long) old);
}
}

流程图示

相关总结

优点:

  • 用户可以在一个线程内同时处理多个socket的IO请求。同时没有多线程多进程那样耗费系统资源
  • 目前几乎在所有的平台上支持,其良好跨平台支持也是它的一个优点

缺点:

  • 可处理的fd集有限,默认1024个
  • 采用轮询遍历每个fd位图其效率较为低下,调用处于阻塞状态
  • 客户进程获取返回的数据后还需遍历fd集才知道哪些fd准备好
  • 每次调用需要复制大量的句柄数据结构到内核空间,产生巨大的开销

poll系统调用

poll的代码和select在同一个文件中

执行流程

1.系统调用函数

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
// 代码基于linux5-19-RC8 linux/fs/select.c
/**
* poll系统调用
* @param ufds 用户空间poll文件描述符
* @param nfds ufds的长度
* @param timeout_msecs 超时参数
*/
SYSCALL_DEFINE3(poll, struct pollfd __user *, ufds, unsigned int, nfds,int, timeout_msecs){
struct timespec64 end_time, *to = NULL;//超时时间
int ret;//响应结果
if (timeout_msecs >= 0) {//计算超时时间
to = &end_time;
poll_select_set_timeout(to, timeout_msecs / MSEC_PER_SEC,
NSEC_PER_MSEC * (timeout_msecs % MSEC_PER_SEC));
}
//(主线)核心调用
ret = do_sys_poll(ufds, nfds, to);
//错误码(系统错误)-ERESTARTNOHAND表明,被中断的系统调用
if (ret == -ERESTARTNOHAND) {
struct restart_block *restart_block;

restart_block = &current->restart_block;
restart_block->poll.ufds = ufds;
restart_block->poll.nfds = nfds;
//是否超时
if (timeout_msecs >= 0) {
restart_block->poll.tv_sec = end_time.tv_sec;
restart_block->poll.tv_nsec = end_time.tv_nsec;
restart_block->poll.has_timeout = 1;
} else
restart_block->poll.has_timeout = 0;
//重启服务
ret = set_restart_fn(restart_block, do_restart_poll);
}
return ret;
}

1.1 pollfd结构体

1
2
3
4
5
6
7
8
/**
* poll文件描述符结构体
*/
struct pollfd {
int fd;//文件描述符
short int events;//关心的events
short int revents;//返回就绪的events
};

2.调用do_sys_poll方法

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79

/**
* poll系统调用核心:
* - 分配链表空间
* - 初始化 poll_wqueues 控制块
* - 调用 do_poll 方法
* @param __user
* @return 返回
*/
static int do_sys_poll(struct pollfd __user *ufds, unsigned int nfds, struct timespec64 *end_time) {
//定义一个poll调用控制块(表)
struct poll_wqueues table;
int err = -EFAULT, fdcount, len;
/* Allocate small arguments on the stack to save memory and be
faster - use long to make sure the buffer is aligned properly
on 64 bit archs to avoid unaligned access */
//优先在栈上分配空间
long stack_pps[POLL_STACK_ALLOC/sizeof(long)];
struct poll_list *const head = (struct poll_list *)stack_pps;
struct poll_list *walk = head;
unsigned long todo = nfds;
//检查是否超出长度限制
if (nfds > rlimit(RLIMIT_NOFILE))return -EINVAL;
//获取分配的空间长度
len = min_t(unsigned int, nfds, N_STACK_PPS);
//为每一个节点(pollfd)进行分配空间
for (;;) {
walk->next = NULL;
walk->len = len;
//所需长度为0则可跳出循环
if (!len)break;
//复制文件描述符到内核空间
if (copy_from_user(walk->entries, ufds + nfds-todo,sizeof(struct pollfd) * walk->len))goto out_fds;

todo -= walk->len;
if (!todo)break;

len = min(todo, POLLFD_PER_PAGE);
//在堆区分配空间
walk = walk->next = kmalloc(struct_size(walk, entries, len),GFP_KERNEL);
if (!walk) {
err = -ENOMEM;
goto out_fds;
}
}
//初始化poll控制块
poll_initwait(&table);
//(主线)执行poll调用
fdcount = do_poll(head, &table, end_time);
//释放poll控制块
poll_freewait(&table);
//如果用户没有写入权限
if (!user_write_access_begin(ufds, nfds * sizeof(*ufds)))goto out_fds;

for (walk = head; walk; walk = walk->next) {
struct pollfd *fds = walk->entries;
int j;

for (j = walk->len; j; fds++, ufds++, j--)
unsafe_put_user(fds->revents, &ufds->revents, Efault);
}
user_write_access_end();

err = fdcount;
out_fds:
walk = head->next;
while (walk) {
struct poll_list *pos = walk;
walk = walk->next;
kfree(pos);
}

return err;

Efault:
user_write_access_end();
err = -EFAULT;
goto out_fds;
}

2.1调用poll_initwait(&table)初始化poll控制块

1
2
3
4
5
6
7
8
9
10
11
12
13
14
/**
* 初始化poll队列
* @param pwq
*/
void poll_initwait(struct poll_wqueues *pwq) {
//调用__pollwait初始化监听队列
init_poll_funcptr(&pwq->pt, __pollwait);
pwq->polling_task = current;//当前调用的进程
pwq->triggered = 0;//是否已经触发
pwq->error = 0;//是否错误
pwq->table = NULL;
pwq->inline_index = 0;
}
EXPORT_SYMBOL(poll_initwait);

2.1.1调用__pollwait添加监听文件

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
/* Add a new entry */
/**
* 将poll_table_entry挂载到资源文件的监听队列
* @param filp 被监听的资源文件
* @param wait_address 被监听的资源文件的等待队列头
* @param p 在poll_initwait()中设置的poll_tbale
*/
static void __pollwait(struct file *filp, wait_queue_head_t *wait_address,poll_table *p){
//获取 poll_wqueues
struct poll_wqueues *pwq = container_of(p, struct poll_wqueues, pt);
//获取 poll_table_entry
struct poll_table_entry *entry = poll_get_entry(pwq);
//如果获取不到直接返回
if (!entry)return;
//增加资源文件引用计数并关联到entry的filp属性
entry->filp = get_file(filp);
//保存资源文件到队列头
entry->wait_address = wait_address;
//赋值监听事件给entry的key
entry->key = p->_key;
//初始化一个等待队列节点,其中唤醒函数设置为pollwake(重点)
init_waitqueue_func_entry(&entry->wait, pollwake);
//
entry->wait.private = pwq;
//添加到监听队列
add_wait_queue(wait_address, &entry->wait);
}

3.调用 do_poll 方法

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
/**
* 核心调用poll
* @param list fd列表的链表
* @param wait 调度控制块
* @param end_time 超时时间
* @return 返回准备好的fd个数
*/
static int do_poll(struct poll_list *list, struct poll_wqueues *wait,struct timespec64 *end_time){
//获取poll_table
poll_table* pt = &wait->pt;
//时间
ktime_t expire, *to = NULL;
int timed_out = 0, count = 0;
u64 slack = 0;
__poll_t busy_flag = net_busy_loop_on() ? POLL_BUSY_LOOP : 0;
unsigned long busy_start = 0;
/* Optimise the no-wait case */
//判断是否超时
if (end_time && !end_time->tv_sec && !end_time->tv_nsec) {
pt->_qproc = NULL;
timed_out = 1;
}
//预估时间
if (end_time && !timed_out)slack = select_estimate_accuracy(end_time);
//死循环遍历
for (;;) {
//当前遍历的链表节点(节点中包含pollfd数组)
struct poll_list *walk;
bool can_busy_loop = false;
//遍历链表
for (walk = list; walk != NULL; walk = walk->next) {//walk = walk->next链表移动
struct pollfd * pfd, * pfd_end;
pfd = walk->entries;
pfd_end = pfd + walk->len;
//遍历当前节点的所有fd
for (; pfd != pfd_end; pfd++) {
/*
* Fish for events. If we found one, record it
* and kill poll_table->_qproc, so we don't
* needlessly register any other waiters after
* this. They'll get immediately deregistered
* when we break out and return.
*/
//(主线) 调用do_pollfd 返回触发的事件
if (do_pollfd(pfd, pt, &can_busy_loop,busy_flag)) {
count++;
pt->_qproc = NULL;
/* found something, stop busy polling */
busy_flag = 0;
can_busy_loop = false;
}
}
}
/*
* All waiters have already been registered, so don't provide
* a poll_table->_qproc to them on the next loop iteration.
*/
pt->_qproc = NULL;
if (!count) {
count = wait->error;
if (signal_pending(current))count = -ERESTARTNOHAND;
}
//有准备好的fd或者超时则跳出死循环
if (count || timed_out)break;

/* only if found POLL_BUSY_LOOP sockets && not out of time */
if (can_busy_loop && !need_resched()) {
if (!busy_start) {
busy_start = busy_loop_current_time();
continue;
}
if (!busy_loop_timeout(busy_start))
continue;
}
busy_flag = 0;

/*
* If this is the first loop and we have a timeout
* given, then we convert to ktime_t and set the to
* pointer to the expiry value.
*/
if (end_time && !to) {
expire = timespec64_to_ktime(*end_time);
to = &expire;
}
//判断是否超时
if (!poll_schedule_timeout(wait, TASK_INTERRUPTIBLE, to, slack))timed_out = 1;
}
return count;
}

3.1 调用 do_pollfd

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
/**
* 调用fd驱动函数
* @param pollfd pollfd结构体
* @param pwait
* @param can_busy_poll
* @param busy_flag
* @return 返回准备好的事件
*/
static inline __poll_t do_pollfd(struct pollfd *pollfd, poll_table *pwait,bool *can_busy_poll,__poll_t busy_flag) {
//文件描述符
int fd = pollfd->fd;
__poll_t mask = 0, filter;
struct fd f;
//fd无效直接返回
if (fd < 0)goto out;
mask = EPOLLNVAL;
//获取真正的文件
f = fdget(fd);
//验证文件有效性
if (!f.file)goto out;
/* userland u16 ->events contains POLL... bitmap */
filter = demangle_poll(pollfd->events) | EPOLLERR | EPOLLHUP;
pwait->_key = filter | busy_flag;
//(核心)调用file的驱动 vfs_poll 返回该文件已经准备好的事件
mask = vfs_poll(f.file, pwait);
if (mask & busy_flag)*can_busy_poll = true;
mask &= filter; /* Mask out unneeded events. */
fdput(f);

out:
/* ... and so does ->revents */
pollfd->revents = mangle_poll(mask);
return mask;
}

2.2 poll_freewait(&table);

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
/**
* 释放 poll_wqueues 的资源占用
* @param pwq
*/
void poll_freewait(struct poll_wqueues *pwq){
struct poll_table_page * p = pwq->table;
int i;
for (i = 0; i < pwq->inline_index; i++)free_poll_entry(pwq->inline_entries + i);
while (p) {
struct poll_table_entry * entry;
struct poll_table_page *old;

entry = p->entry;
do {
entry--;
free_poll_entry(entry);
} while (entry > p->entries);
old = p;
p = p->next;
free_page((unsigned long) old);
}
}
EXPORT_SYMBOL(poll_freewait);

流程图示

相关总结

优点:

  • 和select相比能监听的fd个数更多(采用链表的方式存储)
  • poll 事件相比 select 的in/out/err ,明显支持的情况更多

缺点:

  • 和select一样每次监听都需要不断的遍历每个fd,效率不是很高
  • 每次调用都需要将用户态的fd复制到内核态中
  • 每次用户进程都需要遍历fd才能知道那个准备好

epoll系统调用

epoll没有最大并发连接的限制,上限是最大可以打开文件的数目,这个数字一般远大于2048, 一般来说这个数目和系统内存关系很大,具体数目可以cat /proc/sys/fs/file-max察看。Epoll最大的优点就在于它只管你“活跃”的连接,而跟连接总数无关,因此在实际的网络环境中,Epoll的效率就会远远高于select和poll。

epoll函数

  • do_epoll_create : 创建epoll控制块
  • do_epoll_ctl : 增删改操作
  • do_epoll_wait : 陷入内核等待

创建流程

1. 创建epoll系统调用

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
/**
* epoll系统调用
*/
SYSCALL_DEFINE1(epoll_create1, int, flags)
{
return do_epoll_create(flags);
}

SYSCALL_DEFINE1(epoll_create, int, size)
{
if (size <= 0)
return -EINVAL;

return do_epoll_create(0);
}

2.调用do_epoll_create创建epoll

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
/**
* 创建epoll的fd
* @param flags
* @return
*/
static int do_epoll_create(int flags) {
int error, fd;
struct eventpoll *ep = NULL;
struct file *file;

/* Check the EPOLL_* constant for consistency. */
BUILD_BUG_ON(EPOLL_CLOEXEC != O_CLOEXEC);

if (flags & ~EPOLL_CLOEXEC)return -EINVAL;
/*
* Create the internal data structure ("struct eventpoll").
*/
//传入应用创建eventpoll的结构体
error = ep_alloc(&ep);
//出现错误返回错误
if (error < 0)return error;
/*
* Creates all the items needed to setup an eventpoll file. That is,
* a file structure and a free file descriptor.
*/
//创建设置eventpoll文件所需的所有项。即文件结构和自由文件描述符。
fd = get_unused_fd_flags(O_RDWR | (flags & O_CLOEXEC));
if (fd < 0) {//创建失败
error = fd;
goto out_free_ep;
}
//创建一个名字为“[eventpoll]”的eventpollfs文件描述符
file = anon_inode_getfile("[eventpoll]", &eventpoll_fops, ep,O_RDWR | (flags & O_CLOEXEC));
if (IS_ERR(file)) {
error = PTR_ERR(file);
goto out_free_fd;
}
//赋值个ep
ep->file = file;
//将文件加入fd数组表(绑定file和fd)
fd_install(fd, file);
//返回fd
return fd;

out_free_fd:
put_unused_fd(fd);
out_free_ep:
ep_free(ep);
return error;
}

2.1.创建分配一个eventpoll结构体

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
/**
* 创建分配一个eventpoll
* @param pep
* @return
*/
static int ep_alloc(struct eventpoll **pep){
int error;
struct user_struct *user;
struct eventpoll *ep;

user = get_current_user();//当前用户
error = -ENOMEM;
ep = kzalloc(sizeof(*ep), GFP_KERNEL);//申请内存
if (unlikely(!ep))goto free_uid;//分配失败
//初始化互斥锁
mutex_init(&ep->mtx);
rwlock_init(&ep->lock);//初始化读写锁
init_waitqueue_head(&ep->wq);//初始化等待队列头
init_waitqueue_head(&ep->poll_wait);
INIT_LIST_HEAD(&ep->rdllist);//初始化就绪队列
ep->rbr = RB_ROOT_CACHED;//初始化红黑树
ep->ovflist = EP_UNACTIVE_PTR;
ep->user = user;//当前用户

*pep = ep;

return 0;//正常返回0
//出现错误
free_uid:
free_uid(user);
return error;
}

修改流程

1.修改epoll系统调用

1
2
3
4
5
6
7
8
9
10
11
12
// 代码基于linux5-19-RC8 linux/fs/eventpoll.c
/**
* epoll系统调用函数 实现插入/删除/修改fd的功能
*/
SYSCALL_DEFINE4(epoll_ctl, int, epfd, int, op, int, fd,struct epoll_event __user *, event)
{
struct epoll_event epds;
//判断是否需要来自用户空间的事件副本 && 拷贝事件到内核空间
if (ep_op_has_event(op) && copy_from_user(&epds, event, sizeof(struct epoll_event)))return -EFAULT;
//执行操作
return do_epoll_ctl(epfd, op, fd, &epds, false);
}

1.1 epoll事件结构体

1
2
3
4
5
6
7
/**
* epoll事件结构体
*/
struct epoll_event {
__poll_t events;//事件
__u64 data;//数据
} EPOLL_PACKED;

2. 调用 do_epoll_ctl

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156

/**
* epoll操作方法
* @param epfd
* @param op 操作类型 [EPOLL_CTL_ADD,EPOLL_CTL_DEL,EPOLL_CTL_MOD]
* @param fd 文件描述符
* @param epds epoll_event结构体
* @param nonblock 是否是非阻塞
* @return
*/
int do_epoll_ctl(int epfd, int op, int fd, struct epoll_event *epds,bool nonblock) {
int error;
int full_check = 0;
struct fd f, tf;
//eventpoll结构体
struct eventpoll *ep;
//epitem结构体
struct epitem *epi;
struct eventpoll *tep = NULL;

error = -EBADF;
f = fdget(epfd);//获取文件描述符
if (!f.file) goto error_return;

/* Get the "struct file *" for the target file */
//目标文件
tf = fdget(fd);
if (!tf.file)goto error_fput;

/* The target file descriptor must support poll */
error = -EPERM;
//如果文件不支持则直接退出
if (!file_can_poll(tf.file))goto error_tgt_fput;

/* Check if EPOLLWAKEUP is allowed */
// 检查是否支持epoll唤醒
if (ep_op_has_event(op))ep_take_care_of_epollwakeup(epds);
/*
* We have to check that the file structure underneath the file descriptor
* the user passed to us _is_ an eventpoll file. And also we do not permit
* adding an epoll file descriptor inside itself.
*/
error = -EINVAL;
//检查文件
if (f.file == tf.file || !is_file_epoll(f.file))
goto error_tgt_fput;

/*
* epoll adds to the wakeup queue at EPOLL_CTL_ADD time only,
* so EPOLLEXCLUSIVE is not allowed for a EPOLL_CTL_MOD operation.
* Also, we do not currently supported nested exclusive wakeups.
*/
if (ep_op_has_event(op) && (epds->events & EPOLLEXCLUSIVE)) {
if (op == EPOLL_CTL_MOD)
goto error_tgt_fput;
if (op == EPOLL_CTL_ADD && (is_file_epoll(tf.file) ||
(epds->events & ~EPOLLEXCLUSIVE_OK_BITS)))
goto error_tgt_fput;
}

/*
* At this point it is safe to assume that the "private_data" contains
* our own data structure.
*/
ep = f.file->private_data;

/*
* When we insert an epoll file descriptor inside another epoll file
* descriptor, there is the chance of creating closed loops, which are
* better be handled here, than in more critical paths. While we are
* checking for loops we also determine the list of files reachable
* and hang them on the tfile_check_list, so we can check that we
* haven't created too many possible wakeup paths.
*
* We do not need to take the global 'epumutex' on EPOLL_CTL_ADD when
* the epoll file descriptor is attaching directly to a wakeup source,
* unless the epoll file descriptor is nested. The purpose of taking the
* 'epmutex' on add is to prevent complex toplogies such as loops and
* deep wakeup paths from forming in parallel through multiple
* EPOLL_CTL_ADD operations.
*/
//需要获取mutex对红黑树进行操作
error = epoll_mutex_lock(&ep->mtx, 0, nonblock);
if (error)
goto error_tgt_fput;
if (op == EPOLL_CTL_ADD) {
if (READ_ONCE(f.file->f_ep) || ep->gen == loop_check_gen ||
is_file_epoll(tf.file)) {
mutex_unlock(&ep->mtx);
error = epoll_mutex_lock(&epmutex, 0, nonblock);
if (error)
goto error_tgt_fput;
loop_check_gen++;
full_check = 1;
if (is_file_epoll(tf.file)) {
tep = tf.file->private_data;
error = -ELOOP;
if (ep_loop_check(ep, tep) != 0)
goto error_tgt_fput;
}
error = epoll_mutex_lock(&ep->mtx, 0, nonblock);
if (error)
goto error_tgt_fput;
}
}

/*
* Try to lookup the file inside our RB tree. Since we grabbed "mtx"
* above, we can be sure to be able to use the item looked up by
* ep_find() till we release the mutex.
*/
//从红黑树中寻找添加的fd是否存在,存在则返回到ep中,否则返回NULL
epi = ep_find(ep, tf.file, fd);

error = -EINVAL;
switch (op) {//区分op类型增删改
case EPOLL_CTL_ADD://新增
//查找不到则添加
if (!epi) {
epds->events |= EPOLLERR | EPOLLHUP;
error = ep_insert(ep, epds, tf.file, fd, full_check);//插入
} else//否则不重复添加
error = -EEXIST;
break;
case EPOLL_CTL_DEL://删除
if (epi)
error = ep_remove(ep, epi);//删除
else
error = -ENOENT;
break;
case EPOLL_CTL_MOD://修改
if (epi) {
if (!(epi->event.events & EPOLLEXCLUSIVE)) {
epds->events |= EPOLLERR | EPOLLHUP;
error = ep_modify(ep, epi, epds);
}
} else
error = -ENOENT;
break;
}
mutex_unlock(&ep->mtx);

error_tgt_fput:
if (full_check) {
clear_tfile_check_list();
loop_check_gen++;
mutex_unlock(&epmutex);
}

fdput(tf);
error_fput:
fdput(f);
error_return:

return error;
}

2.1 结构体eventpoll控制块

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
struct eventpoll {
/*
* This mutex is used to ensure that files are not removed
* while epoll is using them. This is held during the event
* collection loop, the file cleanup path, the epoll file exit
* code and the ctl operations.
*/
struct mutex mtx;

/* Wait queue used by sys_epoll_wait() */
wait_queue_head_t wq;//调用进程等待队列

/* Wait queue used by file->poll() */
wait_queue_head_t poll_wait;//等待队列队头(如果被监听的文件是一个epoll类型)

/* List of ready file descriptors */
struct list_head rdllist;//已经准备好的文件描述符队列队头(双向链表)

/* Lock which protects rdllist and ovflist */
rwlock_t lock;//读写锁

/* RB tree root used to store monitored fd structs */
struct rb_root_cached rbr;//红黑树root节点(存储epitem)

/*
* This is a single linked list that chains all the "struct epitem" that
* happened while transferring ready events to userspace w/out
* holding ->lock.
*/
struct epitem *ovflist;//

/* wakeup_source used when ep_scan_ready_list is running */
struct wakeup_source *ws;

/* The user that created the eventpoll descriptor */
struct user_struct *user;//当前进程用户

struct file *file;//真正的文件

/* used to optimize loop detection check */
u64 gen;
struct hlist_head refs;

#ifdef CONFIG_NET_RX_BUSY_POLL
/* used to track busy poll napi_id */
unsigned int napi_id;
#endif

#ifdef CONFIG_DEBUG_LOCK_ALLOC
/* tracks wakeup nests for lockdep validation */
u8 nests;
#endif
};

2.2 结构体epitem红黑树节点

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
struct epitem {

union {
/* RB tree node links this structure to the eventpoll RB tree */
struct rb_node rbn;//指向红黑树的节点
/* Used to free the struct epitem */
struct rcu_head rcu;
};

/* List header used to link this structure to the eventpoll ready list */
struct list_head rdllink;

/*
* Works together "struct eventpoll"->ovflist in keeping the
* single linked chain of items.
*/
struct epitem *next;//

/* The file descriptor information this item refers to */
struct epoll_filefd ffd;//文件fd

/* List containing poll wait queues */
struct eppoll_entry *pwqlist;//等待队列

/* The "container" of this item */
struct eventpoll *ep;//指向eventpoll控制块

/* List header used to link this item to the "struct file" items list */
struct hlist_node fllink;

/* wakeup_source used when EPOLLWAKEUP is set */
struct wakeup_source __rcu *ws;

/* The structure that describe the interested events and the source fd */
struct epoll_event event;//事件
};

2.3 红黑树查找对应的fd的epitem

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
/**
* 红黑树查找对应的fd的epitem
* @param ep
* @param file
* @param fd
* @return
*/
static struct epitem *ep_find(struct eventpoll *ep, struct file *file, int fd){
int kcmp;
struct rb_node *rbp;
struct epitem *epi, *epir = NULL;
struct epoll_filefd ffd;

ep_set_ffd(&ffd, file, fd);//初始化指针
//二分搜索
for (rbp = ep->rbr.rb_root.rb_node; rbp; ) {
epi = rb_entry(rbp, struct epitem, rbn);
kcmp = ep_cmp_ffd(&ffd, &epi->ffd);
if (kcmp > 0)
rbp = rbp->rb_right;
else if (kcmp < 0)
rbp = rbp->rb_left;
else {
epir = epi;
break;
}
}
return epir;
}

插入操作

1.调用ep_insert插入

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
/**
* 插入操作(必须在持有“mtx”的情况下调用)
* @param ep 控制块
* @param event
* @param tfile
* @param fd
* @param full_check
* @return
*/
static int ep_insert(struct eventpoll *ep, const struct epoll_event *event,struct file *tfile, int fd, int full_check)
{
int error, pwake = 0;
__poll_t revents;
struct epitem *epi;//红黑树节点结构体
struct ep_pqueue epq;//epoll队列
struct eventpoll *tep = NULL;
//判断是否是epoll文件
if (is_file_epoll(tfile))tep = tfile->private_data;

lockdep_assert_irqs_enabled();
//epoll可监控的最大值
//unsigned int sysctl_nr_open __read_mostly = 1024*1024;
if (unlikely(percpu_counter_compare(&ep->user->epoll_watches,max_user_watches) >= 0))return -ENOSPC;
//增加cpu计数
percpu_counter_inc(&ep->user->epoll_watches);
//从缓存中申请内存,申请失败减掉计数
//epi_cache内存池在epoll模块初始化时已经分配,这里根据slab直接取一个epitem
if (!(epi = kmem_cache_zalloc(epi_cache, GFP_KERNEL))) {
percpu_counter_dec(&ep->user->epoll_watches);
return -ENOMEM;
}
//初始化工作 epitem
/* Item initialization follow here ... */
INIT_LIST_HEAD(&epi->rdllink);
epi->ep = ep;//设置指向ep的指针
ep_set_ffd(&epi->ffd, tfile, fd);//设置ffd
epi->event = *event;//设置监听的事件
epi->next = EP_UNACTIVE_PTR;
//加锁
if (tep)mutex_lock_nested(&tep->mtx, 1);
/* Add the current item to the list of active epoll hook for this file */
//将当前项添加到此文件的活动epoll钩子列表
if (unlikely(attach_epitem(tfile, epi) < 0)) {
if (tep)mutex_unlock(&tep->mtx);
kmem_cache_free(epi_cache, epi);
percpu_counter_dec(&ep->user->epoll_watches);
return -ENOMEM;
}

if (full_check && !tep)list_file(tfile);

/*
* Add the current item to the RB tree. All RB tree operations are
* protected by "mtx", and ep_insert() is called with "mtx" held.
*/
//插入到红黑树中
ep_rbtree_insert(ep, epi);
if (tep)mutex_unlock(&tep->mtx);

/* now check if we've created too many backpaths */
if (unlikely(full_check && reverse_path_check())) {
ep_remove(ep, epi);
return -EINVAL;
}
//如果events里设置了EPOLLWAKEUP, 还需要为autosleep创建一个唤醒源 ep_create_wakeup_source
if (epi->event.events & EPOLLWAKEUP) {
error = ep_create_wakeup_source(epi);
if (error) {
ep_remove(ep, epi);
return error;
}
}

/* Initialize the poll table using the queue callback */
//初始化
epq.epi = epi;

/* 设置epq的回调函数为ep_ptable_queue_proc,当调用poll_wait时会调用该回调函数,
* 而函数体ep_ptable_queue_proc内部所做的主要工作,
* 就是把epitem对应fd的事件到来时的回调函数设置为ep_poll_callback。
* ep_poll_callback所做的主要工作就是把就绪的fd放到就绪链表rdllist上,
* 然后唤醒epoll_wait的调用者, 被唤醒的进程再把rdllist上就绪的fd的events拷贝给用户进程,
* 完成一个闭环。
*/
init_poll_funcptr(&epq.pt, ep_ptable_queue_proc);

/*
* Attach the item to the poll hooks and get current event bits.
* We can safely use the file* here because its usage count has
* been increased by the caller of this function. Note that after
* this operation completes, the poll callback can start hitting
* the new item.
*/
//如果当前监听的事件刚好发生,则直接返回
revents = ep_item_poll(epi, &epq.pt, 1);

/*
* We have to check if something went wrong during the poll wait queue
* install process. Namely an allocation for a wait queue failed due
* high memory pressure.
*/
if (unlikely(!epq.epi)) {
ep_remove(ep, epi);
return -ENOMEM;
}

/* We have to drop the new item inside our item list to keep track of it */
write_lock_irq(&ep->lock);

/* record NAPI ID of new item if present */
ep_set_busy_poll_napi_id(epi);

/* If the file is already "ready" we drop it inside the ready list */
//如果当前的epi已经就绪了,上面的revents返回就绪的事件,则把其加入就绪列表rdllink
if (revents && !ep_is_linked(epi)) {
//加入就序列表尾部
list_add_tail(&epi->rdllink, &ep->rdllist);
ep_pm_stay_awake(epi);
/* Notify waiting tasks that events are available */
//主动唤醒通知等待任务事件可用
if (waitqueue_active(&ep->wq))wake_up(&ep->wq);
if (waitqueue_active(&ep->poll_wait))pwake++;
}
//释放ep的锁
write_unlock_irq(&ep->lock);

/* We have to call this outside the lock */
if (pwake)ep_poll_safewake(ep, NULL);

return 0;
}

2.插入节点到红黑树

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
/**
* 插入epi到红黑树中
* @param ep eventpoll控制块
* @param epi 插入的节点
*/
static void ep_rbtree_insert(struct eventpoll *ep, struct epitem *epi)
{
int kcmp;
struct rb_node **p = &ep->rbr.rb_root.rb_node, *parent = NULL;
struct epitem *epic;
bool leftmost = true;

while (*p) {
parent = *p;
epic = rb_entry(parent, struct epitem, rbn);
kcmp = ep_cmp_ffd(&epi->ffd, &epic->ffd);
if (kcmp > 0) {
p = &parent->rb_right;
leftmost = false;
} else
p = &parent->rb_left;
}
//通过rbn关联(注意实际上插入红黑树的是rbn,通过其进行关联epitem)
rb_link_node(&epi->rbn, parent, p);
rb_insert_color_cached(&epi->rbn, &ep->rbr, leftmost);
}
/**
* 红黑树的插入操作
* @param node
* @param parent
* @param rb_link
*/
static inline void rb_link_node(struct rb_node *node, struct rb_node *parent,struct rb_node **rb_link)
{
node->__rb_parent_color = (unsigned long)parent;//父节点颜色
node->rb_left = node->rb_right = NULL;//设置其左右节点为NULL
*rb_link = node;//将rb_link指针指向node
}

2.1查找到对应插入的位置

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
/**
* 插入epi到红黑树中
* @param ep eventpoll控制块
* @param epi 插入的节点
*/
static void ep_rbtree_insert(struct eventpoll *ep, struct epitem *epi)
{
int kcmp;
struct rb_node **p = &ep->rbr.rb_root.rb_node, *parent = NULL;
struct epitem *epic;
bool leftmost = true;

while (*p) {
parent = *p;
epic = rb_entry(parent, struct epitem, rbn);
kcmp = ep_cmp_ffd(&epi->ffd, &epic->ffd);
if (kcmp > 0) {
p = &parent->rb_right;
leftmost = false;
} else
p = &parent->rb_left;
}
//通过rbn关联
rb_link_node(&epi->rbn, parent, p);
rb_insert_color_cached(&epi->rbn, &ep->rbr, leftmost);
}

2.2 插入节点

1
2
3
4
5
6
static inline void rb_link_node(struct rb_node *node, struct rb_node *parent,struct rb_node **rb_link)
{
node->__rb_parent_color = (unsigned long)parent;
node->rb_left = node->rb_right = NULL;
*rb_link = node;
}

3.初始化poll回调函数指针

1
2
3
4
5
6
7
8
9
10
11
12
/**
* 初始化
* @param pt
* @param qproc
*/
static inline void init_poll_funcptr(poll_table *pt, poll_queue_proc qproc)
{
//pt是poll_table_struct结构体类型,设置其回调函数
pt->_qproc = qproc;
//默认监听所有事件
pt->_key = ~(__poll_t)0; /* all events enabled */
}

3.1 结构体poll_table_struct

存储回调函数和监听的事件类型

1
2
3
4
typedef struct poll_table_struct {
poll_queue_proc _qproc;
__poll_t _key;
} poll_table;

3.2 唤醒时回调的函数

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
static void ep_ptable_queue_proc(struct file *file, wait_queue_head_t *whead,poll_table *pt)
{
struct ep_pqueue *epq = container_of(pt, struct ep_pqueue, pt);
struct epitem *epi = epq->epi;
struct eppoll_entry *pwq;//结构体见3.3

if (unlikely(!epi)) // an earlier allocation has failed
return;

pwq = kmem_cache_alloc(pwq_cache, GFP_KERNEL);
if (unlikely(!pwq)) {
epq->epi = NULL;
return;
}
//初始化等待队列,当唤醒时调用ep_poll_callback
init_waitqueue_func_entry(&pwq->wait, ep_poll_callback);
pwq->whead = whead;
pwq->base = epi;
if (epi->event.events & EPOLLEXCLUSIVE)add_wait_queue_exclusive(whead, &pwq->wait);
else add_wait_queue(whead, &pwq->wait);
pwq->next = epi->pwqlist;
epi->pwqlist = pwq;
}

3.3 结构体 eppoll_entry

1
2
3
4
5
6
7
8
9
10
11
/* Wait structure used by the poll hooks */
struct eppoll_entry {
/* List header used to link this structure to the "struct epitem" */
struct eppoll_entry *next;
/* The "base" pointer is set to the container "struct epitem" */
struct epitem *base;
/* Wait queue item that will be linked to the target file wait queue head.*/
wait_queue_entry_t wait;
/* The wait queue head that linked the "wait" wait queue item */
wait_queue_head_t *whead;
};

3.4 回调函数ep_poll_callback

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
/**
* epoll回调函数主要的功能是将被监视文件的等待事件就绪时,
* 将文件对应的epitem实例添加到就绪队列中,当用户调用epoll_wait()时,
* 内核会将就绪队列中的事件报告给用户
*
* @param wait
* @param mode
* @param sync
* @param key
* @return
*/
static int ep_poll_callback(wait_queue_entry_t *wait, unsigned mode, int sync, void *key)
{
int pwake = 0;
struct epitem *epi = ep_item_from_wait(wait);//从等待队列指针获取“struct epitem”
struct eventpoll *ep = epi->ep;//ep控制块
__poll_t pollflags = key_to_poll(key);
unsigned long flags;
int ewake = 0;
//获取读锁
read_lock_irqsave(&ep->lock, flags);

ep_set_busy_poll_napi_id(epi);

/*
* If the event mask does not contain any poll(2) event, we consider the
* descriptor to be disabled. This condition is likely the effect of the
* EPOLLONESHOT bit that disables the descriptor when an event is received,
* until the next EPOLL_CTL_MOD will be issued.
*/
//如果当前epitem不感兴趣任何事件则直接跳出
if (!(epi->event.events & ~EP_PRIVATE_BITS))
goto out_unlock;

/*
* Check the events coming with the callback. At this stage, not
* every device reports the events in the "key" parameter of the
* callback. We need to be able to handle both cases here, hence the
* test for "key" != NULL before the event match test.
*/
if (pollflags && !(pollflags & epi->event.events))
goto out_unlock;

/*
* If we are transferring events to userspace, we can hold no locks
* (because we're accessing user memory, and because of linux f_op->poll()
* semantics). All the events that happen during that period of time are
* chained in ep->ovflist and requeued later on.
*/
// ovflist默认值是EP_UNACTIVE_PTR,epoll_wait()遍历rdllist之前会把ovflist设置为NULL,
// 遍历完再恢复为EP_UNACTIVE_PTR,因此通过判断ovflist的值是不是EP_UNACTIVE_PTR可知此时rdllist是不是正在被访问。
if (READ_ONCE(ep->ovflist) != EP_UNACTIVE_PTR) {
if (chain_epi_lockless(epi))ep_pm_stay_awake_rcu(epi);
} else if (!ep_is_linked(epi)) {
/* In the usual case, add event to ready list. */
//将事件添加到就绪列表
if (list_add_tail_lockless(&epi->rdllink, &ep->rdllist))ep_pm_stay_awake_rcu(epi);
}

/*
* Wake up ( if active ) both the eventpoll wait list and the ->poll()
* wait list.
*/
//如果等待队列不为空,则将他们进行唤醒
if (waitqueue_active(&ep->wq)) {
if ((epi->event.events & EPOLLEXCLUSIVE) &&
!(pollflags & POLLFREE)) {
switch (pollflags & EPOLLINOUT_BITS) {
case EPOLLIN:
if (epi->event.events & EPOLLIN)
ewake = 1;
break;
case EPOLLOUT:
if (epi->event.events & EPOLLOUT)
ewake = 1;
break;
case 0:
ewake = 1;
break;
}
}
//唤醒等待的进程
wake_up(&ep->wq);
}
if (waitqueue_active(&ep->poll_wait))
pwake++;

out_unlock:
read_unlock_irqrestore(&ep->lock, flags);

/* We have to call this outside the lock */
if (pwake)
ep_poll_safewake(ep, epi);

if (!(epi->event.events & EPOLLEXCLUSIVE))
ewake = 1;

if (pollflags & POLLFREE) {
/*
* If we race with ep_remove_wait_queue() it can miss
* ->whead = NULL and do another remove_wait_queue() after
* us, so we can't use __remove_wait_queue().
*/
list_del_init(&wait->entry);
/*
* ->whead != NULL protects us from the race with ep_free()
* or ep_remove(), ep_remove_wait_queue() takes whead->lock
* held by the caller. Once we nullify it, nothing protects
* ep/epi or even wait.
*/
smp_store_release(&ep_pwq_from_wait(wait)->whead, NULL);
}

return ewake;
}

4.调用ep_item_poll函数

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
/**
* 核心调用
* @param epi epitem 节点
* @param pt poll_table
* @param depth
* @return
*/
static __poll_t ep_item_poll(const struct epitem *epi, poll_table *pt, int depth)
{
struct file *file = epi->ffd.file;
__poll_t res;

pt->_key = epi->event.events;
if (!is_file_epoll(file))//不支持epoll则调用poll
res = vfs_poll(file, pt);
else//(主线)否则调用epoll
res = __ep_eventpoll_poll(file, pt, depth);
return res & epi->event.events;
}

4.1 调用 __ep_eventpoll_poll

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
/**
* 主线
* @param file 文件
* @param wait poll_table
* @param depth
* @return
*/
static __poll_t __ep_eventpoll_poll(struct file *file, poll_table *wait, int depth)
{
struct eventpoll *ep = file->private_data;//获取ep控制块
LIST_HEAD(txlist);
struct epitem *epi, *tmp;
poll_table pt;
__poll_t res = 0;
//重置回调函数为NULL
init_poll_funcptr(&pt, NULL);
/* Insert inside our poll wait queue */
//主线
poll_wait(file, &ep->poll_wait, wait);
/*
* Proceed to find out if wanted events are really available inside
* the ready list.
*/
mutex_lock_nested(&ep->mtx, depth);
//
ep_start_scan(ep, &txlist);
list_for_each_entry_safe(epi, tmp, &txlist, rdllink) {
if (ep_item_poll(epi, &pt, depth + 1)) {
res = EPOLLIN | EPOLLRDNORM;
break;
} else {
/*
* Item has been dropped into the ready list by the poll
* callback, but it's not actually ready, as far as
* caller requested events goes. We can remove it here.
*/
__pm_relax(ep_wakeup_source(epi));
list_del_init(&epi->rdllink);
}
}
ep_done_scan(ep, &txlist);
mutex_unlock(&ep->mtx);
return res;
}

4.2 调用poll_wait函数触发回调

1
2
3
4
5
6
7
8
9
10
11
12
/**
* 调用回调函数
* @param filp
* @param wait_address
* @param p
*/
static inline void poll_wait(struct file * filp, wait_queue_head_t * wait_address, poll_table *p)
{
if (p && p->_qproc && wait_address)
//调用init_poll_funcptr设置的回调函数
p->_qproc(filp, wait_address, p);
}

删除操作

1. 调用ep_remove删除

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
/**
* 移除 epitem 操作
* @param ep ep控制块
* @param epi 需要移除的epitem
* @return
*/
static int ep_remove(struct eventpoll *ep, struct epitem *epi)
{
struct file *file = epi->ffd.file;//获取文件
struct epitems_head *to_free;//epitem头指针
struct hlist_head *head;

lockdep_assert_irqs_enabled();

/*
* Removes poll wait queue hooks.
*/
ep_unregister_pollwait(ep, epi);

/* Remove the current item from the list of epoll hooks */
spin_lock(&file->f_lock);//获取锁
to_free = NULL;
head = file->f_ep;
//遍历fllink获取要移除的节点
if (head->first == &epi->fllink && !epi->fllink.next) {
file->f_ep = NULL;
if (!is_file_epoll(file)) {
struct epitems_head *v;
v = container_of(head, struct epitems_head, epitems);
if (!smp_load_acquire(&v->next))
to_free = v;
}
}
hlist_del_rcu(&epi->fllink);
spin_unlock(&file->f_lock);
free_ephead(to_free);//释放

rb_erase_cached(&epi->rbn, &ep->rbr);

write_lock_irq(&ep->lock);
if (ep_is_linked(epi))
list_del_init(&epi->rdllink);
write_unlock_irq(&ep->lock);

wakeup_source_unregister(ep_wakeup_source(epi));
/*
* At this point it is safe to free the eventpoll item. Use the union
* field epi->rcu, since we are trying to minimize the size of
* 'struct epitem'. The 'rbn' field is no longer in use. Protected by
* ep->mtx. The rcu read side, reverse_path_check_proc(), does not make
* use of the rbn field.
*/
call_rcu(&epi->rcu, epi_rcu_free);//释放

percpu_counter_dec(&ep->user->epoll_watches);

return 0;
}

2.移除进程等待队列wq

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
/**
* 取消注册的进程
* @param ep
* @param epi
*/
static void ep_unregister_pollwait(struct eventpoll *ep, struct epitem *epi)
{
struct eppoll_entry **p = &epi->pwqlist;
struct eppoll_entry *pwq;

while ((pwq = *p) != NULL) {
*p = pwq->next;
ep_remove_wait_queue(pwq);//移除
kmem_cache_free(pwq_cache, pwq);//释放内存
}
}

修改操作

1.调用ep_modify修改

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
/**
* 修改操作:将epi设置为感兴趣的event
* @param ep控制块
* @param epi epitem
* @param event 更改后的事件
* @return
*/
static int ep_modify(struct eventpoll *ep, struct epitem *epi,const struct epoll_event *event)
{
int pwake = 0;
poll_table pt;

lockdep_assert_irqs_enabled();
//将pt的回调函数置空
init_poll_funcptr(&pt, NULL);

/*
* Set the new event interest mask before calling f_op->poll();
* otherwise we might miss an event that happens between the
* f_op->poll() call and the new event set registering.
*/
//设置新的感兴趣的事件
epi->event.events = event->events; /* need barrier below */
epi->event.data = event->data; /* protected by mtx */
//加入唤醒源
if (epi->event.events & EPOLLWAKEUP) {
if (!ep_has_wakeup_source(epi))
ep_create_wakeup_source(epi);
} else if (ep_has_wakeup_source(epi)) {
ep_destroy_wakeup_source(epi);
}

/*
* The following barrier has two effects:
*
* 1) Flush epi changes above to other CPUs. This ensures
* we do not miss events from ep_poll_callback if an
* event occurs immediately after we call f_op->poll().
* We need this because we did not take ep->lock while
* changing epi above (but ep_poll_callback does take
* ep->lock).
*
* 2) We also need to ensure we do not miss _past_ events
* when calling f_op->poll(). This barrier also
* pairs with the barrier in wq_has_sleeper (see
* comments for wq_has_sleeper).
*
* This barrier will now guarantee ep_poll_callback or f_op->poll
* (or both) will notice the readiness of an item.
*/
smp_mb();//内存屏障

/*
* Get current event bits. We can safely use the file* here because
* its usage count has been increased by the caller of this function.
* If the item is "hot" and it is not registered inside the ready
* list, push it inside.
*/
//如果在当前有准备好的事件了则将其加入到就绪队列中去
if (ep_item_poll(epi, &pt, 1)) {
write_lock_irq(&ep->lock);
if (!ep_is_linked(epi)) {
list_add_tail(&epi->rdllink, &ep->rdllist);
ep_pm_stay_awake(epi);

/* Notify waiting tasks that events are available */
if (waitqueue_active(&ep->wq))
wake_up(&ep->wq);
if (waitqueue_active(&ep->poll_wait))
pwake++;
}
write_unlock_irq(&ep->lock);
}

/* We have to call this outside the lock */
if (pwake)
ep_poll_safewake(ep, NULL);

return 0;
}

陷入内核

1.系统调用

1
2
3
4
5
SYSCALL_DEFINE4(epoll_wait, int, epfd, struct epoll_event __user *, events,int, maxevents, int, timeout)
{
struct timespec64 to;
return do_epoll_wait(epfd, events, maxevents,ep_timeout_to_timespec(&to, timeout));
}

2.调用do_epoll_wait

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
/**
* 等待事件
* @param epfd
* @param events 用户空间事件
* @param maxevents 最大事件数
* @param to 超时时间
* @return 返回准备好的事件个数/调用错误响应码
*/
static int do_epoll_wait(int epfd, struct epoll_event __user *events,int maxevents, struct timespec64 *to)
{
int error;
struct fd f;//文件描述符
struct eventpoll *ep;//ep控制块

/* The maximum number of event must be greater than zero */
if (maxevents <= 0 || maxevents > EP_MAX_EVENTS)return -EINVAL;

/* Verify that the area passed by the user is writeable */
//检查是否能正常访问
if (!access_ok(events, maxevents * sizeof(struct epoll_event)))return -EFAULT;

/* Get the "struct file *" for the eventpoll file */
//根据epfd获取对应的fd
f = fdget(epfd);
if (!f.file)return -EBADF;

/*
* We have to check that the file structure underneath the fd
* the user passed to us _is_ an eventpoll file.
*/
//判断当前文件是否支持epoll
error = -EINVAL;
if (!is_file_epoll(f.file))goto error_fput;

/*
* At this point it is safe to assume that the "private_data" contains
* our own data structure.
*/
ep = f.file->private_data;

/* Time to fish for events ... */
//(主线)调用ep_poll函数
error = ep_poll(ep, events, maxevents, to);

error_fput:
fdput(f);
return error;
}

3.调用ep_poll

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124

/**
* ep_poll - Retrieves ready events, and delivers them to the caller-supplied event buffer.
*
* @ep: Pointer to the eventpoll context.
* @events: Pointer to the userspace buffer where the ready events should be stored.
* @maxevents: Size (in terms of number of events) of the caller event buffer.
* @timeout: Maximum timeout for the ready events fetch operation, in
* timespec. If the timeout is zero, the function will not block,
* while if the @timeout ptr is NULL, the function will block
* until at least one event has been retrieved (or an error occurred).
* Return: the number of ready events which have been fetched, or an error code, in case of error.
*/
static int ep_poll(struct eventpoll *ep, struct epoll_event __user *events,int maxevents, struct timespec64 *timeout)
{
int res, eavail, timed_out = 0;
u64 slack = 0;
wait_queue_entry_t wait;
ktime_t expires, *to = NULL;

lockdep_assert_irqs_enabled();
//是否带有超时时间
if (timeout && (timeout->tv_sec | timeout->tv_nsec)) {
slack = select_estimate_accuracy(timeout);
to = &expires;
*to = timespec64_to_ktime(*timeout);
} else if (timeout) {
/*
* Avoid the unnecessary trip to the wait queue loop, if the
* caller specified a non blocking operation.
*/
timed_out = 1;
}

/*
* This call is racy: We may or may not see events that are being added
* to the ready list under the lock (e.g., in IRQ callbacks). For cases
* with a non-zero timeout, this thread will check the ready list under
* lock and will add to the wait queue. For cases with a zero
* timeout, the user by definition should not care and will have to
* recheck again.
*/
//检查是否有可用的事件(查看rdlist)
eavail = ep_events_available(ep);
//死循环
while (1) {
if (eavail) {//如果已经有了可用的事件
/*
* Try to transfer events to user space. In case we get
* 0 events and there's still timeout left over, we go
* trying again in search of more luck.
*/
//(主线)尝试将事件传输到用户空间
res = ep_send_events(ep, events, maxevents);
if (res)return res;
}
//超时返回
if (timed_out)return 0;

eavail = ep_busy_loop(ep, timed_out);
if (eavail)continue;
//中断
if (signal_pending(current))return -EINTR;

/*
* Internally init_wait() uses autoremove_wake_function(),
* thus wait entry is removed from the wait queue on each
* wakeup. Why it is important? In case of several waiters
* each new wakeup will hit the next waiter, giving it the
* chance to harvest new event. Otherwise wakeup can be
* lost. This is also good performance-wise, because on
* normal wakeup path no need to call __remove_wait_queue()
* explicitly, thus ep->lock is not taken, which halts the
* event delivery.
*/
init_wait(&wait);

write_lock_irq(&ep->lock);
/*
* Barrierless variant, waitqueue_active() is called under
* the same lock on wakeup ep_poll_callback() side, so it
* is safe to avoid an explicit barrier.
*/
//设置当前状态位任务中断
__set_current_state(TASK_INTERRUPTIBLE);

/*
* Do the final check under the lock. ep_scan_ready_list()
* plays with two lists (->rdllist and ->ovflist) and there
* is always a race when both lists are empty for short
* period of time although events are pending, so lock is
* important.
*/
//再次判断就绪队列
eavail = ep_events_available(ep);
if (!eavail)__add_wait_queue_exclusive(&ep->wq, &wait);

write_unlock_irq(&ep->lock);

if (!eavail)timed_out = !schedule_hrtimeout_range(to, slack,HRTIMER_MODE_ABS);
__set_current_state(TASK_RUNNING);

/*
* We were woken up, thus go and try to harvest some events.
* If timed out and still on the wait queue, recheck eavail
* carefully under lock, below.
*/
eavail = 1;

if (!list_empty_careful(&wait.entry)) {
write_lock_irq(&ep->lock);
/*
* If the thread timed out and is not on the wait queue,
* it means that the thread was woken up after its
* timeout expired before it could reacquire the lock.
* Thus, when wait.entry is empty, it needs to harvest
* events.
*/
if (timed_out)eavail = list_empty(&wait.entry);
__remove_wait_queue(&ep->wq, &wait);
write_unlock_irq(&ep->lock);
}
}
}

4.拷贝事件到用户空间

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
/**
* 发送事件到用户空间
* @param ep eventpoll控制块
* @param events
* @param maxevents
* @return event cnt
*/
static int ep_send_events(struct eventpoll *ep,struct epoll_event __user *events, int maxevents)
{
struct epitem *epi, *tmp;
LIST_HEAD(txlist);
poll_table pt;
int res = 0;

/*
* Always short-circuit for fatal signals to allow threads to make a
* timely exit without the chance of finding more events available and
* fetching repeatedly.
*/
if (fatal_signal_pending(current))return -EINTR;
//重置poll_table回调函数
init_poll_funcptr(&pt, NULL);

mutex_lock(&ep->mtx);
//开始扫描就绪的fd
ep_start_scan(ep, &txlist);

/*
* We can loop without lock because we are passed a task private list.
* Items cannot vanish during the loop we are holding ep->mtx.
*/
//传入链表进行遍历
list_for_each_entry_safe(epi, tmp, &txlist, rdllink) {
struct wakeup_source *ws;
__poll_t revents;

if (res >= maxevents)break;//超出最大长度结束循环

/*
* Activate ep->ws before deactivating epi->ws to prevent
* triggering auto-suspend here (in case we reactive epi->ws
* below).
*
* This could be rearranged to delay the deactivation of epi->ws
* instead, but then epi->ws would temporarily be out of sync
* with ep_is_linked().
*/
ws = ep_wakeup_source(epi);
if (ws) {
if (ws->active)
__pm_stay_awake(ep->ws);
__pm_relax(ws);
}

list_del_init(&epi->rdllink);

/*
* If the event mask intersect the caller-requested one,
* deliver the event to userspace. Again, we are holding ep->mtx,
* so no operations coming from userspace can change the item.
*/
//(主线)调用 ep_item_poll
revents = ep_item_poll(epi, &pt, 1);
if (!revents)continue;//如果没有准备好的事件直接结束当前循环
//(主线) 拷贝事件到用户空间
events = epoll_put_uevent(revents, epi->event.data, events);
if (!events) {
list_add(&epi->rdllink, &txlist);//添加进链表
ep_pm_stay_awake(epi);
if (!res)res = -EFAULT;
break;
}
res++;
if (epi->event.events & EPOLLONESHOT)epi->event.events &= EP_PRIVATE_BITS;
else if (!(epi->event.events & EPOLLET)) {//边缘触发方式
/*
* If this file has been added with Level
* Trigger mode, we need to insert back inside
* the ready list, so that the next call to
* epoll_wait() will check again the events
* availability. At this point, no one can insert
* into ep->rdllist besides us. The epoll_ctl()
* callers are locked out by
* ep_scan_ready_list() holding "mtx" and the
* poll callback will queue them in ep->ovflist.
*/
list_add_tail(&epi->rdllink, &ep->rdllist);//添加进链表
ep_pm_stay_awake(epi);
}
}
ep_done_scan(ep, &txlist);//完成扫描
mutex_unlock(&ep->mtx);

return res;
}

4.1调用ep_start_scan

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
/*
* ep->mutex needs to be held because we could be hit by
* eventpoll_release_file() and epoll_ctl().
*/
static void ep_start_scan(struct eventpoll *ep, struct list_head *txlist)
{
/*
* Steal the ready list, and re-init the original one to the
* empty list. Also, set ep->ovflist to NULL so that events
* happening while looping w/out locks, are not lost. We cannot
* have the poll callback to queue directly on ep->rdllist,
* because we want the "sproc" callback to be able to do it
* in a lockless way.
*/
lockdep_assert_irqs_enabled();
write_lock_irq(&ep->lock);
list_splice_init(&ep->rdllist, txlist);//拼接两个链表
WRITE_ONCE(ep->ovflist, NULL);//写入ovflist链表
write_unlock_irq(&ep->lock);
}

4.2调用ep_done_scan

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48

static void ep_done_scan(struct eventpoll *ep,struct list_head *txlist)
{
struct epitem *epi, *nepi;

write_lock_irq(&ep->lock);
/*
* During the time we spent inside the "sproc" callback, some
* other events might have been queued by the poll callback.
* We re-insert them inside the main ready-list here.
*/
for (nepi = READ_ONCE(ep->ovflist); (epi = nepi) != NULL;
nepi = epi->next, epi->next = EP_UNACTIVE_PTR) {
/*
* We need to check if the item is already in the list.
* During the "sproc" callback execution time, items are
* queued into ->ovflist but the "txlist" might already
* contain them, and the list_splice() below takes care of them.
*/
if (!ep_is_linked(epi)) {
/*
* ->ovflist is LIFO, so we have to reverse it in order
* to keep in FIFO.
*/
list_add(&epi->rdllink, &ep->rdllist);//添加到链表
ep_pm_stay_awake(epi);
}
}
/*
* We need to set back ep->ovflist to EP_UNACTIVE_PTR, so that after
* releasing the lock, events will be queued in the normal way inside
* ep->rdllist.
*/
WRITE_ONCE(ep->ovflist, EP_UNACTIVE_PTR);

/*
* Quickly re-inject items left on "txlist".
*/
list_splice(txlist, &ep->rdllist);//拼接
__pm_relax(ep->ws);

if (!list_empty(&ep->rdllist)) {
if (waitqueue_active(&ep->wq))
wake_up(&ep->wq);
}
//释放锁
write_unlock_irq(&ep->lock);
}

流程图示

相关总结

和select/poll相比,epoll进行了功能分离,将添加进阻塞队列和等待数据进行分离,先用 epoll_ctl 维护等待队列,再调用 epoll_wait 阻塞进程。具体流程是先用epoll_create 创建一个 epoll 对象 epfd,再通过 epoll_ctl 将需要监视的 socket 添加到 epfd 中,最后调用 epoll_wait 等待数据。

优点:

  • 相比 select/poll,epoll 拆分了功能,将操作和监听分离,操作粒度更小,效率更高
  • epoll采用回调方式将其准备好的fd加入就绪链表,用户进程无需遍历整个fd集

缺点:

  • epoll适合的是大量连接但操作不平凡的socket,如果是少了连接且触发频繁select和poll效率可能会更高

资料

https://github.com/torvalds/linux/find/master
Linux高性能服务器编程.pdf


网络编程-Linux高性能服务器编程
https://mikeygithub.github.io/2022/09/15/yuque/网络编程-《Linux高性能服务器编程》/
作者
Mikey
发布于
2022年9月15日
许可协议