This repository has been archived by the owner on Feb 12, 2022. It is now read-only.
-
Notifications
You must be signed in to change notification settings - Fork 29
/
Copy path13-4-shm-chatroom.c
420 lines (390 loc) · 14.1 KB
/
13-4-shm-chatroom.c
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
#include <sys/socket.h>
#include <netinet/in.h>
#include <assert.h>
#include <stdio.h>
#include <errno.h>
#include <string.h>
#include <stdlib.h>
#include <sys/epoll.h>
#include <signal.h>
#include <sys/wait.h>
#include <sys/mman.h>
#include <sys/stat.h>
#include <fcntl.h>
#include <unistd.h>
#include <arpa/inet.h>
#define USER_LIMIT 5
#define BUFFER_SIZE 1024
#define FD_LIMIT 65535
#define MAX_EVENT_NUMBER 1024
#define PROCESS_LIMIT 65536
//处理一个客户连接必要的数据
struct client_data
{
sockaddr_in address; //客户端的socket地址
int connfd; //socket文件描述符
pid_t pid; //处理这个连接的子进程的PID
int pipefd[2]; //和父进程通信用的管道
};
static const char* shm_name = "/my_shm";
int sig_pipefd[2];
int epollfd;
int listenfd;
int shmfd;
char* share_mem = 0;
//客户连接数组。进程用客户连接的编号来索引这个数组,即可取得相关的客户连接数据
client_data* users = 0;
//子进程和客户连接的映射关系。用进程的PID来索引这个数组,即可取得该进程所处理的客户连接的编号
int* sub_process = 0;
//当前客户数量
int user_count = 0;
bool stop_child = false;
int setnonblocking(int fd)
{
int old_option = fcntl(fd, F_GETFL);
int new_option = old_option | O_NONBLOCK;
fcntl(fd, F_SETFL, new_option);
return old_option;
}
void addfd(int epollfd, int fd)
{
epoll_event event;
event.data.fd = fd;
event.events = EPOLLIN | EPOLLET;
epoll_ctl(epollfd, EPOLL_CTL_ADD, fd, &event);
setnonblocking(fd);
}
void sig_handler(int sig)
{
int save_errno = errno;
int msg = sig;
send(sig_pipefd[1], (char *)&msg, 1, 0);
errno = save_errno;
}
void addsig(int sig, void(*handler)(int), bool restart = true)
{
struct sigaction sa;
memset(&sa, '\0', sizeof(sa));
sa.sa_handler = handler;
if (restart)
{
sa.sa_flags |= SA_RESTART;
}
sigfillset(&sa.sa_mask);
assert(sigaction(sig, &sa, NULL) != -1);
}
void del_resource()
{
close(sig_pipefd[0]);
close(sig_pipefd[1]);
close(listenfd);
close(epollfd);
shm_unlink(shm_name);
delete [] users;
delete [] sub_process;
}
//停止一个子进程
void child_term_handler(int sig)
{
stop_child = true;
}
//子进程运行的函数。参数idx指出该子进程处理的客户连接的编号,users是保存所有客户端连接数据的数组,参数share_mem指出共享内存的起始地址
int run_child(int idx, client_data* users, char* share_mem)
{
epoll_event events[MAX_EVENT_NUMBER];
//子进程使用I/O复用技术来同时监听两个文件描述符:客户连接socket、与父进程通信的管道文件描述符
int child_epollfd = epoll_create(5);
assert(child_epollfd != -1);
int connfd = users[idx].connfd;
addfd(child_epollfd, connfd);
int pipefd = users[idx].pipefd[1];
addfd(child_epollfd, pipefd);
int ret;
//子进程需要设置自己的信号处理函数
addsig(SIGTERM, child_term_handler, false);
while(!stop_child)
{
int number = epoll_wait(child_epollfd, events, MAX_EVENT_NUMBER, -1);
if((number < 0) && (errno != EINTR))
{
printf("epoll failure\n");
break;
}
for(int i = 0; i < number; i++)
{
int sockfd = events[i].data.fd;
//本子进程负责的客户连接有数据到达
if ((sockfd == connfd) && (events[i].events &EPOLLIN))
{
memset(share_mem + idx * BUFFER_SIZE, '\0', BUFFER_SIZE);
//将客户数据读取到对应的读缓存中。该读缓存是共享内存的一段,它开始于idx*BUFFER_SIZE处,
//长度为BUFFER_SIZE字节。因此,各个客户连接的读缓存是共享的
ret = recv(connfd, share_mem + idx*BUFFER_SIZE, BUFFER_SIZE-1, 0);
if (ret < 0)
{
if (errno != EAGAIN)
{
stop_child = true;
}
}
else if (ret == 0)
{
stop_child = true;
}
else
{
//成功读取客户数据后就通知主进程(通过管道)来处理
send(pipefd, (char*)&idx, sizeof(idx), 0);
}
}
//主进程通知本进程(通过管道)将第client个客户的数据发送到本进程负责的客户端
else if ((sockfd == pipefd) && (events[i].events & EPOLLIN))
{
int client = 0;
//接收主进程发送来的数据, 即有客户数据到达的连接的编号
ret = recv(sockfd, (char*)&client, sizeof(client), 0);
if (ret < 0)
{
if (errno != EAGAIN)
{
stop_child = true;
}
}
else if (ret == 0)
{
stop_child = true;
}
else
{
send(connfd, share_mem + client * BUFFER_SIZE, BUFFER_SIZE, 0);
}
}
else
{
continue;
}
}
}
close(connfd);
close(pipefd);
close(child_epollfd);
return 0;
}
int main(int argc, char *argv[])
{
if (argc < 2)
{
printf("usage: %s ip_address port_number\n", basename(argv[0]));
return 1;
}
const char* ip = argv[1];
int port = atoi(argv[2]);
int ret = 0;
struct sockaddr_in address;
bzero(&address, sizeof(address));
address.sin_family = AF_INET;
inet_pton(AF_INET, ip, &address.sin_addr);
address.sin_port = htons(port);
listenfd = socket(PF_INET, SOCK_STREAM, 0);
assert(listenfd > 0);
ret = bind(listenfd, (struct sockaddr *)&address, sizeof(address));
assert(ret != -1);
ret = listen(listenfd, 5);
assert(ret != -1);
user_count = 0;
users = new client_data[USER_LIMIT + 1];
sub_process = new int [PROCESS_LIMIT];
for( int i = 0; i < PROCESS_LIMIT; ++ i)
{
sub_process[i] = -1;
}
epoll_event events[MAX_EVENT_NUMBER];
epollfd = epoll_create(1);
assert(epollfd != -1);
addfd(epollfd, listenfd);
ret = socketpair(PF_UNIX, SOCK_STREAM, 0, sig_pipefd);
assert(ret != -1);
setnonblocking(sig_pipefd[1]);
addfd(epollfd, sig_pipefd[0]);
addsig(SIGCHLD, sig_handler);
addsig(SIGTERM, sig_handler);
addsig(SIGINT, sig_handler);
addsig(SIGPIPE, SIG_IGN);
bool stop_server = false;
bool terminate = false;
// 创建共享内存, 作为所有客户socket连接的缓存
shmfd = shm_open(shm_name, O_CREAT | O_RDWR, 0666);
assert(shmfd != -1);
ret = ftruncate(shmfd, USER_LIMIT * BUFFER_SIZE);
assert( ret != -1);
share_mem = (char *)mmap(NULL, USER_LIMIT * BUFFER_SIZE, PROT_READ| PROT_WRITE, MAP_SHARED, shmfd, 0);
assert(*share_mem != MAP_SHARED);
close(shmfd);
while( !stop_server )
{
int number = epoll_wait(epollfd, events, MAX_EVENT_NUMBER, -1);
if ((number < 0) && (errno != EINTR))
{
printf("epoll failure\n");
break;
}
for (int i = 0; i < number; i ++ )
{
int sockfd = events[i].data.fd;
//新的客户连接到来
if (sockfd == listenfd )
{
struct sockaddr_in client_address;
socklen_t client_addrlength = sizeof(client_address);
int connfd = accept(listenfd, (struct sockaddr *)&client_address, &client_addrlength);
if (connfd < 0)
{
printf("errno is: %d\n", errno);
continue;
}
if (user_count >= USER_LIMIT)
{
const char* info = "too many users\n";
printf("%s", info);
send(connfd, info, strlen(info), 0);
close(connfd);
continue;
}
//保存user_count个客户连接的相关数据
users[user_count].address = client_address;
users[user_count].connfd = connfd;
//在主进程和子进程间建立管道, 以传递必要的数据
ret = socketpair(PF_UNIX, SOCK_STREAM, 0, users[user_count].pipefd);
assert(ret != -1);
pid_t pid = fork();
if (pid < 0)
{
close(connfd);
continue;
}
else if (pid == 0)
{
close(epollfd);
close(listenfd);
close(users[user_count].pipefd[0]);
close(sig_pipefd[0]);
close(sig_pipefd[1]);
run_child(user_count, users, share_mem);
munmap((void *)share_mem, USER_LIMIT * BUFFER_SIZE);
exit(0);
}
else
{
close(connfd);
close(users[user_count].pipefd[1]);
addfd(epollfd, users[user_count].pipefd[0]);
users[user_count].pid = pid;
//记录新的客户连接在数组users中的索引值,建立进程pid和该索引值之间的映射关系
sub_process[pid] = user_count;
user_count ++;
}
}
else if ((sockfd == sig_pipefd[0]) && (events[i].events & EPOLLIN))
{
/* int sig; */
char signals[1024];
ret = recv(sig_pipefd[0], signals, sizeof(signals), 0);
if (ret == -1)
{
continue;
}
else if (ret == 0)
{
continue;
}
else
{
for (int i = 0; i < ret; ++ i)
{
switch( signals[i] )
{
//子进程退出,表示有某个客户端关闭了连接
case SIGCHLD:
{
pid_t pid;
int stat;
while((pid == waitpid(-1, &stat, WNOHANG)) > 0)
{
//用子进程的pid取得被关闭的客户连接的编号
int del_user = sub_process[pid];
sub_process[pid] = -1;
if ((del_user < 0) || (del_user > USER_LIMIT))
{
continue;
}
//清除第del_user个客户连接使用的相关数据
epoll_ctl(epollfd, EPOLL_CTL_DEL, users[del_user].pipefd[0], 0);
close(users[del_user].pipefd[0]);
users[del_user] = users[--user_count];
sub_process[users[del_user].pid] = del_user;
}
if (terminate && user_count == 0)
{
stop_server = true;
}
break;
}
case SIGTERM:
case SIGINT:
{
//结束服务程序
printf("kill all the clild now\n");
if(user_count == 0)
{
stop_server = true;
break;
}
for( int i = 0; i < user_count; ++i)
{
int pid = users[i].pid;
kill(pid, SIGTERM);
}
terminate = true;
break;
}
default:
{
break;
}
}
}
}
}
//某个子进程向父进程写入了数据
else if (events[i].events & EPOLLIN)
{
int child = 0;
//读取管道数据, child变量记录了是哪个客户连接有数据到达
ret = recv(sockfd, (char *)&child, sizeof(child), 0);
printf("read data from child accross pipe\n");
if (ret == -1)
{
continue;
}
else if (ret == 0)
{
continue;
}
else
{
//向负责处理第child个客户连接的子进程之外的其他子进程发送消息,通知它们有客户数据要写
for(int j = 0; j < user_count; j++)
{
if (users[j].pipefd[0] != sockfd)
{
printf("send data to child accross pipe\n");
send(users[j].pipefd[0], (char *)&child, sizeof(child), 0);
}
}
}
}
}
}
del_resource();
return 0;
}