asyncio: Use logging and errno modules.

pull/11/head
Paul Sokolovsky 2014-04-22 02:51:04 +03:00
rodzic 517c6ea084
commit f79b960890
1 zmienionych plików z 28 dodań i 18 usunięć

Wyświetl plik

@ -1,7 +1,12 @@
import __main__
import time import time
import heapq import heapq
import errno
import logging
log = logging.getLogger("asyncio")
def coroutine(f): def coroutine(f):
return f return f
@ -24,7 +29,7 @@ class EventLoop:
def call_at(self, time, callback, *args): def call_at(self, time, callback, *args):
# self.q.append((callback, args)) # self.q.append((callback, args))
# self.cnt is workaround per heapq docs # self.cnt is workaround per heapq docs
# print("Scheduling", (time, self.cnt, callback, args)) log.debug("Scheduling %s", (time, self.cnt, callback, args))
heapq.heappush(self.q, (time, self.cnt, callback, args)) heapq.heappush(self.q, (time, self.cnt, callback, args))
# print(self.q) # print(self.q)
self.cnt += 1 self.cnt += 1
@ -42,6 +47,8 @@ class EventLoop:
while True: while True:
if self.q: if self.q:
t, cnt, cb, args = heapq.heappop(self.q) t, cnt, cb, args = heapq.heappop(self.q)
log.debug("Next task to run: %s", (t, cnt, cb, args))
# __main__.mem_info()
tnow = self.time() tnow = self.time()
delay = t - tnow delay = t - tnow
if delay > 0: if delay > 0:
@ -57,20 +64,22 @@ class EventLoop:
try: try:
if args == (): if args == ():
args = (None,) args = (None,)
print("Send args:", args) log.debug("Gen send args: %s", args)
ret = cb.send(*args) ret = cb.send(*args)
print("ret:", ret) log.debug("Gen yield result: %s", ret)
if isinstance(ret, SysCall): if isinstance(ret, SysCall):
if isinstance(ret, Sleep): if isinstance(ret, Sleep):
delay = ret.args[0] delay = ret.args[0]
elif isinstance(ret, IORead): elif isinstance(ret, IORead):
# self.add_reader(ret.obj.fileno(), lambda self, c, f: self.call_soon(c, f), self, cb, ret.obj)
# self.add_reader(ret.obj.fileno(), lambda c, f: self.call_soon(c, f), cb, ret.obj)
self.add_reader(ret.obj.fileno(), lambda f: self.call_soon(cb, f), ret.obj) self.add_reader(ret.obj.fileno(), lambda f: self.call_soon(cb, f), ret.obj)
continue continue
elif isinstance(ret, IOWrite): elif isinstance(ret, IOWrite):
self.add_writer(ret.obj.fileno(), lambda f: self.call_soon(cb, f), ret.obj) self.add_writer(ret.obj.fileno(), lambda f: self.call_soon(cb, f), ret.obj)
continue continue
except StopIteration as e: except StopIteration as e:
print(cb, "finished") log.debug("Gen finished: %s", cb)
continue continue
#self.q.append(c) #self.q.append(c)
self.call_later(delay, cb, *args) self.call_later(delay, cb, *args)
@ -99,22 +108,22 @@ class EpollEventLoop(EventLoop):
self.poller = select.epoll(1) self.poller = select.epoll(1)
def add_reader(self, fd, cb, *args): def add_reader(self, fd, cb, *args):
print("add_reader") log.debug("add_reader%s", (fd, cb, args))
self.poller.register(fd, select.EPOLLIN, (cb, args)) self.poller.register(fd, select.EPOLLIN, (cb, args))
def add_writer(self, fd, cb, *args): def add_writer(self, fd, cb, *args):
print("add_writer") log.debug("add_writer%s", (fd, cb, args))
self.poller.register(fd, select.EPOLLOUT, (cb, args)) self.poller.register(fd, select.EPOLLOUT, (cb, args))
def wait(self, delay): def wait(self, delay):
print("epoll.wait", delay) log.debug("epoll.wait(%d)", delay)
if delay == -1: if delay == -1:
res = self.poller.poll(-1) res = self.poller.poll(-1)
else: else:
res = self.poller.poll(int(delay * 1000)) res = self.poller.poll(int(delay * 1000))
print("poll: ", res) log.debug("epoll result: %s", res)
for cb, ev in res: for cb, ev in res:
print("Calling %s%s" % (cb[0], cb[1])) log.debug("Calling IO callback: %s%s", cb[0], cb[1])
cb[0](*cb[1]) cb[0](*cb[1])
@ -164,11 +173,11 @@ class StreamReader:
self.s = s self.s = s
def readline(self): def readline(self):
print("readline") log.debug("StreamReader.readline()")
s = yield IORead(self.s) s = yield IORead(self.s)
print("after IORead") log.debug("StreamReader.readline(): after IORead: %s", s)
res = self.s.readline() res = self.s.readline()
print("readline res:", res) log.debug("StreamReader.readline(): res: %s", res)
return res return res
@ -178,14 +187,14 @@ class StreamWriter:
self.s = s self.s = s
def write(self, buf): def write(self, buf):
print("Write!")
res = self.s.write(buf) res = self.s.write(buf)
print("write res:", res) log.debug("StreamWriter.write(): %d", res)
s = yield IOWrite(self.s) s = yield IOWrite(self.s)
print("returning write res:", res) log.debug("StreamWriter.write(): returning")
def open_connection(host, port): def open_connection(host, port):
log.debug("open_connection(%s, %s)", host, port)
s = _socket.socket() s = _socket.socket()
s.setblocking(False) s.setblocking(False)
ai = _socket.getaddrinfo(host, port) ai = _socket.getaddrinfo(host, port)
@ -193,8 +202,9 @@ def open_connection(host, port):
try: try:
s.connect(addr) s.connect(addr)
except OSError as e: except OSError as e:
print(e.args[0]) if e.args[0] != errno.EINPROGRESS:
print("After connect") raise
log.debug("open_connection: After connect")
s = yield IOWrite(s) s = yield IOWrite(s)
print("After iowait:", s) log.debug("open_connection: After iowait: %s", s)
return StreamReader(s), StreamWriter(s) return StreamReader(s), StreamWriter(s)