协程学习笔记

协程

概念

什么是协程
https://stackoverflow.com/questions/553704/what-is-a-coroutine
协程与线程的区别
https://stackoverflow.com/questions/1934715/difference-between-a-coroutine-and-a-thread
并行与并发的区别
https://stackoverflow.com/questions/1050222/what-is-the-difference-between-concurrency-and-parallelism
Python3.5 协程原理
https://snarky.ca/how-the-heck-does-async-await-work-in-python-3-5/
https://github.com/xitu/gold-miner/blob/master/TODO/how-the-heck-does-async-await-work-in-python-3-5.md
协程和并发的一门有趣课程
http://www.dabeaz.com/coroutines/

代码学习

使用生成器来实现一个并发网络应用程序(链接里最底下那篇很长的代码,不搬运了)
https://python3-cookbook.readthedocs.io/zh_CN/latest/c12/p12_using_generators_as_alternative_to_threads.html
看不懂就背,默写,我就是这么过来的

站在巨人肩膀

  1. 调度器的实现,可参考epoll+reactor模型
  2. yield和send()的配合真的抽象,需熟悉二者的参数和返回值是怎么回事!让协程跑起来必须先send(None),或者next()
  3. 实现了NewTask类,方便添加新的task
  4. 实现了TimeWait类,用的是最小堆,为了防止task的添加顺序混乱,元组成员是(超时绝对时间,自增id,task)
  5. 发现个bug,新增将_ioloop()作为task加入到_ready中
from collections import deque
import heapq
import socket
import select
import time

class YieldEvent:
    def handle_yield(self,sched,task):
        pass
    def handle_resume(self,sched,task):
        pass

class Scheduler:
    def __init__(self) -> None:
        self._numtasks = 0
        self._ready = deque()
        self._waiting = {}
        self._timer_id = 0
        self._timer = []
        self._epoll = select.epoll()
        pass

    def _ioloop(self):
        while True:
            timeout = None
            now = time.time()
            while self._timer and self._timer[0][0] <= now:
                t,_,task = heapq.heappop(self._timer)
                self.add_ready(task)
                timeout = 0
            if timeout is None:
                if self._ready:
                    timeout = 0
                elif self._timer:
                    timeout = self._timer[0][0] - now
                
            events = self._epoll.poll(timeout = timeout)
            for fileno,event in events:
                # if events&select.EPOLLIN:
                # if events&select.EPOLLOUT:
                evt,task = self._waiting.pop(fileno)
                self._epoll.unregister(fileno)
                evt.handle_resume(self,task)
            yield 
        pass

    def new(self,task):
        self._ready.append((task,None))
        self._numtasks += 1
    
    def add_ready(self,task,msg=None):
        self._ready.append((task,msg))
    
    def _read_wait(self,fileno,evt,task):
        self._waiting[fileno] = (evt,task)
        self._epoll.register(fileno,select.EPOLLIN)
    
    def _write_wait(self,fileno,evt,task):
        self._waiting[fileno] = (evt,task)
        self._epoll.register(fileno,select.EPOLLOUT)
    
    def _time_wait(self,seconds,task):
        heapq.heappush(self._timer,(time.time() + seconds,self._timer_id,task))
        self._timer_id += 1
        pass

    def run(self):
        self.new(self._ioloop())
        while self._numtasks:
            task,msg = self._ready.popleft()
            try:
                evt = task.send(msg)
                if isinstance(evt,YieldEvent):
                    evt.handle_yield(self,task)
                elif evt is None:
                    self.add_ready(task)
                    pass
                else:    
                    raise RuntimeError(f'unrecognized yield event {type(evt)}')
            except StopIteration:
                self._numtasks-=1
            pass

class ReadSocket(YieldEvent):
    def __init__(self,sock,nbytes) -> None:
        self.sock = sock
        self.nbytes = nbytes
    def handle_yield(self,sched, task):
        sched._read_wait(self.sock.fileno(),self,task)
    def handle_resume(self, sched, task):
        d = self.sock.recv(self.nbytes)
        sched.add_ready(task,d)

class WriteSocket(YieldEvent):
    def __init__(self,sock,data) -> None:
        self.sock = sock
        self.data = data
    def handle_yield(self, sched, task):
        sched._write_wait(self.sock.fileno(),self,task)
    def handle_resume(self, sched, task):
        nsent = self.sock.send(self.data)
        sched.add_ready(task,nsent)

class AcceptSocket(YieldEvent):
    def __init__(self,sock) -> None:
        self.sock = sock
    def handle_yield(self, sched, task):
        sched._read_wait(self.sock.fileno(),self,task)
    def handle_resume(self, sched, task):
        r = self.sock.accept()
        sched.add_ready(task,r)

class NewTask(YieldEvent):
    def __init__(self,task) -> None:
        self.task = task
    def handle_yield(self, sched, task):
        sched.new(self.task)
        sched.add_ready(task)
    def handle_resume(self, sched, task):
        pass

class TimeWait(YieldEvent):
    def __init__(self,seconds) -> None:
        self.seconds = seconds
    def handle_yield(self, sched, task):
        sched._time_wait(self.seconds,task)
        pass

class Socket:
    def __init__(self,sock) -> None:
        self.sock = sock
    def recv(self,nbytes):
        return ReadSocket(self.sock,nbytes)
    def send(self,data):
        return WriteSocket(self.sock,data)
    def accept(self):
        return AcceptSocket(self.sock)
    def __getattr__(self, name: str):
        return getattr(self.sock,name)

def server(addr):
    sock = Socket(socket.socket(socket.AF_INET,socket.SOCK_STREAM,0))
    sock.bind(addr)
    sock.listen(5)
    while True:
        c,a = yield sock.accept()
        yield NewTask(handle_client(Socket(c),a))

def handle_client(sock,addr):
    print(f'accept {addr}')
    while True:
        data = yield sock.recv(1024)
        if not data:
            break
        yield TimeWait(1)
        yield sock.send(data)
        
    print(f'client close {addr}')
    sock.close()

if __name__ == '__main__':
    sched = Scheduler()
    sched.new(server(('',8080)))
    sched.run()

未完待续

c可以采用ucontext实现协程,可参考风云写的代码
https://github.com/cloudwu/coroutine/

我先研究研究…