00001 #!/usr/local/bin/python 00002 # 00003 # monitoringDbSyncOut.py 00004 # 00005 # 00006 # 00007 # 00008 # Copyright 2005 00009 # by 00010 # The Board of Trustees of the 00011 # Leland Stanford Junior University. 00012 # All rights reserved. 00013 # 00014 00015 __facility__ = "Online" 00016 __abstract__ = "Housekeeping Database Synch Out" 00017 __author__ = "J. Panetta <panetta@SLAC.Stanford.edu> SLAC - GLAST LAT I&T/Online" 00018 __date__ = "11/20/2004" 00019 __version__ = "$Revision: 1.5 $" 00020 __credits__ = "SLAC" 00021 00022 import LATTE.copyright_SLAC 00023 00024 import os, sys, struct, time, gzip 00025 from LATTE.client.gOptions import Options 00026 import MySQLdb 00027 00028 tlmFieldsList = "MNEMID, MNEM, DESTTYPE, EUCONV, DESCRIP, OPSYMBOL, SUBS, EUSTRING, EVENTFLAG, ARYLENGTH, LIMITS, INIT, NOTASSIGNABLE" 00029 valFieldsList = "USECS, VALUE, MNEMID, SOURCE" 00030 srcFieldsList = "SOURCE, DESCRIP" 00031 00032 noAuto = "set AUTOCOMMIT=0" 00033 lockMsg = "LOCK TABLES valfields WRITE;" 00034 unlockMsg = "UNLOCK TABLES;" 00035 00036 00037 valPackFmt = "!ddLH" 00038 chunkSize = 1000 00039 nullValue = 1000000000.0000001 # Standard python does not have IEEE NaN available 00040 00041 def lockTransact(db, msg): 00042 """Locking database transactor for INNODB databases 00043 """ 00044 db.execute(noAuto) 00045 db.execute(lockMsg) 00046 db.execute(msg) 00047 rc = db.rowcount 00048 db.connection.commit() 00049 db.execute(unlockMsg) 00050 return rc 00051 00052 def dateMnemSelect(db, start, end): 00053 """Returns all used mnemonics in a time period 00054 """ 00055 msg = "select distinct mnemid from valfields where %f<usecs and usecs<%f" % (start, end) 00056 db.execute(msg) 00057 m = db.fetchall() 00058 ret = [] 00059 for item in m: 00060 ret.append(item[0]) 00061 return ret 00062 00063 def dumpVALfields(db, outFile, start, end): 00064 # Get the mnemonic space to iterate over 00065 mnemSet = dateMnemSelect(db, start, end) 00066 for mnem in mnemSet: 00067 # print time.ctime(), "Starting select mnemid =", mnem 00068 selectMsg = "select %s from valfields" % (valFieldsList) 00069 qualifier = " where mnemid=%d and %f<usecs and usecs<%f and VALIDITY is NULL" %\ 00070 (mnem, start, end) 00071 db.execute(selectMsg + qualifier) # doesn't need locking transaction 00072 m = db.fetchall() 00073 00074 for set in m: 00075 if set[1] is None: # Deal with NULL returns 00076 s = list(set) 00077 s[1] = nullValue 00078 set = tuple(s) 00079 str = struct.pack(valPackFmt, *set) 00080 outFile.write(str) 00081 00082 updMsg = "update valfields set VALIDITY=1" 00083 limitMsg = " LIMIT %d" % chunkSize 00084 block=0 00085 # print time.ctime(), "starting update" 00086 while True: 00087 # print time.ctime(), "update block", block 00088 rows = lockTransact(db, updMsg + qualifier + limitMsg) 00089 block += 1 00090 if rows < chunkSize: 00091 break 00092 00093 00094 # while True: 00095 # msg = "select %s from valfields where VALIDITY is NULL AND USECS>%f and USECS<%f limit %d" %\ 00096 # (valFieldsList, start, end, chunkSize) 00097 # db.execute(msg) 00098 # rc = db.rowcount 00099 # if rc == 0: break 00100 # 00101 # m = db.fetchmany(chunkSize) 00102 # for set in m: 00103 # if set[1] is None: # Deal with NULL returns 00104 # s = list(set) 00105 # s[1] = nullValue 00106 # set = tuple(s) 00107 # str = struct.pack(valPackFmt, *set) 00108 # outFile.write(str) 00109 # 00110 # updMsg = "update valfields set VALIDITY=1 where VALIDITY is NULL AND USECS>%f and USECS<%f limit %d" %\ 00111 # ( start, end, chunkSize) 00112 # db.execute("LOCK TABLES valfields WRITE;") 00113 # rc = db.execute(updMsg) 00114 # db.connection.commit() 00115 # db.execute("UNLOCK TABLES;") 00116 # 00117 00118 def findMinTime(db): 00119 msg = "select min(usecs) from valfields where validity is NULL" 00120 db.execute(msg) 00121 m = db.fetchall() # returns tuple(tuple,) 00122 return m[0][0] 00123 00124 00125 def dumpTLMfields(db, outFile): 00126 msg = "select %s from tlmfields;" % (tlmFieldsList) 00127 db.execute(msg) 00128 00129 m = db.fetchall() 00130 00131 print >> outFile, tlmFieldsList 00132 for set in m: 00133 print >> outFile, set 00134 00135 00136 def usage(): 00137 print """ 00138 monitoringDbSyncOut Usage: 00139 $ monitoringDbSyncOut --outdir <dir> [--start <start>] [--end <end>] 00140 00141 Options: 00142 --outdir Specify the output directory root. A subdirectory will be created there 00143 of the form YYYYMMDD-HHMMSS containing the output files. The time used is 00144 from the --end option, below. 00145 --start Specify the start time in seconds since Jan 0 1970. 00146 Default is 0 00147 --end Specify the end time in seconds since Jan 0 1970 00148 Default is the output of time.time() when the program is run. 00149 """ 00150 00151 if __name__ == '__main__': 00152 00153 # Options, first list mandatory, second list optional 00154 options = Options(["outdir"], 00155 ["start", "end"]) 00156 try: 00157 options.parse() 00158 except Exception, msg: 00159 # options.usage(usage()) 00160 raise Exception, msg 00161 00162 start = -1 00163 end = int(time.time()) 00164 if options.start: 00165 start = int(options.start) 00166 if options.end: 00167 end = int(options.end) 00168 00169 endTimeStr = time.strftime("%Y%m%d-%H%M%S", time.gmtime(end)) 00170 outDir = os.path.join(options.outdir, endTimeStr) 00171 00172 if not os.path.isdir(outDir): 00173 os.mkdir(outDir) 00174 00175 sqlDb = MySQLdb.connect(db='itosdb', user='itosdb', host='localhost') 00176 cursor = sqlDb.cursor() 00177 00178 if start == -1: 00179 start = findMinTime(cursor) 00180 00181 valOut = gzip.open(os.path.join(outDir,'val.out'),mode='wb+') 00182 dumpVALfields(cursor, valOut, start, end) 00183 valOut.close() 00184 00185 tlmOut = open(os.path.join(outDir,'tlm.out'),mode='wt+') 00186 dumpTLMfields(cursor, tlmOut) 00187 tlmOut.close() 00188 00189 done = open(os.path.join(outDir, 'zzz.done'), mode='w') 00190 done.close() 00191 00192 00193 00194