网络socket编程--epoll 模型原理详解以及实例
阅读原文时间:2021年04月21日阅读:1

1.简介

Linux I/O多路复用技术在比较多的TCP网络服务器中有使用,即比较多的用到select函数。Linux 2.6内核中有提高网络I/O性能的新方法,即epoll 。 
epoll是什么?按照man手册的说法是为处理大批量句柄而作了改进的poll。要使用epoll只需要以下的三个系统函数调用: epoll_create(2),epoll_ctl(2),epoll_wait(2)。

2.select模型的缺陷

(1) 在Linux内核中,select所用到的FD_SET是有限的 
内核中有个参数__FD_SETSIZE定义了每个FD_SET的句柄个数:#define __FD_SETSIZE 1024。也就是说,如果想要同时检测1025个句柄的可读状态是不可能用select实现的;或者同时检测1025个句柄的可写状态也是不可能的。 
(2) 内核中实现select是使用轮询方法 
每次检测都会遍历所有FD_SET中的句柄,显然select函数的执行时间与FD_SET中句柄的个数有一个比例关系,即select要检测的句柄数越多就会越费时

3.Windows IOCP模型的缺陷

windows完成端口实现的AIO,实际上也只是使用内部用线程池实现的,最后的结果是IO有个线程池,你的应用程序也需要一个线程池。很多文档其实已经指出了这引发的线程context-switch所带来的代价。

4.EPOLL模型的优点

(1) 支持一个进程打开大数目的socket描述符(FD) 
epoll没有select模型中的限制,它所支持的FD上限是最大可以打开文件的数目,这个数字一般远大于select 所支持的2048。下面是我的小PC机上的显示: 
pt@ubuntu:~$ cat /proc/sys/fs/file-max 
6815744 
那么对于服务器而言,这个数目会更大。 

(2) IO效率不随FD数目增加而线性下降 
传统select/poll的另一个致命弱点就是当你拥有一个很大的socket集合,由于网络得延时,使得任一时间只有部分的socket是”活跃”的,而select/poll每次调用都会线性扫描全部的集合,导致效率呈现线性下降。但是epoll不存在这个问题,它只会对”活跃”的socket进行操作:这是因为在内核实现中epoll是根据每个fd上面的callback函数实现的。于是,只有”活跃”的socket才会主动去调用callback函数,其他idle状态的socket则不会。在这点上,epoll实现了一个”伪”AIO”,因为这时候推动力在os内核。在一些 benchmark中,如果所有的socket基本上都是活跃的,比如一个高速LAN环境,epoll也不比select/poll低多少效率,但若过多使用的调用epoll_ctl,效率稍微有些下降。然而一旦使用idle connections模拟WAN环境,那么epoll的效率就远在select/poll之上了。 
(3) 使用mmap加速内核与用户空间的消息传递 
无论是select,poll还是epoll都需要内核把FD消息通知给用户空间,如何避免不必要的内存拷贝就显得很重要。在这点上,epoll是通过内核于用户空间mmap同一块内存实现。

5.EPOLL模型的工作模式

  • ET(边缘模式)
  • LT(水平模式)

(1) LT模式 
LT:level triggered,这是缺省的工作方式,同时支持block和no-block socket,在这种模式中,内核告诉你一个文件描述符是否就绪了,然后你可以对这个就绪的fd进行IO操作。如果你不作任何操作,内核还是会继续通知你的,所以,这种模式编程出错误可能性要小一点。传统的select/poll都是这种模型的代表。 

是标准模式,意味着每次epoll_wait()返回后,事件处理后,如果之后还有数据,会不断触发,也就是说,一个套接字上一次完整的数据,epoll_wait()可能会返回多次,直到没有数据为止。
(2) ET模式 
LT:edge-triggered,这是高速工作方式,只支持no-block socket。在这种模式下,当描述符从未就绪变为就绪时,内核就通过epoll告诉你,然后它会假设你知道文件描述符已经就绪,并且不会再为那个文件描述符发送更多的就绪通知,直到你做了某些操作而导致那个文件描述符不再是就绪状态(比如你在发送,接收或是接受请求,或者发送接收的数据少于一定量时导致了一个EWOULDBLOCK 错误)。但是请注意,如果一直不对这个fd作IO操作(从而导致它再次变成未就绪),内核就不会发送更多的通知(only once)。不过在TCP协议中,ET模式的加速效用仍需要更多的benchmark确认。

也称高效模式,有数据过来后,epoll_wait()会返回一次,一段时间内,该套接字就算有数据源源不断地过来,epoll_wait()也不会返回了。这里注意,是一段时间,不代表这个套接字上有数据就只触发一次。时间过长,还是会返回多次的。比如我写FTP用了epoll+多线程,但是每次套接字上有信息就开线程处理,同一时间内希望一个套接字只被一个线程持有,但是因为文件传输时间过长,就算使用ET模式,套接字还是会返回多次。这里要特别强调一个参数EPOLLONESHOT,如果要保证套接字同一时段只被一个线程处理,必须加上。解决方案:给accept()后的套接字加上参数EPOLLONESHOT,线程结束后处理完之后,再重置EPOLLONESHOT属性,但是,千万不可以给listen()后的监听套接字设置此属性,这会造成同一时刻只能处理一个连接的情况。

深入理解EPOLLONESHOT事件

即使使用ET模式,一个socket上的某个事件还是可能被触发多次,这是跟数据报的大小有关系,常见的情景就是一个线程,而在数据的处理过程中该socket上又有新数据可读(EPOLLIN再次被触发),此时另外一个线程被唤醒处理这些新的数据,于是出现了两个线程同时操作一个socket,为了避免这种情况,就可以采用epoll的EPOLLONESPOT事件。同时要注意,注册了EPOLLONESHOT事件的socket一旦被某个线程处理完毕,该线程就应该立即重置这个socket的EPOLLONESHOT的事件,以确保这个socket下次可读时,其EPOLLIN事件被触发,进而让其他的工作线程有机会继续处理这个socket。

网络事件EAGIN

          在一个非阻塞的socket上调用read/write函数, 返回EAGAIN或者EWOULDBLOCK(注: EAGAIN就是EWOULDBLOCK)从字面上看, 意思是:EAGAIN: 再试一次,EWOULDBLOCK:

如果这是一个阻塞socket, 操作将被block,perror输出: Resource temporarily unavailable.

这个错误表示资源暂时不够,能read时,读缓冲区没有数据,或者write时,写缓冲区满了。遇到这种情况,如果是阻塞socket,read/write就要阻塞掉。而如果是非阻塞socket,read/write立即返回-1, 同时errno设置为EAGAIN。所以,对于阻塞socket,read/write返回-1代表网络出错了。但对于非阻塞socket,read/write返回-1不一定网络真的出错了。可能是Resource temporarily unavailable。这时你应该再试,直到Resource available。

综上,对于non-blocking的socket,正确的读写操作为: 
读:忽略掉errno = EAGAIN的错误,下次继续读 
写:忽略掉errno = EAGAIN的错误,下次继续写

对于select和epoll的LT模式,这种读写方式是没有问题的。但对于epoll的ET模式,这种方式还有漏洞。

epoll的两种模式LT和ET

二者的差异在于level-trigger模式下只要某个socket处于readable/writable状态,无论什么时候进行epoll_wait都会返回该socket;而edge-trigger模式下只有某个socket从unreadable变为readable或从unwritable变为writable时,epoll_wait才会返回该socket。如下两个示意图: 
从socket读数据: 

从socket写数据: 

所以,在epoll的ET模式下,正确的读写方式为: 
读:只要可读,就一直读,直到返回0,或者 errno = EAGAIN 写:只要可写,就一直写,直到数据发送完,或者 errno = EAGAIN。

正确的读:

n = 0;
while ((nread = read(fd, buf + n, BUFSIZ-1)) > 0) {
if (nread == -1 && errno != EAGAIN)
{
    perror("read error");
}
n += nread;
}

正确的写:

int nwrite, data_size = strlen(buf);
n = data_size;
while (n > 0)
{
    nwrite = write(fd, buf + data_size - n, n);
    if (nwrite < n)
    {
        if (nwrite == -1 && errno != EAGAIN)
        {
            perror("write error");
        }
        break;
     }
     n -= nwrite;
}

accept上的问题

  • 阻塞模式 accept 存在的问题 
    考虑这种情况:TCP连接被客户端关闭,即在服务器调用accept之前,客户端主动发送RST终止连接,导致刚刚建立的连接从就绪队列中移出,如果套接口被设置成阻塞模式,服务器就会一直阻塞在accept调用上,直到其他某个客户建立一个新的连接为止。但是在此期间,服务器单纯地阻塞在accept调用上,就绪队列中的其他描述符都得不到处理。

解决方案:把监听套接口设置为非阻塞,当客户在服务器调用accept之前中止某个连接时,accept调用可以立即返回-1,这时源自Berkeley的实现会在内核中处理该事件,并不会将该事件通知给epoll,而其他实现把errno设置为ECONNABORTED或者EPROTO错误,我们应该忽略这两个错误。

  • ET模式下accept存在的问题。

考虑这种情况:多个连接同时到达,服务器的TCP就绪队列瞬间积累多个就绪连接,由于是边缘触发模式,epoll只会通知一次,accept只处理一个连接,导致TCP就绪队列中剩下的连接都得不到处理。 
解决办法:是用while循环抱住accept调用,处理完TCP就绪队列中的所有连接后再退出循环。如何知道是否处理完就绪队列中的所有连接呢?accept返回-1并且errno设置为EAGAIN就表示所有连接都处理完。

综合以上两种情况,服务器应该使用非阻塞地accept,accept在ET模式下的正确使用方式为:

while ((conn_sock = accept(listenfd,(struct sockaddr *) &remote, (size_t *)&addrlen)) > 0)
{
    handle_client(conn_sock);
}
if (conn_sock == -1)
{
    if (errno != EAGAIN && errno != ECONNABORTED && errno != EPROTO && errno != EINTR)
    perror("accept");
}

面试题:

使用Linux epoll模型,水平触发模式;当socket可写时,会不停的触发socket可写的事件,如何处理? 
+ 第一种最普遍的方式: 
需要向socket写数据的时候才把socket加入epoll,等待可写事件。接受到可写事件后,调用write或者send发送数据。当所有数据都写完后,把socket移出epoll。 
这种方式的缺点是,即使发送很少的数据,也要把socket加入epoll,写完后在移出epoll,有一定操作代价。

+ 第二种的方式: 

开始不把socket加入epoll,需要向socket写数据的时候,直接调用write或者send发送数据。如果返回EAGAIN,把socket加入epoll,在epoll的驱动下写数据,全部数据发送完毕后,再移出epoll。 
这种方式的优点是:数据不多的时候可以避免epoll的事件处理,提高效率。

总结,ET模式下: 
如果read返回0,那么说明已经接受所有数据 
如果errno=EAGAIN,说明还有数据未接收,等待下一次通知 
如果read返回-1,说明发生错误,停止处理。

6.EPOLL模型的使用方法

epoll用到的所有函数都是在头文件sys/epoll.h中声明的,下面简要说明所用到的数据结构和函数: 
(1) epoll_data、epoll_data_t、epoll_event 
typedef union epoll_data { 
void *ptr; 
int fd; 
__uint32_t u32; 
__uint64_t u64; 
} epoll_data_t;

struct epoll_event { 
__uint32_t events; /* Epoll events */ 
epoll_data_t data; /* User data variable */ 
};

结构体epoll_event 被用于注册所感兴趣的事件和回传所发生待处理的事件。epoll_event 结构体的events字段是表示感兴趣的事件和被触发的事件,可能的取值为: 
EPOLLIN: 表示对应的文件描述符可以读; 
EPOLLOUT: 表示对应的文件描述符可以写; 
EPOLLPRI: 表示对应的文件描述符有紧急的数据可读; 
EPOLLERR: 表示对应的文件描述符发生错误; 
EPOLLHUP: 表示对应的文件描述符被挂断; 
EPOLLET: 表示对应的文件描述符有事件发生;

联合体epoll_data用来保存触发事件的某个文件描述符相关的数据。例如一个client连接到服务器,服务器通过调用accept函数可以得到于这个client对应的socket文件描述符,可以把这文件描述符赋给epoll_data的fd字段,以便后面的读写操作在这个文件描述符上进行。

(2)epoll_create 
函数声明:int epoll_create(intsize) 
函数说明:该函数生成一个epoll专用的文件描述符,其中的参数是指定生成描述符的最大范围。

(3) epoll_ctl函数 
函数声明:int epoll_ctl(int epfd,int op, int fd, struct epoll_event *event) 
函数说明:该函数用于控制某个文件描述符上的事件,可以注册事件、修改事件、删除事件。 
epfd:由 epoll_create 生成的epoll专用的文件描述符; 
op:要进行的操作,可能的取值EPOLL_CTL_ADD 注册、EPOLL_CTL_MOD 修改、EPOLL_CTL_DEL 删除; 
fd:关联的文件描述符; 
event:指向epoll_event的指针; 
如果调用成功则返回0,不成功则返回-1。

(4) epoll_wait函数 
函数声明:int epoll_wait(int epfd, structepoll_event * events, int maxevents, int timeout) 
函数说明:该函数用于轮询I/O事件的发生。 
epfd:由epoll_create 生成的epoll专用的文件描述符; 
epoll_event:用于回传代处理事件的数组; 
maxevents:每次能处理的事件数; 
timeout:等待I/O事件发生的超时值; 
返回发生事件数。

7 设计思路及模板

首先通过create_epoll(int maxfds)来创建一个epoll的句柄,其中maxfds为你的epoll所支持的最大句柄数。这个函数会返回一个新的epoll句柄,之后的所有操作都将通过这个句柄来进行操作。在用完之后,记得用close()来关闭这个创建出来的epoll句柄。 
然后在你的网络主循环里面,调用epoll_wait(int epfd, epoll_event events, int max_events,int timeout)来查询所有的网络接口,看哪一个可以读,哪一个可以写。基本的语法为: 
nfds = epoll_wait(kdpfd, events, maxevents, -1); 
其中kdpfd为用epoll_create创建之后的句柄,events是一个epoll_event*的指针,当epoll_wait函数操作成功之后,events里面将储存所有的读写事件。max_events是当前需要监听的所有socket句柄数。最后一个timeout参数指示 epoll_wait的超时条件,为0时表示马上返回;为-1时表示函数会一直等下去直到有事件返回;为任意正整数时表示等这么长的时间,如果一直没有事件,则会返回。一般情况下如果网络主循环是单线程的话,可以用-1来等待,这样可以保证一些效率,如果是和主循环在同一个线程的话,则可以用0来保证主循环的效率。epoll_wait返回之后,应该进入一个循环,以便遍历所有的事件。 
对epoll 的操作就这么简单,总共不过4个API:epoll_create, epoll_ctl,epoll_wait和close。以下是man中的一个例子。

struct epoll_event ev, *events;
for(;;) 
{
   nfds = epoll_wait(kdpfd, events, maxevents, -1);    //等待IO事件
   for(n = 0; n < nfds; ++n)
   {
   //如果是主socket的事件,则表示有新连接进入,需要进行新连接的处理。
      if(events[n].data.fd == listener)
      {
         client = accept(listener, (struct sockaddr *) &local,  &addrlen);
         if(client < 0)
         {
            perror("accept error");
            continue;
         }

         // 将新连接置于非阻塞模式
         setnonblocking(client);
         ev.events = EPOLLIN | EPOLLET; 
         //注意这里的参数EPOLLIN | EPOLLET并没有设置对写socket的监听,
         //如果有写操作的话,这个时候epoll是不会返回事件的,
         //如果要对写操作也监听的话,应该是EPOLLIN | EPOLLOUT | EPOLLET。
         // 并且将新连接也加入EPOLL的监听队列
         ev.data.fd = client;
         // 设置好event之后,将这个新的event通过epoll_ctl
         if (epoll_ctl(kdpfd, EPOLL_CTL_ADD, client, &ev) < 0)
         {
            //加入到epoll的监听队列里,这里用EPOLL_CTL_ADD
            //来加一个新的 epoll事件。可以通过EPOLL_CTL_DEL来减少
            //一个epoll事件,通过EPOLL_CTL_MOD来改变一个事件的监听方式。
            fprintf(stderr, "epoll set insertion error: fd=%d"0, client);
            return -1;
         }
      }
      else
      // 如果不是主socket的事件的话,则代表这是一个用户的socket的事件,
      // 则用来处理这个用户的socket的事情是,比如说read(fd,xxx)之类,或者一些其他的处理。
         do_use_fd(events[n].data.fd);
   }
}

8 EPOLL模型的简单实例

服务器

#include <iostream>
#include <sys/socket.h> 
#include <sys/epoll.h>
#include <netinet/in.h> 
#include <arpa/inet.h>
#include <fcntl.h> 
#include <unistd.h> 
#include <stdio.h>

#define MAXLINE 10 
#define OPEN_MAX 100
#define LISTENQ 20
#define SERV_PORT 5555 
#define INFTIM 1000

void setnonblocking(int sock)
{
   int opts;
   opts = fcntl(sock, F_GETFL);
   if(opts < 0)
   {
      perror("fcntl(sock, GETFL)");
      exit(1);
   }
   opts = opts | O_NONBLOCK;
   if(fcntl(sock, F_SETFL, opts) < 0)
   {
      perror("fcntl(sock,SETFL,opts)");
      exit(1);
   }
}

int main()
{
   int i, maxi, listenfd, connfd, sockfd, epfd, nfds; 
   ssize_t n; 
   char line[MAXLINE];
   socklen_t clilen;
   //声明epoll_event结构体的变量, ev用于注册事件, events数组用于回传要处理的事件
   struct epoll_event ev,events[20];
   //生成用于处理accept的epoll专用的文件描述符, 指定生成描述符的最大范围为256 
   epfd = epoll_create(256);
   struct sockaddr_in clientaddr; 
   struct sockaddr_in serveraddr;

   listenfd = socket(AF_INET, SOCK_STREAM, 0);

   setnonblocking(listenfd);       //把用于监听的socket设置为非阻塞方式

   ev.data.fd = listenfd;          //设置与要处理的事件相关的文件描述符
   ev.events = EPOLLIN | EPOLLET;  //设置要处理的事件类型

   epoll_ctl(epfd, EPOLL_CTL_ADD, listenfd, &ev);     //注册epoll事件

   bzero(&serveraddr, sizeof(serveraddr)); 
   serveraddr.sin_family = AF_INET;
   char *local_addr = "200.200.200.204";
   inet_aton(local_addr, &(serveraddr.sin_addr));
   serveraddr.sin_port = htons(SERV_PORT);  //或者htons(SERV_PORT);

   bind(listenfd,(sockaddr *)&serveraddr, sizeof(serveraddr));

   listen(listenfd, LISTENQ);

   maxi = 0;
   for( ; ; )
   { 
      nfds = epoll_wait(epfd, events, 20, 500); //等待epoll事件的发生
      for(i = 0; i < nfds; ++i)                 //处理所发生的所有事件
      {
         if(events[i].data.fd == listenfd)      //监听事件
         {
            connfd = accept(listenfd, (sockaddr *)&clientaddr, &clilen); 
            if(connfd < 0)
            {
               perror("connfd<0");
               exit(1);
            }
            setnonblocking(connfd);           //把客户端的socket设置为非阻塞方式
            char *str = inet_ntoa(clientaddr.sin_addr);
            printf("connect from");

            ev.data.fd=connfd;                //设置用于读操作的文件描述符
            ev.events=EPOLLIN | EPOLLET;      //设置用于注测的读操作事件
            epoll_ctl(epfd, EPOLL_CTL_ADD, connfd, &ev);
            //注册ev事件
         }
         else if(events[i].events&EPOLLIN)      //读事件
         {
            if ( (sockfd = events[i].data.fd) < 0)
            {
               continue;
            }
            if ( (n = read(sockfd, line, MAXLINE)) < 0) // 这里和IOCP不同
            {
               if (errno == ECONNRESET)
               {
                  close(sockfd);
                  events[i].data.fd = -1; 
               }
               else
               {          
                  printf("readline error");
               }
            }
            else if (n == 0)
            {
               close(sockfd); 
               events[i].data.fd = -1; 
            }
            ev.data.fd=sockfd;              //设置用于写操作的文件描述符
            ev.events=EPOLLOUT | EPOLLET;   //设置用于注测的写操作事件 

            //修改sockfd上要处理的事件为EPOLLOUT
            epoll_ctl(epfd, EPOLL_CTL_MOD, sockfd, &ev);
         } 
         else if(events[i].events&EPOLLOUT)//写事件
         {
            sockfd = events[i].data.fd;
            write(sockfd, line, n);
            ev.data.fd = sockfd;               //设置用于读操作的文件描述符
            ev.events = EPOLLIN | EPOLLET;     //设置用于注册的读操作事件
            //修改sockfd上要处理的事件为EPOLIN
            epoll_ctl(epfd, EPOLL_CTL_MOD, sockfd, &ev);
         } 
      }
   }
}

9.epoll进阶思考

9.1. 问题来源 
最近学习EPOLL模型,介绍中说将EPOLL与Windows IOCP模型进行比较,说其的优势在于解决了IOCP模型大量线程上下文切换的开销,于是可以看出,EPOLL模型不需要多线程,即单线程中可以处理EPOLL逻辑。如果引入多线程反而会引起一些问题。但是EPOLL模型的服务器端到底可以不可以用多线程技术,如果可以,改怎么取舍,这成了困扰我的问题。上网查了一下,有这样几种声音: 
(1) “要么事件驱动(如epoll),要么多线程,要么多进程,把这几个综合起来使用,感觉更加麻烦。”; 
(2) “单线程使用epoll,但是不能发挥多核;多线程不用epoll。”; 
(3) “主通信线程使用epoll所有需要监控的FD,有事件交给多线程去处理”; 
(4) “既然用了epoll, 那么线程就不应该看到fd, 而只看到的是一个一个的业务请求/响应; epoll将网络数据组装成业务数据后, 转交给业务线程进行处理。这就是常说的半同步半异步”。 
我比较赞同上述(3)、(4)中的观点 
EPOLLOUT只有在缓冲区已经满了,不可以发送了,过了一会儿缓冲区中有空间了,就会触发EPOLLOUT,而且只触发一次。如果你编写的程序的网络IO不大,一次写入的数据不多的时候,通常都是epoll_wait立刻就会触发 EPOLLOUT;如果你不调用 epoll,直接写 socket,那么情况就取决于这个socket的缓冲区是不是足够了。如果缓冲区足够,那么写就成功。如果缓冲区不足,那么取决你的socket是不是阻塞的,要么阻塞到写完成,要么出错返回。所以EPOLLOUT事件具有较大的随机性,ET模式一般只用于EPOLLIN, 很少用于EPOLLOUT。 
9.2. 具体做法 
(1) 主通信线程使用epoll所有需要监控的FD,负责监控listenfd和connfd,这里只监听EPOLLIN事件,不监听EPOLLOUT事件; 
(2) 一旦从Client收到了数据以后,将其构造成一个消息,放入消息队列中; 
(3) 若干工作线程竞争,从消息队列中取出消息并进行处理,然后把处理结果发送给客户端。发送客户端的操作由工作线程完成。直接进行write。write到EAGAIN或EWOULDBLOCK后,线程循环continue等待缓冲区队列 
发送函数代码如下:
 

bool send_data(int connfd, char *pbuffer, unsigned int &len,int flag)
{
   if ((connfd < 0) || (0  == pbuffer))
   {
      return false;
   }

   int result = 0;
   int remain_size = (int) len;
   int send_size = 0;
   const char *p = pbuffer; 

   time_t start_time = time(NULL);
   int time_out = 3;

   do
   {
      if (time(NULL) > start + time_out)
      {
         return false;
      }

      send_size = send(connfd, p, remain_size, flag);
      if (nSentSize < 0)
      {
         if ((errno == EAGAIN) || (errno == EWOULDBLOCK) || (errno == EINTR))
         {
            continue;
         }
         else
         {
            len -= remain_size;
            return false;
         }
      }

      p += send_size;
      remain_size -= send_size;
   }while(remain_size > 0);

   return true;
}

10 epoll 实现服务器和客户端例子

最后我们用C++实现一个简单的客户端回射,所用到的代码文件是

net.h  server.cpp   client.cpp

服务器端:epoll实现的,干两件事分别为:1.等待客户端的链接,2.接收来自客户端的数据并且回射;

客户端:select实现,干两件事为:1.等待键盘输入,2.发送数据到服务器端并且接收服务器端回射的数据;

/***********
net.h
***********/
#include <stdio.h>

#ifndef _NET_H
#define _NET_H

#include <iostream>
#include <vector>
#include <algorithm>

#include <stdio.h>
#include <sys/types.h>
#include <sys/epoll.h>  //epoll ways file
#include <sys/socket.h>
#include <fcntl.h>    //block and noblock

#include <stdlib.h>
#include <error.h>
#include <unistd.h>
#include <arpa/inet.h>
#include <netinet/in.h>
#include <string.h>
#include <signal.h>

using namespace std;


#define hand_error(msg) do{perror(msg); exit(EXIT_FAILURE);}while(0)
#endif


/***********
server.c
***********/
#include "net.h"
#define MAX_EVENTS 10000

int setblock(int sock)
{
    int ret =  fcntl(sock, F_SETFL, 0);
    if (ret < 0 )
        hand_error("setblock");
    return 0;
}
int setnoblock(int sock)  //设置非阻塞模式
{
    int ret = fcntl(sock,  F_SETFL, O_NONBLOCK );
    if(ret < 0)
        hand_error("setnoblock");
    return 0;
}

int main()
{
    signal(SIGPIPE,SIG_IGN);
  int listenfd;
    listenfd = socket( AF_INET, SOCK_STREAM,0 );   //create a socket stream
    if( listenfd < 0 )
        hand_error( "socket_create");
    setnoblock(listenfd);
    int on = 1;
    if( setsockopt( listenfd, SOL_SOCKET, SO_REUSEADDR, &on, sizeof(on))< 0)
        hand_error("setsockopt");

    struct sockaddr_in my_addr;
    memset(&my_addr, 0, sizeof(my_addr));
    my_addr.sin_family = AF_INET;
    my_addr.sin_port = htons(18000);   //here is host  sequeue
    my_addr.sin_addr.s_addr = inet_addr("127.0.0.1");

    if( bind( listenfd, (struct sockaddr *)&my_addr, sizeof(my_addr)) < 0)
        hand_error("bind");

    int lisId = listen(listenfd, SOMAXCONN);
    if( lisId < 0)   //LISTEN
        hand_error("listen");

    struct sockaddr_in peer_addr;   //用来 save client addr
    socklen_t peerlen;  
    //下面是一些初始化,都是关于epoll的。
    vector<int> clients;
    int count = 0;
    int cli_sock = 0;
    int epfd = 0;  //epoll 的文件描述符
    int ret_events;  //epoll_wait()的返回值
  struct epoll_event ev_remov, ev, events[MAX_EVENTS];  //events 用来存放从内核读取的的事件
    ev.events = EPOLLET | EPOLLIN;   //边缘方式触发
    ev.data.fd = listenfd;

    epfd = epoll_create(MAX_EVENTS);   //create epoll,返回值为epoll的文件描述符
    //epfd = epoll_create(EPOLL_CLOEXEC);  //新版写法
    if(epfd < 0)
        hand_error("epoll_create");
    int ret = epoll_ctl(epfd, EPOLL_CTL_ADD, listenfd, &ev);   //添加时间
    if(ret < 0)
        hand_error("epoll_ctl");


    while(1)
    {
        ret_events = epoll_wait(epfd, events, MAX_EVENTS, -1);   //类似于select函数,这里是等待事件的到来。
        if(ret_events == -1)
        {
            cout<<"ret_events = "<<ret_events<<endl;
            hand_error("epoll_wait");
        }

        if( ret_events == 0)
        {
            cout<<"ret_events = "<<ret_events<<endl;
            continue;
        }

//      cout<<"ret_events = "<<ret_events<<endl;
        for( int num = 0; num < ret_events; num ++)
        {
            cout<<"num = "<<num<<endl;
            cout<<"events[num].data.fd = "<<events[num].data.fd<<endl;
            if(events[num].data.fd == listenfd) //client connect
            {
                cout<<"listen sucess and listenfd = "<<listenfd<<endl;
                cli_sock = accept(listenfd, (struct sockaddr*)&peer_addr, &peerlen);
                if(cli_sock < 0)
                    hand_error("accept");
                cout<<"count = "<<count++;
                printf("ip=%s,port = %d\n", inet_ntoa(peer_addr.sin_addr),peer_addr.sin_port);
                clients.push_back(cli_sock);
                setnoblock(cli_sock);   //设置为非阻塞模式
                ev.data.fd = cli_sock;// 将新连接也加入EPOLL的监听队列
                ev.events = EPOLLIN | EPOLLET ;
                if(epoll_ctl(epfd, EPOLL_CTL_ADD, cli_sock, &ev)< 0)
                    hand_error("epoll_ctl");
            }

            else if( events[num].events & EPOLLIN)
            {
                cli_sock = events[num].data.fd;
                if(cli_sock < 0)
                    hand_error("cli_sock");
                char recvbuf[1024];
                memset(recvbuf, 0 , sizeof(recvbuf));
                int num = read( cli_sock, recvbuf, sizeof(recvbuf));
                if(num == -1)
                    hand_error("read have some problem:");
                if( num == 0 )  //stand of client have exit
                {
                    cout<<"client have exit"<<endl;
                    close(cli_sock);
                    ev_remov = events[num];
                    epoll_ctl(epfd, EPOLL_CTL_DEL, cli_sock, &ev_remov);
                    clients.erase(remove(clients.begin(), clients.end(), cli_sock),clients.end());
                }
                fputs(recvbuf,stdout);
                write(cli_sock, recvbuf, strlen(recvbuf));
            }
        }
    }

    return 0;
}



/***********
client.c
***********/

#include "net.h"

int main()
{
    signal(SIGPIPE,SIG_IGN);
  int sock;
    sock = socket( AF_INET, SOCK_STREAM,0 );   //create a socket stream
    if( sock< 0 )
        hand_error( "socket_create");

    struct sockaddr_in my_addr;

    //memset my_addr;
    memset(&my_addr, 0, sizeof(my_addr));
    my_addr.sin_family = AF_INET;
    my_addr.sin_port = htons(18000);   //here is host sequeue
//  my_addr.sin_addr.s_addr = htonl( INADDR_ANY );
    my_addr.sin_addr.s_addr = inet_addr("127.0.0.1");

    int conn = connect(sock, (struct sockaddr *)&my_addr, sizeof(my_addr)) ;
    if(conn != 0)
        hand_error("connect");

    char recvbuf[1024] = {0};
    char sendbuf[1024] = {0};
    fd_set rset;
    FD_ZERO(&rset);     

    int nready = 0;
    int maxfd;
    int stdinof = fileno(stdin);
    if( stdinof > sock)
        maxfd = stdinof;
    else
        maxfd = sock;
    while(1)
    {
        //select返回后把原来待检测的但是仍没就绪的描述字清0了。所以每次调用select前都要重新设置一下待检测的描述字
        FD_SET(sock, &rset);  
        FD_SET(stdinof, &rset);
        nready = select(maxfd+1, &rset, NULL, NULL, NULL); 
        cout<<"nready = "<<nready<<"  "<<"maxfd = "<<maxfd<<endl;
        if(nready == -1 )
            break;
        else if( nready == 0)
            continue;
        else
        {
            if( FD_ISSET(sock, &rset) )  //检测sock是否已经在集合rset里面。
            {
                int ret = read( sock, recvbuf, sizeof(recvbuf));  //读数据
                if( ret == -1)
                    hand_error("read");
                else if( ret == 0)
                {
                    cout<<"sever have close"<<endl;
                    close(sock);
                    break;
                }
                else
                {
                    fputs(recvbuf,stdout);    //输出数据
                    memset(recvbuf, 0, strlen(recvbuf));
                }   
            }

            if( FD_ISSET(stdinof, &rset))   //检测stdin的文件描述符是否在集合里面
            {   
                if(fgets(sendbuf, sizeof(sendbuf), stdin) != NULL)
                {
                    int num = write(sock, sendbuf, strlen(sendbuf));   //写数据
                    cout<<"sent num = "<<num<<endl;
                    memset(sendbuf, 0, sizeof(sendbuf));
                }
            }
        }
    }
    return 0;
}