taskEngine.py

Go to the documentation of this file.
00001 #!/usr/bin/env python
00002 #
00003 #                               Copyright 2005
00004 #                                     by
00005 #                        The Board of Trustees of the
00006 #                     Leland Stanford Junior University.
00007 #                            All rights reserved.
00008 #
00009 
00010 __facility__ = "Online"
00011 __abstract__ = "GLAST LAT Task Engine class"
00012 __author__   = "R. Claus <Claus@SLAC.Stanford.edu> SLAC - GLAST LAT I&T/Online"
00013 __date__     = "2004/03/09 00:00:00"
00014 __updated__  = "$Date: 2005/09/22 00:29:31 $"
00015 __version__  = "$Revision: 1.5 $"
00016 __release__  = "$Name: HEAD $"
00017 __credits__  = "SLAC"
00018 
00019 import LICOS.copyright_SLAC
00020 
00021 from   Queue         import Queue
00022 from   threading     import Thread, currentThread
00023 import logging       as     log
00024 
00025 
00026 class TaskEngine(Thread):
00027   """!\brief A class for setting up and queueing tasks to a task engine.
00028 
00029   A task engine is a thread that polls a task queue and runs tasks found on
00030   the queue.  Tasks are functions or class methods accompanied with their
00031   arguments and keyword arguments.  Each task must run to completion before
00032   the next task is pulled from the queue.  Returning a return value from the
00033   task to the caller is currently not supported (easy to add, if the need
00034   arises).  The queue is a FIFO queue and there is currently no way to jam
00035   a task to the head of the queue.
00036   """
00037   def __init__(self, name = None):
00038     Thread.__init__(self)
00039 
00040     if name is None:
00041       name = self.__class__.__name__
00042     self.__name  = name
00043     self.__queue = Queue()
00044 
00045   def name(self):
00046     """!\brief Method for querying the task engine's name
00047 
00048     \return The task engine's name
00049     """
00050     return self.__name
00051 
00052   def spawn(self, task, *args, **kwargs):
00053     """!\brief Method used to queue a task to the task engine.
00054 
00055     \param task    The task routine to queue
00056     \param args    The arguments for the task routine
00057     \param kwargs  The keyword arguments for the task routine
00058     """
00059     assert self.__queue is not None, (self.name + ".spawn: " + \
00060                                       "TaskEngine.__init__() was not called")
00061     self.__queue.put((task,) + args + (kwargs,))
00062 
00063   def run(self):
00064     """!\brief The task engine main routine.
00065 
00066     This method is invoked by the Thread class and should be considered
00067     'protected'.  Do not call it.
00068     """
00069     self.setName(self.name())
00070 
00071     while True:
00072       taskArgsKwargs = self.__queue.get(True) # Blocks
00073       if taskArgsKwargs[0] is None:  break    # Quit signal is None
00074 
00075       try:
00076         taskArgsKwargs[0](*taskArgsKwargs[1:-1], **taskArgsKwargs[-1])
00077       except:
00078         import types
00079         if isinstance(taskArgsKwargs[0], types.MethodType):
00080           log.exception(self.name()                               + \
00081                         ": Trapped exception from spawned task "  + \
00082                         taskArgsKwargs[0].im_class.__name__ + '.' + \
00083                         taskArgsKwargs[0].__name__ + '()')
00084         else:
00085           log.exception(self.name()                               + \
00086                         ": Trapped exception from spawned task "  + \
00087                         taskArgsKwargs[0].__name__ + '()')
00088 
00089       # Drop references to this object to allow it to be garbage collected
00090       taskArgsKwargs = None
00091 
00092     log.info(self.name() + ": terminating")
00093 
00094   def shutdown(self, tmo = 2.0):
00095     """!\brief Method for shutting down the task engine
00096 
00097     This method blocks (with a timeout) until the task engine has exited.
00098     It makes no sense to spawn this method to the task engine since it waits
00099     for the task engine to exit.
00100 
00101     \param tmo  The amount of time (seconds) to wait for the task engine to exit
00102     \return  0 for success,
00103            -1 if the task engine didn't exit,
00104            -2 if the task engine exited prematurely
00105     """
00106     # This method can not be called from the taskEngine thread on purpose.
00107     # If it could, there would be no way for the instigator to synchronize.
00108     if currentThread() is self:
00109       raise RuntimeError, "A task engine can't be shut down by its own thread"
00110     if self.isAlive():                  # Thread still there or did it crash?
00111       self.spawn(None)                  # Signal to quit
00112       self.join(tmo)                    # Let engine exit
00113       if self.isAlive():                # Timed out?
00114         log.error(self.name() + ".shutdown: " + \
00115                   "Task engine did not exit when expected")
00116         return -1
00117     else:
00118       log.warn(self.name() + ":.shutdown: " + \
00119                "Task engine unexpectedly disappeared")
00120       return -2
00121 
00122     return 0
00123 
00124 
00125 #-------------------------------------------------------------------------------
00126 # Unit testing follows:
00127 
00128 if __debug__ and __name__ == '__main__':
00129   import unittest
00130   import time
00131 
00132   results = [None, None, None, None, None, None]
00133   status  = None
00134 
00135   class TaskEngineTestCase(unittest.TestCase):
00136     def __init__(self, arg):
00137       unittest.TestCase.__init__(self, arg)
00138 
00139     def checkInstantiation(self):
00140       """!\brief Test class instantiation.
00141 
00142       """
00143       engine = None
00144       engine = TaskEngine("TestEngine")
00145       assert engine != None, "Could not instantiate a TaskEngine"
00146 
00147     def checkStartup(self):
00148       """!\brief Test whether the thread starts up.
00149 
00150       """
00151       engine = TaskEngine("TestEngine")
00152       engine.start()
00153       time.sleep(0.1)                   # Let it start
00154       assert engine.isAlive(), "Could not start TaskEngine"
00155       engine.shutdown()
00156       time.sleep(.1)
00157 
00158     def __task_1(self, a, b, c, x, y, z):
00159       self.__a = a
00160       self.__b = b
00161       self.__c = c
00162       self.__x = x
00163       self.__y = y
00164       self.__z = z
00165 
00166     def checkSpawn(self):
00167       """!\brief Test whether we can spawn a class method task to the engine and
00168       have the arguments handled correctly.
00169 
00170       """
00171       self.__a = None
00172       self.__b = None
00173       self.__c = None
00174       self.__x = None
00175       self.__y = None
00176       self.__z = None
00177 
00178       a = "Hello world"
00179       b = 3.1415926
00180       c = 0xabadcafeL
00181       engine = TaskEngine("TestEngine")
00182       engine.start()
00183       time.sleep(0.1)                   # Let it start
00184       engine.spawn(self.__task_1, a, b, c, z = a, x = b, y = c)
00185       time.sleep(0.1)                   # Let it run
00186       assert engine.isAlive(), "TaskEngine prematurely disappeared"
00187       assert self.__a is a, "TaskEngine did not correctly handle args[0]"
00188       assert self.__b is b, "TaskEngine did not correctly handle args[1]"
00189       assert self.__c is c, "TaskEngine did not correctly handle args[2]"
00190       assert self.__x is b, "TaskEngine did not correctly handle kwargs[0]"
00191       assert self.__y is c, "TaskEngine did not correctly handle kwargs[1]"
00192       assert self.__z is a, "TaskEngine did not correctly handle kwargs[2]"
00193       engine.shutdown()
00194       time.sleep(.1)
00195 
00196     def checkUnboundSpawn(self):
00197       """!\brief Test whether we can spawn an unbound function task to the engine and
00198       have the arguments handled correctly.
00199 
00200       """
00201       a = "Test of an unbound function"
00202       b = 2.718281828
00203       c = 0xdeadbeefL
00204 
00205       engine = TaskEngine("TestEngine")
00206       engine.start()
00207       time.sleep(0.1)                   # Let it start
00208       engine.spawn(_unbound1, a, b, c, y = a, z = b, x = c)
00209       time.sleep(0.1)                   # Let it run
00210       assert engine.isAlive(), "TaskEngine prematurely disappeared"
00211       assert results[0] is a, "TaskEngine did not correctly handle args[0]"
00212       assert results[1] is b, "TaskEngine did not correctly handle args[1]"
00213       assert results[2] is c, "TaskEngine did not correctly handle args[2]"
00214       assert results[3] is c, "TaskEngine did not correctly handle kwargs[0]"
00215       assert results[4] is a, "TaskEngine did not correctly handle kwargs[1]"
00216       assert results[5] is b, "TaskEngine did not correctly handle kwargs[2]"
00217       engine.shutdown()
00218       time.sleep(.1)
00219 
00220     def __doException(self):  # 2nd level to give traceback some depth
00221       raise RuntimeError, "Raising RuntimeError to test exception handling"
00222       return True
00223 
00224     def __task_2(self):
00225       self.__status = False
00226       self.__status = self.__doException()
00227 
00228     def checkException1(self):
00229       """!\brief Test that the engine doesn't die if a class method task throws an
00230       exception.
00231 
00232       """
00233       print ""
00234       print "##############################################################################"
00235       print "#                                                                            #"
00236       print "#  This test should cause a 'Trapped exception from spawned task' exception  #"
00237       print "#                                                                            #"
00238       print "##############################################################################"
00239       engine = TaskEngine("TestEngine")
00240       engine.start()
00241       time.sleep(0.1)                   # Let it start
00242       engine.spawn(self.__task_2)
00243       time.sleep(0.1)                   # Let it run
00244       assert engine.isAlive(), "TaskEngine died from exception"
00245       assert self.__status == False, "Exception didn't abort task as expected"
00246       engine.shutdown()
00247       time.sleep(.1)
00248 
00249     def checkException2(self):
00250       """!\brief Test that the engine doesn't die if an unbound function task throws an
00251       exception.
00252 
00253       """
00254       print ""     # Expecting an exception: add space for readability
00255       print "##############################################################################"
00256       print "#                                                                            #"
00257       print "#  This test should cause a 'Trapped exception from spawned task' exception  #"
00258       print "#                                                                            #"
00259       print "##############################################################################"
00260       engine = TaskEngine("TestEngine")
00261       engine.start()
00262       time.sleep(0.1)                   # Let it start
00263       engine.spawn(_unbound2)
00264       time.sleep(0.1)                   # Let it run
00265       assert engine.isAlive(), "TaskEngine died from exception"
00266       assert status == False, "Exception didn't abort task as expected"
00267       engine.shutdown()
00268       time.sleep(.1)
00269 
00270     def checkSelfShutdown(self):
00271       """!\brief Test that attempts to shut down an engine via a task fails.
00272 
00273       """
00274       print ""     # Expecting an exception: add space for readability
00275       print "##############################################################################"
00276       print "#                                                                            #"
00277       print "#  This test should cause a 'Trapped exception from spawned task' exception  #"
00278       print "#                                                                            #"
00279       print "##############################################################################"
00280       engine = TaskEngine("TestEngine")
00281       engine.start()
00282       time.sleep(0.1)                   # Let it start
00283       engine.spawn(engine.shutdown)
00284       time.sleep(0.1)                   # Let it run
00285       assert engine.isAlive(), "TaskEngine died from exception"
00286       engine.shutdown()
00287       time.sleep(.1)
00288 
00289     def checkShutdown1(self):
00290       """!\brief Test that attempting to shut down an engine from an unrelated thread
00291       succeeds.
00292 
00293       """
00294       engine = TaskEngine("TestEngine")
00295       engine.start()
00296       time.sleep(0.1)                   # Let it start
00297       status = engine.shutdown()
00298       time.sleep(0.1)                   # Let it shut down
00299       assert not engine.isAlive(), "TaskEngine didn't shut down"
00300       assert status == 0, "Bad status %d returned from shutdown()" % (status)
00301 
00302     def checkShutdown2(self):
00303       """!\brief Test that attempts to shut down an engine that wasn't started fails.
00304 
00305       """
00306       print ""
00307       print "#############################################################################"
00308       print "#                                                                           #"
00309       print "#  This test should cause a 'Task engine unexpectedly disappeared' warning  #"
00310       print "#                                                                           #"
00311       print "#############################################################################"
00312       engine = TaskEngine("TestEngine")
00313       status = engine.shutdown()
00314       assert status != 0, "TaskEngine didn't shut down as expected"
00315 
00316   def _unbound1(a, b, c, x, y, z):
00317     results[0] = a
00318     results[1] = b
00319     results[2] = c
00320     results[3] = x
00321     results[4] = y
00322     results[5] = z
00323 
00324   def _doException():
00325     raise RuntimeError, "Raising RuntimeError to test exception handling"
00326     return True
00327 
00328   def _unbound2():
00329     global status
00330     status = False
00331     status = _doException()
00332 
00333   def suite():
00334     loader = unittest.TestLoader()
00335     testSuite = loader.loadTestsFromNames(
00336       ["checkInstantiation", "checkStartup", "checkSpawn", "checkUnboundSpawn",
00337        "checkException1", "checkException2", "checkSelfShutdown",
00338        "checkShutdown1", "checkShutdown2"], TaskEngineTestCase)
00339     return testSuite
00340 
00341   def test():
00342     print "########################################################"
00343     print "#                                                      #"
00344     print "#  Exceptions are expected from this test.  These can  #"
00345     print "#  be ignored if the test completes with status 'OK'   #"
00346     print "#                                                      #"
00347     print "########################################################"
00348     print ""
00349 
00350     # Initialize logging
00351     log.basicConfig()
00352 
00353     runner = unittest.TextTestRunner()
00354     runner.run(suite())
00355 
00356     # Shut down the logger
00357     log.shutdown()
00358 
00359   # Now run the tests
00360   test()
00361 
00362 
00363 #-------------------------------------------------------------------------------

Generated on Thu Apr 27 20:52:43 2006 for LICOS L02-01-00 by doxygen 1.4.6-NO