00001 #!/usr/local/bin/python
00002 #
00003 # Copyright 2003
00004 # by
00005 # The Board of Trustees of the
00006 # Leland Stanford Junior University.
00007 # All rights reserved.
00008 #
00009
00010 __facility__ = "Online"
00011 __abstract__ = "GLAST Online consumer"
00012 __author__ = "Lester Miller"
00013 __date__ = "11/20/2003"
00014 __version__ = "$Revision: 1.5 $"
00015 __credits__ = "SLAC"
00016
00017
00018 import LATTE.copyright_SLAC
00019 import LDF
00020 import LATTE.consumer.standardLDF as standardLDF
00021 import LATTE.tools.DataDistributorPoll as DDP
00022 import LATTE.runcontrol.rcArchiver as rcArchiver
00023 import struct
00024 import time
00025 import sys
00026 import getopt
00027 import array
00028 import logging as myLog
00029
00030 class Consumer(object):
00031 """\brief Consumer class to consume LDF data
00032 This class just handles file open/close or server
00033 open/close
00034 """
00035
00036 def __init__(self,argv):
00037 """consumer constructor
00038 """
00039 # subclasses should provide iterators in constructor
00040 self.lci = standardLDF.LATcomponentIterator()
00041 self.eei = standardLDF.EBFeventIterator(self.lci)
00042 self.lcti = standardLDF.LATcontributionIterator(self.eei)
00043 self.ldbi = LDF.LATdataBufferIterator(self.lcti)
00044
00045 self.__ds = None
00046 self.__file = None
00047 self.__archiver = None
00048 self.__debug = 0
00049 self.eventData = None
00050 self.__archiveFile = "consumer.ldf"
00051 self.__inFileName = None
00052 self.__nEvents = 0
00053 self.__server = "1"
00054 self.__runId = ''
00055 self.sleepTime = 0.1
00056 self.startRun = False
00057 self.endRun = False
00058 self.parseArgs(argv)
00059
00060 def parseEvent(self):
00061 """Analyse an event
00062 """
00063 # subclasses should provide a custom iterator to iterate in this method
00064 self.ldbi.iterate(self.eventData, len(self.eventData))
00065 self.checkUDF()
00066
00067 def checkUDF(self):
00068 udfDict = self.lcti.getUdfDict()
00069 self.startRun = False
00070 if 'startRun' in udfDict:
00071 self.setRunId(udfDict['startRun'])
00072 self.startRun = True
00073 self.endRun = False
00074 if 'endRun' in udfDict:
00075 self.endRun = True
00076 self.lcti.clearUdfDict()
00077
00078 def getRunId(self):
00079 return self.__runId
00080 def setRunId(self,runId):
00081 self.__runId = runId
00082
00083 def getNextEvent(self):
00084 """get Next Event into data buffer
00085 """
00086 if self.__file is not None:
00087 evtPtr = self.__file.tell()
00088 # get datagram
00089 self.eventData = self.__file.read(8)
00090 # check for file end
00091 if len(self.eventData) < 8:
00092 myLog.info("Consumer.getNextEvent: End of File")
00093 return 0
00094 # unpack pair of big-endian 4 byte words
00095 # this is presumably broken for Sun hardware
00096 (identity, length) = struct.unpack('!LL', self.eventData)
00097 self.eventData += self.__file.read(length - 8)
00098 if self.__debug:
00099 myLog.info("got data %d length from file" % (length))
00100 return length
00101 if self.__ds is not None:
00102 self.eventData, sender = self.__ds.receive()
00103 if self.eventData is None:
00104 #print "No data from server"
00105 return 1
00106 if self.__debug:
00107 myLog.info("got data %d length from file" % len(self.eventData))
00108 return 1
00109 myLog.warn("Consumer.getNextEvent: No data source")
00110 return 0
00111
00112 def serverOpen(self, server="1"):
00113 """ Connect to network server
00114 """
00115 myLog.info("Consumer.serverOpen: Connecting to data server %s"%(server))
00116 # Close previous data sources
00117 if self.__file is not None:
00118 self.__fileClose()
00119 if self.__ds is not None:
00120 self.serverClose()
00121 dsName = 'LAT_EBFdata'
00122 self.__ds = DDP.DataDistributorPoll(dsName)
00123 if self.__ds is not None:
00124 self.__ds.connect(server)
00125 self.__ds.start()
00126 else:
00127 myLog.warn("Consumer.serverOpen: Unable to connect to Data Distribution server %s"%(server))
00128 sys.exit(2)
00129
00130 def serverClose(self):
00131 """ Close network server
00132 """
00133 if self.__ds is not None:
00134 self.__ds.stop()
00135 self.__ds.disconnect()
00136 self.__ds = None
00137
00138 def fileOpen(self,fn):
00139 """ Load input data from a file
00140 """
00141 # Close previous data sources
00142 if self.__ds is not None:
00143 self.serverClose()
00144 if self.__file is not None:
00145 self.__fileClose()
00146 try:
00147 self.__file = open(fn, 'rb')
00148 except:
00149 myLog.warn("Consumer.fileOpen: Fatal: failed to open file %s"%(fn))
00150 sys.exit(2)
00151 print myLog.info("Consumer.fileOpen: opening file %s"%(fn))
00152
00153 def fileClose(self):
00154 """ Close input data file
00155 """
00156 if self.__file is not None:
00157 self.__file.close()
00158 self.__file = None
00159 return
00160
00161 def archiveOpen(self,fn):
00162 """ Open data archive
00163 """
00164 # make a 'being written to' version of the file
00165 openfn = fn + '_locked'
00166 # Close previous archive file
00167 if self.__archiver is not None:
00168 self.archiveClose(fn)
00169 myLog.info("Consumer.archiveOpen: opening file %s"%(fn))
00170 try:
00171 self.__archiver = rcArchiver.rcArchiver(fileName=openfn)
00172 except:
00173 myLog.warn("Consumer.archiveOpen: failed to open archive file %s"%(fn))
00174 self.__archiver = None
00175
00176 def archiveClose(self,fn):
00177 """ Close data archive
00178 """
00179 if self.__archiver is not None:
00180 self.__archiver.close()
00181 self.__archiver = None
00182 myLog.info("Consumer.archiveClose: closing archive file %s"%(fn))
00183 # move it to the requested file name if it is closed
00184 openfn = fn + '_locked'
00185 import shutil
00186 shutil.move(openfn,fn)
00187 return
00188
00189 def dataSize(self):
00190 """Return current event data size
00191 """
00192 if self.eventData is not None:
00193 return len(self.eventData)
00194 else:
00195 return 0
00196
00197 def usage(self):
00198 print "\n%s Usage:" % (sys.argv[0])
00199 print "\t-h or --help for Help"
00200 print "\t-a or --archive to specify output to archive file (default consumer.ldf)"
00201 print "\t-n or --nevents to specify the number of events archived (default 0)"
00202 print "\t-i or --infile to specify an input from file instead of server"
00203 print "\t-s or --server to specify server to connect to (default 1)"
00204 print "\t-c or --config to specify configuration file (default None, used by Plotter)"
00205 print "\t-r or --rate to specify a rate limit in Hz (default 10)"
00206
00207
00208 def parseArgs(self,argv):
00209 try:
00210 opts, args = getopt.getopt(argv, "ha:n:i:s:c:r:",
00211 ["help", "archive=", "nevents=", "infile=", "server=","config=","rate="])
00212 except getopt.GetoptError:
00213 # print help information and exit:
00214 self.usage()
00215 sys.exit(2)
00216 for o, a in opts:
00217 if o in ("-h", "--help"):
00218 self.usage()
00219 sys.exit()
00220 if o in ("-a", "--archive"):
00221 self.__archiveFile = a
00222 if o in ("-n", "--nevents"):
00223 self.__nEvents = int(a)
00224 if o in ("-i", "--infile"):
00225 self.__inFileName = a
00226 if o in ("-s", "--server"):
00227 self.__server = a
00228 if o in ("-c", "--config"):
00229 self.configFile = a
00230 if o in ("-r", "--rate"):
00231 rate = float(a)
00232 if rate>0:
00233 self.sleepTime = 1./rate
00234
00235 def loop(self):
00236 if self.__inFileName is not None:
00237 self.fileOpen(self.__inFileName)
00238 else:
00239 self.serverOpen(self.__server)
00240 eventCount=0
00241 archivedEventCount=0
00242 sayWaiting = 0
00243 timeLastEvent = 0
00244 while (1):
00245 self.getNextEvent()
00246 if self.eventData is not None and self.dataSize()>0:
00247 timeLastEvent = time.time()
00248 eventCount+=1
00249 self.parseEvent()
00250 # archive
00251 if self.__archiver is not None and archivedEventCount<self.__nEvents:
00252 # don't know why, but needs byte swapping?
00253 outputData = array.array('L',self.eventData)
00254 outputData.byteswap()
00255 self.__archiver.write(outputData)
00256 #self.__archiver.write(self.eventData)
00257 archivedEventCount+=1
00258 # open archive if new run seen and none open
00259 if self.__nEvents>0 and self.startRun:
00260 self.archiveOpen(self.__archiveFile)
00261 # close archive if collected enough
00262 if (self.endRun or archivedEventCount>=self.__nEvents) and self.__archiver is not None:
00263 self.archiveClose(self.__archiveFile)
00264 archivedEventCount=0
00265 if sayWaiting==1:
00266 sayWaiting=0
00267 print "resumed acquisition... (eventCount is ",eventCount,")"
00268 time.sleep(self.sleepTime) # conserve cpu...
00269 else:
00270 deltaTime = time.time() - timeLastEvent
00271 if sayWaiting==0 and deltaTime>1:
00272 sayWaiting=1
00273 print "Waiting for data..."
00274 time.sleep(0.0005) # conserve cpu...
00275
00276 myLog.info("Consumer: Read in %d events" % (eventCount))
00277 self.serverClose()
00278 self.fileClose()
00279 sys.exit()
00280
00281
00282 if __name__ == '__main__':
00283 consumer = Consumer(sys.argv[1:])
00284 consumer.loop()
00285