from greenlet import greenlet import traceback from collections import deque class Scheduler(object): def __init__(self): self.running = None self.rgtasklet = [] self.g = None def schedule(self, tasklet, *args): tasklet.args = args tasklet.scheduled = True self.rgtasklet.append(tasklet) def kill(self, tasklet): if not tasklet.g.dead: tasklet.g.parent = greenlet.getcurrent() tasklet.g.throw() try: self.rgtasklet.remove(tasklet) except: pass def run(self): self.g = greenlet.getcurrent() while len(self.rgtasklet) > 0: tasklets = self.rgtasklet self.rgtasklet = [] for tasklet in tasklets: try: self.running = tasklet args = tasklet.args tasklet.args = None tasklet.scheduled = False tasklet.g.parent = self.g tasklet.g.switch(*args) except: traceback.print_exc() self.kill(tasklet) self.running = None self.g = None def suspend(self): self.g.switch() class Tasklet(object): def __init__(self, cb, scheduler): self.g = greenlet(cb) self.args = None self.scheduler = scheduler self.scheduled = False def __call__(self, *args): self.scheduler.schedule(self, *args) def kill(self): self.scheduler.kill(self) class Channel(object): def __init__(self, scheduler): self.queue = deque() self.tasklet = None self.scheduler = scheduler def receive(self): while len(self.queue) == 0: self.tasklet = self.scheduler.running self.tasklet.scheduled = True self.scheduler.suspend() self.tasklet = None return self.queue.popleft() def send(self, val): self.queue.append(val) if self.tasklet: self.scheduler.schedule(self.tasklet) scheduler = Scheduler() def tasklet(cb): return Tasklet(cb, scheduler) def channel(): return Channel(scheduler) def run(): scheduler.run()