00001 #!/usr/local/bin/python 00002 # 00003 # Copyright 2003 00004 # by 00005 # The Board of Trustees of the 00006 # Leland Stanford Junior University. 00007 # All rights reserved. 00008 # 00009 00010 __facility__ = "Online" 00011 __abstract__ = "GLAST Online consumer" 00012 __author__ = "Lester Miller" 00013 __date__ = "11/20/2003" 00014 __version__ = "$Revision: 1.5 $" 00015 __credits__ = "SLAC" 00016 00017 00018 import LATTE.copyright_SLAC 00019 import LDF 00020 import LATTE.consumer.standardLDF as standardLDF 00021 import LATTE.tools.DataDistributorPoll as DDP 00022 import LATTE.runcontrol.rcArchiver as rcArchiver 00023 import struct 00024 import time 00025 import sys 00026 import getopt 00027 import array 00028 import logging as myLog 00029 00030 class Consumer(object): 00031 """\brief Consumer class to consume LDF data 00032 This class just handles file open/close or server 00033 open/close 00034 """ 00035 00036 def __init__(self,argv): 00037 """consumer constructor 00038 """ 00039 # subclasses should provide iterators in constructor 00040 self.lci = standardLDF.LATcomponentIterator() 00041 self.eei = standardLDF.EBFeventIterator(self.lci) 00042 self.lcti = standardLDF.LATcontributionIterator(self.eei) 00043 self.ldbi = LDF.LATdataBufferIterator(self.lcti) 00044 00045 self.__ds = None 00046 self.__file = None 00047 self.__archiver = None 00048 self.__debug = 0 00049 self.eventData = None 00050 self.__archiveFile = "consumer.ldf" 00051 self.__inFileName = None 00052 self.__nEvents = 0 00053 self.__server = "1" 00054 self.__runId = '' 00055 self.sleepTime = 0.1 00056 self.startRun = False 00057 self.endRun = False 00058 self.parseArgs(argv) 00059 00060 def parseEvent(self): 00061 """Analyse an event 00062 """ 00063 # subclasses should provide a custom iterator to iterate in this method 00064 self.ldbi.iterate(self.eventData, len(self.eventData)) 00065 self.checkUDF() 00066 00067 def checkUDF(self): 00068 udfDict = self.lcti.getUdfDict() 00069 self.startRun = False 00070 if 'startRun' in udfDict: 00071 self.setRunId(udfDict['startRun']) 00072 self.startRun = True 00073 self.endRun = False 00074 if 'endRun' in udfDict: 00075 self.endRun = True 00076 self.lcti.clearUdfDict() 00077 00078 def getRunId(self): 00079 return self.__runId 00080 def setRunId(self,runId): 00081 self.__runId = runId 00082 00083 def getNextEvent(self): 00084 """get Next Event into data buffer 00085 """ 00086 if self.__file is not None: 00087 evtPtr = self.__file.tell() 00088 # get datagram 00089 self.eventData = self.__file.read(8) 00090 # check for file end 00091 if len(self.eventData) < 8: 00092 myLog.info("Consumer.getNextEvent: End of File") 00093 return 0 00094 # unpack pair of big-endian 4 byte words 00095 # this is presumably broken for Sun hardware 00096 (identity, length) = struct.unpack('!LL', self.eventData) 00097 self.eventData += self.__file.read(length - 8) 00098 if self.__debug: 00099 myLog.info("got data %d length from file" % (length)) 00100 return length 00101 if self.__ds is not None: 00102 self.eventData, sender = self.__ds.receive() 00103 if self.eventData is None: 00104 #print "No data from server" 00105 return 1 00106 if self.__debug: 00107 myLog.info("got data %d length from file" % len(self.eventData)) 00108 return 1 00109 myLog.warn("Consumer.getNextEvent: No data source") 00110 return 0 00111 00112 def serverOpen(self, server="1"): 00113 """ Connect to network server 00114 """ 00115 myLog.info("Consumer.serverOpen: Connecting to data server %s"%(server)) 00116 # Close previous data sources 00117 if self.__file is not None: 00118 self.__fileClose() 00119 if self.__ds is not None: 00120 self.serverClose() 00121 dsName = 'LAT_EBFdata' 00122 self.__ds = DDP.DataDistributorPoll(dsName) 00123 if self.__ds is not None: 00124 self.__ds.connect(server) 00125 self.__ds.start() 00126 else: 00127 myLog.warn("Consumer.serverOpen: Unable to connect to Data Distribution server %s"%(server)) 00128 sys.exit(2) 00129 00130 def serverClose(self): 00131 """ Close network server 00132 """ 00133 if self.__ds is not None: 00134 self.__ds.stop() 00135 self.__ds.disconnect() 00136 self.__ds = None 00137 00138 def fileOpen(self,fn): 00139 """ Load input data from a file 00140 """ 00141 # Close previous data sources 00142 if self.__ds is not None: 00143 self.serverClose() 00144 if self.__file is not None: 00145 self.__fileClose() 00146 try: 00147 self.__file = open(fn, 'rb') 00148 except: 00149 myLog.warn("Consumer.fileOpen: Fatal: failed to open file %s"%(fn)) 00150 sys.exit(2) 00151 print myLog.info("Consumer.fileOpen: opening file %s"%(fn)) 00152 00153 def fileClose(self): 00154 """ Close input data file 00155 """ 00156 if self.__file is not None: 00157 self.__file.close() 00158 self.__file = None 00159 return 00160 00161 def archiveOpen(self,fn): 00162 """ Open data archive 00163 """ 00164 # make a 'being written to' version of the file 00165 openfn = fn + '_locked' 00166 # Close previous archive file 00167 if self.__archiver is not None: 00168 self.archiveClose(fn) 00169 myLog.info("Consumer.archiveOpen: opening file %s"%(fn)) 00170 try: 00171 self.__archiver = rcArchiver.rcArchiver(fileName=openfn) 00172 except: 00173 myLog.warn("Consumer.archiveOpen: failed to open archive file %s"%(fn)) 00174 self.__archiver = None 00175 00176 def archiveClose(self,fn): 00177 """ Close data archive 00178 """ 00179 if self.__archiver is not None: 00180 self.__archiver.close() 00181 self.__archiver = None 00182 myLog.info("Consumer.archiveClose: closing archive file %s"%(fn)) 00183 # move it to the requested file name if it is closed 00184 openfn = fn + '_locked' 00185 import shutil 00186 shutil.move(openfn,fn) 00187 return 00188 00189 def dataSize(self): 00190 """Return current event data size 00191 """ 00192 if self.eventData is not None: 00193 return len(self.eventData) 00194 else: 00195 return 0 00196 00197 def usage(self): 00198 print "\n%s Usage:" % (sys.argv[0]) 00199 print "\t-h or --help for Help" 00200 print "\t-a or --archive to specify output to archive file (default consumer.ldf)" 00201 print "\t-n or --nevents to specify the number of events archived (default 0)" 00202 print "\t-i or --infile to specify an input from file instead of server" 00203 print "\t-s or --server to specify server to connect to (default 1)" 00204 print "\t-c or --config to specify configuration file (default None, used by Plotter)" 00205 print "\t-r or --rate to specify a rate limit in Hz (default 10)" 00206 00207 00208 def parseArgs(self,argv): 00209 try: 00210 opts, args = getopt.getopt(argv, "ha:n:i:s:c:r:", 00211 ["help", "archive=", "nevents=", "infile=", "server=","config=","rate="]) 00212 except getopt.GetoptError: 00213 # print help information and exit: 00214 self.usage() 00215 sys.exit(2) 00216 for o, a in opts: 00217 if o in ("-h", "--help"): 00218 self.usage() 00219 sys.exit() 00220 if o in ("-a", "--archive"): 00221 self.__archiveFile = a 00222 if o in ("-n", "--nevents"): 00223 self.__nEvents = int(a) 00224 if o in ("-i", "--infile"): 00225 self.__inFileName = a 00226 if o in ("-s", "--server"): 00227 self.__server = a 00228 if o in ("-c", "--config"): 00229 self.configFile = a 00230 if o in ("-r", "--rate"): 00231 rate = float(a) 00232 if rate>0: 00233 self.sleepTime = 1./rate 00234 00235 def loop(self): 00236 if self.__inFileName is not None: 00237 self.fileOpen(self.__inFileName) 00238 else: 00239 self.serverOpen(self.__server) 00240 eventCount=0 00241 archivedEventCount=0 00242 sayWaiting = 0 00243 timeLastEvent = 0 00244 while (1): 00245 self.getNextEvent() 00246 if self.eventData is not None and self.dataSize()>0: 00247 timeLastEvent = time.time() 00248 eventCount+=1 00249 self.parseEvent() 00250 # archive 00251 if self.__archiver is not None and archivedEventCount<self.__nEvents: 00252 # don't know why, but needs byte swapping? 00253 outputData = array.array('L',self.eventData) 00254 outputData.byteswap() 00255 self.__archiver.write(outputData) 00256 #self.__archiver.write(self.eventData) 00257 archivedEventCount+=1 00258 # open archive if new run seen and none open 00259 if self.__nEvents>0 and self.startRun: 00260 self.archiveOpen(self.__archiveFile) 00261 # close archive if collected enough 00262 if (self.endRun or archivedEventCount>=self.__nEvents) and self.__archiver is not None: 00263 self.archiveClose(self.__archiveFile) 00264 archivedEventCount=0 00265 if sayWaiting==1: 00266 sayWaiting=0 00267 print "resumed acquisition... (eventCount is ",eventCount,")" 00268 time.sleep(self.sleepTime) # conserve cpu... 00269 else: 00270 deltaTime = time.time() - timeLastEvent 00271 if sayWaiting==0 and deltaTime>1: 00272 sayWaiting=1 00273 print "Waiting for data..." 00274 time.sleep(0.0005) # conserve cpu... 00275 00276 myLog.info("Consumer: Read in %d events" % (eventCount)) 00277 self.serverClose() 00278 self.fileClose() 00279 sys.exit() 00280 00281 00282 if __name__ == '__main__': 00283 consumer = Consumer(sys.argv[1:]) 00284 consumer.loop() 00285