Main Page | Packages | Class Hierarchy | Alphabetical List | Class List | Directories | File List | Namespace Members | Class Members | Related Pages

consumer.py

00001 #!/usr/local/bin/python
00002 #
00003 #                               Copyright 2003
00004 #                                     by
00005 #                        The Board of Trustees of the
00006 #                     Leland Stanford Junior University.
00007 #                            All rights reserved.
00008 #
00009 
00010 __facility__ = "Online"
00011 __abstract__ = "GLAST Online consumer"
00012 __author__   = "Lester Miller"
00013 __date__     = "11/20/2003"
00014 __version__  = "$Revision: 1.5 $"
00015 __credits__  = "SLAC"
00016 
00017 
00018 import LATTE.copyright_SLAC
00019 import LDF
00020 import LATTE.consumer.standardLDF as standardLDF
00021 import LATTE.tools.DataDistributorPoll as DDP
00022 import LATTE.runcontrol.rcArchiver as rcArchiver
00023 import struct
00024 import time
00025 import sys
00026 import getopt
00027 import array
00028 import logging as myLog
00029 
00030 class Consumer(object):
00031   """\brief Consumer class to consume LDF data
00032   This class just handles file open/close or server
00033   open/close
00034   """
00035   
00036   def __init__(self,argv):
00037     """consumer constructor
00038     """
00039     # subclasses should provide iterators in constructor
00040     self.lci       = standardLDF.LATcomponentIterator()
00041     self.eei       = standardLDF.EBFeventIterator(self.lci)
00042     self.lcti      = standardLDF.LATcontributionIterator(self.eei)
00043     self.ldbi      = LDF.LATdataBufferIterator(self.lcti)
00044 
00045     self.__ds          = None
00046     self.__file        = None
00047     self.__archiver    = None
00048     self.__debug       = 0
00049     self.eventData     = None
00050     self.__archiveFile = "consumer.ldf"
00051     self.__inFileName  = None
00052     self.__nEvents     = 0
00053     self.__server      = "1"
00054     self.__runId       = ''
00055     self.sleepTime     = 0.1
00056     self.startRun      = False
00057     self.endRun        = False
00058     self.parseArgs(argv)
00059 
00060   def parseEvent(self):
00061     """Analyse an event
00062     """
00063     # subclasses should provide a custom iterator to iterate in this method
00064     self.ldbi.iterate(self.eventData, len(self.eventData)) 
00065     self.checkUDF()
00066     
00067   def checkUDF(self):
00068     udfDict = self.lcti.getUdfDict()
00069     self.startRun = False
00070     if 'startRun' in udfDict:
00071       self.setRunId(udfDict['startRun'])
00072       self.startRun = True
00073     self.endRun = False
00074     if 'endRun' in udfDict:
00075       self.endRun = True
00076     self.lcti.clearUdfDict()
00077   
00078   def getRunId(self):
00079     return self.__runId
00080   def setRunId(self,runId):
00081     self.__runId = runId
00082   
00083   def getNextEvent(self):
00084     """get Next Event into data buffer
00085     """
00086     if self.__file is not None:
00087       evtPtr = self.__file.tell()
00088       # get datagram
00089       self.eventData = self.__file.read(8)
00090       # check for file end
00091       if len(self.eventData) < 8:
00092         myLog.info("Consumer.getNextEvent: End of File")
00093         return 0
00094       # unpack pair of big-endian 4 byte words
00095       # this is presumably broken for Sun hardware
00096       (identity, length) = struct.unpack('!LL', self.eventData)
00097       self.eventData        += self.__file.read(length - 8)
00098       if self.__debug:
00099         myLog.info("got data %d length from file" % (length))
00100       return length
00101     if self.__ds is not None:
00102       self.eventData, sender = self.__ds.receive()
00103       if self.eventData is None:
00104         #print "No data from server"
00105         return 1
00106       if self.__debug:
00107         myLog.info("got data %d length from file" % len(self.eventData))
00108       return 1
00109     myLog.warn("Consumer.getNextEvent: No data source")
00110     return 0
00111 
00112   def serverOpen(self, server="1"):
00113     """  Connect to network server
00114     """
00115     myLog.info("Consumer.serverOpen: Connecting to data server %s"%(server))
00116     # Close previous data sources
00117     if self.__file is not None:
00118       self.__fileClose()
00119     if self.__ds is not None:
00120       self.serverClose()
00121     dsName = 'LAT_EBFdata'
00122     self.__ds = DDP.DataDistributorPoll(dsName)
00123     if self.__ds is not None:
00124       self.__ds.connect(server)
00125       self.__ds.start()
00126     else:
00127       myLog.warn("Consumer.serverOpen: Unable to connect to Data Distribution server %s"%(server))
00128       sys.exit(2)
00129       
00130   def serverClose(self):
00131     """  Close network server
00132     """
00133     if self.__ds is not None:
00134       self.__ds.stop()
00135       self.__ds.disconnect()
00136       self.__ds = None
00137 
00138   def fileOpen(self,fn):
00139     """ Load input data from a file
00140     """
00141     # Close previous data sources
00142     if self.__ds is not None:
00143       self.serverClose()
00144     if self.__file is not None:
00145       self.__fileClose()
00146     try:
00147       self.__file = open(fn, 'rb')
00148     except:
00149       myLog.warn("Consumer.fileOpen: Fatal: failed to open file %s"%(fn))
00150       sys.exit(2)
00151     print myLog.info("Consumer.fileOpen: opening file %s"%(fn))
00152     
00153   def fileClose(self):
00154     """ Close input data file
00155     """
00156     if self.__file is not None:
00157       self.__file.close()
00158       self.__file = None
00159     return
00160 
00161   def archiveOpen(self,fn):
00162     """ Open data archive
00163     """
00164     # make a 'being written to' version of the file
00165     openfn = fn + '_locked'
00166     # Close previous archive file
00167     if self.__archiver is not None:
00168       self.archiveClose(fn)
00169     myLog.info("Consumer.archiveOpen: opening file %s"%(fn))
00170     try:
00171       self.__archiver = rcArchiver.rcArchiver(fileName=openfn)
00172     except:
00173       myLog.warn("Consumer.archiveOpen: failed to open archive file %s"%(fn))
00174       self.__archiver = None
00175     
00176   def archiveClose(self,fn):
00177     """ Close data archive
00178     """
00179     if self.__archiver is not None:
00180       self.__archiver.close()
00181       self.__archiver = None
00182       myLog.info("Consumer.archiveClose: closing archive file %s"%(fn))
00183     # move it to the requested file name if it is closed
00184     openfn = fn + '_locked'
00185     import shutil
00186     shutil.move(openfn,fn)
00187     return
00188 
00189   def dataSize(self):
00190     """Return current event data size
00191     """
00192     if self.eventData is not None:
00193       return len(self.eventData)
00194     else:
00195       return 0
00196       
00197   def usage(self):
00198     print "\n%s Usage:" % (sys.argv[0])
00199     print "\t-h or --help for Help"
00200     print "\t-a or --archive to specify output to archive file (default consumer.ldf)"
00201     print "\t-n or --nevents to specify the number of events archived (default 0)"
00202     print "\t-i or --infile to specify an input from file instead of server"
00203     print "\t-s or --server to specify server to connect to (default 1)"
00204     print "\t-c or --config to specify configuration file (default None, used by Plotter)"
00205     print "\t-r or --rate to specify a rate limit in Hz (default 10)"
00206     
00207 
00208   def parseArgs(self,argv):
00209     try:
00210       opts, args = getopt.getopt(argv, "ha:n:i:s:c:r:",
00211                                  ["help", "archive=", "nevents=", "infile=", "server=","config=","rate="])
00212     except getopt.GetoptError:
00213       # print help information and exit:
00214       self.usage()
00215       sys.exit(2)
00216     for o, a in opts:
00217       if o in ("-h", "--help"):
00218         self.usage()
00219         sys.exit()
00220       if o in ("-a", "--archive"):
00221         self.__archiveFile = a
00222       if o in ("-n", "--nevents"):
00223         self.__nEvents = int(a)
00224       if o in ("-i", "--infile"):
00225         self.__inFileName = a
00226       if o in ("-s", "--server"):
00227         self.__server = a
00228       if o in ("-c", "--config"):
00229         self.configFile = a
00230       if o in ("-r", "--rate"):
00231         rate = float(a)
00232         if rate>0:
00233           self.sleepTime = 1./rate
00234                   
00235   def loop(self):
00236     if self.__inFileName is not None:
00237       self.fileOpen(self.__inFileName)
00238     else:
00239       self.serverOpen(self.__server)
00240     eventCount=0
00241     archivedEventCount=0
00242     sayWaiting = 0
00243     timeLastEvent = 0
00244     while (1):
00245       self.getNextEvent()
00246       if self.eventData is not None and self.dataSize()>0:
00247         timeLastEvent = time.time()
00248         eventCount+=1
00249         self.parseEvent()
00250         # archive
00251         if self.__archiver is not None and archivedEventCount<self.__nEvents:
00252           # don't know why, but needs byte swapping?
00253           outputData = array.array('L',self.eventData)
00254           outputData.byteswap()
00255           self.__archiver.write(outputData)
00256           #self.__archiver.write(self.eventData)
00257           archivedEventCount+=1
00258         # open archive if new run seen and none open
00259         if self.__nEvents>0 and self.startRun:
00260           self.archiveOpen(self.__archiveFile)
00261         # close archive if collected enough
00262         if (self.endRun or archivedEventCount>=self.__nEvents) and self.__archiver is not None:
00263           self.archiveClose(self.__archiveFile)
00264           archivedEventCount=0
00265         if sayWaiting==1:
00266           sayWaiting=0
00267           print "resumed acquisition... (eventCount is ",eventCount,")"
00268         time.sleep(self.sleepTime) # conserve cpu...
00269       else:
00270         deltaTime = time.time() - timeLastEvent
00271         if sayWaiting==0 and deltaTime>1:
00272           sayWaiting=1
00273           print "Waiting for data..."
00274       time.sleep(0.0005) # conserve cpu...
00275     
00276     myLog.info("Consumer: Read in %d events" % (eventCount))
00277     self.serverClose()
00278     self.fileClose()
00279     sys.exit()
00280 
00281 
00282 if __name__ == '__main__':
00283   consumer = Consumer(sys.argv[1:])
00284   consumer.loop()
00285   

Generated on Fri Jul 21 13:26:27 2006 for LATTE R04-12-00 by doxygen 1.4.3