00001 #!/usr/bin/env python 00002 # 00003 # Copyright 2004 00004 # by 00005 # The Board of Trustees of the 00006 # Leland Stanford Junior University. 00007 # All rights reserved. 00008 # 00009 00010 __facility__ = "Online" 00011 __abstract__ = "GLAST LAT Task Engine class" 00012 __author__ = "R. Claus <Claus@SLAC.Stanford.edu> SLAC - GLAST LAT I&T/Online" 00013 __date__ = "1/21/2004" 00014 __version__ = "$Revision: 2.2 $" 00015 __credits__ = "SLAC" 00016 00017 import LATTE.copyright_SLAC 00018 00019 from Queue import Queue 00020 from threading import Thread, currentThread 00021 import logging as log 00022 00023 00024 class TaskEngine(Thread): 00025 """A class for setting up and queueing tasks to a task engine. 00026 00027 A task engine is a thread that polls a task queue and runs tasks found on 00028 the queue. Tasks are functions or class methods accompanied with their 00029 arguments and keyword arguments. Each task must run to completion before 00030 the next task is pulled from the queue. Returning a return value from the 00031 task to the caller is currently not supported (easy to add, if the need 00032 arises). The queue is a FIFO queue and there is currently no way to jam 00033 a task to the head of the queue. 00034 """ 00035 def __init__(self, name = None): 00036 Thread.__init__(self) 00037 00038 if name is None: 00039 name = self.__class__.__name__ 00040 self.__name = name 00041 self.__queue = Queue() 00042 00043 def name(self): 00044 """\brief Method for querying the task engine's name 00045 00046 \return The task engine's name 00047 """ 00048 return self.__name 00049 00050 def spawn(self, task, *args, **kwargs): 00051 """\brief Method used to queue a task to the task engine. 00052 00053 \param task The task routine to queue 00054 \param args The arguments for the task routine 00055 \param kwargs The keyword arguments for the task routine 00056 """ 00057 assert self.__queue is not None, (self.name + ".spawn: " + \ 00058 "TaskEngine.__init__() was not called") 00059 self.__queue.put((task,) + args + (kwargs,)) 00060 00061 def run(self): 00062 """\brief The task engine main routine. 00063 00064 This method is invoked by the Thread class and should be considered 00065 'protected'. Do not call it. 00066 """ 00067 self.setName(self.name()) 00068 00069 while True: 00070 taskArgsKwargs = self.__queue.get(True) # Blocks 00071 if taskArgsKwargs[0] is None: break # Quit signal is None 00072 00073 try: 00074 taskArgsKwargs[0](*taskArgsKwargs[1:-1], **taskArgsKwargs[-1]) 00075 except: 00076 import types 00077 if isinstance(taskArgsKwargs[0], types.MethodType): 00078 log.exception(self.name() + \ 00079 ": Trapped exception from spawned task " + \ 00080 taskArgsKwargs[0].im_class.__name__ + '.' + \ 00081 taskArgsKwargs[0].__name__ + '()') 00082 else: 00083 log.exception(self.name() + \ 00084 ": Trapped exception from spawned task " + \ 00085 taskArgsKwargs[0].__name__ + '()') 00086 00087 # Drop references to this object to allow it to be garbage collected 00088 taskArgsKwargs = None 00089 00090 log.info(self.name() + ": terminating") 00091 00092 def shutdown(self, tmo = 2.0): 00093 """\brief Method for shutting down the task engine 00094 00095 This method blocks (with a timeout) until the task engine has exited. 00096 It makes no sense to spawn this method to the task engine since it waits 00097 for the task engine to exit. 00098 00099 \param tmo The amount of time (seconds) to wait for the task engine to exit 00100 \return 0 for success, 00101 -1 if the task engine didn't exit, 00102 -2 if the task engine exited prematurely 00103 """ 00104 # This method can not be called from the taskEngine thread on purpose. 00105 # If it could, there would be no way for the instigator to synchronize. 00106 if currentThread() is self: 00107 raise RuntimeError, "A task engine can't be shut down by its own thread" 00108 if self.isAlive(): # Thread still there or did it crash? 00109 self.spawn(None) # Signal to quit 00110 self.join(tmo) # Let engine exit 00111 if self.isAlive(): # Timed out? 00112 log.error(self.name() + ".shutdown: " + \ 00113 "Task engine did not exit when expected") 00114 return -1 00115 else: 00116 log.warn(self.name() + ":.shutdown: " + \ 00117 "Task engine unexpectedly disappeared") 00118 return -2 00119 00120 return 0 00121 00122 00123 #------------------------------------------------------------------------------- 00124 # Unit testing follows: 00125 00126 if __debug__ and __name__ == '__main__': 00127 import unittest 00128 import time 00129 00130 results = [None, None, None, None, None, None] 00131 status = None 00132 00133 class TaskEngineTestCase(unittest.TestCase): 00134 def __init__(self, arg): 00135 unittest.TestCase.__init__(self, arg) 00136 00137 def checkInstantiation(self): 00138 """Test class instantiation""" 00139 engine = None 00140 engine = TaskEngine("TestEngine") 00141 assert engine != None, "Could not instantiate a TaskEngine" 00142 00143 def checkStartup(self): 00144 """Test whether the thread starts up""" 00145 engine = TaskEngine("TestEngine") 00146 engine.start() 00147 time.sleep(0.1) # Let it start 00148 assert engine.isAlive(), "Could not start TaskEngine" 00149 engine.shutdown() 00150 time.sleep(.1) 00151 00152 def __task_1(self, a, b, c, x, y, z): 00153 self.__a = a 00154 self.__b = b 00155 self.__c = c 00156 self.__x = x 00157 self.__y = y 00158 self.__z = z 00159 00160 def checkSpawn(self): 00161 """ 00162 Test whether we can spawn a class method task to the engine and 00163 have the arguments handled correctly 00164 """ 00165 self.__a = None 00166 self.__b = None 00167 self.__c = None 00168 self.__x = None 00169 self.__y = None 00170 self.__z = None 00171 00172 a = "Hello world" 00173 b = 3.1415926 00174 c = 0xabadcafeL 00175 engine = TaskEngine("TestEngine") 00176 engine.start() 00177 time.sleep(0.1) # Let it start 00178 engine.spawn(self.__task_1, a, b, c, z = a, x = b, y = c) 00179 time.sleep(0.1) # Let it run 00180 assert engine.isAlive(), "TaskEngine prematurely disappeared" 00181 assert self.__a is a, "TaskEngine did not correctly handle args[0]" 00182 assert self.__b is b, "TaskEngine did not correctly handle args[1]" 00183 assert self.__c is c, "TaskEngine did not correctly handle args[2]" 00184 assert self.__x is b, "TaskEngine did not correctly handle kwargs[0]" 00185 assert self.__y is c, "TaskEngine did not correctly handle kwargs[1]" 00186 assert self.__z is a, "TaskEngine did not correctly handle kwargs[2]" 00187 engine.shutdown() 00188 time.sleep(.1) 00189 00190 def checkUnboundSpawn(self): 00191 """ 00192 Test whether we can spawn an unbound function task to the engine and 00193 have the arguments handled correctly 00194 """ 00195 a = "Test of an unbound function" 00196 b = 2.718281828 00197 c = 0xdeadbeefL 00198 00199 engine = TaskEngine("TestEngine") 00200 engine.start() 00201 time.sleep(0.1) # Let it start 00202 engine.spawn(_unbound1, a, b, c, y = a, z = b, x = c) 00203 time.sleep(0.1) # Let it run 00204 assert engine.isAlive(), "TaskEngine prematurely disappeared" 00205 assert results[0] is a, "TaskEngine did not correctly handle args[0]" 00206 assert results[1] is b, "TaskEngine did not correctly handle args[1]" 00207 assert results[2] is c, "TaskEngine did not correctly handle args[2]" 00208 assert results[3] is c, "TaskEngine did not correctly handle kwargs[0]" 00209 assert results[4] is a, "TaskEngine did not correctly handle kwargs[1]" 00210 assert results[5] is b, "TaskEngine did not correctly handle kwargs[2]" 00211 engine.shutdown() 00212 time.sleep(.1) 00213 00214 def __doException(self): # 2nd level to give traceback some depth 00215 raise RuntimeError, "Raising RuntimeError to test exception handling" 00216 return True 00217 00218 def __task_2(self): 00219 self.__status = False 00220 self.__status = self.__doException() 00221 00222 def checkException1(self): 00223 """ 00224 Test that the engine doesn't die if a class method task throws an 00225 exception 00226 """ 00227 print "" 00228 print "##############################################################################" 00229 print "# #" 00230 print "# This test should cause a 'Trapped exception from spawned task' exception #" 00231 print "# #" 00232 print "##############################################################################" 00233 engine = TaskEngine("TestEngine") 00234 engine.start() 00235 time.sleep(0.1) # Let it start 00236 engine.spawn(self.__task_2) 00237 time.sleep(0.1) # Let it run 00238 assert engine.isAlive(), "TaskEngine died from exception" 00239 assert self.__status == False, "Exception didn't abort task as expected" 00240 engine.shutdown() 00241 time.sleep(.1) 00242 00243 def checkException2(self): 00244 """ 00245 Test that the engine doesn't die if an unbound function task throws an 00246 exception 00247 """ 00248 print "" # Expecting an exception: add space for readability 00249 print "##############################################################################" 00250 print "# #" 00251 print "# This test should cause a 'Trapped exception from spawned task' exception #" 00252 print "# #" 00253 print "##############################################################################" 00254 engine = TaskEngine("TestEngine") 00255 engine.start() 00256 time.sleep(0.1) # Let it start 00257 engine.spawn(_unbound2) 00258 time.sleep(0.1) # Let it run 00259 assert engine.isAlive(), "TaskEngine died from exception" 00260 assert status == False, "Exception didn't abort task as expected" 00261 engine.shutdown() 00262 time.sleep(.1) 00263 00264 def checkSelfShutdown(self): 00265 """Test that attempts to shut down an engine via a task fails""" 00266 print "" # Expecting an exception: add space for readability 00267 print "##############################################################################" 00268 print "# #" 00269 print "# This test should cause a 'Trapped exception from spawned task' exception #" 00270 print "# #" 00271 print "##############################################################################" 00272 engine = TaskEngine("TestEngine") 00273 engine.start() 00274 time.sleep(0.1) # Let it start 00275 engine.spawn(engine.shutdown) 00276 time.sleep(0.1) # Let it run 00277 assert engine.isAlive(), "TaskEngine died from exception" 00278 engine.shutdown() 00279 time.sleep(.1) 00280 00281 def checkShutdown1(self): 00282 """ 00283 Test that attempting to shut down an engine from an unrelated thread 00284 succeeds 00285 """ 00286 engine = TaskEngine("TestEngine") 00287 engine.start() 00288 time.sleep(0.1) # Let it start 00289 status = engine.shutdown() 00290 time.sleep(0.1) # Let it shut down 00291 assert not engine.isAlive(), "TaskEngine didn't shut down" 00292 assert status == 0, "Bad status %d returned from shutdown()" % (status) 00293 00294 def checkShutdown2(self): 00295 """Test that attempts to shut down an engine that wasn't started fails""" 00296 print "" 00297 print "#############################################################################" 00298 print "# #" 00299 print "# This test should cause a 'Task engine unexpectedly disappeared' warning #" 00300 print "# #" 00301 print "#############################################################################" 00302 engine = TaskEngine("TestEngine") 00303 status = engine.shutdown() 00304 assert status != 0, "TaskEngine didn't shut down as expected" 00305 00306 def _unbound1(a, b, c, x, y, z): 00307 results[0] = a 00308 results[1] = b 00309 results[2] = c 00310 results[3] = x 00311 results[4] = y 00312 results[5] = z 00313 00314 def _doException(): 00315 raise RuntimeError, "Raising RuntimeError to test exception handling" 00316 return True 00317 00318 def _unbound2(): 00319 global status 00320 status = False 00321 status = _doException() 00322 00323 def suite(): 00324 loader = unittest.TestLoader() 00325 testSuite = loader.loadTestsFromNames( 00326 ["checkInstantiation", "checkStartup", "checkSpawn", "checkUnboundSpawn", 00327 "checkException1", "checkException2", "checkSelfShutdown", 00328 "checkShutdown1", "checkShutdown2"], TaskEngineTestCase) 00329 return testSuite 00330 00331 def test(): 00332 print "########################################################" 00333 print "# #" 00334 print "# Exceptions are expected from this test. These can #" 00335 print "# be ignored if the test completes with status 'OK' #" 00336 print "# #" 00337 print "########################################################" 00338 print "" 00339 00340 # Initialize logging 00341 log.basicConfig() 00342 00343 runner = unittest.TextTestRunner() 00344 runner.run(suite()) 00345 00346 # Shut down the logger 00347 log.shutdown() 00348 00349 # Now run the tests 00350 test() 00351 00352 00353 #-------------------------------------------------------------------------------