00001 #!/usr/local/bin/python 00002 # 00003 # Copyright 2006 00004 # by 00005 # The Board of Trustees of the 00006 # Leland Stanford Junior University. 00007 # All rights reserved. 00008 # 00009 00010 __facility__ = "Online" 00011 __abstract__ = "Replacement rcTransitions class for event merging" 00012 __author__ = "R. Claus <Claus@SLAC.Stanford.edu> SLAC - GLAST LAT I&T/Online" 00013 __date__ = "6/29/2006" 00014 __updated__ = ("$Date: 2006/07/19 16:26:04 $").split(' ')[1] 00015 __version__ = "$Revision: 1.4 $" 00016 __release__ = "$Name: R04-12-00 $" 00017 __credits__ = "SLAC" 00018 00019 import LATTE.copyright_SLAC 00020 00021 import struct 00022 00023 from LATTE.runcontrol.rcTransitions import rcTransitions 00024 from LATTE.merger.ancillaryContribution import AncillaryContribution 00025 from LDF import EBFevent 00026 from support.gEvtCliBT import EvtCliBT 00027 00028 00029 class rcTransitionsBT(rcTransitions): 00030 def __init__(self, rc, userId, debug, merge = True): 00031 rcTransitions.__init__(self, rc, userId, debug) 00032 self.__debug = debug 00033 self.__merge = merge # Set merge to False to disable merging 00034 self.__autoStopRun = True 00035 00036 def autoStopRun(self, value = None): 00037 if value is not None: self.__autoStopRun = value 00038 return self.__autoStopRun 00039 00040 def rcSetup(self): 00041 if self.__merge: 00042 self.__ecSave = self.evtCli 00043 self.__ec = EvtCliBT(debug=self.__debug) 00044 self.setEvtCli(self.__ec) 00045 return rcTransitions.rcSetup(self) 00046 00047 def rcTeardown(self): 00048 if self.__merge: 00049 self.__ec.shutdown() 00050 self.setEvtCli(self.__ecSave) 00051 return rcTransitions.rcTeardown(self) 00052 00053 def eventHandler(self): 00054 if not self.__merge: return self._rcTransitions__datagramHandler() 00055 00056 sweepEvt = self._rcTransitions__sweepEvent 00057 evtCli = self.evtCli 00058 sweepMkr = rcTransitions._rcTransitions__SWEEP_MARKER 00059 markerMsk = 0x7 << 22 00060 errorMsk = 0x1 << 21 00061 trgParMsk = 0x1 00062 pktErrMsk = 0x7 << 13 00063 00064 self._rcTransitions__evtHandlerQuit = False 00065 00066 # Indicate that the Event Handler is ready to take events 00067 self._rcTransitions__eventHandlerReady.set() 00068 00069 while not (self._rcTransitions__evtHandlerQuit and sweepEvt.isSet()): 00070 statuses, datagram = evtCli.readEvent() 00071 header = struct.unpack('!6L', datagram[0:24]) 00072 00073 status = 0 00074 self.errorEvent = False 00075 self.trgParErrorEvent = False 00076 self.packetErrorEvent = False 00077 self.eventMarker = None 00078 contribId = header[2] & 0x000fffff 00079 if contribId == AncillaryContribution.HeaderId: 00080 self.eventMarker = -1 00081 elif contribId == AncillaryContribution.TrailerId: 00082 if self.__autoStopRun: 00083 import logging 00084 logging.info("rcTransitionsBT.eventHandler: Ancillary trailer seen. Stopping run") 00085 self.rc.doStopRun() 00086 self.eventMarker = -1 00087 elif contribId == AncillaryContribution.EventId: 00088 # Count every event to ensure displayed rate makes sense 00089 self.evtCnt += 1 00090 self._rcTransitions__dgSize = header[1] 00091 self._rcTransitions__dgSumSize += header[1] 00092 00093 ancSize = header[3] 00094 if self._rcTransitions__dgSize > ancSize + 8: 00095 status = statuses[1] #!\todo deal with other statuses 00096 ebfEvent = datagram[ancSize + 8:] 00097 ebfHeader = struct.unpack('!4L', ebfEvent[0:16]) 00098 00099 # Give application quick access to error bit and marker 00100 self.errorEvent = (ebfHeader[3] & errorMsk) != 0 00101 self.trgParErrorEvent = (ebfHeader[3] & trgParMsk) 00102 self.packetErrorEvent = (ebfHeader[2] & pktErrMsk) != 0 00103 self.eventMarker = (ebfHeader[3] & markerMsk) >> 22 00104 elif contribId == EBFevent.ID & 0x000Fffff: 00105 # Count every event to ensure displayed rate makes sense 00106 self.evtCnt += 1 00107 self._rcTransitions__dgSize = header[1] 00108 self._rcTransitions__dgSumSize += header[1] 00109 00110 status = statuses[0] #!\todo deal with other statuses 00111 # Give application quick access to error bit and marker 00112 self.errorEvent = (header[5] & errorMsk) != 0 00113 self.trgParErrorEvent = (header[5] & trgParMsk) 00114 self.packetErrorEvent = (header[4] & pktErrMsk) != 0 00115 self.eventMarker = (header[5] & markerMsk) >> 22 00116 else: 00117 import logging 00118 logging.error("rcTransitionsBT.eventHandler: Unrecognized contribution encounterd: %08x" % (contribId)) 00119 00120 # Process the datagram 00121 latContribs = self._rcTransitions__process(status, datagram) 00122 00123 # Check FSW event delivery to OES status 00124 if status == 0: 00125 # Archive the data 00126 self._rcTransitions__archive(header[0], latContribs) 00127 00128 # Multicast the data 00129 self._rcTransitions__publish(datagram) 00130 00131 # Handle datagrams with TEM contribution errors 00132 if self.eventMarker != sweepMkr and \ 00133 (self.errorEvent or self.trgParErrorEvent or self.packetErrorEvent): 00134 self._rcTransitions__archiveErr(datagram) 00135 # increment internal error counters 00136 if self.errorEvent: self._rcTransitions__errEvents += 1 00137 if self.trgParErrorEvent: self._rcTransitions__trgParErrorEvt += 1 00138 if self.packetErrorEvent: self._rcTransitions__pktErrorEvt += 1 00139 else: 00140 # Handle datagrams for which FSW choked 00141 self._rcTransitions__badEvents += 1 00142 self._rcTransitions__archiveBad(datagram) 00143 00144 # Make sure the above is done before setting the sweepEvt to prevent 00145 # things from getting out of sync. 00146 if self.eventMarker == sweepMkr: sweepEvt.set() 00147 00148 # Flush any events left in the systems. They'll all be incomplete. 00149 evtCli.flushAll() 00150 00151 import logging 00152 logging.debug("rcTransitionsBT.eventHandler: %s terminating" % 00153 (self.getName()))