CvtDbPopulator.py

Go to the documentation of this file.
00001 #!/usr/local/bin/python
00002 #
00003 #                               Copyright 2004
00004 #                                     by
00005 #                        The Board of Trustees of the
00006 #                     Leland Stanford Junior University.
00007 #                            All rights reserved.
00008 #
00009 
00010 __facility__ = "Online"
00011 __abstract__ = "Current Value Table Database Populator"
00012 __author__  = "Jim Panetta <panetta@slac.stanford.edu> SLAC - GLAST LAT I&T/Online"
00013 __date__     = ("$Date: 2006/04/26 01:40:08 $").split(' ')[1]
00014 __version__ = "$Revision: 1.9 $"
00015 __release__  = "$Name: HEAD $"
00016 __credits__ = "SLAC"
00017 
00018 import sys, asyncore, time
00019 import threading
00020 import psycopg
00021 from ConfigParser import *
00022 
00023 from LICOS.lib.currValTable.CurrValClient import CurrValClient
00024 from LICOS.lib.LATconstants               import LAT_EPOCH
00025 from LICOS.tools.proxy.VscProxyPorts import VscProxyPorts
00026 
00027 import LICOS.util.gOptions             as gOptions
00028 from ISOC.TlmUtils.TlmRdbInterface     import TlmRdbDb
00029 
00030 
00031 killEvent = threading.Event()
00032 killEvent.clear()
00033 
00034 multiplyDefinedMnems = ['LHKSPARE10', 'LHKSPARE08', 'LHKSPARE13', 'LHKSPARE16']
00035 
00036 TIMER_GRANULARITY = 1  # second
00037 
00038 def asyncore_loop(timeout=1.0, use_poll=False, map=None):
00039   """replace the asyncore central loop and run it its own thread.
00040      Valid for Python 2.3.x  
00041      Python 2.4 has an extra agument for loop, count.
00042   """
00043   if map is None:
00044       map = asyncore.socket_map
00045 
00046   if use_poll:
00047     if hasattr(select, 'poll'):
00048       poll_fun = asyncore.poll3
00049     else:
00050       poll_fun = asyncore.poll2
00051   else:
00052     poll_fun = asyncore.poll
00053 
00054   while map and not killEvent.isSet():
00055     poll_fun(timeout, map)
00056 
00057   print map, killEvent.isSet()
00058 
00059 class CvtDbPopulator(CurrValClient):
00060   def __init__(self, host, port, db, source, build):
00061     dbc = db.cursor()
00062     tlmdb = TlmRdbDb(dbc)
00063 
00064     print "populating...", time.asctime()
00065     tlmdb.populate(source=source, build=build)
00066     print "populated", time.asctime()
00067 
00068     mnemList = []
00069     for item in tlmdb.getAllMnems():
00070       # *don't* add the PBC telemetry...
00071       if item.name()[0:4] == "LPBC":
00072         continue
00073       if item in multiplyDefinedMnems:
00074         continue
00075       mnemList.append(item.name())
00076     
00077     CurrValClient.__init__(self, host, port, mnemList=mnemList)
00078 
00079     self.__tlmDb = tlmdb
00080     self.__source = source
00081     self.__dbc   = dbc
00082 
00083     self.__cvListLock = threading.Lock()
00084     self.__updateLock = threading.Lock()
00085     self.__cvList = []
00086     self.__timer  = threading.Timer(TIMER_GRANULARITY, self.multiUpdate)
00087     self.__timer.start()
00088     self.__nUpdates = 0
00089     pass
00090     
00091   def update(self, cv):
00092     #print "in update", cv.name
00093     tlm = self.__tlmDb.getMnemList([cv.name])[0]
00094     if cv.name in multiplyDefinedMnems:
00095       return
00096 
00097     dispatch = False
00098     self.__cvListLock.acquire()
00099     self.__cvList.append(cv)
00100     self.__cvListLock.release()
00101 
00102 
00103   def multiUpdate(self):
00104     self.__updateLock.acquire()
00105     self.__cvListLock.acquire()
00106     
00107     nCv = len(self.__cvList)
00108     dispatchList = self.__cvList[0:nCv]
00109     self.__cvList = self.__cvList[nCv:]
00110 
00111     self.__cvListLock.release()
00112     self.__updateLock.release()
00113 
00114     sendList = []
00115     for item in dispatchList:
00116       sendList.append( (item.name, self.__source, item.age() + LAT_EPOCH, item.rawValue()) )
00117 
00118     while sendList != []:
00119       end = min(100, len(sendList))
00120       try:
00121         #print "inserting %d items into the database" % end, time.time()
00122         self.__tlmDb.insertTelemetryCollection(sendList[:end])
00123         sendList = sendList[end:]
00124       except Exception, e:
00125         msg = "Error in inserting telemetry.  Data follows:"
00126         msg += "mnem, source, age, value: \n" + str(sendList[:end])
00127         print e, msg
00128 
00129     del(sendList)
00130     # reanalyze the db as per LCS-96
00131     if not self.__nUpdates % 5000:   # do the first time and every ~5000 seconds hence
00132       query = "ANALYZE v3hkvalfields;"
00133       print time.time(), query
00134       self.__dbc.execute(query)
00135     self.__nUpdates += 1
00136     
00137     sys.stdout.flush()
00138     self.__timer  = threading.Timer(TIMER_GRANULARITY, self.multiUpdate)
00139     self.__timer.start()
00140     
00141     
00142    
00143 def usage():
00144   print """
00145 where:
00146   cvtHost       is the hostname of the CVT server
00147   config        is the catchall configuration file
00148   """
00149   
00150 def main():
00151   options = gOptions.Options(['cvtHost', 'config'])
00152   try:
00153     options.parse()
00154   except Exception, msg:
00155     options.usage(str(msg))
00156     usage()
00157     sys.exit()
00158     
00159   cvtHost    = options.cvtHost
00160   configFile = options.config
00161   config = ConfigParser()
00162   config.read(configFile)
00163 
00164   # DSN = "user=panetta dbname=panetta host=lat-hobbit4 password=panetta"
00165   db = psycopg.connect( config.get('postgres','dsn').strip() )
00166   
00167   ports = VscProxyPorts(config.getint('vsc', 'proxyPortBase'))
00168   CvtDbPopulator( cvtHost, ports.cvtOut(), db, source=config.getint('tlmdb','source'),build=config.get('tlmdb', 'build'))
00169 
00170   # start the asyncore loop.  Termination is set by killEvent
00171   asyncore_loop()
00172   
00173   killEvent.set()
00174 
00175 
00176 if __name__ == '__main__':
00177     main()

Generated on Thu Apr 27 20:52:41 2006 for LICOS L02-01-00 by doxygen 1.4.6-NO