Source
# print "select ended: (" + str(len(read)) + "," + str(len(write)) + "," + str(len(excep)) + ")"
import threading
import select
import time
import base64
import uuid
import os
import posix
import fcntl
class mindpipes (threading.Thread) :
__change = threading.Lock( )
__queue_lock = threading.Lock( )
__queue = { }
class nub :
__watcher = None
__id = None
def __init__(self,watcher,id):
self.__watcher = watcher
self.__id = str(id)
def id ( self ) :
return self.__id;
def unwatch ( self ) :
self.__watcher.remove(self.__id)
def __init__(self):
threading.Thread.__init__(self)
self.__change.acquire( )
self.setDaemon(True)
def watch ( self, fd, callback, data=None ):
fcntl.fcntl(fd, fcntl.F_SETFL, posix.O_NDELAY)
id = base64.b64encode(uuid.uuid4().bytes,'..')[0:10]
self.__queue_lock.acquire( )
while self.__queue.has_key(id):
id = base64.b64encode(uuid.uuid4().bytes,'..')[0:10]
self.__queue[id] = { 'pipe': fd, 'callback': callback, 'buffer': '', 'data': data }
self.__queue_lock.release( )
while not self.__change.locked( ) :
time.sleep(0.1)
self.__change.release( )
return self.nub(self,id)
def remove ( self, nubbin ):
self.__queue_lock.acquire( )
if self.__queue.has_key(nubbin.id( )):
x = self.queue.pop(nubbin.id( ))
x.pop('buffer')
self.__queue_lock.release( )
while not self.__change.locked( ) :
time.sleep(0.1)
self.__change.release( )
return x
else:
self.__queue_lock.release( )
return None
def run( self ) :
while ( 1 ) :
self.__queue_lock.acquire( )
read_fds = [ ]
for key in self.__queue.keys( ) :