Main Page | Packages | Class Hierarchy | Alphabetical List | Class List | Directories | File List | Namespace Members | Class Members | Related Pages

taskEngine.py

00001 #!/usr/bin/env python
00002 #
00003 #                               Copyright 2004
00004 #                                     by
00005 #                        The Board of Trustees of the
00006 #                     Leland Stanford Junior University.
00007 #                            All rights reserved.
00008 #
00009 
00010 __facility__ = "Online"
00011 __abstract__ = "GLAST LAT Task Engine class"
00012 __author__   = "R. Claus <Claus@SLAC.Stanford.edu> SLAC - GLAST LAT I&T/Online"
00013 __date__     = "1/21/2004"
00014 __version__  = "$Revision: 2.2 $"
00015 __credits__  = "SLAC"
00016 
00017 import LATTE.copyright_SLAC
00018 
00019 from   Queue         import Queue
00020 from   threading     import Thread, currentThread
00021 import logging       as     log
00022 
00023 
00024 class TaskEngine(Thread):
00025   """A class for setting up and queueing tasks to a task engine.
00026 
00027   A task engine is a thread that polls a task queue and runs tasks found on
00028   the queue.  Tasks are functions or class methods accompanied with their
00029   arguments and keyword arguments.  Each task must run to completion before
00030   the next task is pulled from the queue.  Returning a return value from the
00031   task to the caller is currently not supported (easy to add, if the need
00032   arises).  The queue is a FIFO queue and there is currently no way to jam
00033   a task to the head of the queue.
00034   """
00035   def __init__(self, name = None):
00036     Thread.__init__(self)
00037 
00038     if name is None:
00039       name = self.__class__.__name__
00040     self.__name  = name
00041     self.__queue = Queue()
00042 
00043   def name(self):
00044     """\brief Method for querying the task engine's name
00045 
00046     \return The task engine's name
00047     """
00048     return self.__name
00049 
00050   def spawn(self, task, *args, **kwargs):
00051     """\brief Method used to queue a task to the task engine.
00052 
00053     \param task    The task routine to queue
00054     \param args    The arguments for the task routine
00055     \param kwargs  The keyword arguments for the task routine
00056     """
00057     assert self.__queue is not None, (self.name + ".spawn: " + \
00058                                       "TaskEngine.__init__() was not called")
00059     self.__queue.put((task,) + args + (kwargs,))
00060 
00061   def run(self):
00062     """\brief The task engine main routine.
00063 
00064     This method is invoked by the Thread class and should be considered
00065     'protected'.  Do not call it.
00066     """
00067     self.setName(self.name())
00068 
00069     while True:
00070       taskArgsKwargs = self.__queue.get(True) # Blocks
00071       if taskArgsKwargs[0] is None:  break    # Quit signal is None
00072 
00073       try:
00074         taskArgsKwargs[0](*taskArgsKwargs[1:-1], **taskArgsKwargs[-1])
00075       except:
00076         import types
00077         if isinstance(taskArgsKwargs[0], types.MethodType):
00078           log.exception(self.name()                               + \
00079                         ": Trapped exception from spawned task "  + \
00080                         taskArgsKwargs[0].im_class.__name__ + '.' + \
00081                         taskArgsKwargs[0].__name__ + '()')
00082         else:
00083           log.exception(self.name()                               + \
00084                         ": Trapped exception from spawned task "  + \
00085                         taskArgsKwargs[0].__name__ + '()')
00086 
00087       # Drop references to this object to allow it to be garbage collected
00088       taskArgsKwargs = None
00089 
00090     log.info(self.name() + ": terminating")
00091 
00092   def shutdown(self, tmo = 2.0):
00093     """\brief Method for shutting down the task engine
00094 
00095     This method blocks (with a timeout) until the task engine has exited.
00096     It makes no sense to spawn this method to the task engine since it waits
00097     for the task engine to exit.
00098 
00099     \param tmo  The amount of time (seconds) to wait for the task engine to exit
00100     \return  0 for success,
00101            -1 if the task engine didn't exit,
00102            -2 if the task engine exited prematurely
00103     """
00104     # This method can not be called from the taskEngine thread on purpose.
00105     # If it could, there would be no way for the instigator to synchronize.
00106     if currentThread() is self:
00107       raise RuntimeError, "A task engine can't be shut down by its own thread"
00108     if self.isAlive():                  # Thread still there or did it crash?
00109       self.spawn(None)                  # Signal to quit
00110       self.join(tmo)                    # Let engine exit
00111       if self.isAlive():                # Timed out?
00112         log.error(self.name() + ".shutdown: " + \
00113                   "Task engine did not exit when expected")
00114         return -1
00115     else:
00116       log.warn(self.name() + ":.shutdown: " + \
00117                "Task engine unexpectedly disappeared")
00118       return -2
00119 
00120     return 0
00121 
00122 
00123 #-------------------------------------------------------------------------------
00124 # Unit testing follows:
00125 
00126 if __debug__ and __name__ == '__main__':
00127   import unittest
00128   import time
00129 
00130   results = [None, None, None, None, None, None]
00131   status  = None
00132 
00133   class TaskEngineTestCase(unittest.TestCase):
00134     def __init__(self, arg):
00135       unittest.TestCase.__init__(self, arg)
00136 
00137     def checkInstantiation(self):
00138       """Test class instantiation"""
00139       engine = None
00140       engine = TaskEngine("TestEngine")
00141       assert engine != None, "Could not instantiate a TaskEngine"
00142 
00143     def checkStartup(self):
00144       """Test whether the thread starts up"""
00145       engine = TaskEngine("TestEngine")
00146       engine.start()
00147       time.sleep(0.1)                   # Let it start
00148       assert engine.isAlive(), "Could not start TaskEngine"
00149       engine.shutdown()
00150       time.sleep(.1)
00151 
00152     def __task_1(self, a, b, c, x, y, z):
00153       self.__a = a
00154       self.__b = b
00155       self.__c = c
00156       self.__x = x
00157       self.__y = y
00158       self.__z = z
00159 
00160     def checkSpawn(self):
00161       """
00162       Test whether we can spawn a class method task to the engine and
00163       have the arguments handled correctly
00164       """
00165       self.__a = None
00166       self.__b = None
00167       self.__c = None
00168       self.__x = None
00169       self.__y = None
00170       self.__z = None
00171 
00172       a = "Hello world"
00173       b = 3.1415926
00174       c = 0xabadcafeL
00175       engine = TaskEngine("TestEngine")
00176       engine.start()
00177       time.sleep(0.1)                   # Let it start
00178       engine.spawn(self.__task_1, a, b, c, z = a, x = b, y = c)
00179       time.sleep(0.1)                   # Let it run
00180       assert engine.isAlive(), "TaskEngine prematurely disappeared"
00181       assert self.__a is a, "TaskEngine did not correctly handle args[0]"
00182       assert self.__b is b, "TaskEngine did not correctly handle args[1]"
00183       assert self.__c is c, "TaskEngine did not correctly handle args[2]"
00184       assert self.__x is b, "TaskEngine did not correctly handle kwargs[0]"
00185       assert self.__y is c, "TaskEngine did not correctly handle kwargs[1]"
00186       assert self.__z is a, "TaskEngine did not correctly handle kwargs[2]"
00187       engine.shutdown()
00188       time.sleep(.1)
00189 
00190     def checkUnboundSpawn(self):
00191       """
00192       Test whether we can spawn an unbound function task to the engine and
00193       have the arguments handled correctly
00194       """
00195       a = "Test of an unbound function"
00196       b = 2.718281828
00197       c = 0xdeadbeefL
00198 
00199       engine = TaskEngine("TestEngine")
00200       engine.start()
00201       time.sleep(0.1)                   # Let it start
00202       engine.spawn(_unbound1, a, b, c, y = a, z = b, x = c)
00203       time.sleep(0.1)                   # Let it run
00204       assert engine.isAlive(), "TaskEngine prematurely disappeared"
00205       assert results[0] is a, "TaskEngine did not correctly handle args[0]"
00206       assert results[1] is b, "TaskEngine did not correctly handle args[1]"
00207       assert results[2] is c, "TaskEngine did not correctly handle args[2]"
00208       assert results[3] is c, "TaskEngine did not correctly handle kwargs[0]"
00209       assert results[4] is a, "TaskEngine did not correctly handle kwargs[1]"
00210       assert results[5] is b, "TaskEngine did not correctly handle kwargs[2]"
00211       engine.shutdown()
00212       time.sleep(.1)
00213 
00214     def __doException(self):  # 2nd level to give traceback some depth
00215       raise RuntimeError, "Raising RuntimeError to test exception handling"
00216       return True
00217 
00218     def __task_2(self):
00219       self.__status = False
00220       self.__status = self.__doException()
00221 
00222     def checkException1(self):
00223       """
00224       Test that the engine doesn't die if a class method task throws an
00225       exception
00226       """
00227       print ""
00228       print "##############################################################################"
00229       print "#                                                                            #"
00230       print "#  This test should cause a 'Trapped exception from spawned task' exception  #"
00231       print "#                                                                            #"
00232       print "##############################################################################"
00233       engine = TaskEngine("TestEngine")
00234       engine.start()
00235       time.sleep(0.1)                   # Let it start
00236       engine.spawn(self.__task_2)
00237       time.sleep(0.1)                   # Let it run
00238       assert engine.isAlive(), "TaskEngine died from exception"
00239       assert self.__status == False, "Exception didn't abort task as expected"
00240       engine.shutdown()
00241       time.sleep(.1)
00242 
00243     def checkException2(self):
00244       """
00245       Test that the engine doesn't die if an unbound function task throws an
00246       exception
00247       """
00248       print ""     # Expecting an exception: add space for readability
00249       print "##############################################################################"
00250       print "#                                                                            #"
00251       print "#  This test should cause a 'Trapped exception from spawned task' exception  #"
00252       print "#                                                                            #"
00253       print "##############################################################################"
00254       engine = TaskEngine("TestEngine")
00255       engine.start()
00256       time.sleep(0.1)                   # Let it start
00257       engine.spawn(_unbound2)
00258       time.sleep(0.1)                   # Let it run
00259       assert engine.isAlive(), "TaskEngine died from exception"
00260       assert status == False, "Exception didn't abort task as expected"
00261       engine.shutdown()
00262       time.sleep(.1)
00263 
00264     def checkSelfShutdown(self):
00265       """Test that attempts to shut down an engine via a task fails"""
00266       print ""     # Expecting an exception: add space for readability
00267       print "##############################################################################"
00268       print "#                                                                            #"
00269       print "#  This test should cause a 'Trapped exception from spawned task' exception  #"
00270       print "#                                                                            #"
00271       print "##############################################################################"
00272       engine = TaskEngine("TestEngine")
00273       engine.start()
00274       time.sleep(0.1)                   # Let it start
00275       engine.spawn(engine.shutdown)
00276       time.sleep(0.1)                   # Let it run
00277       assert engine.isAlive(), "TaskEngine died from exception"
00278       engine.shutdown()
00279       time.sleep(.1)
00280 
00281     def checkShutdown1(self):
00282       """
00283       Test that attempting to shut down an engine from an unrelated thread
00284       succeeds
00285       """
00286       engine = TaskEngine("TestEngine")
00287       engine.start()
00288       time.sleep(0.1)                   # Let it start
00289       status = engine.shutdown()
00290       time.sleep(0.1)                   # Let it shut down
00291       assert not engine.isAlive(), "TaskEngine didn't shut down"
00292       assert status == 0, "Bad status %d returned from shutdown()" % (status)
00293 
00294     def checkShutdown2(self):
00295       """Test that attempts to shut down an engine that wasn't started fails"""
00296       print ""
00297       print "#############################################################################"
00298       print "#                                                                           #"
00299       print "#  This test should cause a 'Task engine unexpectedly disappeared' warning  #"
00300       print "#                                                                           #"
00301       print "#############################################################################"
00302       engine = TaskEngine("TestEngine")
00303       status = engine.shutdown()
00304       assert status != 0, "TaskEngine didn't shut down as expected"
00305 
00306   def _unbound1(a, b, c, x, y, z):
00307     results[0] = a
00308     results[1] = b
00309     results[2] = c
00310     results[3] = x
00311     results[4] = y
00312     results[5] = z
00313 
00314   def _doException():
00315     raise RuntimeError, "Raising RuntimeError to test exception handling"
00316     return True
00317 
00318   def _unbound2():
00319     global status
00320     status = False
00321     status = _doException()
00322 
00323   def suite():
00324     loader = unittest.TestLoader()
00325     testSuite = loader.loadTestsFromNames(
00326       ["checkInstantiation", "checkStartup", "checkSpawn", "checkUnboundSpawn",
00327        "checkException1", "checkException2", "checkSelfShutdown",
00328        "checkShutdown1", "checkShutdown2"], TaskEngineTestCase)
00329     return testSuite
00330 
00331   def test():
00332     print "########################################################"
00333     print "#                                                      #"
00334     print "#  Exceptions are expected from this test.  These can  #"
00335     print "#  be ignored if the test completes with status 'OK'   #"
00336     print "#                                                      #"
00337     print "########################################################"
00338     print ""
00339 
00340     # Initialize logging
00341     log.basicConfig()
00342 
00343     runner = unittest.TextTestRunner()
00344     runner.run(suite())
00345 
00346     # Shut down the logger
00347     log.shutdown()
00348 
00349   # Now run the tests
00350   test()
00351 
00352 
00353 #-------------------------------------------------------------------------------

Generated on Fri Jul 21 13:26:32 2006 for LATTE R04-12-00 by doxygen 1.4.3