00001 #!/usr/local/bin/python 00002 # 00003 # Copyright 2006 00004 # by 00005 # The Board of Trustees of the 00006 # Leland Stanford Junior University. 00007 # All rights reserved. 00008 # 00009 00010 __facility__ = "Online" 00011 __abstract__ = "Replacement evtCli class for event merging" 00012 __author__ = "R. Claus <Claus@SLAC.Stanford.edu> SLAC - GLAST LAT I&T/Online" 00013 __date__ = "6/29/2006" 00014 __updated__ = ("$Date: 2006/07/19 16:24:56 $").split(' ')[1] 00015 __version__ = "$Revision: 1.2 $" 00016 __release__ = "$Name: R04-12-00 $" 00017 __credits__ = "SLAC" 00018 00019 import LATTE.copyright_SLAC 00020 00021 import struct 00022 from array import array 00023 from select import select 00024 import Queue 00025 import time 00026 00027 import LDF 00028 00029 from LATTE.client.gEvtCli import EvtCli, EvtCliIO 00030 from LATTE.client.gSocket import Socket 00031 00032 import LATTE.merger.geb as geb 00033 from LATTE.merger.ldfContribution import LdfContribution 00034 from LATTE.merger.ancillaryContribution import AncillaryContribution 00035 00036 00037 class SocketClosed(Exception): pass 00038 00039 class LdfContributor(geb.GebContributor): 00040 """!\brief Concrete contributor class specifically meant to work with EvtCli. 00041 """ 00042 def __init__(self, evtCli, offset = 1L): 00043 geb.GebContributor.__init__(self) 00044 self.__socket = evtCli # Must be a Socket object 00045 self.__timeout = 1 # Same as EvtCliIO.__timeout 00046 self.__offset = offset 00047 00048 def name(self): 00049 return "LdfContributor" 00050 00051 def sequence(self, sequence): 00052 """!\brief Method for remapping the ancillary contribution's sequence number 00053 into the same space as the other contributions' sequence numbers. 00054 00055 This method has the side effect that it can affect the next the value 00056 returned, so it assumes that it is called with sequential sequence numbers. 00057 This would be a problem if were called multiple times to retrieve the 00058 sequence number for the same sequence number. 00059 \todo Fix the side effect nature of this routine 00060 """ 00061 # offset does double duty by compensating for the off-by-one nature of the 00062 # two event number spaces and by keeping track of the event number extention 00063 # needed to handle the raw 17 bit event number roll-over problem 00064 seq = sequence - self.__offset 00065 if sequence == 0x0001ffff: self.__offset -= 0x00020000L 00066 return seq 00067 00068 def open(self): 00069 # Wait for the socket to be created 00070 while self.__socket.socket() is None: 00071 time.sleep(0.1) 00072 00073 def close(self): 00074 self.__quit = True 00075 00076 def read(self): 00077 """!\brief Method to read an LDF contribution from a socket. 00078 """ 00079 markerMsk = 0x7 << 22 00080 sweepFld = 0x5 << 22 # Marker 5 00081 hdrLen = 8 00082 self.__quit = 0 00083 rd, wr, ex = select([self.__socket.socket()], [], [], self.__timeout) 00084 while not self.__quit: 00085 if rd: 00086 data = self.__socket.recvall(4 + hdrLen) # "4 +" for leading status 00087 status, typeId, length = struct.unpack('>3L', data) 00088 if status != 0: 00089 # Don't try to continue if bad status 00090 raise KeyError, "LdfContributor.read: Bad ocsmanager status %08x" % (status) 00091 if typeId != LDF.LATdatagram.ID: 00092 # Don't try to continue if bad typeId: we're lost or it's undefined 00093 raise KeyError, "LdfContributor.read: Unrecognized contribution %08x"%(typeId) 00094 data = self.__socket.recvall(length - hdrLen) # Only the LATcontribution(s) portion 00095 summary = struct.unpack('>L', data[3*4 : 4*4])[0] 00096 if (summary & markerMsk) == sweepFld: 00097 return geb.Geb.Sweep, status, LdfContribution(self, data) 00098 else: 00099 return geb.Geb.Build, status, LdfContribution(self, data) 00100 00101 rd, wr, ex = select([self.__socket.socket()], [], [], self.__timeout) 00102 00103 return geb.Geb.Terminate, None, None 00104 00105 00106 class AncillaryContributor(geb.GebContributor): 00107 """!\brief Concrete contributor class specifically meant to work with EvtCli. 00108 """ 00109 __typeIds = [AncillaryContribution.HeaderId, 00110 AncillaryContribution.TrailerId, 00111 AncillaryContribution.EventId] 00112 def __init__(self, (host, port), offset = 0L): 00113 geb.GebContributor.__init__(self) 00114 self.__socket = Socket(port) 00115 self.__timeout = 1 # Same as EvtCliIO.__timeout 00116 self.__host = host 00117 self.__port = port 00118 self.__offset = offset 00119 00120 def name(self): 00121 return "AncContributor" 00122 00123 def sequence(self, sequence): 00124 """!\brief Method for remapping the ancillary contribution's sequence number 00125 into the same space as the other contributions' sequence numbers. 00126 """ 00127 # offset does double duty by compensating for the off-by-one nature of the 00128 # two event number spaces and by keeping track of the event number extention 00129 # needed to handle the raw 17 bit event number roll-over problem 00130 seq = sequence - self.__offset 00131 if sequence == 0x0fffffff: self.__offset -= 0x10000000L 00132 return seq 00133 00134 def open(self): 00135 """!\brief Method to open and connect to the server socket. 00136 It retries until a connection can be established, or the quit flag 00137 is seen to be set. 00138 """ 00139 self.__quit = False 00140 while not self.__quit: 00141 try: 00142 self.__socket.connect(self.__host) 00143 break 00144 except Exception, e: 00145 import logging 00146 logging.debug("AncillaryContributor.open: %s" % (e)) 00147 time.sleep(1.0) # Don't retry too quickly 00148 import logging 00149 logging.debug("AncillaryContributor.open: socket opened") 00150 00151 def close(self): 00152 """!\brief Method to signal the read loop to exit. 00153 """ 00154 self.__quit = True 00155 00156 def read(self): 00157 """!\brief Method to read an ancillary contribution from a socket. 00158 00159 This method deals with reconnecting when the server restarted. 00160 """ 00161 self.__quit = 0 00162 while not self.__quit: 00163 try: 00164 return self.__read() 00165 except SocketClosed: 00166 import logging 00167 logging.debug("AncillaryContributor.read: socket closed") 00168 self.__socket.disconnect() 00169 self.__socket = Socket(self.__port) 00170 self.open() 00171 except Exception, e: 00172 if "Connection reset by peer" in str(e): 00173 import logging 00174 logging.debug("AncillaryContributor.read: socket closed") 00175 self.__socket.disconnect() 00176 self.__socket = Socket(self.__port) 00177 self.open() 00178 else: 00179 import logging 00180 logging.exception("AncillaryContributor.read: %s" % (str(e))) 00181 # Do we need to do something here? 00182 break 00183 00184 return geb.Geb.Terminate, None, None 00185 00186 def __read(self): 00187 """!\brief Method to read an ancillary contribution from a socket. 00188 """ 00189 self.__quit = 0 00190 rd, wr, ex = select([self.__socket.socket()], [], [], self.__timeout) 00191 initLen = 8 00192 while not self.__quit: 00193 if rd: 00194 data = self.__socket.recvall(initLen) 00195 if len(data) != initLen: 00196 # This will happen only when the socket closed 00197 raise SocketClosed 00198 vTypeId, length = struct.unpack('<2L', data) 00199 typeId = vTypeId & 0x000fffff 00200 if typeId not in AncillaryContributor.__typeIds: 00201 # Don't try to continue if bad typeId: we're lost or it's undefined 00202 raise KeyError, "AncillaryContributor.read: Unrecognized contribution %08x" % (typeId) 00203 length &= 0x0fffffff 00204 data += self.__socket.recvall(length - initLen) 00205 data = array('I', data) 00206 data.byteswap() # Get it into big endian form 00207 data = data.tostring() 00208 00209 status = 0 00210 if typeId == AncillaryContribution.EventId: 00211 return geb.Geb.Build, status, AncillaryContribution(self, data) 00212 else: 00213 return geb.Geb.DontBuild, status, AncillaryContribution(self, data) 00214 00215 rd, wr, ex = select([self.__socket.socket()], [], [], self.__timeout) 00216 00217 self.__socket.disconnect() 00218 return geb.Geb.Terminate, None, None 00219 00220 00221 class Merger(geb.Geb): 00222 def __init__(self, contributors, queue): 00223 geb.Geb.__init__(self, contributors) 00224 self.__queue = queue 00225 00226 def dispose(self, buffer): 00227 """!\brief Method called by the base class to dispose of an event. 00228 """ 00229 self.__queue.put(buffer) 00230 00231 00232 def getHostPort(hostPort): 00233 host_port = hostPort.split(':') 00234 host = host_port[0] 00235 if len(host_port) == 1: 00236 port = 2345 00237 else: 00238 port = int(host_port[1]) 00239 return host, port 00240 00241 00242 class EvtCliBT(EvtCli): 00243 AncHostPort = "localhost:2345" 00244 def __init__(self, debug = 0, 00245 port = EvtCliIO.PortNumber, 00246 ancHostPort = "localhost:2345"): #EvtCliBT.AncHostPort): 00247 EvtCli.__init__(self, debug, port) 00248 self.__timeout = 1 # same as EvtCliIO.__timeout 00249 00250 ldfContributor = LdfContributor(self) 00251 ancContributor = AncillaryContributor(getHostPort(ancHostPort)) 00252 self.__queue = Queue.Queue() 00253 self.__merger = Merger([ldfContributor, ancContributor], self.__queue) 00254 #self.__merger = Merger([ldfContributor], self.__queue) 00255 self.__merger.start() 00256 00257 def readEvent(self): 00258 """\brief Method used to read an event from the event merging code. 00259 """ 00260 self.__abort = 0 00261 while not self.__abort: 00262 try: 00263 return self.__queue.get(block = True, timeout = self.__timeout) 00264 except Queue.Empty: 00265 if self.isTimeoutEnabled(): 00266 raise IOError, "EvtCliBT.readEvent: Event read timeout" 00267 # Abort 00268 raise RuntimeError, "EvtCliBT.readEvent: Aborted" 00269 00270 def flushAll(self): 00271 self.__merger.flushAll() 00272 00273 def abort(self): 00274 """\brief Abort out of a readEvent(). 00275 00276 Can be used from another thread 00277 """ 00278 self.__abort = True 00279 00280 def shutdown(self): 00281 """\brief Method used to shut down the event merging threads. 00282 00283 This must be called from a different thread than the one in which the 00284 merger runs. 00285 """ 00286 self.__merger.shutdown() 00287 self.__merger.join() # Wait for the Event Builder thread to exit