~cpp #csp.py #Copyright 2003 June Kim <juneaftn at hanmail dot net> from Queue import Queue import threading, thread import netstring, socket from cPickle import dumps,loads class Process: def __init__(self): pass def run(self): pass class ParProcess(Process): def __init__(self,processes): Process.__init__(self) self.ps=processes def run(self): threads=[] for each in self.ps: threads.append(threading.Thread(target=each.run)) for each in threads: each.setDaemon(True) each.start() for each in threads: each.join() class SeqProcess(Process): def __init__(self,processes): Process.__init__(self) self.ps=processes def run(self): for each in self.ps: each.run() class Channel: def __init__(self,buffer=1): self.q=Queue(buffer) def get(self): if self.q.get()!=None: #for block raise "None expected" return self.q.get() def put(self,v): self.q.put(None) #for block return self.q.put(v) class NetChannel: def __init__(self,socket): self.s=socket def get(self): v=loads(netstring.readns(self.s)) #block netstring.writens(self.s,"ACK") return v def put(self,v): netstring.writens(self.s,dumps(v)) ack=netstring.readns(self.s) #block if ack!='ACK': raise IOError, "ACK expected but got %s"%ack class Client(Process): def __init__(self,addr,outstream): self.addr=addr self.out_=outstream def run(self): s=socket.socket(socket.AF_INET,socket.SOCK_STREAM) while 1: try: s.connect(self.addr) except socket.error: continue else: break self.out_.put(s) class Server(Process): def __init__(self,addr,outstream): self.addr=addr self.out_=outstream def run(self): s=socket.socket(socket.AF_INET,socket.SOCK_STREAM) s.bind(self.addr) s.listen(1) s2,addr=s.accept() self.out_.put(s2) class ViaNet: def __init__(self,addr): self.outc=Channel() background(self._Process(addr,self.outc)) self.netc=None def get(self): nc=self.getNetChannel() return nc.get() def put(self,v): nc=self.getNetChannel() return nc.put(v) def getNetChannel(self): if self.netc: return self.netc s=self.outc.get() self.netc=NetChannel(s) return self.netc class OneToNet(ViaNet): _Process=Client class NetToOne(ViaNet): _Process=Server def background(aProcess,args=()): thread.start_new_thread(aProcess.run,args)
~cpp #remoteprocess.py #Copyright 2003 June Kim <juneaftn at hanmail dot net> from csp import * PORT=8881 class MyP1(Process): def __init__(self,outstream): Process.__init__(self) self.out_=outstream def run(self): print "putting..." self.out_.put(3.14159) class MyP2(Process): def __init__(self,instream): Process.__init__(self) self.in_=instream def run(self): print "getting..." v=self.in_.get() print "got",v if __name__=='__main__': cout=OneToNet(('localhost',PORT)) cin=NetToOne(('localhost',PORT)) ParProcess([MyP1(cout),MyP2(cin)]).run()
~cpp #fib.py #Copyright 2003 June Kim <juneaftn at hanmail dot net> from csp import Process,ParProcess,Channel, background class Cons(Process): def __init__(self,n,instream,outstream): Process.__init__(self) self.n=n self.in_=instream self.out_=outstream def run(self): self.out_.put(self.n) while True: v=self.in_.get() self.out_.put(v) class Plus(Process): def __init__(self,outstream,istream1,istream2): Process.__init__(self) self.out_=outstream self.in1_=istream1 self.in2_=istream2 def run(self): while 1: v1=self.in1_.get() v2=self.in2_.get() self.out_.put(v1+v2) class Dup(Process): def __init__(self,instream,ostream1,ostream2): Process.__init__(self) self.in_=instream self.out1_=ostream1 self.out2_=ostream2 def run(self): while 1: v=self.in_.get() self.out1_.put(v) self.out2_.put(v) class Print(Process): def __init__(self,instream): Process.__init__(self) self.in_=instream def run(self): while 1: v=self.in_.get() print v time.sleep(0.5) def q(): return Channel() class Fib(Process): def __init__(self,outstream): Process.__init__(self) c1d1=q() d1c2=q() c2d2=q() d2p=outstream d2plus=q() plusc1=q() d1plus=q() c1=Cons(1,plusc1,c1d1) d1=Dup(c1d1,d1plus,d1c2) c2=Cons(1,d1c2,c2d2) d2=Dup(c2d2,d2plus,d2p) plus=Plus(plusc1,d1plus,d2plus) self.ps=[c1,d1,c2,d2,plus] self.out_=d2p def run(self): ParProcess(self.ps).run() if __name__=='__main__': outq=q() f=Fib(outq) background(f) for i in xrange(10): print outq.get() #ParProcess([f,Print(outq)]).run()
~cpp #netstring.py #from Steve Holden's Python Web Programming def readns(sock): """read a netstring from a socket.""" size = "" while 1: c = sock.recv(1) if c == ":": break elif not c: raise IOError, "short netstring read" size = size + c size = sz = int(size) s = "" while sz: ss = sock.recv(sz) if not ss: raise IOError, "short netstring read" s += ss sz -= len(ss) if len(s) != size: raise IOError, "short netstring read" if sock.recv(1) != ",": raise IOError, "missing netstring terminator" return s def writens(sock, s): """write a netstring to a socket.""" s = encode(s) while len(s): l = sock.send(s) s = s[l:] def encode(s): return "%d:%s," % (len(s), s) def decode(s): try: if s[-1] != ",": raise ValueError p = s.index(":") l = int(s[0:p]) if len(s) != p + l + 2: raise ValueError return s[p+1:-1] except ValueError: raise ValueError, "netstring format error: " + s def freadns(f): """read a netstring from a file.""" size = "" while 1: c = f.read(1) if c == ":": break elif not c: raise IOError, "short netstring read" size = size + c size = sz = int(size) s = "" while sz: ss = f.read(sz) if not ss: raise IOError, "short netstring read" s += ss sz -= len(ss) if len(s) != size: raise IOError, "short netstring read" if f.read(1) != ",": raise IOError, "missing netstring terminator" return s def fwritens(f, s): """write a netstring to a file.""" s = encode(s) f.write(s)
~cpp #rfib2.py #Copyright 2003 June Kim <juneaftn at hanmail dot net> from csp import * import time MYADDR=('localhost',8121) YOURADDR=('localhost',8142) class Cons(Process): def __init__(self,n,instream,outstream): Process.__init__(self) self.n=n self.in_=instream self.out_=outstream def run(self): self.out_.put(self.n) while True: v=self.in_.get() self.out_.put(v) class Plus(Process): def __init__(self,outstream,istream1,istream2): Process.__init__(self) self.out_=outstream self.in1_=istream1 self.in2_=istream2 def run(self): while 1: v1=self.in1_.get() v2=self.in2_.get() self.out_.put(v1+v2) class Dup(Process): def __init__(self,instream,ostream1,ostream2): Process.__init__(self) self.in_=instream self.out1_=ostream1 self.out2_=ostream2 def run(self): while 1: v=self.in_.get() self.out1_.put(v) self.out2_.put(v) class Print(Process): def __init__(self,instream): Process.__init__(self) self.in_=instream def run(self): while 1: v=self.in_.get() print v time.sleep(0.5) def q(): return Channel() class Fib2(Process): def __init__(self,instream,outstream,printstream): Process.__init__(self) d1c2=instream c2d2=q() d2p=printstream d2plus=outstream c2=Cons(1,d1c2,c2d2) d2=Dup(c2d2,d2plus,d2p) self.ps=[c2,d2] self.out_=d2p def run(self): ParProcess(self.ps).run() if __name__=='__main__': d1c2=NetToOne(MYADDR) d2plus=OneToNet(YOURADDR) outq=q() f=Fib2(d1c2,d2plus,outq) ParProcess([f,Print(outq)]).run()
~cpp #rfib1.py #Copyright 2003 June Kim <juneaftn at hanmail dot net> from csp import * YOURADDR=('localhost',8121) MYADDR=('localhost',8142) class Cons(Process): def __init__(self,n,instream,outstream): Process.__init__(self) self.n=n self.in_=instream self.out_=outstream def run(self): self.out_.put(self.n) while True: v=self.in_.get() self.out_.put(v) class Plus(Process): def __init__(self,outstream,istream1,istream2): Process.__init__(self) self.out_=outstream self.in1_=istream1 self.in2_=istream2 def run(self): while 1: v1=self.in1_.get() v2=self.in2_.get() self.out_.put(v1+v2) class Dup(Process): def __init__(self,instream,ostream1,ostream2): Process.__init__(self) self.in_=instream self.out1_=ostream1 self.out2_=ostream2 def run(self): while 1: v=self.in_.get() self.out1_.put(v) self.out2_.put(v) class Print(Process): def __init__(self,instream): Process.__init__(self) self.in_=instream def run(self): while 1: v=self.in_.get() print v time.sleep(0.5) def q(): return Channel() class Fib1(Process): def __init__(self,instream,outstream): Process.__init__(self) c1d1=q() d1c2=outstream d2plus=instream plusc1=q() d1plus=q() c1=Cons(1,plusc1,c1d1) d1=Dup(c1d1,d1plus,d1c2) plus=Plus(plusc1,d1plus,d2plus) self.ps=[c1,d1,plus] def run(self): ParProcess(self.ps).run() if __name__=='__main__': d1c2=OneToNet(YOURADDR) d2plus=NetToOne(MYADDR) f=Fib1(d2plus,d1c2) f.run()