Main Page | Packages | Class Hierarchy | Alphabetical List | Class List | Directories | File List | Namespace Members | Class Members | Related Pages

gEvtCliBT.py

00001 #!/usr/local/bin/python
00002 #
00003 #                               Copyright 2006
00004 #                                     by
00005 #                        The Board of Trustees of the
00006 #                     Leland Stanford Junior University.
00007 #                            All rights reserved.
00008 #
00009 
00010 __facility__ = "Online"
00011 __abstract__ = "Replacement evtCli class for event merging"
00012 __author__   = "R. Claus <Claus@SLAC.Stanford.edu> SLAC - GLAST LAT I&T/Online"
00013 __date__     = "6/29/2006"
00014 __updated__  = ("$Date: 2006/07/19 16:24:56 $").split(' ')[1]
00015 __version__  = "$Revision: 1.2 $"
00016 __release__  = "$Name: R04-12-00 $"
00017 __credits__  = "SLAC"
00018 
00019 import LATTE.copyright_SLAC
00020 
00021 import struct
00022 from   array                              import array
00023 from   select                             import select
00024 import Queue
00025 import time
00026 
00027 import LDF
00028 
00029 from   LATTE.client.gEvtCli               import EvtCli, EvtCliIO
00030 from   LATTE.client.gSocket               import Socket
00031 
00032 import LATTE.merger.geb                   as     geb
00033 from   LATTE.merger.ldfContribution       import LdfContribution
00034 from   LATTE.merger.ancillaryContribution import AncillaryContribution
00035 
00036 
00037 class SocketClosed(Exception):  pass
00038 
00039 class LdfContributor(geb.GebContributor):
00040   """!\brief Concrete contributor class specifically meant to work with EvtCli.
00041   """
00042   def __init__(self, evtCli, offset = 1L):
00043     geb.GebContributor.__init__(self)
00044     self.__socket    = evtCli           # Must be a Socket object
00045     self.__timeout   = 1                # Same as EvtCliIO.__timeout
00046     self.__offset    = offset
00047 
00048   def name(self):
00049     return "LdfContributor"
00050 
00051   def sequence(self, sequence):
00052     """!\brief Method for remapping the ancillary contribution's sequence number
00053     into the same space as the other contributions' sequence numbers.
00054 
00055     This method has the side effect that it can affect the next the value
00056     returned, so it assumes that it is called with sequential sequence numbers.
00057     This would be a problem if were called multiple times to retrieve the
00058     sequence number for the same sequence number.
00059     \todo Fix the side effect nature of this routine
00060     """
00061     # offset does double duty by compensating for the off-by-one nature of the
00062     # two event number spaces and by keeping track of the event number extention
00063     # needed to handle the raw 17 bit event number roll-over problem
00064     seq = sequence - self.__offset
00065     if sequence == 0x0001ffff:  self.__offset -= 0x00020000L
00066     return seq
00067 
00068   def open(self):
00069     # Wait for the socket to be created
00070     while self.__socket.socket() is None:
00071       time.sleep(0.1)
00072 
00073   def close(self):
00074     self.__quit = True
00075 
00076   def read(self):
00077     """!\brief Method to read an LDF contribution from a socket.
00078     """
00079     markerMsk   = 0x7 << 22
00080     sweepFld    = 0x5 << 22             # Marker 5
00081     hdrLen      = 8
00082     self.__quit = 0
00083     rd, wr, ex = select([self.__socket.socket()], [], [], self.__timeout)
00084     while not self.__quit:
00085       if rd:
00086         data   = self.__socket.recvall(4 + hdrLen) # "4 +" for leading status
00087         status, typeId, length = struct.unpack('>3L', data)
00088         if status != 0:
00089           # Don't try to continue if bad status
00090           raise KeyError, "LdfContributor.read: Bad ocsmanager status %08x" % (status)
00091         if typeId != LDF.LATdatagram.ID:
00092           # Don't try to continue if bad typeId: we're lost or it's undefined
00093           raise KeyError, "LdfContributor.read: Unrecognized contribution %08x"%(typeId)
00094         data = self.__socket.recvall(length - hdrLen) # Only the LATcontribution(s) portion
00095         summary = struct.unpack('>L', data[3*4 : 4*4])[0]
00096         if (summary & markerMsk) == sweepFld:
00097           return geb.Geb.Sweep, status, LdfContribution(self, data)
00098         else:
00099           return geb.Geb.Build,     status, LdfContribution(self, data)
00100 
00101       rd, wr, ex = select([self.__socket.socket()], [], [], self.__timeout)
00102 
00103     return geb.Geb.Terminate, None, None
00104 
00105 
00106 class AncillaryContributor(geb.GebContributor):
00107   """!\brief Concrete contributor class specifically meant to work with EvtCli.
00108   """
00109   __typeIds = [AncillaryContribution.HeaderId,
00110                AncillaryContribution.TrailerId,
00111                AncillaryContribution.EventId]
00112   def __init__(self, (host, port), offset = 0L):
00113     geb.GebContributor.__init__(self)
00114     self.__socket    = Socket(port)
00115     self.__timeout   = 1                # Same as EvtCliIO.__timeout
00116     self.__host      = host
00117     self.__port      = port
00118     self.__offset    = offset
00119 
00120   def name(self):
00121     return "AncContributor"
00122 
00123   def sequence(self, sequence):
00124     """!\brief Method for remapping the ancillary contribution's sequence number
00125     into the same space as the other contributions' sequence numbers.
00126     """
00127     # offset does double duty by compensating for the off-by-one nature of the
00128     # two event number spaces and by keeping track of the event number extention
00129     # needed to handle the raw 17 bit event number roll-over problem
00130     seq = sequence - self.__offset
00131     if sequence == 0x0fffffff:  self.__offset -= 0x10000000L
00132     return seq
00133 
00134   def open(self):
00135     """!\brief Method to open and connect to the server socket.
00136     It retries until a connection can be established, or the quit flag
00137     is seen to be set.
00138     """
00139     self.__quit = False
00140     while not self.__quit:
00141       try:
00142         self.__socket.connect(self.__host)
00143         break
00144       except Exception, e:
00145         import logging
00146         logging.debug("AncillaryContributor.open: %s" % (e))
00147         time.sleep(1.0)                 # Don't retry too quickly
00148     import logging
00149     logging.debug("AncillaryContributor.open: socket opened")
00150 
00151   def close(self):
00152     """!\brief Method to signal the read loop to exit.
00153     """
00154     self.__quit = True
00155 
00156   def read(self):
00157     """!\brief Method to read an ancillary contribution from a socket.
00158 
00159     This method deals with reconnecting when the server restarted.
00160     """
00161     self.__quit = 0
00162     while not self.__quit:
00163       try:
00164         return self.__read()
00165       except SocketClosed:
00166         import logging
00167         logging.debug("AncillaryContributor.read: socket closed")
00168         self.__socket.disconnect()
00169         self.__socket = Socket(self.__port)
00170         self.open()
00171       except Exception, e:
00172         if "Connection reset by peer" in str(e):
00173           import logging
00174           logging.debug("AncillaryContributor.read: socket closed")
00175           self.__socket.disconnect()
00176           self.__socket = Socket(self.__port)
00177           self.open()
00178         else:
00179           import logging
00180           logging.exception("AncillaryContributor.read: %s" % (str(e)))
00181           # Do we need to do something here?
00182           break
00183 
00184     return geb.Geb.Terminate, None, None
00185 
00186   def __read(self):
00187     """!\brief Method to read an ancillary contribution from a socket.
00188     """
00189     self.__quit = 0
00190     rd, wr, ex = select([self.__socket.socket()], [], [], self.__timeout)
00191     initLen = 8
00192     while not self.__quit:
00193       if rd:
00194         data            = self.__socket.recvall(initLen)
00195         if len(data) != initLen:
00196           # This will happen only when the socket closed
00197           raise SocketClosed
00198         vTypeId, length = struct.unpack('<2L', data)
00199         typeId = vTypeId & 0x000fffff
00200         if typeId not in AncillaryContributor.__typeIds:
00201           # Don't try to continue if bad typeId: we're lost or it's undefined
00202           raise KeyError, "AncillaryContributor.read: Unrecognized contribution %08x" % (typeId)
00203         length &= 0x0fffffff
00204         data  += self.__socket.recvall(length - initLen)
00205         data = array('I', data)
00206         data.byteswap()                 # Get it into big endian form
00207         data = data.tostring()
00208 
00209         status = 0
00210         if typeId == AncillaryContribution.EventId:
00211           return geb.Geb.Build,     status, AncillaryContribution(self, data)
00212         else:
00213           return geb.Geb.DontBuild, status, AncillaryContribution(self, data)
00214 
00215       rd, wr, ex = select([self.__socket.socket()], [], [], self.__timeout)
00216 
00217     self.__socket.disconnect()
00218     return geb.Geb.Terminate, None, None
00219 
00220 
00221 class Merger(geb.Geb):
00222   def __init__(self, contributors, queue):
00223     geb.Geb.__init__(self, contributors)
00224     self.__queue = queue
00225 
00226   def dispose(self, buffer):
00227     """!\brief Method called by the base class to dispose of an event.
00228     """
00229     self.__queue.put(buffer)
00230 
00231 
00232 def getHostPort(hostPort):
00233   host_port = hostPort.split(':')
00234   host = host_port[0]
00235   if len(host_port) == 1:
00236     port = 2345
00237   else:
00238     port = int(host_port[1])
00239   return host, port
00240 
00241 
00242 class EvtCliBT(EvtCli):
00243   AncHostPort = "localhost:2345"
00244   def __init__(self, debug = 0,
00245                port        = EvtCliIO.PortNumber,
00246                ancHostPort = "localhost:2345"):  #EvtCliBT.AncHostPort):
00247     EvtCli.__init__(self, debug, port)
00248     self.__timeout = 1                  # same as EvtCliIO.__timeout
00249 
00250     ldfContributor = LdfContributor(self)
00251     ancContributor = AncillaryContributor(getHostPort(ancHostPort))
00252     self.__queue   = Queue.Queue()
00253     self.__merger  = Merger([ldfContributor, ancContributor], self.__queue)
00254     #self.__merger  = Merger([ldfContributor], self.__queue)
00255     self.__merger.start()
00256 
00257   def readEvent(self):
00258     """\brief Method used to read an event from the event merging code.
00259     """
00260     self.__abort = 0
00261     while not self.__abort:
00262       try:
00263         return self.__queue.get(block = True, timeout = self.__timeout)
00264       except Queue.Empty:
00265         if self.isTimeoutEnabled():
00266           raise IOError, "EvtCliBT.readEvent: Event read timeout"
00267     # Abort
00268     raise RuntimeError, "EvtCliBT.readEvent: Aborted"
00269 
00270   def flushAll(self):
00271     self.__merger.flushAll()
00272 
00273   def abort(self):
00274     """\brief Abort out of a readEvent().
00275 
00276     Can be used from another thread
00277     """
00278     self.__abort = True
00279 
00280   def shutdown(self):
00281     """\brief Method used to shut down the event merging threads.
00282 
00283     This must be called from a different thread than the one in which the
00284     merger runs.
00285     """
00286     self.__merger.shutdown()
00287     self.__merger.join()   # Wait for the Event Builder thread to exit

Generated on Fri Jul 21 13:26:28 2006 for LATTE R04-12-00 by doxygen 1.4.3