Main Page | Packages | Class Hierarchy | Alphabetical List | Class List | Directories | File List | Namespace Members | Class Members | Related Pages

geb.py

00001 #!/usr/local/bin/python
00002 #
00003 #                               Copyright 2006
00004 #                                     by
00005 #                        The Board of Trustees of the
00006 #                     Leland Stanford Junior University.
00007 #                            All rights reserved.
00008 #
00009 
00010 __facility__ = "Online"
00011 __abstract__ = "Generic event builder"
00012 __author__   = "R. Claus <Claus@SLAC.Stanford.edu> SLAC - GLAST LAT I&T/Online"
00013 __date__     = "2/27/2006"
00014 __updated__  = ("$Date: 2006/07/19 16:06:44 $").split(' ')[1]
00015 __version__  = "$Revision: 1.2 $"
00016 __release__  = "$Name: R04-12-00 $"
00017 __credits__  = "SLAC"
00018 
00019 import LATTE.copyright_SLAC
00020 
00021 from   Queue         import Queue
00022 from   threading     import Thread
00023 import time
00024 import struct
00025 
00026 import LDF
00027 
00028 
00029 class GebContribution(object):
00030   def __init__(self, contributor):
00031     self.__contributor = contributor
00032 
00033   def identity(self):
00034     """!\brief Method to return the identity (LDF TypeId) of a contribution to
00035     an event.  Meant to be overridden.
00036     """
00037     raise NotImplementedError, "identity() method must be implemented in a subclass"
00038 
00039   def length(self):
00040     """!\brief Method to return the length of a contribution to an event.
00041     Meant to be overridden.
00042     """
00043     raise NotImplementedError, "sequence() method must be implemented in a subclass"
00044 
00045   def sequence(self):
00046     """!\brief Method to return the sequence value of the contribution.
00047     Meant to be overridden.
00048 
00049     This method must return a value for which the equality and greater
00050     than operators are defined.  The sequence values must be common
00051     amoungst all contributors such that a if a contribution's sequence
00052     value is equal to another contributions sequence value, the two
00053     contributions belong together as part of the same event.  A
00054     contribution A's sequence value that is greater than another
00055     contribution B's sequence value is interpreted to mean that A is
00056     newer than B.  This information is used to flush out partially
00057     built older events.
00058     """
00059     raise NotImplementedError, "sequence() method must be implemented in a subclass"
00060 
00061   def contributor(self):
00062     return self.__contributor
00063 
00064 
00065 class GebContributor(Thread):
00066   # Set up one common blocking queue for all contributors
00067   __contributionQueue = Queue()
00068 
00069   def __init__(self):
00070     Thread.__init__(self)
00071     self.setName(self.name())
00072 
00073   def contributionQueue():
00074     """!\brief Method to get the common contribution queue.
00075 
00076     \returns A Queue object
00077     """
00078     return GebContributor.__contributionQueue
00079 
00080   contributionQueue = staticmethod(contributionQueue)
00081 
00082   def read(self):
00083     """!\brief Method to read a contribution from somewhere.
00084     Meant to be overridden.
00085     """
00086     raise NotImplementedError, "read() method must be implemented in a subclass"
00087 
00088   def name(self):
00089     """!\brief Method to return the name of the contribution type.
00090     Meant to be overridden.
00091     """
00092     raise NotImplementedError, "name() method must be implemented in a subclass"
00093 
00094   def sequence(self, sequence):
00095     """!\brief Optional method for remapping the ancillary contribution's
00096     sequence number into the same space as the other contributions' sequence
00097     numbers.  If this method is not overridden, the supplied sequence number is
00098     returned.
00099     """
00100     return sequence
00101 
00102   def open(self):
00103     """!\brief Method to open the contributor.
00104     Meant to be overridden.
00105     """
00106     raise NotImplementedError, "open() method must be implemented in a subclass"
00107 
00108   def close(self):
00109     """!\brief Optional method to close the contributor.
00110     Meant to be overridden.
00111     """
00112     pass
00113 
00114   def idMask(self, id = None):
00115     """!\brief Method for building the contributor mask
00116     """
00117     if id is not None:
00118       self.__idMask = id
00119     return self.__idMask
00120 
00121   def run(self):
00122     """!\brief Loop that continuously reads from the data source
00123     """
00124     contributionQueue = self.__contributionQueue
00125     reason = None
00126     try:
00127       while reason != Geb.Terminate:
00128         reason, status, contribution = self.read()
00129         contributionQueue.put((reason, status, contribution))
00130 
00131         # Allow other threads to run by forcing a reschedule
00132         time.sleep(0.0)
00133     except Exception, e:
00134       import logging
00135       logging.exception(self.name() + ": Trapped exception:\n" + str(e))
00136 
00137     print self.name() + ": terminating"
00138     import logging
00139     logging.info(self.name() + ": terminating")
00140 
00141 
00142 class Geb(Thread):
00143   Build     = 0
00144   DontBuild = 1
00145   Terminate = 2
00146   Sweep     = 3
00147   def __init__(self, contributors):
00148     Thread.__init__(self)
00149     self.setName(self.name())
00150     self.__idMask = 0L
00151     idMask        = 0x1L
00152     for contributor in contributors:
00153       # Make sure that the contributors were inherited from GebContributor
00154       if not isinstance(contributor, GebContributor):
00155         raise TypeError, "Contributor must be a subclass of GebContributor"
00156       self.__idMask |= contributor.idMask(idMask)
00157       idMask <<= 1
00158     self.__contributors      = contributors
00159     self.__contributionQueue = GebContributor.contributionQueue()
00160     self.__contributionsDict = {}
00161 
00162   def name(self):
00163     """!\brief Method to return the name of the event builder.
00164     May be overridden.
00165     """
00166     return "GEB"
00167 
00168   def shutdown(self):
00169     """!\brief Method to shutdown the builder
00170     """
00171     # Shutdown the contributors
00172     for contributor in self.__contributors:
00173       contributor.close()
00174 
00175   def run(self):
00176     """!\brief Method run by the Thread base class.  Use the start() method
00177     to start event building.
00178     """
00179     try:
00180       self.build()
00181     except Exception, e:
00182       import logging
00183       logging.exception(self.name() + ":\n" + str(e))
00184 
00185   def build(self):
00186     """!\brief Call this method to start event building
00187     """
00188     contributionQueue = self.__contributionQueue
00189     contributionsDict = self.__contributionsDict
00190     contributors      = self.__contributors
00191     dTime             = 0.0
00192 
00193     # Open the contributors
00194     for contributor in contributors:
00195       contributor.open()
00196 
00197     # Start the contributor threads up
00198     for contributor in contributors:
00199       contributor.start()
00200 
00201     reason, status, contribution = contributionQueue.get() # Blocks
00202     tStart = time.time()
00203 
00204     n = 0
00205     while reason != Geb.Terminate:
00206       if reason == Geb.Build:
00207         contributor = contribution.contributor()
00208         sequence    = contribution.sequence()
00209         try:
00210           contributionsDict[sequence].append((status, contribution))
00211           contributionsDict[sequence][0] |= contributor.idMask()
00212         except KeyError: # i.e., sequence not in contributionsDict.keys():
00213           contributionsDict[sequence] = [contributor.idMask(), (status, contribution)]
00214 
00215         if contributionsDict[sequence][0] == self.__idMask:
00216           self.__flush(sequence)
00217           self.__dispose(contributionsDict.pop(sequence)[1:])
00218           if n % 100 == 0:
00219             print "......Geb.build: seq %08x built and disposed" % (sequence)
00220           n += 1
00221       elif reason == Geb.Sweep:
00222         sequence = contribution.sequence()
00223         contributionsDict[sequence] = [self.__idMask, (status, contribution)]
00224 
00225         self.__flush(sequence)
00226         self.__dispose(contributionsDict.pop(sequence)[1:])
00227         print "......Geb.build: seq %08x disposed (sweep)" % (sequence)
00228       else:
00229         print "......Geb.build: id %08x disposed" % (contribution.identity())
00230 
00231         self.__dispose([(status, contribution)])
00232 
00233       dTime += time.time() - tStart
00234       reason, status, contribution = contributionQueue.get() # Blocks
00235       tStart = time.time()
00236 
00237     if n     == 0:  n     = 1.0e-99
00238     if dTime == 0:  dTime = 1.0e-99
00239     print "\nGeb.build: N = %d, T = %f, T/N = %f, N/T = %f" % \
00240           (n, dTime, dTime / n, float(n) / dTime)
00241 
00242     # Close the contributors
00243     for contributor in contributors:
00244       contributor.close()    # Redundant - normally done by shutdown()
00245       contributor.join()     # Wait for the contributor threads to exit
00246 
00247     # Let the subclass clean up and close
00248     self.close()
00249 
00250     print "\nGeb.build: terminating\n"
00251     import logging
00252     logging.info("Geb.build: terminating")
00253 
00254   def flushAll(self):
00255     """!\brief Method for flushing all stale events from the system.
00256     """
00257     contribsDict = self.__contributionsDict
00258     sequences    = contribsDict.keys()
00259     for staleSequence in sequences:
00260       contributions = contribsDict.pop(staleSequence)[1:]
00261       self.__dispose(contributions)
00262       ids = ""
00263       for status, contribution in contributions:
00264         ids += hex(contribution.identity()) + ", "
00265       import logging
00266       logging.warning("...Geb.flushAll: seq %08x flushed, ids = %s" % (staleSequence, ids))
00267 
00268   def __flush(self, sequence):
00269     """!\brief Method for flushing stale events from the system.
00270 
00271     Stale events are generally events that are missing contributions and will
00272     never be completed.  Since we require that events are disposed of in
00273     chronological order, incomplete events older than the most recently
00274     completed event are deemed stale.
00275 
00276     \param sequence - The sequence number of the most recently completed event
00277     """
00278     contribsDict = self.__contributionsDict
00279     sequences    = contribsDict.keys()
00280     for staleSequence in sequences:
00281       if staleSequence < sequence:
00282         contributions = contribsDict.pop(staleSequence)[1:]
00283         self.__dispose(contributions)
00284         ids = ""
00285         for status, contribution in contributions:
00286           ids += hex(contribution.identity()) + ", "
00287         import logging
00288         logging.warning("......Geb.flush: seq %08x flushed, ids = %s" % (staleSequence, ids))
00289 
00290   def __dispose(self, contributions):
00291     """!\brief Internal method to concatenate the contributions into a single
00292     event and call the subclass to dispose of it.
00293     """
00294     length   = 0
00295     buffer   = ''
00296     statuses = ()
00297     for status, contribution in contributions:
00298       statuses += (status,)
00299       length   += contribution.length()
00300       buffer   += str(contribution.buffer())
00301 
00302     length += 8
00303     header  = struct.pack('>2L', LDF.LATdatagram.ID, length)
00304 
00305     self.dispose((statuses, header + buffer))
00306 
00307   def close(self):
00308     """!\brief Optional method to close down the derived class.
00309     Meant to be overridden.
00310     """
00311     return

Generated on Fri Jul 21 13:26:28 2006 for LATTE R04-12-00 by doxygen 1.4.3