00001
00002
00003
00004
00005
00006
00007
00008
00009
00010 __facility__ = "Online"
00011 __abstract__ = "Current Value Table Server"
00012 __author__ = "Jim Panetta <panetta@slac.stanford.edu> SLAC - GLAST LAT I&T/Online"
00013 __date__ = ("$Date: 2006/01/28 05:35:26 $").split(' ')[1]
00014 __version__ = "$Revision: 1.13 $"
00015 __release__ = "$Name: HEAD $"
00016 __credits__ = "SLAC"
00017
00018 import LICOS.copyright_SLAC
00019
00020 import struct, time, threading, copy
00021 import cPickle as pickle
00022
00023 import asyncore, socket
00024
00025 from CurrValTable import *
00026 from LICOS.lib.currValTable.StringTcpProtocol import Int16StringProtocol
00027 import LICOS.util.gOptions as gOptions
00028
00029 from LICOS.lib.cmdTlmDb.LCATtlmDb import LICOS_TlmPacketFactory
00030
00031 killEvent = threading.Event()
00032 killEvent.clear()
00033
00034 class CurrentValueProtocol(Int16StringProtocol):
00035 TIMEOUT = 10
00036 def __init__(self, port, cvt):
00037 super(CurrentValueProtocol, self).__init__(port)
00038 self.__cvt = cvt
00039 self.__mnemFlags = {}
00040 self.sendString("ok ")
00041
00042 def handle_close(self):
00043
00044 self.__cvt.removeObserver(self)
00045 self.close()
00046
00047
00048 def stringReceived(self, line):
00049 """ here's where the full connection protocol between the clients and
00050 server is defined.
00051
00052 observer # client wishes to be an observer.
00053
00054 register <mnemonic> <flags> # client wishes to be an observer of <mnemonic> with flags <flags> (space separated set of flags)
00055 unregister <mnemonic> # client wishes *not* to be an observer of <mnemonic>
00056
00057 drop # client wishes to be no longer an observer
00058 # *REVISIT* -- how to disconnect from mnemonics?
00059 request <mnem> <flags> # Client wants to request an observable ASAP.
00060 """
00061 fields = line.split()
00062 query = fields[0]
00063
00064 cvt = self.__cvt
00065 egu = False
00066
00067
00068 if query == 'observer':
00069 resp = 'ob '
00070 self.sendString(resp)
00071 pass
00072 elif query == 'drop':
00073 pass
00074 elif query == 'register':
00075 mnem = fields[1]
00076 flags = fields[2:]
00077 self.register(mnem, flags)
00078 elif query == 'unregister':
00079 mnem = fields[1]
00080 self.unregister(mnem)
00081 elif query == 'request':
00082 mnem = fields[1]
00083 flags = fields[2:]
00084 self.request(mnem, flags)
00085 else:
00086 print "bad query:", line
00087
00088
00089
00090 def register(self, mnem, flags):
00091 cvt = self.__cvt
00092 if mnem == 'everythingunderthesun':
00093 self.__registerAll(flags)
00094 else:
00095
00096 if cvt.hasValue(mnem):
00097 cv = cvt.getValue(mnem)
00098 cv.addObserver(self)
00099
00100 if not(cv.flags() & cv.FLAG_STALE):
00101 self.update(cv)
00102 else:
00103 cvt.setValue(mnem, None, time.time())
00104 cv = cvt.getValue(mnem)
00105 cv.addObserver(self)
00106 cv.setFlags(cv.FLAG_STALE)
00107
00108
00109 self.__mnemFlags[mnem] = flags
00110
00111
00112 def __registerAll(self, flags):
00113 for mnem, cv in self.__cvt.fullTable().items():
00114 cv.addObserver(self)
00115 self.__mnemFlags[mnem] = flags
00116
00117 def unregister(self, mnem):
00118 cvt = self.__cvt
00119 if cvt.hasValue(mnem):
00120 cvt.getValue(mnem).delObserver(self)
00121
00122 def request(self, mnem, flags):
00123 cvt = self.__cvt
00124 replyStr = "rq "
00125
00126 if cvt.hasValue(mnem):
00127 cv = cvt.getValue(mnem)
00128 replyStr += encodeCurrentValue(cv)
00129
00130 else:
00131 replyStr += pickle.dumps(None,2)
00132
00133
00134 self.sendString(replyStr)
00135
00136 def update(self, cv):
00137 """update method
00138 \param cv A CurrentValue object
00139 """
00140
00141 replyStr = "up "
00142 replyStr += encodeCurrentValue(cv)
00143
00144
00145 self.sendString(replyStr)
00146
00147 def __encode(self, cv):
00148
00149
00150
00151
00152
00153
00154
00155 cvCop = copy.copy(cv)
00156 cvCop.__class__ = CurrentValueBase
00157 del(cvCop.observers)
00158
00159
00160 return pickle.dumps(cvCop,2)
00161
00162 class CurrentValueFactory(asyncore.dispatcher, object):
00163 def __init__(self, port, cvt):
00164 super(CurrentValueFactory, self).__init__()
00165 self.__cvt = cvt
00166
00167
00168 self.create_socket(socket.AF_INET,socket.SOCK_STREAM)
00169 self.set_reuse_addr()
00170 self.bind(("", port))
00171 self.listen(100)
00172
00173 def handle_accept(self):
00174 """This is called once per *connection*
00175 """
00176 clientPort, addr = self.accept()
00177 print "Adding client connection: ", addr
00178 return CurrentValueProtocol(clientPort, self.__cvt)
00179
00180 def readLoop(cvt, packetFile):
00181
00182 pFh = open(packetFile, 'rb')
00183 count = 0
00184 delta = 0
00185 delta2 = 0
00186 while not killEvent.isSet():
00187 packet = getPacket(pFh)
00188 if packet is None:
00189 pFh.seek(0)
00190 continue
00191
00192 apIdPkt = LICOS_TlmPacketFactory(packet)
00193 aTime = apIdPkt.secs + apIdPkt.usecs/1E6
00194 print "dispatching apid 0x%04x, time %12d" %( apIdPkt.apid, aTime )
00195 apIdPkt.decode_payload()
00196
00197 for mnem in apIdPkt.get_payload_list():
00198 cvt.setValue(mnem, apIdPkt.get_payload(mnem), aTime)
00199
00200 cvt.setValue(apIdPkt.name.upper(), packet, aTime)
00201 count += 1
00202 time.sleep(0.25)
00203 if count%100 == 0:
00204 print count, time.time()
00205
00206 def getPacket(fileHandle):
00207 try:
00208 packet = ""
00209 packet += fileHandle.read(6)
00210 pktHdr = struct.unpack('!HHH', packet)
00211 apId = ( pktHdr[0] & 0x07FF )
00212 pLen = pktHdr[2] +1
00213
00214 packet += fileHandle.read(pLen)
00215 return packet
00216
00217 except Exception, e:
00218
00219 return None
00220
00221 if __name__ == '__main__':
00222 options = gOptions.Options(['pktFile', 'cvtPort'])
00223 try:
00224 options.parse()
00225 except Exception, msg:
00226 options.usage(str(msg))
00227 exit
00228
00229 packetFile = options.pktFile
00230 cvtPort = int(options.cvtPort)
00231
00232 cvt = CurrentValueTable()
00233 cvs = CurrentValueFactory(cvtPort, cvt)
00234
00235
00236 readThread = threading.Thread(name="readLoop", target=readLoop, args= (cvt, packetFile))
00237
00238 readThread.start()
00239
00240
00241 try:
00242 asyncore.loop(1)
00243 except:
00244 pass
00245 print "doneloop"
00246
00247
00248 killEvent.set()
00249 readThread.join()
00250
00251
00252
00253
00254
00255
00256
00257
00258