【3】IO复用:select和poll

Source

select 函数

该函数允许进程指示内核等待多个事件中的任何一个发生,并且只在有一个或者多个事件发生或者超时之后才唤醒它,函数原型如下:

#include <sys/select.h>
#include <sys/time.h>

/*
 * maxfdp: 待测试的描述符个数,是最大描述符个数+1,例如1,3,5的最大的描述符为5,那么要填充6
 * readfds: 读描述符,用不到设定为NULL
 * writefds: 写描述符,用不到设定为NULL
 * errorfds: 异常条件描述符,用不到设定为NULL
 * timeout: 告诉内核等待所指定描述符中任意一个就绪可以花费多少时间,NULL:永远等待,0:不等待,其他值:超时时间
 * 
 * 返回值:返回状态发生变化的描述符总数
*/
int select(int maxfdp, fd_set *readfds, fd_set *writefds, fd_set *errorfds, struct timeval* timeout);

fd_set 数据类型和操作方式

fd_set是一个描述符集合,通过下面的宏函数来操作即可:

void FD_ZERO(fd_set* fdset); // 清空fd_set集合
void FD_SET(int fd, fd_set* fdset); // 将描述符fd填充进fd_set集合中
void FD_CLR(int fd, fd_set* fdset); // 从fd_set集合中移除描述符fd
void FD_ISSET(int fd, fd_set* fdset); // 检查描述符fd是否在这个fd_set集合里面

描述符就绪的条件

低水平标记:标记可读写的最小的数据大小,例如64bit、32bit这样

可读的条件

  1. 套接字缓冲区中的数据字节数大于等于套接字接收缓冲区低水平标记的当前大小
  2. 连接的读半部关闭,读操作将不再阻塞,直接返回0
  3. 套接字为监听套接字且已完成的连接数不为0
  4. 有一个套接字错误待处理

可写的条件

  1. 套接字发送缓冲区的可用空间字节数大于等于套接字发送缓冲区低水平标记的当前大小
  2. 连接的写半部关闭,写操作将会产生SIGPIPE信号
  3. 使用非阻塞式connect的套接字已经建立连接,或者connect已经失败了
  4. 有一个套接字错误待处理

条件 可读吗? 可写吗? 异常吗?
有数据可读
关闭连接的读一半
给监听套接字准备好新连接
有可用于写的空间
关闭连接的写一半
待处理错误
TCP带外数据

shutdown 函数

shutdown与close函数功能一致,都是关闭网络连接,不同点如下:

  1. close是将描述符的引用-1,当计数变为0的时候关闭套接字,而shutdown不管引用计数
  2. close是终止读和写两个方向的通道,而shutdown是可选的,关闭读或者关闭写或者关闭读写都可以

函数原型如下:

#include <sys/socket.h>

/*
 * sockfd: 套接字描述符
 * howto: 行为定义,SHUT_RD:关闭读,SHUT_WR:关闭写,SHUT_RDWR:关闭读写
*/
int shutdown(int sockfd, int howto);

select 服务端程序

照着UNIX网络编程,套接字联网一书中的代码搞的,测试起来没有同时通讯的功能,因为会阻塞在read函数中,等我强大了,再回头来看吧。

#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <stdbool.h>
#include <unistd.h>
#include <netinet/in.h>
#include <sys/types.h>
#include <sys/socket.h>
#include <sys/time.h>
#include <arpa/inet.h>

static int g_client[FD_SETSIZE] = {
    
      -1};

static void do_msg_process(int accsocfd) {
    
      
    printf("do_msg_process enter!\n");
    int size = 0;
    char read_buffer[512] = {
    
      0};
    char write_buffer[1024] = {
    
      0};

    while (true) {
    
      
        memset(read_buffer, 0x00, sizeof(read_buffer));
        memset(write_buffer, 0x00, sizeof(write_buffer));

        size = read(accsocfd, read_buffer, 1024);
        printf("size: %d\n", size);
        if (size > 0) {
    
      
            printf("pid: %d, client say: %s\n", getpid(), read_buffer);

            snprintf(write_buffer, 1024, "%s%s", "[SERVER RECV]: ", read_buffer);
            size = write(accsocfd, write_buffer, strlen(write_buffer));
        } else {
    
      
            break;
        }
    }
    printf("do_msg_process exit!\n");
}

static void do_client_msg(int socketfd) {
    
      
    int i = 0;
    int nready = 0; // 发生变化的描述符总数
    int maxfd = socketfd; // 当前最大描述符
    int maxi = -1; // 当前描述符个数
    fd_set allset; // 所有的描述符集合
    fd_set rset; // 当前所用的描述符集合

    for (i = 0; i < FD_SETSIZE; i++) {
    
      
        g_client[i] = -1;
    }

    FD_ZERO(&allset);
    FD_SET(socketfd, &allset); // 将监听套接字设置进描述符集合

    while (true) {
    
      
        printf("start while()\n");

        // 开始等待监听套接字描述符发生变化,当发生变化的时候,意味着有客户端发送连接了
        rset = allset;
        nready = select(maxfd + 1, &rset, NULL, NULL, NULL);
        printf("nready: %d, maxfd: %d, maxi: %d\n", nready, maxfd, maxi);

        // 客户端连接过来了
        if (FD_ISSET(socketfd, &rset)) {
    
      
            printf("start accept\n");
            // 调用accept,服务器进入睡眠状态,等待客户的连接,完成3次握手,然后返回一个握手成功的描述符,新描述符用于与客户端通讯
            int accsocfd = accept(socketfd, NULL, NULL);

            // 将已连接套接字,放入到全局数组
            for (i = 0; i < FD_SETSIZE; i++) {
    
      
                if (g_client[i] < 0) {
    
      
                    g_client[i] = accsocfd;
                    break;
                }
            }

            // 超过最大支持连接数
            if (i == FD_SETSIZE) {
    
      
                printf("client is full!\n");
            }

            // 将已连接套接字,设置进描述符集合
            FD_SET(accsocfd, &allset);

            // 更改最大描述符和描述符数量
            maxfd = accsocfd > maxfd ? accsocfd : maxfd;
            maxi = i > maxi ? i : maxi;

            printf("end accept\n");
            // 连接操作完成,如果发送变化的描述符未处理完,则继续处理,否则开始读取数据部分
            if (--nready <= 0) {
    
      
                continue;
            }
        }

        // 根据上面accept连接的客户端数组,与其通讯
        printf("start read/write msg\n");
        for (i = 0; i <= maxi; i++) {
    
      
            int temp_sockfd = g_client[i];
            printf("nready:%d, temp_sockfd:%d\n", nready, temp_sockfd);
            if (temp_sockfd < 0) {
    
      
                continue;
            }

            // 当有可读的数据了
            if (FD_ISSET(temp_sockfd, &rset)) {
    
      
                do_msg_process(temp_sockfd);
                // 客户端断开了,关闭套接字,从描述符集合中移除
                close(temp_sockfd);
                FD_CLR(temp_sockfd, &allset);
                g_client[i] = -1;
            }

            // 所有待操作的客户端都操作完成
            if (--nready <= 0) {
    
      
                break;
            }
        }

        printf("end while()\n");
    }
}

int main(int argc, char const *argv[]) {
    
      
    // 调用socket函数来创建一个IPV4(AF_INET)字节流(SOCK_STREAM)套接字,返回标识符
    int socketfd = socket(AF_INET, SOCK_STREAM, IPPROTO_TCP);
    if (socketfd < 0) {
    
      
        perror("socket create failed!\n");
        return 0;
    }

    // 将服务器的IP地址和端口号填入sockaddr_t结构的变量中,地址族设定为AF_INET,IP地址和端口必须要是特地格式,IP地址使用INADDR_ANY表示任意IP
    struct sockaddr_in s_addr;
    memset(&s_addr, 0x00, sizeof(struct sockaddr_in));
    s_addr.sin_family = AF_INET;
    s_addr.sin_port = htons(6666);
    s_addr.sin_addr.s_addr = htonl(INADDR_ANY);

    // 调用bind函数绑定所创建的套接字
    bind(socketfd, (struct sockaddr*)&s_addr, sizeof(struct sockaddr));

    // 调用listen将套接字转换为监听套接字,这样外来连接可以通过该套接字被内核接受
    listen(socketfd, 10);

    // 处理客户端连接
    do_client_msg(socketfd);

    close(socketfd);
    return 0;
}


select 服务端运行结果

在这里插入图片描述


poll 函数

函数原型:

#include <poll.h>

/*
 * fdarray: pollfd结构体数组
 * nfds: 用来指定第一个参数数组元素个数
 * timeout: 指定超时等待的毫秒数,INFTIM(-1):永远等待,0:立即返回,其他数:等待的毫秒数
*/
int poll(struct pollfd *fdarray, unsigned long nfds, int timeout);

struct pollfd 结构体

/* struct pllfd 定义 
 * fd: 文件描述符
 * events: 等待的事件
 * revents: 实际发生的事件
*/
struct pollfd {
    
      
	int fd;	
	short events;
	short revents;
};

events和revents

常值 作为events的输入吗? 作为revents的结果吗 说明
POLLIN 普通或者优先级带数据可读
POLLRDNORM 普通数据可读
POLLRDBAND 优先级带数据可读
POLLPRI 高优先级数据可读
POLLOUT 普通数据可写
POLLWRNORM 普通数据可写
POLLWRBAND 优先级带数据可写
POLLERR 发生错误
POLLHUP 发生挂起
POLLNVAL 描述符不是一个打开的文件

poll 服务端程序

#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <stdbool.h>
#include <unistd.h>
#include <poll.h>
#include <netinet/in.h>
#include <sys/types.h>
#include <sys/socket.h>
#include <sys/time.h>
#include <arpa/inet.h>

#define CLIENT_MAX 10
static struct pollfd g_client[CLIENT_MAX] = {
    
      -1};

static void do_msg_process(int accsocfd) {
    
      
    printf("do_msg_process enter!\n");
    int size = 0;
    char read_buffer[512] = {
    
      0};
    char write_buffer[1024] = {
    
      0};

    while (true) {
    
      
        memset(read_buffer, 0x00, sizeof(read_buffer));
        memset(write_buffer, 0x00, sizeof(write_buffer));

        size = read(accsocfd, read_buffer, 1024);
        printf("size: %d\n", size);
        if (size > 0) {
    
      
            printf("pid: %d, client say: %s\n", getpid(), read_buffer);

            snprintf(write_buffer, 1024, "%s%s", "[SERVER RECV]: ", read_buffer);
            size = write(accsocfd, write_buffer, strlen(write_buffer));
        } else {
    
      
            break;
        }
    }
    printf("do_msg_process exit!\n");
}

static void do_client_msg(int socketfd) {
    
      
    int i = 0;
    int maxi = 0;
    int nready = 0;

    g_client[0].fd = socketfd;
    g_client[0].events = POLLIN;
    for (i = 1; i < CLIENT_MAX; i++) {
    
      
        g_client[i].fd = -1;
    }

    while (true) {
    
      
        printf("start while() maxi:%d\n", maxi);
        nready = poll(g_client, maxi + 1, -1);

        // 客户端连接过来了
        if (g_client[0].revents & POLLIN) {
    
      
            printf("start accept\n");
            // 调用accept,服务器进入睡眠状态,等待客户的连接,完成3次握手,然后返回一个握手成功的描述符,新描述符用于与客户端通讯
            int accsocfd = accept(socketfd, NULL, NULL);

            // 将已连接套接字,放入到全局数组
            for (i = 1; i < CLIENT_MAX; i++) {
    
      
                if (g_client[i].fd < 0) {
    
      
                    g_client[i].fd = accsocfd;
                    g_client[i].events = POLLIN;
                    break;
                }
            }

            // 超过最大支持连接数
            if (i == CLIENT_MAX) {
    
      
                printf("client is full!\n");
            }

            // 更改描述符数量
            maxi = i > maxi ? i : maxi;

            printf("end accept\n");
            // 连接操作完成,如果发送变化的描述符未处理完,则继续处理,否则开始读取数据部分
            if (--nready <= 0) {
    
      
                continue;
            }
        }

        // 根据上面accept连接的客户端数组,与其通讯
        printf("start read/write msg\n");
        for (i = 1; i <= maxi; i++) {
    
      
            int temp_sockfd = g_client[i].fd;
            printf("nready:%d, temp_sockfd:%d\n", nready, temp_sockfd);
            if (temp_sockfd < 0) {
    
      
                continue;
            }

            // 当有可读的数据了
            if (g_client[i].revents & (POLLIN | POLLERR)) {
    
      
                do_msg_process(temp_sockfd);
                // 客户端断开了,关闭套接字,从描述符集合中移除
                close(temp_sockfd);
                g_client[i].fd = -1;
            }

            // 所有待操作的客户端都操作完成
            if (--nready <= 0) {
    
      
                break;
            }
        }

        printf("end while()\n");
    }
}

int main(int argc, char const *argv[]) {
    
      
    // 调用socket函数来创建一个IPV4(AF_INET)字节流(SOCK_STREAM)套接字,返回标识符
    int socketfd = socket(AF_INET, SOCK_STREAM, IPPROTO_TCP);
    if (socketfd < 0) {
    
      
        perror("socket create failed!\n");
        return 0;
    }

    // 将服务器的IP地址和端口号填入sockaddr_t结构的变量中,地址族设定为AF_INET,IP地址和端口必须要是特地格式,IP地址使用INADDR_ANY表示任意IP
    struct sockaddr_in s_addr;
    memset(&s_addr, 0x00, sizeof(struct sockaddr_in));
    s_addr.sin_family = AF_INET;
    s_addr.sin_port = htons(6666);
    s_addr.sin_addr.s_addr = htonl(INADDR_ANY);

    // 调用bind函数绑定所创建的套接字
    bind(socketfd, (struct sockaddr*)&s_addr, sizeof(struct sockaddr));

    // 调用listen将套接字转换为监听套接字,这样外来连接可以通过该套接字被内核接受
    listen(socketfd, 10);

    // 处理客户端连接
    do_client_msg(socketfd);

    close(socketfd);
    return 0;
}


poll 服务端运行结果

在这里插入图片描述


客户端程序

#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <stdbool.h>
#include <unistd.h>
#include <netinet/in.h>
#include <sys/types.h>
#include <sys/socket.h>
#include <arpa/inet.h>

static void do_msg_process(int socketfd) {
    
      
    printf("do_msg_process enter!\n");
    int size = 0;
    char read_buffer[512] = {
    
      0};
    char write_buffer[1024] = {
    
      0};

    while(true) {
    
      
        memset(read_buffer, 0x00, sizeof(read_buffer));
        memset(write_buffer, 0x00, sizeof(write_buffer));

        printf("input:> ");
        fgets(write_buffer, 1024, stdin);

        size = write(socketfd, write_buffer, strlen(write_buffer));

        size = read(socketfd, read_buffer, 1024);
        printf("server say: %s\n", read_buffer);
    }
    printf("do_msg_process exit!\n");
}

int main(int argc, char const *argv[]) {
    
      
    // 利用socket函数来创建一个IPV4(AF_INET)字节流(SOCK_STREAM)套接字,返回标识符
    int socketfd = socket(AF_INET, SOCK_STREAM, IPPROTO_TCP);
    if (socketfd < 0) {
    
      
        perror("socket create failed!\n");
        return 0;
    }

    // 将服务器的IP地址和端口号填入sockaddr_t结构的变量中,地址族设定为AF_INET,IP地址和端口必须要是特地格式,使用htons转换端口,使用inet_pton转换IP地址
    struct sockaddr_in c_addr;
    memset(&c_addr, 0x00, sizeof(struct sockaddr_in));
    c_addr.sin_family = AF_INET;
    c_addr.sin_port = htons(6666);
    inet_pton(AF_INET, "127.0.0.1", &c_addr.sin_addr);

    // connet将和指定的服务器建立一个TCP连接
    int result = connect(socketfd, (struct sockaddr*)&c_addr, sizeof(struct sockaddr_in));
    if (result < 0) {
    
      
        perror("connect failed!\n");
        return 0;
    }

    // 处理消息
    do_msg_process(socketfd);

    close(socketfd);
    return 0;
}