No older revisions available
No older revisions available
~cpp
#csp.py
#Copyright 2003 June Kim <juneaftn at hanmail dot net>
from Queue import Queue
import threading, thread
import netstring, socket
from cPickle import dumps,loads
class Process:
def __init__(self):
pass
def run(self):
pass
class ParProcess(Process):
def __init__(self,processes):
Process.__init__(self)
self.ps=processes
def run(self):
threads=[]
for each in self.ps:
threads.append(threading.Thread(target=each.run))
for each in threads:
each.setDaemon(True)
each.start()
for each in threads:
each.join()
class SeqProcess(Process):
def __init__(self,processes):
Process.__init__(self)
self.ps=processes
def run(self):
for each in self.ps:
each.run()
class Channel:
def __init__(self,buffer=1):
self.q=Queue(buffer)
def get(self):
if self.q.get()!=None: #for block
raise "None expected"
return self.q.get()
def put(self,v):
self.q.put(None) #for block
return self.q.put(v)
class NetChannel:
def __init__(self,socket):
self.s=socket
def get(self):
v=loads(netstring.readns(self.s)) #block
netstring.writens(self.s,"ACK")
return v
def put(self,v):
netstring.writens(self.s,dumps(v))
ack=netstring.readns(self.s) #block
if ack!='ACK':
raise IOError, "ACK expected but got %s"%ack
class Client(Process):
def __init__(self,addr,outstream):
self.addr=addr
self.out_=outstream
def run(self):
s=socket.socket(socket.AF_INET,socket.SOCK_STREAM)
while 1:
try:
s.connect(self.addr)
except socket.error:
continue
else:
break
self.out_.put(s)
class Server(Process):
def __init__(self,addr,outstream):
self.addr=addr
self.out_=outstream
def run(self):
s=socket.socket(socket.AF_INET,socket.SOCK_STREAM)
s.bind(self.addr)
s.listen(1)
s2,addr=s.accept()
self.out_.put(s2)
class ViaNet:
def __init__(self,addr):
self.outc=Channel()
background(self._Process(addr,self.outc))
self.netc=None
def get(self):
nc=self.getNetChannel()
return nc.get()
def put(self,v):
nc=self.getNetChannel()
return nc.put(v)
def getNetChannel(self):
if self.netc:
return self.netc
s=self.outc.get()
self.netc=NetChannel(s)
return self.netc
class OneToNet(ViaNet):
_Process=Client
class NetToOne(ViaNet):
_Process=Server
def background(aProcess,args=()):
thread.start_new_thread(aProcess.run,args)
~cpp
#remoteprocess.py
#Copyright 2003 June Kim <juneaftn at hanmail dot net>
from csp import *
PORT=8881
class MyP1(Process):
def __init__(self,outstream):
Process.__init__(self)
self.out_=outstream
def run(self):
print "putting..."
self.out_.put(3.14159)
class MyP2(Process):
def __init__(self,instream):
Process.__init__(self)
self.in_=instream
def run(self):
print "getting..."
v=self.in_.get()
print "got",v
if __name__=='__main__':
cout=OneToNet(('localhost',PORT))
cin=NetToOne(('localhost',PORT))
ParProcess([MyP1(cout),MyP2(cin)]).run()
~cpp
#fib.py
#Copyright 2003 June Kim <juneaftn at hanmail dot net>
from csp import Process,ParProcess,Channel, background
class Cons(Process):
def __init__(self,n,instream,outstream):
Process.__init__(self)
self.n=n
self.in_=instream
self.out_=outstream
def run(self):
self.out_.put(self.n)
while True:
v=self.in_.get()
self.out_.put(v)
class Plus(Process):
def __init__(self,outstream,istream1,istream2):
Process.__init__(self)
self.out_=outstream
self.in1_=istream1
self.in2_=istream2
def run(self):
while 1:
v1=self.in1_.get()
v2=self.in2_.get()
self.out_.put(v1+v2)
class Dup(Process):
def __init__(self,instream,ostream1,ostream2):
Process.__init__(self)
self.in_=instream
self.out1_=ostream1
self.out2_=ostream2
def run(self):
while 1:
v=self.in_.get()
self.out1_.put(v)
self.out2_.put(v)
class Print(Process):
def __init__(self,instream):
Process.__init__(self)
self.in_=instream
def run(self):
while 1:
v=self.in_.get()
print v
time.sleep(0.5)
def q():
return Channel()
class Fib(Process):
def __init__(self,outstream):
Process.__init__(self)
c1d1=q()
d1c2=q()
c2d2=q()
d2p=outstream
d2plus=q()
plusc1=q()
d1plus=q()
c1=Cons(1,plusc1,c1d1)
d1=Dup(c1d1,d1plus,d1c2)
c2=Cons(1,d1c2,c2d2)
d2=Dup(c2d2,d2plus,d2p)
plus=Plus(plusc1,d1plus,d2plus)
self.ps=[c1,d1,c2,d2,plus]
self.out_=d2p
def run(self):
ParProcess(self.ps).run()
if __name__=='__main__':
outq=q()
f=Fib(outq)
background(f)
for i in xrange(10):
print outq.get()
#ParProcess([f,Print(outq)]).run()
~cpp
#netstring.py
#from Steve Holden's Python Web Programming
def readns(sock):
"""read a netstring from a socket."""
size = ""
while 1:
c = sock.recv(1)
if c == ":":
break
elif not c:
raise IOError, "short netstring read"
size = size + c
size = sz = int(size)
s = ""
while sz:
ss = sock.recv(sz)
if not ss:
raise IOError, "short netstring read"
s += ss
sz -= len(ss)
if len(s) != size:
raise IOError, "short netstring read"
if sock.recv(1) != ",":
raise IOError, "missing netstring terminator"
return s
def writens(sock, s):
"""write a netstring to a socket."""
s = encode(s)
while len(s):
l = sock.send(s)
s = s[l:]
def encode(s):
return "%d:%s," % (len(s), s)
def decode(s):
try:
if s[-1] != ",":
raise ValueError
p = s.index(":")
l = int(s[0:p])
if len(s) != p + l + 2:
raise ValueError
return s[p+1:-1]
except ValueError:
raise ValueError, "netstring format error: " + s
def freadns(f):
"""read a netstring from a file."""
size = ""
while 1:
c = f.read(1)
if c == ":":
break
elif not c:
raise IOError, "short netstring read"
size = size + c
size = sz = int(size)
s = ""
while sz:
ss = f.read(sz)
if not ss:
raise IOError, "short netstring read"
s += ss
sz -= len(ss)
if len(s) != size:
raise IOError, "short netstring read"
if f.read(1) != ",":
raise IOError, "missing netstring terminator"
return s
def fwritens(f, s):
"""write a netstring to a file."""
s = encode(s)
f.write(s)
~cpp
#rfib2.py
#Copyright 2003 June Kim <juneaftn at hanmail dot net>
from csp import *
import time
MYADDR=('localhost',8121)
YOURADDR=('localhost',8142)
class Cons(Process):
def __init__(self,n,instream,outstream):
Process.__init__(self)
self.n=n
self.in_=instream
self.out_=outstream
def run(self):
self.out_.put(self.n)
while True:
v=self.in_.get()
self.out_.put(v)
class Plus(Process):
def __init__(self,outstream,istream1,istream2):
Process.__init__(self)
self.out_=outstream
self.in1_=istream1
self.in2_=istream2
def run(self):
while 1:
v1=self.in1_.get()
v2=self.in2_.get()
self.out_.put(v1+v2)
class Dup(Process):
def __init__(self,instream,ostream1,ostream2):
Process.__init__(self)
self.in_=instream
self.out1_=ostream1
self.out2_=ostream2
def run(self):
while 1:
v=self.in_.get()
self.out1_.put(v)
self.out2_.put(v)
class Print(Process):
def __init__(self,instream):
Process.__init__(self)
self.in_=instream
def run(self):
while 1:
v=self.in_.get()
print v
time.sleep(0.5)
def q():
return Channel()
class Fib2(Process):
def __init__(self,instream,outstream,printstream):
Process.__init__(self)
d1c2=instream
c2d2=q()
d2p=printstream
d2plus=outstream
c2=Cons(1,d1c2,c2d2)
d2=Dup(c2d2,d2plus,d2p)
self.ps=[c2,d2]
self.out_=d2p
def run(self):
ParProcess(self.ps).run()
if __name__=='__main__':
d1c2=NetToOne(MYADDR)
d2plus=OneToNet(YOURADDR)
outq=q()
f=Fib2(d1c2,d2plus,outq)
ParProcess([f,Print(outq)]).run()
~cpp
#rfib1.py
#Copyright 2003 June Kim <juneaftn at hanmail dot net>
from csp import *
YOURADDR=('localhost',8121)
MYADDR=('localhost',8142)
class Cons(Process):
def __init__(self,n,instream,outstream):
Process.__init__(self)
self.n=n
self.in_=instream
self.out_=outstream
def run(self):
self.out_.put(self.n)
while True:
v=self.in_.get()
self.out_.put(v)
class Plus(Process):
def __init__(self,outstream,istream1,istream2):
Process.__init__(self)
self.out_=outstream
self.in1_=istream1
self.in2_=istream2
def run(self):
while 1:
v1=self.in1_.get()
v2=self.in2_.get()
self.out_.put(v1+v2)
class Dup(Process):
def __init__(self,instream,ostream1,ostream2):
Process.__init__(self)
self.in_=instream
self.out1_=ostream1
self.out2_=ostream2
def run(self):
while 1:
v=self.in_.get()
self.out1_.put(v)
self.out2_.put(v)
class Print(Process):
def __init__(self,instream):
Process.__init__(self)
self.in_=instream
def run(self):
while 1:
v=self.in_.get()
print v
time.sleep(0.5)
def q():
return Channel()
class Fib1(Process):
def __init__(self,instream,outstream):
Process.__init__(self)
c1d1=q()
d1c2=outstream
d2plus=instream
plusc1=q()
d1plus=q()
c1=Cons(1,plusc1,c1d1)
d1=Dup(c1d1,d1plus,d1c2)
plus=Plus(plusc1,d1plus,d2plus)
self.ps=[c1,d1,plus]
def run(self):
ParProcess(self.ps).run()
if __name__=='__main__':
d1c2=OneToNet(YOURADDR)
d2plus=NetToOne(MYADDR)
f=Fib1(d2plus,d1c2)
f.run()