diff --git a/multiprocessing/multiprocessing.py b/multiprocessing/multiprocessing.py index cd15d48c..52c5c592 100644 --- a/multiprocessing/multiprocessing.py +++ b/multiprocessing/multiprocessing.py @@ -1,4 +1,5 @@ import os +import pickle class Process: @@ -6,15 +7,56 @@ class Process: def __init__(self, group=None, target=None, name=None, args=(), kwargs={}): self.target = target self.args = args + self.kwargs = kwargs self.pid = 0 + self.r = self.w = None def start(self): self.pid = os.fork() if not self.pid: - self.target(*self.args) + if self.r: + self.r.close() + self.target(*self.args, **self.kwargs) os._exit(0) else: + if self.w: + self.w.close() return def join(self): os.waitpid(self.pid, 0) + + def register_pipe(self, r, w): + """Extension to CPython API: any pipe used for parent/child + communication should be registered with this function.""" + self.r, self.w = r, w + + +class Connection: + + def __init__(self, fd): + self.fd = fd + self.f = open(fd) + + def __repr__(self): + return "" % self.f + + def send(self, obj): + s = pickle.dumps(obj) + self.f.write(len(s).to_bytes(4)) + self.f.write(s) + + def recv(self): + s = self.f.read(4) + l = int.from_bytes(s) + s = self.f.read(l) + return pickle.loads(s) + + def close(self): + self.f.close() + + +def Pipe(duplex=True): + assert duplex == False + r, w = os.pipe() + return Connection(r), Connection(w) diff --git a/multiprocessing/test_pipe.py b/multiprocessing/test_pipe.py new file mode 100644 index 00000000..84591d96 --- /dev/null +++ b/multiprocessing/test_pipe.py @@ -0,0 +1,19 @@ +import sys +import os +from multiprocessing import Process, Pipe, Connection + +def f(conn): + conn.send([42, None, 'hello']) + conn.send([42, 42, 42]) + conn.close() + +if __name__ == '__main__': + parent_conn, child_conn = Pipe(False) + print(parent_conn, child_conn) + p = Process(target=f, args=(child_conn,)) + # Extension: need to call this for uPy + p.register_pipe(parent_conn, child_conn) + p.start() + print(parent_conn.recv()) + print(parent_conn.recv()) + p.join()