00001 #!/usr/local/bin/python 00002 # 00003 # Copyright 2006 00004 # by 00005 # The Board of Trustees of the 00006 # Leland Stanford Junior University. 00007 # All rights reserved. 00008 # 00009 00010 __facility__ = "Online" 00011 __abstract__ = "Generic event builder" 00012 __author__ = "R. Claus <Claus@SLAC.Stanford.edu> SLAC - GLAST LAT I&T/Online" 00013 __date__ = "2/27/2006" 00014 __updated__ = ("$Date: 2006/07/19 16:06:44 $").split(' ')[1] 00015 __version__ = "$Revision: 1.2 $" 00016 __release__ = "$Name: R04-12-00 $" 00017 __credits__ = "SLAC" 00018 00019 import LATTE.copyright_SLAC 00020 00021 from Queue import Queue 00022 from threading import Thread 00023 import time 00024 import struct 00025 00026 import LDF 00027 00028 00029 class GebContribution(object): 00030 def __init__(self, contributor): 00031 self.__contributor = contributor 00032 00033 def identity(self): 00034 """!\brief Method to return the identity (LDF TypeId) of a contribution to 00035 an event. Meant to be overridden. 00036 """ 00037 raise NotImplementedError, "identity() method must be implemented in a subclass" 00038 00039 def length(self): 00040 """!\brief Method to return the length of a contribution to an event. 00041 Meant to be overridden. 00042 """ 00043 raise NotImplementedError, "sequence() method must be implemented in a subclass" 00044 00045 def sequence(self): 00046 """!\brief Method to return the sequence value of the contribution. 00047 Meant to be overridden. 00048 00049 This method must return a value for which the equality and greater 00050 than operators are defined. The sequence values must be common 00051 amoungst all contributors such that a if a contribution's sequence 00052 value is equal to another contributions sequence value, the two 00053 contributions belong together as part of the same event. A 00054 contribution A's sequence value that is greater than another 00055 contribution B's sequence value is interpreted to mean that A is 00056 newer than B. This information is used to flush out partially 00057 built older events. 00058 """ 00059 raise NotImplementedError, "sequence() method must be implemented in a subclass" 00060 00061 def contributor(self): 00062 return self.__contributor 00063 00064 00065 class GebContributor(Thread): 00066 # Set up one common blocking queue for all contributors 00067 __contributionQueue = Queue() 00068 00069 def __init__(self): 00070 Thread.__init__(self) 00071 self.setName(self.name()) 00072 00073 def contributionQueue(): 00074 """!\brief Method to get the common contribution queue. 00075 00076 \returns A Queue object 00077 """ 00078 return GebContributor.__contributionQueue 00079 00080 contributionQueue = staticmethod(contributionQueue) 00081 00082 def read(self): 00083 """!\brief Method to read a contribution from somewhere. 00084 Meant to be overridden. 00085 """ 00086 raise NotImplementedError, "read() method must be implemented in a subclass" 00087 00088 def name(self): 00089 """!\brief Method to return the name of the contribution type. 00090 Meant to be overridden. 00091 """ 00092 raise NotImplementedError, "name() method must be implemented in a subclass" 00093 00094 def sequence(self, sequence): 00095 """!\brief Optional method for remapping the ancillary contribution's 00096 sequence number into the same space as the other contributions' sequence 00097 numbers. If this method is not overridden, the supplied sequence number is 00098 returned. 00099 """ 00100 return sequence 00101 00102 def open(self): 00103 """!\brief Method to open the contributor. 00104 Meant to be overridden. 00105 """ 00106 raise NotImplementedError, "open() method must be implemented in a subclass" 00107 00108 def close(self): 00109 """!\brief Optional method to close the contributor. 00110 Meant to be overridden. 00111 """ 00112 pass 00113 00114 def idMask(self, id = None): 00115 """!\brief Method for building the contributor mask 00116 """ 00117 if id is not None: 00118 self.__idMask = id 00119 return self.__idMask 00120 00121 def run(self): 00122 """!\brief Loop that continuously reads from the data source 00123 """ 00124 contributionQueue = self.__contributionQueue 00125 reason = None 00126 try: 00127 while reason != Geb.Terminate: 00128 reason, status, contribution = self.read() 00129 contributionQueue.put((reason, status, contribution)) 00130 00131 # Allow other threads to run by forcing a reschedule 00132 time.sleep(0.0) 00133 except Exception, e: 00134 import logging 00135 logging.exception(self.name() + ": Trapped exception:\n" + str(e)) 00136 00137 print self.name() + ": terminating" 00138 import logging 00139 logging.info(self.name() + ": terminating") 00140 00141 00142 class Geb(Thread): 00143 Build = 0 00144 DontBuild = 1 00145 Terminate = 2 00146 Sweep = 3 00147 def __init__(self, contributors): 00148 Thread.__init__(self) 00149 self.setName(self.name()) 00150 self.__idMask = 0L 00151 idMask = 0x1L 00152 for contributor in contributors: 00153 # Make sure that the contributors were inherited from GebContributor 00154 if not isinstance(contributor, GebContributor): 00155 raise TypeError, "Contributor must be a subclass of GebContributor" 00156 self.__idMask |= contributor.idMask(idMask) 00157 idMask <<= 1 00158 self.__contributors = contributors 00159 self.__contributionQueue = GebContributor.contributionQueue() 00160 self.__contributionsDict = {} 00161 00162 def name(self): 00163 """!\brief Method to return the name of the event builder. 00164 May be overridden. 00165 """ 00166 return "GEB" 00167 00168 def shutdown(self): 00169 """!\brief Method to shutdown the builder 00170 """ 00171 # Shutdown the contributors 00172 for contributor in self.__contributors: 00173 contributor.close() 00174 00175 def run(self): 00176 """!\brief Method run by the Thread base class. Use the start() method 00177 to start event building. 00178 """ 00179 try: 00180 self.build() 00181 except Exception, e: 00182 import logging 00183 logging.exception(self.name() + ":\n" + str(e)) 00184 00185 def build(self): 00186 """!\brief Call this method to start event building 00187 """ 00188 contributionQueue = self.__contributionQueue 00189 contributionsDict = self.__contributionsDict 00190 contributors = self.__contributors 00191 dTime = 0.0 00192 00193 # Open the contributors 00194 for contributor in contributors: 00195 contributor.open() 00196 00197 # Start the contributor threads up 00198 for contributor in contributors: 00199 contributor.start() 00200 00201 reason, status, contribution = contributionQueue.get() # Blocks 00202 tStart = time.time() 00203 00204 n = 0 00205 while reason != Geb.Terminate: 00206 if reason == Geb.Build: 00207 contributor = contribution.contributor() 00208 sequence = contribution.sequence() 00209 try: 00210 contributionsDict[sequence].append((status, contribution)) 00211 contributionsDict[sequence][0] |= contributor.idMask() 00212 except KeyError: # i.e., sequence not in contributionsDict.keys(): 00213 contributionsDict[sequence] = [contributor.idMask(), (status, contribution)] 00214 00215 if contributionsDict[sequence][0] == self.__idMask: 00216 self.__flush(sequence) 00217 self.__dispose(contributionsDict.pop(sequence)[1:]) 00218 if n % 100 == 0: 00219 print "......Geb.build: seq %08x built and disposed" % (sequence) 00220 n += 1 00221 elif reason == Geb.Sweep: 00222 sequence = contribution.sequence() 00223 contributionsDict[sequence] = [self.__idMask, (status, contribution)] 00224 00225 self.__flush(sequence) 00226 self.__dispose(contributionsDict.pop(sequence)[1:]) 00227 print "......Geb.build: seq %08x disposed (sweep)" % (sequence) 00228 else: 00229 print "......Geb.build: id %08x disposed" % (contribution.identity()) 00230 00231 self.__dispose([(status, contribution)]) 00232 00233 dTime += time.time() - tStart 00234 reason, status, contribution = contributionQueue.get() # Blocks 00235 tStart = time.time() 00236 00237 if n == 0: n = 1.0e-99 00238 if dTime == 0: dTime = 1.0e-99 00239 print "\nGeb.build: N = %d, T = %f, T/N = %f, N/T = %f" % \ 00240 (n, dTime, dTime / n, float(n) / dTime) 00241 00242 # Close the contributors 00243 for contributor in contributors: 00244 contributor.close() # Redundant - normally done by shutdown() 00245 contributor.join() # Wait for the contributor threads to exit 00246 00247 # Let the subclass clean up and close 00248 self.close() 00249 00250 print "\nGeb.build: terminating\n" 00251 import logging 00252 logging.info("Geb.build: terminating") 00253 00254 def flushAll(self): 00255 """!\brief Method for flushing all stale events from the system. 00256 """ 00257 contribsDict = self.__contributionsDict 00258 sequences = contribsDict.keys() 00259 for staleSequence in sequences: 00260 contributions = contribsDict.pop(staleSequence)[1:] 00261 self.__dispose(contributions) 00262 ids = "" 00263 for status, contribution in contributions: 00264 ids += hex(contribution.identity()) + ", " 00265 import logging 00266 logging.warning("...Geb.flushAll: seq %08x flushed, ids = %s" % (staleSequence, ids)) 00267 00268 def __flush(self, sequence): 00269 """!\brief Method for flushing stale events from the system. 00270 00271 Stale events are generally events that are missing contributions and will 00272 never be completed. Since we require that events are disposed of in 00273 chronological order, incomplete events older than the most recently 00274 completed event are deemed stale. 00275 00276 \param sequence - The sequence number of the most recently completed event 00277 """ 00278 contribsDict = self.__contributionsDict 00279 sequences = contribsDict.keys() 00280 for staleSequence in sequences: 00281 if staleSequence < sequence: 00282 contributions = contribsDict.pop(staleSequence)[1:] 00283 self.__dispose(contributions) 00284 ids = "" 00285 for status, contribution in contributions: 00286 ids += hex(contribution.identity()) + ", " 00287 import logging 00288 logging.warning("......Geb.flush: seq %08x flushed, ids = %s" % (staleSequence, ids)) 00289 00290 def __dispose(self, contributions): 00291 """!\brief Internal method to concatenate the contributions into a single 00292 event and call the subclass to dispose of it. 00293 """ 00294 length = 0 00295 buffer = '' 00296 statuses = () 00297 for status, contribution in contributions: 00298 statuses += (status,) 00299 length += contribution.length() 00300 buffer += str(contribution.buffer()) 00301 00302 length += 8 00303 header = struct.pack('>2L', LDF.LATdatagram.ID, length) 00304 00305 self.dispose((statuses, header + buffer)) 00306 00307 def close(self): 00308 """!\brief Optional method to close down the derived class. 00309 Meant to be overridden. 00310 """ 00311 return