CurrValServer.py

Go to the documentation of this file.
00001 #!/usr/local/bin/python
00002 #
00003 #                               Copyright 2004
00004 #                                     by
00005 #                        The Board of Trustees of the
00006 #                     Leland Stanford Junior University.
00007 #                            All rights reserved.
00008 #
00009 
00010 __facility__ = "Online"
00011 __abstract__ = "Current Value Table Server"
00012 __author__  = "Jim Panetta <panetta@slac.stanford.edu> SLAC - GLAST LAT I&T/Online"
00013 __date__     = ("$Date: 2006/01/28 05:35:26 $").split(' ')[1]
00014 __version__ = "$Revision: 1.13 $"
00015 __release__  = "$Name: HEAD $"
00016 __credits__ = "SLAC"
00017 
00018 import LICOS.copyright_SLAC
00019 
00020 import struct, time, threading, copy
00021 import cPickle as pickle
00022 
00023 import asyncore, socket
00024 
00025 from CurrValTable import *
00026 from LICOS.lib.currValTable.StringTcpProtocol import Int16StringProtocol
00027 import LICOS.util.gOptions as gOptions
00028 
00029 from LICOS.lib.cmdTlmDb.LCATtlmDb  import LICOS_TlmPacketFactory
00030 
00031 killEvent = threading.Event()
00032 killEvent.clear()
00033 
00034 class CurrentValueProtocol(Int16StringProtocol):
00035   TIMEOUT   = 10    # seconds
00036   def __init__(self, port, cvt):
00037     super(CurrentValueProtocol, self).__init__(port)
00038     self.__cvt  = cvt
00039     self.__mnemFlags    = {}
00040     self.sendString("ok ")
00041 
00042   def handle_close(self):
00043     # print "CVP: handle_close"
00044     self.__cvt.removeObserver(self)
00045     self.close()
00046 
00047 
00048   def stringReceived(self, line):
00049     """ here's where the full connection protocol between the clients and
00050         server is defined.
00051 
00052         observer               # client wishes to be an observer.
00053 
00054         register <mnemonic> <flags>  # client wishes to be an observer of <mnemonic> with flags <flags>  (space separated set of flags)
00055         unregister <mnemonic>  # client wishes *not* to be an observer of <mnemonic>
00056 
00057         drop                   # client wishes to be no longer an observer
00058                                #   *REVISIT* -- how to disconnect from mnemonics?
00059         request <mnem> <flags> # Client wants to request an observable ASAP.
00060     """
00061     fields = line.split()
00062     query = fields[0]
00063 
00064     cvt = self.__cvt
00065     egu = False
00066     #print fields
00067 
00068     if   query == 'observer':  # register an observer
00069       resp = 'ob '
00070       self.sendString(resp)
00071       pass
00072     elif query == 'drop':
00073       pass
00074     elif query == 'register':
00075       mnem = fields[1]
00076       flags = fields[2:]
00077       self.register(mnem, flags)
00078     elif query == 'unregister':
00079       mnem = fields[1]
00080       self.unregister(mnem)
00081     elif query == 'request':
00082       mnem = fields[1]
00083       flags = fields[2:]
00084       self.request(mnem, flags)
00085     else:
00086       print "bad query:", line
00087 
00088     # print line
00089 
00090   def register(self, mnem, flags):
00091     cvt = self.__cvt
00092     if mnem == 'everythingunderthesun':
00093       self.__registerAll(flags)
00094     else:
00095       # print cvt.fullTable().keys()
00096       if cvt.hasValue(mnem):
00097         cv = cvt.getValue(mnem)
00098         cv.addObserver(self)
00099         #print "register:", mnem, " flags:", hex(cv.flags())
00100         if not(cv.flags() & cv.FLAG_STALE):
00101           self.update(cv)
00102       else:
00103         cvt.setValue(mnem, None, time.time())
00104         cv = cvt.getValue(mnem)
00105         cv.addObserver(self)
00106         cv.setFlags(cv.FLAG_STALE)
00107         # Don't call self.update()  The value is poor.
00108 
00109       self.__mnemFlags[mnem] = flags
00110     # ****revisit******    Add the update here.  (to seed the observer)
00111 
00112   def __registerAll(self, flags):
00113     for mnem, cv in self.__cvt.fullTable().items():
00114       cv.addObserver(self)
00115       self.__mnemFlags[mnem] = flags
00116 
00117   def unregister(self, mnem):
00118     cvt = self.__cvt
00119     if cvt.hasValue(mnem):
00120       cvt.getValue(mnem).delObserver(self)
00121 
00122   def request(self, mnem, flags):
00123     cvt = self.__cvt
00124     replyStr = "rq "
00125     # print "request", mnem, flags
00126     if cvt.hasValue(mnem):
00127       cv = cvt.getValue(mnem)
00128       replyStr += encodeCurrentValue(cv)
00129 
00130     else:  # mnem doesn't exist, pass None across
00131       replyStr += pickle.dumps(None,2)
00132 
00133     #print "rq:", len(replyStr)
00134     self.sendString(replyStr)
00135 
00136   def update(self, cv):
00137     """update method
00138     \param cv  A CurrentValue object
00139     """
00140 
00141     replyStr =  "up "
00142     replyStr += encodeCurrentValue(cv)
00143 
00144     # print "up:", len(replyStr)
00145     self.sendString(replyStr)
00146 
00147   def __encode(self, cv):
00148     # To reduce the size  and complexity of the transferred object, make
00149     # a copy, and perform a sticky <reinterpret_cast> back
00150     # to the base class.  Then delete the observers list
00151     # This list is only useful on the server side, and increases
00152     # the size of the pickled object by a large factor if full.
00153 
00154     
00155     cvCop = copy.copy(cv)
00156     cvCop.__class__ = CurrentValueBase
00157     del(cvCop.observers)
00158 
00159     # pickle the object into a binary blob
00160     return pickle.dumps(cvCop,2)
00161 
00162 class CurrentValueFactory(asyncore.dispatcher, object):
00163   def __init__(self, port, cvt):
00164     super(CurrentValueFactory, self).__init__()
00165     self.__cvt = cvt
00166 
00167     # build the main connection socket
00168     self.create_socket(socket.AF_INET,socket.SOCK_STREAM)
00169     self.set_reuse_addr()
00170     self.bind(("", port))
00171     self.listen(100)
00172 
00173   def handle_accept(self):
00174     """This is called once per *connection*
00175     """
00176     clientPort, addr = self.accept()
00177     print "Adding client connection: ", addr
00178     return CurrentValueProtocol(clientPort, self.__cvt)
00179 
00180 def readLoop(cvt, packetFile):
00181 
00182   pFh = open(packetFile, 'rb')
00183   count = 0
00184   delta = 0
00185   delta2 = 0
00186   while not killEvent.isSet():
00187     packet = getPacket(pFh)
00188     if packet is None:
00189       pFh.seek(0)
00190       continue
00191 
00192     apIdPkt = LICOS_TlmPacketFactory(packet)
00193     aTime = apIdPkt.secs + apIdPkt.usecs/1E6
00194     print "dispatching apid 0x%04x, time %12d" %( apIdPkt.apid, aTime )
00195     apIdPkt.decode_payload()
00196     # toss each mnemonic in the cvt and then...
00197     for mnem in apIdPkt.get_payload_list():
00198       cvt.setValue(mnem, apIdPkt.get_payload(mnem), aTime)
00199     # add in the packet as well
00200     cvt.setValue(apIdPkt.name.upper(), packet, aTime)
00201     count += 1
00202     time.sleep(0.25)
00203     if count%100 == 0:
00204       print count, time.time()
00205 
00206 def getPacket(fileHandle):
00207   try:
00208     packet = ""
00209     packet += fileHandle.read(6)
00210     pktHdr = struct.unpack('!HHH', packet)
00211     apId = ( pktHdr[0] & 0x07FF )
00212     pLen = pktHdr[2] +1
00213     #print "returning apId %x, length %d" % (apId, pLen)
00214     packet += fileHandle.read(pLen)
00215     return packet
00216 
00217   except Exception, e:
00218     # print e
00219     return None
00220 
00221 if __name__ == '__main__':
00222   options = gOptions.Options(['pktFile', 'cvtPort'])
00223   try:
00224     options.parse()
00225   except Exception, msg:
00226     options.usage(str(msg))
00227     exit
00228 
00229   packetFile = options.pktFile
00230   cvtPort = int(options.cvtPort)
00231 
00232   cvt = CurrentValueTable()
00233   cvs = CurrentValueFactory(cvtPort, cvt)
00234 
00235 
00236   readThread = threading.Thread(name="readLoop", target=readLoop, args= (cvt, packetFile))
00237   # readThread.setDaemon(True)
00238   readThread.start()
00239 
00240   # print "reactor.run"
00241   try:
00242     asyncore.loop(1)
00243   except:
00244     pass
00245   print "doneloop"
00246 
00247 
00248   killEvent.set()
00249   readThread.join()
00250 
00251 
00252 
00253 
00254 
00255 
00256 
00257 
00258 #eof

Generated on Thu Apr 27 20:52:41 2006 for LICOS L02-01-00 by doxygen 1.4.6-NO