第15章 高并发服务器编程(1)_非阻塞I/O模型
阅读原文时间:2024年06月05日阅读:1

1. 高性能I/O

(1)通常,recv函数没有数据可用时会阻塞等待。同样,当socket发送缓冲区没有足够多空间来发送消息时,函数send会阻塞。

(2)当socket在非阻塞模式下,这些函数不会阻塞,如果发送/接收缓冲区没有数据时,调用会失败并设置errno为EWOULDBLOCK或EAGAIN。

(3)可以调用fcntl函数实现非阻塞式I/O或调用select实现I/O多路复用以提高使用I/O而出现的效率问题。

2. 非阻塞I/O模型: fcntl函数

【编程实验】echo服务器(非阻塞IO方式实现)

(1)主线程创建一个服务于所有客户端的子线程。

(2)主线程调用accept与客户端建立连接,并将新的socket设置为非阻塞方式。然后将这个新的socket放入数组中。

(3)利用一个子线程遍历(轮询)数组中各个socket,并调用read/write(非阻塞式)与客户端进行通信。

//vector_fd.h

#ifndef __VECTOR_H__
#define __VECTOR_H__

#include

//用于存放sock的动态数组(线程安全!)
typedef struct{
int *fd;
int counter; //元素个数
int max_counter;//最多存数个数,会动态增长
pthread_mutex_t mutex;
}VectorFD, *PVectorFD;

//动态数组相关的操作函数
extern VectorFD* create_vector_fd(void);
extern void destroy_vector_fd(VectorFD* vfd);
extern int get_fd(VectorFD* vfd, int index);
extern void remove_fd(VectorFD* vfd, int fd);
extern void add_fd(VectorFD* vfd, int fd);

#endif

//vector_fd.c  //动态数组操作函数

#include "vector_fd.h"
#include
#include
#include

//查找指定fd在数组中的索引值
static int indexof(VectorFD* vfd, int fd)
{
int ret = -;

int i=;  
for(; i<vfd->counter; i++){  
    if(vfd->fd\[i\] == fd){  
        ret = i;  
        break;  
    }  
}

return ret;  

}

//数组空间的动态增长
static void encapacity(VectorFD* vfd)
{
if(vfd->counter >=vfd->max_counter){
int* fds = (int*)calloc(vfd->counter + , sizeof(int));
assert(fds != NULL);
memcpy(fds, vfd->fd, sizeof(int) * vfd->counter);

    free(vfd->fd);  
    vfd->fd = fds;  
    vfd->max\_counter += ;  
}  

}

//动态数组相关的操作
VectorFD* create_vector_fd(void)
{
VectorFD* vfd = (VectorFD*)calloc(, sizeof(VectorFD));
assert(vfd != NULL);

//分配存放fd的数组空间  
vfd->fd = (int\*)calloc(, sizeof(int));  
assert(vfd->fd != NULL);

vfd->counter = ;  
vfd->max\_counter = ;

//对互斥锁进行初始化  
pthread\_mutex\_init(&vfd->mutex, NULL);

return vfd;  

}

void destroy_vector_fd(VectorFD* vfd)
{
assert(vfd != NULL);
//销毁互斥锁
pthread_mutex_destroy(&vfd->mutex);

free(vfd->fd);  
free(vfd);  

}

int get_fd(VectorFD* vfd, int index)
{
int ret = ;
assert(vfd != NULL);

pthread\_mutex\_lock(&vfd->mutex);

if(( <= index) && (index < vfd->counter)){  
    ret = vfd->fd\[index\];  
}

pthread\_mutex\_unlock(&vfd->mutex);

return ret;  

}

void remove_fd(VectorFD* vfd, int fd)
{
assert(vfd != NULL);

pthread\_mutex\_lock(&vfd->mutex);

int index = indexof(vfd, fd);

if(index >= ){  
    int i = index;  
    for(; i<vfd->counter-; i++){  
         vfd->fd\[i\] = vfd->fd\[i+\];  
    }

    vfd->counter--;  
}

pthread\_mutex\_unlock(&vfd->mutex);  

}

void add_fd(VectorFD* vfd, int fd)
{
assert(vfd != NULL);

encapacity(vfd);  
vfd->fd\[vfd->counter++\] = fd;  

}

//echo_tcp_server_fcntl.c

#include
#include
#include
#include
#include
#include
#include
#include
#include
#include
#include "vector_fd.h"
#include

/*基于非阻塞IO的高并发服务器编程
测试:telnet 127.0.0.1 xxxx
http://xxx.xxx.xxx.xxx:端口号
注意:演示时可关闭服务器的防火墙,防火墙口被过滤
#service iptables status 查看防火墙
#service iptables stop 关闭防火墙
*/

VectorFD* vfd;
int sockfd;
int bStop = ;

void sig_handler(int signo)
{
if(signo == SIGINT){
bStop = ;
printf("server close\n");
exit();
}
}

void out_addr(struct sockaddr_in* clientAddr)
{
char ip[];
memset(ip, , sizeof(ip));
int port = ntohs(clientAddr->sin_port);
inet_ntop(AF_INET, &clientAddr->sin_addr.s_addr, ip, sizeof(ip));

printf("%s(%d) connnected!\\n", ip, port);  

}

/*服务程序
* fd对应于某个连接的客户端,和某一个连接的客户端进行双向通信(非阻塞方式)
*/
void do_service(int fd)
{
/*服务端和客户端进行读写操作(双向通信)*/
char buff[];

memset(buff, , sizeof(buff));  
size\_t size = read(fd, buff, sizeof(buff));

//读取客户端发送过来的消息  
//由于采用非阻塞方式,若读不到数据直接返回了,直接服务于下一个客户端  
//因此不需要判断size小于0的情况。  
if(size == ){  //客户端己关闭连接  
    char info\[\] = "client close\\n";  
    write(STDOUT\_FILENO, info, sizeof(info));

    //将fd从动态数组中删除  
    remove\_fd(vfd, fd);  
    close(fd);  
}else if(size > ){  
    write(STDOUT\_FILENO, buff, sizeof(buff));//显示客户端发送的消息  
    //写回客户端(回显功能)  
    if(write(fd, buff, sizeof(buff)) != size){  
        if(errno == EPIPE){  
            //如果客户端己被关闭(相当于管道的读端关闭),会产生SIGPIPE信号  
            //并将errno设置为EPIPE  
            perror("write error");  
            remove\_fd(vfd, fd);  
            close(fd);  
        }  
    }  
}  

}

//线程函数
void* th_fn(void* arg)
{
int i= ;
//轮询动态数组中的socket描述符
while(!bStop){
for(i=; icounter; i++){
do_service(get_fd(vfd, i));
}
}

return (void\*);  

}

int main(int argc, char* argv[])
{
if(argc < ){
printf("usage: %s port\n", argv[]);
exit();
}

//按ctrl-c时中止服务端程序  
if(signal(SIGINT, sig\_handler) == SIG\_ERR){  
    perror("signal sigint error");  
    exit();  
}

/\*步骤1:创建socket(套接字)  
 \*注:socket创建在内核中,是一个结构体  
 \*AF\_INET:IPv4  
 \*SOCK\_STREAM:tcp协议  
 \*/  
sockfd = socket(AF\_INET, SOCK\_STREAM, );

/\*步骤2:将sock和地址(包括ip、port)进行绑定\*/  
struct sockaddr\_in servAddr; //使用专用地址结构体  
memset(&servAddr, , sizeof(servAddr));  
//往地址中填入ip、port和Internet地址族类型  
servAddr.sin\_family = AF\_INET;//IPv4  
servAddr.sin\_port = htons(atoi(argv\[\])); //port  
servAddr.sin\_addr.s\_addr = INADDR\_ANY; //任一可用的IP

if(bind(sockfd, (struct sockaddr\*)&servAddr, sizeof(servAddr)) < ){  
    perror("bind error");  
    exit();  
}

/\*步骤3:调用listen函数启动监听  
 \*       通知系统去接受来自客户端的连接请求  
 \*/  
if(listen(sockfd, ) < ){  //队列中最多允许10个连接请求  
    perror("listen error");  
    exit();  
}

//创建放置套接字描述符的动态数组  
vfd = create\_vector\_fd();

//设置线程的分离属性  
pthread\_t  th;  
pthread\_attr\_t attr;  
pthread\_attr\_init(&attr);  
pthread\_attr\_setdetachstate(&attr, PTHREAD\_CREATE\_DETACHED);  
//启动子线程  
int err;  
if((err = pthread\_create(&th, &attr, th\_fn, (void\*))) != ){  
    perror("pthread create error");  
    exit();  
}  
pthread\_attr\_destroy(&attr);

/\*(1)主线程获得客户端连接,将新的socket描述符放置到动态数组中  
 \*(2)子线程负责遍历动态数组中socket描述符并和对应的客户端进行  
 \*   双向通信(采用非阻塞方式读写)  
 \*/

struct sockaddr\_in clientAddr;  
socklen\_t len = sizeof(clientAddr);

while(!bStop){  
    /\*步骤4:调用accept函数,从请求队列中获取一个连接  
     \*       并返回新的socket描述符  
     \* \*/  
    int fd = accept(sockfd, (struct sockaddr\*)&clientAddr, &len);

    if(fd < ){  
        perror("accept error");  
        continue;  
    }

    //将读写修改为非阻塞方式  
    int val;  
    fcntl(fd, F\_GETFL, &val);  
    val |= O\_NONBLOCK; //非阻塞式  
    fcntl(fd, F\_SETFL, val);

    //输出客户端信息  
    out\_addr(&clientAddr);

    //将返回的新socket描述符加入到动态数组中  
    add\_fd(vfd, fd);  
}

close(sockfd);  
destroy\_vector\_fd(vfd);

return ;  

}
/*输出结果
* [root@localhost 15.AdvNet]# gcc -o bin/echo_tcp_client src/echo_tcp_client.c
* [root@localhost 15.AdvNet]# bin/echo_tcp_server_fcntl 8888
* 127.0.0.1(40694) connnected!
* abcdefaadeafcdafacdaegadeageadfacadegadaddeagdafddeagd^Cserver close
*/

//echo_tcp_client.c

#include
#include
#include
#include
#include
#include

int main(int argc, char* argv[])
{
if(argc < ){
printf("usage: %s ip port\n", argv[]);
exit();
}

/\*步骤1: 创建socket(套接字)\*/  
int sockfd = socket(AF\_INET, SOCK\_STREAM, );  
if(sockfd < ){  
    perror("socket error");  
}

//往servAddr中填入ip、port和地址族类型  
struct sockaddr\_in servAddr;  
memset(&servAddr, , sizeof(servAddr));  
servAddr.sin\_family = AF\_INET;  
servAddr.sin\_port = htons(atoi(argv\[\]));  
//将ip地址转换成网络字节序后填入servAdd中  
inet\_pton(AF\_INET, argv\[\], &servAddr.sin\_addr.s\_addr);

/\*步骤2: 客户端调用connect函数连接到服务器端\*/  
if(connect(sockfd, (struct sockaddr\*)&servAddr, sizeof(servAddr)) < ){  
    perror("connect error");  
    exit();  
}

/\*步骤3: 调用自定义的协议处理函数和服务端进行双向通信\*/  
char buff\[\];  
size\_t size;  
char\* prompt = ">";

while(){  
    memset(buff, , sizeof(buff));  
    write(STDOUT\_FILENO, prompt, );  
    size = read(STDIN\_FILENO, buff, sizeof(buff));  
    if(size < ) continue;

    buff\[size-\] = '\\0';  
    //将键盘输入的内容发送到服务端  
    if(write(sockfd, buff, sizeof(buff)) < ){  
        perror("write error");  
        continue;  
    }else{  
        memset(buff, , sizeof(buff));  
        //读取来自服务端的消息  
        if(read(sockfd, buff, sizeof(buff)) < ){  
            perror("read error");  
            continue;  
        }else{  
            printf("%s\\n", buff);  
        }  
    }  
}

/\*关闭套接字\*/  
close(sockfd);  

}
/*输出结果
*[root@localhost 15.AdvNet]# bin/echo_tcp_client 127.0.0.1 8888

abcdef
abcdef
aade
aade
afcdaf
afcdaf
acdaeg
acdaeg
^C
*/

手机扫一扫

移动阅读更方便

阿里云服务器
腾讯云服务器
七牛云服务器

你可能感兴趣的文章