00001
00002
00003
00004
00005
00006
00007
00008
00009
00010 __facility__ = "Online"
00011 __abstract__ = "Current Value Table Database Populator"
00012 __author__ = "Jim Panetta <panetta@slac.stanford.edu> SLAC - GLAST LAT I&T/Online"
00013 __date__ = ("$Date: 2006/04/26 01:40:08 $").split(' ')[1]
00014 __version__ = "$Revision: 1.9 $"
00015 __release__ = "$Name: HEAD $"
00016 __credits__ = "SLAC"
00017
00018 import sys, asyncore, time
00019 import threading
00020 import psycopg
00021 from ConfigParser import *
00022
00023 from LICOS.lib.currValTable.CurrValClient import CurrValClient
00024 from LICOS.lib.LATconstants import LAT_EPOCH
00025 from LICOS.tools.proxy.VscProxyPorts import VscProxyPorts
00026
00027 import LICOS.util.gOptions as gOptions
00028 from ISOC.TlmUtils.TlmRdbInterface import TlmRdbDb
00029
00030
00031 killEvent = threading.Event()
00032 killEvent.clear()
00033
00034 multiplyDefinedMnems = ['LHKSPARE10', 'LHKSPARE08', 'LHKSPARE13', 'LHKSPARE16']
00035
00036 TIMER_GRANULARITY = 1
00037
00038 def asyncore_loop(timeout=1.0, use_poll=False, map=None):
00039 """replace the asyncore central loop and run it its own thread.
00040 Valid for Python 2.3.x
00041 Python 2.4 has an extra agument for loop, count.
00042 """
00043 if map is None:
00044 map = asyncore.socket_map
00045
00046 if use_poll:
00047 if hasattr(select, 'poll'):
00048 poll_fun = asyncore.poll3
00049 else:
00050 poll_fun = asyncore.poll2
00051 else:
00052 poll_fun = asyncore.poll
00053
00054 while map and not killEvent.isSet():
00055 poll_fun(timeout, map)
00056
00057 print map, killEvent.isSet()
00058
00059 class CvtDbPopulator(CurrValClient):
00060 def __init__(self, host, port, db, source, build):
00061 dbc = db.cursor()
00062 tlmdb = TlmRdbDb(dbc)
00063
00064 print "populating...", time.asctime()
00065 tlmdb.populate(source=source, build=build)
00066 print "populated", time.asctime()
00067
00068 mnemList = []
00069 for item in tlmdb.getAllMnems():
00070
00071 if item.name()[0:4] == "LPBC":
00072 continue
00073 if item in multiplyDefinedMnems:
00074 continue
00075 mnemList.append(item.name())
00076
00077 CurrValClient.__init__(self, host, port, mnemList=mnemList)
00078
00079 self.__tlmDb = tlmdb
00080 self.__source = source
00081 self.__dbc = dbc
00082
00083 self.__cvListLock = threading.Lock()
00084 self.__updateLock = threading.Lock()
00085 self.__cvList = []
00086 self.__timer = threading.Timer(TIMER_GRANULARITY, self.multiUpdate)
00087 self.__timer.start()
00088 self.__nUpdates = 0
00089 pass
00090
00091 def update(self, cv):
00092
00093 tlm = self.__tlmDb.getMnemList([cv.name])[0]
00094 if cv.name in multiplyDefinedMnems:
00095 return
00096
00097 dispatch = False
00098 self.__cvListLock.acquire()
00099 self.__cvList.append(cv)
00100 self.__cvListLock.release()
00101
00102
00103 def multiUpdate(self):
00104 self.__updateLock.acquire()
00105 self.__cvListLock.acquire()
00106
00107 nCv = len(self.__cvList)
00108 dispatchList = self.__cvList[0:nCv]
00109 self.__cvList = self.__cvList[nCv:]
00110
00111 self.__cvListLock.release()
00112 self.__updateLock.release()
00113
00114 sendList = []
00115 for item in dispatchList:
00116 sendList.append( (item.name, self.__source, item.age() + LAT_EPOCH, item.rawValue()) )
00117
00118 while sendList != []:
00119 end = min(100, len(sendList))
00120 try:
00121
00122 self.__tlmDb.insertTelemetryCollection(sendList[:end])
00123 sendList = sendList[end:]
00124 except Exception, e:
00125 msg = "Error in inserting telemetry. Data follows:"
00126 msg += "mnem, source, age, value: \n" + str(sendList[:end])
00127 print e, msg
00128
00129 del(sendList)
00130
00131 if not self.__nUpdates % 5000:
00132 query = "ANALYZE v3hkvalfields;"
00133 print time.time(), query
00134 self.__dbc.execute(query)
00135 self.__nUpdates += 1
00136
00137 sys.stdout.flush()
00138 self.__timer = threading.Timer(TIMER_GRANULARITY, self.multiUpdate)
00139 self.__timer.start()
00140
00141
00142
00143 def usage():
00144 print """
00145 where:
00146 cvtHost is the hostname of the CVT server
00147 config is the catchall configuration file
00148 """
00149
00150 def main():
00151 options = gOptions.Options(['cvtHost', 'config'])
00152 try:
00153 options.parse()
00154 except Exception, msg:
00155 options.usage(str(msg))
00156 usage()
00157 sys.exit()
00158
00159 cvtHost = options.cvtHost
00160 configFile = options.config
00161 config = ConfigParser()
00162 config.read(configFile)
00163
00164
00165 db = psycopg.connect( config.get('postgres','dsn').strip() )
00166
00167 ports = VscProxyPorts(config.getint('vsc', 'proxyPortBase'))
00168 CvtDbPopulator( cvtHost, ports.cvtOut(), db, source=config.getint('tlmdb','source'),build=config.get('tlmdb', 'build'))
00169
00170
00171 asyncore_loop()
00172
00173 killEvent.set()
00174
00175
00176 if __name__ == '__main__':
00177 main()