python-高级编程-05-异步IO
阅读原文时间:2023年07月15日阅读:1

【异步非阻塞IO】

-------------------------------------------------------------------------------------------------------------------

小明和小强是笔友,他们通过有邮件的方式联系,小明发一封,小强回一封

邮差有点时候天气好,早上发出的信件,晚上就能收的到,然后有的时候遇到意外,

可能好几天都不能收到,小强就在邮箱前面等啊等,一直等到天荒地老

cont =1

mailbox = xxxxx

while 1:

  mail = mailbox.cleck()

  #look mail

  sleep(cont)

但是这样有个问题

cont的大小 如果小了就非常占用cpu 设置太大就效率低

我们能如果节约呢?

小强在邮箱上放了个旗子,让邮递员放好了信就把上面的旗子立起来,然后小强知道了 就去拿信

这样小强就能在等信的时候做其他的事情了。

这就是异步io的思想。

【select pool   epoll 】

---------------------------------------------------------------------------------------------------------------

随时小强的人脉越来越广,他就搞了很多邮箱,一个邮箱对应一个人,然后上面都放好旗子

select 就相当于循环的检查邮箱上面的旗子,一有旗子立起来的话就通知小强,但是他的限制只有1024个(这里的1024 在操作系统里面值得并不是同时打开文件描述符的个数,而是号,当文件描述符超过了这个号,即便是前头的都关闭了,还是不能够增加新的)

pool 跟select的区别就是去掉了1024的限制,但是如果连接增多那么他还是会面的很慢。因为没次循环都要对所有邮箱进行检查。

[epoll] -- epoll 是linux内核的可扩展I/O事件通知机制,特点就是让需要大量操作文件描述符的程序得以 更优异的性能。

select和poll的时间复杂度是O(n) 而epoll 是O(1)

这里的e 值得event 事件

说白了就是 给每个邮箱编号然后改造成电子邮箱,一但有邮件过来了,立马会在手机app上面显示 xxx号邮箱 有邮件啦!!!

【epoll的ET和LT】

---------------------------------------------------------------------------------------------------------------

ET:边缘触发  来了信,手机上面只响一下

LT:水平触发 来了信,手机一直响,一直到你打开app处理

ET在处理的时候,如果处理方式不够谨慎 ,很容易造成消息丢失

epoll 默认是 LT

######################################################################

#!/usr/bin/env python
#coding:utf-8

###################
#from:Rico.xia #
#time:2017-08-19 #
###################

import socket
import time
import select
import pdb

#############################################
class STATE:
def __init__(self):
self.state = 'accept'
self.have_read = 0
self.need_read = 5
self.have_write = 0
self.need_write = 5
self.buff_write = ""
self.buff_read = ""
self.sock_obj = ""

##############################################
class nbNetBase:
def setFd(self,sock):

    #实例化STATE() 并初始化参数  
    tmp\_state = STATE()

    #将socket对象存到实例化的对象的字段中  
    tmp\_state.sock\_obj = sock

    #获取这个socket的文件描述符,并作为key,value 为实例化的状态(STATE)对象。  
    self.conn\_state\[sock.fileno()\]=tmp\_state

def accept(self,fd):

    #dbpPrint("\\n --accept start")  
    #获取sock对象  
    sock = self.conn\_state\[fd\].sock\_obj  
    #获取客户端的sock对象,和ip  
    conn,addr  = sock.accept()  
    #设置为非阻塞状态  
    conn.setblocking(0)  
    #返回  
    return conn

def close(self,fd):  
    print 'closeing'  
    try:  
        sock = self.conn\_state\[fd\].sock\_obj  
        sock.close()  
    finally:  
        self.epoll\_sock.unregister(fd)  
        self.conn\_state.pop(fd)

def read(self,fd):  
    print 'readng'  
    try:  
        #获取客户端的sock信息  
        sock\_state = self.conn\_state\[fd\]  
        conn  = sock\_state.sock\_obj  
        #防止宇宙射线导致的字符反转  
        if sock\_state.need\_read <=0:  
            raise socket.error  
        #第一读取,读取之前定义好的需要读取的字节数  
        one\_read = conn.recv(sock\_state.need\_read)  
        #如果读取的信息为0就抛出异常  
        if len(one\_read) == 0:  
            raise socket.error  
        #判断开头是不是回车 在telnet里面敲击一次回车会发送数据如果没有输入就是 \\r\\n  
        if one\_read\[0:2\] == "\\r\\n":  
            one\_read = one\_read\[2:\]  
        #buff里面缓存住读取内容  
        sock\_state.buff\_read += one\_read  
        #已经读了的加上去读取的字节数  
        sock\_state.have\_read += len(one\_read)  
        #需要读取的剪掉读取的字节数  
        sock\_state.need\_read -= len(one\_read)  
        #sock\_state.printState()

        #这里如果我们已经读了5个字节的头部之后 我们需要去读取需要出的处理的数据  
        #我们的协议是 00003abc -->00003cbc 我们通过头部知道需要读多少处理数据  
        if sock\_state.have\_read == 5:  
            header\_said\_need\_read = int(sock\_state.buff\_read)  
            if header\_said\_need\_read <= 0:  
                raise socket.error  
            sock\_state.need\_read += header\_said\_need\_read  
            sock\_state.buff\_read = ""  
            #如果满足条件 返回 readcontent  取阅读内容  
            return "readcontent"  
        elif sock\_state.need\_read == 0:  
            #如果需要读取的已经读完了 那么我们处理数据  
            return "process"

        else:  
            return "readmore"

    except (socket.error,ValueError),msg:  
        try:  
            if msg.error == 11:  
                return "retry"  
        except:  
            pass  
        return "closing"

def write(self,fd):  
    #跟read方法类似  
    sock\_state = self.conn\_state\[fd\]  
    conn = sock\_state.sock\_obj  
    last\_have\_send = sock\_state.have\_write  
    try:  
        have\_send  = conn.send(sock\_state.buff\_write\[last\_have\_send:\])  
        sock\_state.have\_write += have\_send  
        sock\_state.need\_write -= have\_send  
        if sock\_state.need\_write == 0 and sock\_state.have\_write != 0:  
            return "writecomplete"  
        else:  
            return "writemore"  
    except socket.error,msg:  
        return "closing"

def run(self):  
    while True:  
        # poll()返回的epoll\_list就是有事件发生的fd的list  
        # 需要在循环中按照event的类型分别处理,一般分为以下几种类型  
        #  EPOLLIN :表示对应的文件描述符可以读;  
        #  EPOLLOUT:表示对应的文件描述符可以写;  
        #  EPOLLPRI:表示对应的文件描述符有紧急的数据可读;一般不需要特殊处理  
        #  EPOLLERR:表示对应的文件描述符发生错误;后面这两种需要关闭socket  
        #  EPOLLHUP:表示对应的文件描述符被挂断  
        #我们这里出现 EPOLLERR 和 EPOLLERR 情况将改变成'closing' 然后将fd扔到状态机中  
        epoll\_list = self.epoll\_sock.poll()  
        print epoll\_list  
        for fd,events in epoll\_list:  
            #dbgPrint("\\n --epoll return fd:%d.event:%s"%(fd,events))  
            sock\_state = self.conn\_state\[fd\]  
            if select.EPOLLHUP & events:  
                sock\_state.state='closing'  
            elif select.EPOLLERR & events:  
                sock\_state.state = 'closing'  
            self.state\_machine(fd)

def state\_machine(self,fd):  
    #dbgPrint('\\n - state machine:fd:%d,status:%s'%(fd,self.conn\_state\[fd\].state))  
    print 'machine is use'  
    ##根据连接状态做对应处理  
    #获取状态对象  
    sock\_state =self.conn\_state\[fd\]  
    #根据字典sm里的处理方法处理其对应的状态  
    self.sm\[sock\_state.state\](fd)

class nbNet(nbNetBase):
def __init__(self,addr,port,logic):
dbgPrint("\n __init__:start !")
#定义一个空字典
self.conn_state = {}
# 初始化监听socket socket.AF_INET指的是以太网 socket.SOCK_STREAM指的是TCP
self.listen_sock = socket.socket(socket.AF_INET,socket.SOCK_STREAM)
#设置socket 开启SO_REUSEADDR,这样当监听端口处于各种xxx_WAIT的状态的时候 也可以listen、bind
self.listen_sock.setsockopt(socket.SOL_SOCKET,socket.SO_REUSEADDR,1)
#绑定端口
self.listen_sock.bind((addr,port))
# 指定backlog数
self.listen_sock.listen(10)
##################################
#设置文件描述符的相关信息
self.setFd(self.listen_sock)
#实例化epool对象
self.epoll_sock = select.epoll()
#将文件描述符传给epoll 并告诉它只只关注EPOLLIN,即connect过来的连接
self.epoll_sock.register(self.listen_sock.fileno(),select.EPOLLIN)
#初始化方法数据
self.logic = logic
#初始化逻辑处理数据,告诉状态机如果程序在某一状态应该用什么方法处理
self.sm = {"accept":self.accept2read,
'read':self.read2process,
"write":self.write2read,
"process":self.process,
'closing':self.close}

def accept2read(self,fd):  
    #获取客户端socket对象  
    conn = self.accept(fd)  
    #将客户端也注册一次epoll,监听EPOLLIN状态,也就当客户端有数据的时候  
    self.epoll\_sock.register(conn.fileno(),select.EPOLLIN)  
    #注册客户端fd  
    self.setFd(conn)  
    #初始化设置为状态为read  
    self.conn\_state\[conn.fileno()\].state = 'read'

def read2process(self,fd):  
    read\_ret = ""  
    try:  
        #去读取  
        read\_ret = self.read(fd)  
    except(Exception),msg:  
        #dbgPrint(msg)  
        read\_ret = 'closing'

    #我们已经读取头部了根据read方法的返回做各种处理  
    if read\_ret == 'process':  
        self.process(fd)  
    elif read\_ret == "readcontent":pass  
    elif read\_ret == 'readmore':pass  
    elif read\_ret == 'retry':pass  
    elif read\_ret == 'closing':  
        self.conn\_state\[fd\].state = 'closing'  
        self.state\_machine(fd)  
    else:  
        raise Exception('impossible state returned by self.read')  
def process(self,fd):  
    #获取socket 对象  
    sock\_state = self.conn\_state\[fd\]  
    #通过传入的logic方法得到要返回给客户端的值  
    response = self.logic(sock\_state.buff\_read)  
    sock\_state.buff\_write = "%05d%s" %(len(response),response)  
    sock\_state.need\_write = len(sock\_state.buff\_write)  
    sock\_state.state = 'write'  
    self.epoll\_sock.modify(fd,select.EPOLLOUT)  
def write2read(self,fd):  
    try:  
        write\_ret = self.write(fd)  
    except socket.error,msg:  
        write\_ret = 'closing'  
    if write\_ret == 'writemore':  
        pass  
    elif write\_ret == 'writecomplete':  
        sock\_state = self.conn\_state\[fd\]  
        conn = sock\_state.sock\_obj  
        self.setFd(conn)  
        self.conn\_state\[fd\].state = 'read'  
        self.epoll\_sock.modify(fd,select.EPOLLIN)  
    elif write\_ret == 'cldsing':  
        self.conn\_state\[fd\].state = 'closing'  
        self.state\_machine(fd)

if __name__ == '__main__':
def logic(d_in):
return (d_in[::-1])
#将参数传到nbNet类并且实例化,监听本地 6789 端口
reverseD = nbNet('0.0.0.0',6789,logic)
#执行run函数
reverseD.run()

手机扫一扫

移动阅读更方便

阿里云服务器
腾讯云服务器
七牛云服务器

你可能感兴趣的文章