00001
00002
00003
00004
00005
00006
00007
00008
00009
00010 __facility__ = "Online"
00011 __abstract__ = "GLAST LAT Task Engine class"
00012 __author__ = "R. Claus <Claus@SLAC.Stanford.edu> SLAC - GLAST LAT I&T/Online"
00013 __date__ = "2004/03/09 00:00:00"
00014 __updated__ = "$Date: 2005/09/22 00:29:31 $"
00015 __version__ = "$Revision: 1.5 $"
00016 __release__ = "$Name: HEAD $"
00017 __credits__ = "SLAC"
00018
00019 import LICOS.copyright_SLAC
00020
00021 from Queue import Queue
00022 from threading import Thread, currentThread
00023 import logging as log
00024
00025
00026 class TaskEngine(Thread):
00027 """!\brief A class for setting up and queueing tasks to a task engine.
00028
00029 A task engine is a thread that polls a task queue and runs tasks found on
00030 the queue. Tasks are functions or class methods accompanied with their
00031 arguments and keyword arguments. Each task must run to completion before
00032 the next task is pulled from the queue. Returning a return value from the
00033 task to the caller is currently not supported (easy to add, if the need
00034 arises). The queue is a FIFO queue and there is currently no way to jam
00035 a task to the head of the queue.
00036 """
00037 def __init__(self, name = None):
00038 Thread.__init__(self)
00039
00040 if name is None:
00041 name = self.__class__.__name__
00042 self.__name = name
00043 self.__queue = Queue()
00044
00045 def name(self):
00046 """!\brief Method for querying the task engine's name
00047
00048 \return The task engine's name
00049 """
00050 return self.__name
00051
00052 def spawn(self, task, *args, **kwargs):
00053 """!\brief Method used to queue a task to the task engine.
00054
00055 \param task The task routine to queue
00056 \param args The arguments for the task routine
00057 \param kwargs The keyword arguments for the task routine
00058 """
00059 assert self.__queue is not None, (self.name + ".spawn: " + \
00060 "TaskEngine.__init__() was not called")
00061 self.__queue.put((task,) + args + (kwargs,))
00062
00063 def run(self):
00064 """!\brief The task engine main routine.
00065
00066 This method is invoked by the Thread class and should be considered
00067 'protected'. Do not call it.
00068 """
00069 self.setName(self.name())
00070
00071 while True:
00072 taskArgsKwargs = self.__queue.get(True)
00073 if taskArgsKwargs[0] is None: break
00074
00075 try:
00076 taskArgsKwargs[0](*taskArgsKwargs[1:-1], **taskArgsKwargs[-1])
00077 except:
00078 import types
00079 if isinstance(taskArgsKwargs[0], types.MethodType):
00080 log.exception(self.name() + \
00081 ": Trapped exception from spawned task " + \
00082 taskArgsKwargs[0].im_class.__name__ + '.' + \
00083 taskArgsKwargs[0].__name__ + '()')
00084 else:
00085 log.exception(self.name() + \
00086 ": Trapped exception from spawned task " + \
00087 taskArgsKwargs[0].__name__ + '()')
00088
00089
00090 taskArgsKwargs = None
00091
00092 log.info(self.name() + ": terminating")
00093
00094 def shutdown(self, tmo = 2.0):
00095 """!\brief Method for shutting down the task engine
00096
00097 This method blocks (with a timeout) until the task engine has exited.
00098 It makes no sense to spawn this method to the task engine since it waits
00099 for the task engine to exit.
00100
00101 \param tmo The amount of time (seconds) to wait for the task engine to exit
00102 \return 0 for success,
00103 -1 if the task engine didn't exit,
00104 -2 if the task engine exited prematurely
00105 """
00106
00107
00108 if currentThread() is self:
00109 raise RuntimeError, "A task engine can't be shut down by its own thread"
00110 if self.isAlive():
00111 self.spawn(None)
00112 self.join(tmo)
00113 if self.isAlive():
00114 log.error(self.name() + ".shutdown: " + \
00115 "Task engine did not exit when expected")
00116 return -1
00117 else:
00118 log.warn(self.name() + ":.shutdown: " + \
00119 "Task engine unexpectedly disappeared")
00120 return -2
00121
00122 return 0
00123
00124
00125
00126
00127
00128 if __debug__ and __name__ == '__main__':
00129 import unittest
00130 import time
00131
00132 results = [None, None, None, None, None, None]
00133 status = None
00134
00135 class TaskEngineTestCase(unittest.TestCase):
00136 def __init__(self, arg):
00137 unittest.TestCase.__init__(self, arg)
00138
00139 def checkInstantiation(self):
00140 """!\brief Test class instantiation.
00141
00142 """
00143 engine = None
00144 engine = TaskEngine("TestEngine")
00145 assert engine != None, "Could not instantiate a TaskEngine"
00146
00147 def checkStartup(self):
00148 """!\brief Test whether the thread starts up.
00149
00150 """
00151 engine = TaskEngine("TestEngine")
00152 engine.start()
00153 time.sleep(0.1)
00154 assert engine.isAlive(), "Could not start TaskEngine"
00155 engine.shutdown()
00156 time.sleep(.1)
00157
00158 def __task_1(self, a, b, c, x, y, z):
00159 self.__a = a
00160 self.__b = b
00161 self.__c = c
00162 self.__x = x
00163 self.__y = y
00164 self.__z = z
00165
00166 def checkSpawn(self):
00167 """!\brief Test whether we can spawn a class method task to the engine and
00168 have the arguments handled correctly.
00169
00170 """
00171 self.__a = None
00172 self.__b = None
00173 self.__c = None
00174 self.__x = None
00175 self.__y = None
00176 self.__z = None
00177
00178 a = "Hello world"
00179 b = 3.1415926
00180 c = 0xabadcafeL
00181 engine = TaskEngine("TestEngine")
00182 engine.start()
00183 time.sleep(0.1)
00184 engine.spawn(self.__task_1, a, b, c, z = a, x = b, y = c)
00185 time.sleep(0.1)
00186 assert engine.isAlive(), "TaskEngine prematurely disappeared"
00187 assert self.__a is a, "TaskEngine did not correctly handle args[0]"
00188 assert self.__b is b, "TaskEngine did not correctly handle args[1]"
00189 assert self.__c is c, "TaskEngine did not correctly handle args[2]"
00190 assert self.__x is b, "TaskEngine did not correctly handle kwargs[0]"
00191 assert self.__y is c, "TaskEngine did not correctly handle kwargs[1]"
00192 assert self.__z is a, "TaskEngine did not correctly handle kwargs[2]"
00193 engine.shutdown()
00194 time.sleep(.1)
00195
00196 def checkUnboundSpawn(self):
00197 """!\brief Test whether we can spawn an unbound function task to the engine and
00198 have the arguments handled correctly.
00199
00200 """
00201 a = "Test of an unbound function"
00202 b = 2.718281828
00203 c = 0xdeadbeefL
00204
00205 engine = TaskEngine("TestEngine")
00206 engine.start()
00207 time.sleep(0.1)
00208 engine.spawn(_unbound1, a, b, c, y = a, z = b, x = c)
00209 time.sleep(0.1)
00210 assert engine.isAlive(), "TaskEngine prematurely disappeared"
00211 assert results[0] is a, "TaskEngine did not correctly handle args[0]"
00212 assert results[1] is b, "TaskEngine did not correctly handle args[1]"
00213 assert results[2] is c, "TaskEngine did not correctly handle args[2]"
00214 assert results[3] is c, "TaskEngine did not correctly handle kwargs[0]"
00215 assert results[4] is a, "TaskEngine did not correctly handle kwargs[1]"
00216 assert results[5] is b, "TaskEngine did not correctly handle kwargs[2]"
00217 engine.shutdown()
00218 time.sleep(.1)
00219
00220 def __doException(self):
00221 raise RuntimeError, "Raising RuntimeError to test exception handling"
00222 return True
00223
00224 def __task_2(self):
00225 self.__status = False
00226 self.__status = self.__doException()
00227
00228 def checkException1(self):
00229 """!\brief Test that the engine doesn't die if a class method task throws an
00230 exception.
00231
00232 """
00233 print ""
00234 print "##############################################################################"
00235 print "# #"
00236 print "# This test should cause a 'Trapped exception from spawned task' exception #"
00237 print "# #"
00238 print "##############################################################################"
00239 engine = TaskEngine("TestEngine")
00240 engine.start()
00241 time.sleep(0.1)
00242 engine.spawn(self.__task_2)
00243 time.sleep(0.1)
00244 assert engine.isAlive(), "TaskEngine died from exception"
00245 assert self.__status == False, "Exception didn't abort task as expected"
00246 engine.shutdown()
00247 time.sleep(.1)
00248
00249 def checkException2(self):
00250 """!\brief Test that the engine doesn't die if an unbound function task throws an
00251 exception.
00252
00253 """
00254 print ""
00255 print "##############################################################################"
00256 print "# #"
00257 print "# This test should cause a 'Trapped exception from spawned task' exception #"
00258 print "# #"
00259 print "##############################################################################"
00260 engine = TaskEngine("TestEngine")
00261 engine.start()
00262 time.sleep(0.1)
00263 engine.spawn(_unbound2)
00264 time.sleep(0.1)
00265 assert engine.isAlive(), "TaskEngine died from exception"
00266 assert status == False, "Exception didn't abort task as expected"
00267 engine.shutdown()
00268 time.sleep(.1)
00269
00270 def checkSelfShutdown(self):
00271 """!\brief Test that attempts to shut down an engine via a task fails.
00272
00273 """
00274 print ""
00275 print "##############################################################################"
00276 print "# #"
00277 print "# This test should cause a 'Trapped exception from spawned task' exception #"
00278 print "# #"
00279 print "##############################################################################"
00280 engine = TaskEngine("TestEngine")
00281 engine.start()
00282 time.sleep(0.1)
00283 engine.spawn(engine.shutdown)
00284 time.sleep(0.1)
00285 assert engine.isAlive(), "TaskEngine died from exception"
00286 engine.shutdown()
00287 time.sleep(.1)
00288
00289 def checkShutdown1(self):
00290 """!\brief Test that attempting to shut down an engine from an unrelated thread
00291 succeeds.
00292
00293 """
00294 engine = TaskEngine("TestEngine")
00295 engine.start()
00296 time.sleep(0.1)
00297 status = engine.shutdown()
00298 time.sleep(0.1)
00299 assert not engine.isAlive(), "TaskEngine didn't shut down"
00300 assert status == 0, "Bad status %d returned from shutdown()" % (status)
00301
00302 def checkShutdown2(self):
00303 """!\brief Test that attempts to shut down an engine that wasn't started fails.
00304
00305 """
00306 print ""
00307 print "#############################################################################"
00308 print "# #"
00309 print "# This test should cause a 'Task engine unexpectedly disappeared' warning #"
00310 print "# #"
00311 print "#############################################################################"
00312 engine = TaskEngine("TestEngine")
00313 status = engine.shutdown()
00314 assert status != 0, "TaskEngine didn't shut down as expected"
00315
00316 def _unbound1(a, b, c, x, y, z):
00317 results[0] = a
00318 results[1] = b
00319 results[2] = c
00320 results[3] = x
00321 results[4] = y
00322 results[5] = z
00323
00324 def _doException():
00325 raise RuntimeError, "Raising RuntimeError to test exception handling"
00326 return True
00327
00328 def _unbound2():
00329 global status
00330 status = False
00331 status = _doException()
00332
00333 def suite():
00334 loader = unittest.TestLoader()
00335 testSuite = loader.loadTestsFromNames(
00336 ["checkInstantiation", "checkStartup", "checkSpawn", "checkUnboundSpawn",
00337 "checkException1", "checkException2", "checkSelfShutdown",
00338 "checkShutdown1", "checkShutdown2"], TaskEngineTestCase)
00339 return testSuite
00340
00341 def test():
00342 print "########################################################"
00343 print "# #"
00344 print "# Exceptions are expected from this test. These can #"
00345 print "# be ignored if the test completes with status 'OK' #"
00346 print "# #"
00347 print "########################################################"
00348 print ""
00349
00350
00351 log.basicConfig()
00352
00353 runner = unittest.TextTestRunner()
00354 runner.run(suite())
00355
00356
00357 log.shutdown()
00358
00359
00360 test()
00361
00362
00363