00001
00002
00003
00004
00005
00006
00007
00008
00009
00010 __facility__ = "Online"
00011 __abstract__ = "Interface to the Run activities table"
00012 __author__ = "S. Tuvi <stuvi@SLAC.Stanford.edu> SLAC - GLAST LAT I&T/Online"
00013 __date__ = "2005/07/23 00:08:27"
00014 __updated__ = "$Date: 2006/04/21 23:39:01 $"
00015 __version__ = "$Revision: 1.17 $"
00016 __release__ = "$Name: HEAD $"
00017 __credits__ = "SLAC"
00018
00019 try:
00020 import LICOS.copyright_SLAC
00021 except:
00022 pass
00023
00024 import MySQLdb
00025 import logging as log
00026 import os
00027 import time
00028
00029 try:
00030 from LICOS.lib.LATconstants import *
00031 except:
00032 ONLINE_ROOT = os.environ['ONLINE_ROOT']
00033
00034 class ActivitiesInterface(object):
00035 __stdDbName = "elogbook"
00036 __stdDbUser = "elogbook"
00037 __stdDbPass = "elogbook"
00038 __stdDbPort = 3306
00039
00040
00041 TableStates = "LICOS_states"
00042 TableActivities = "LICOS_activities"
00043 TableReport = "elogreport"
00044
00045
00046 CommandingState = "CommandingState_FK"
00047 DataState = "DataState_FK"
00048 AnalysisState = "AnalysisState_FK"
00049
00050
00051 DataStart = "DataStart"
00052 DataStop = "DataStop"
00053
00054
00055 SuiteIdentifier = "SuiteIdentifier"
00056
00057
00058 LDFname = "LdfFileName"
00059
00060
00061 ScriptOutput = "ScriptOutput"
00062
00063
00064 JobComplete = "JobCompleteList"
00065
00066
00067 PrimaryKey = "RunID_FK"
00068
00069
00070
00071 __states = [ "ActivityCreated",
00072 "CommandingStarted",
00073 "CommandingComplete",
00074 "NoData",
00075 "DataStarted",
00076 "DataComplete",
00077 "NoAnalysis",
00078 "AnalysisStarted",
00079 "AnalysisComplete", ]
00080
00081 """!\brief Interface to the activities database
00082 """
00083 def __init__(self,
00084 dbName=__stdDbName,
00085 dbUser=__stdDbUser,
00086 dbPass=__stdDbPass,
00087 dbPort=__stdDbPort):
00088
00089 self.__db = None
00090 self.__dbName = dbName
00091 self.__dbUser = dbUser
00092 self.__dbPass = dbPass
00093 self.__dbPort = dbPort
00094 self.__stateKeys = {}
00095
00096 self.initialize()
00097
00098 pass
00099
00100 def __del__(self):
00101 """ Reimplementation of the destructor to make bloody well sure
00102 that the table ends up in an unlocked state...
00103
00104 """
00105 self.unlockRow()
00106 self.__db = None
00107
00108
00109 def initialize(self):
00110 """!\brief Initialize the object. Do a database connect, etc.
00111 """
00112 self.connect()
00113
00114
00115 for stateName in self.__states:
00116 constraint = "stateName = '%s'" % (stateName)
00117 tmp = self.__pull(self.__selectSQL( table=self.TableStates, column = "state_PK", constraint=constraint))
00118
00119 setattr(self, stateName, tmp[0])
00120 self.__stateKeys[tmp[0]] = stateName
00121
00122
00123 def connect(self):
00124 """!\brief connect to the database
00125 """
00126 if "ELOGBOOK_HOST" in os.environ:
00127 host = os.environ["ELOGBOOK_HOST"]
00128 else:
00129 host = "localhost"
00130
00131 if 'ELOGBOOK_DB' in os.environ:
00132 self.__dbName = os.environ['ELOGBOOK_DB']
00133
00134 try:
00135 self.__db = MySQLdb.connect(host=host,
00136 db = self.__dbName,
00137 user = self.__dbUser,
00138 passwd = self.__dbPass,
00139 port = self.__dbPort)
00140 except MySQLdb.OperationalError, e:
00141 msg = "Error encountered when attempting to connect to MySQL server on host: '%s'\n" % host
00142 msg += " Actual exception was: %s" % e
00143 log.exception(MySQLdb.OperationalError(msg))
00144 raise
00145
00146 def newActivity(self, runID, suiteID=None):
00147 """! Create a new activity in the activities database
00148
00149 \param runID run identifier
00150 \param suiteID Suite linkage identifier Note: no check is made to see whether the suite exists.
00151 """
00152 if suiteID is None:
00153 suiteID = "Null"
00154
00155
00156
00157
00158
00159
00160
00161 runID_fk = runID
00162
00163 sql = "insert into %s " % self.TableActivities
00164 sql += "(RunID_FK, SuiteIdentifier, CommandingState_FK, DataState_FK, AnalysisState_FK) values "
00165 sql += "(%s, %s, %s, %s, %s)" % \
00166 (runID_fk, suiteID, self.ActivityCreated, self.NoData, self.NoAnalysis)
00167 try:
00168 self.__db.cursor().execute(sql)
00169 self.__db.commit()
00170 except Exception, e:
00171 print "BadQuery: \n ", sql
00172 raise
00173
00174 def setState(self, runID, category, newState):
00175 """! Set the state of a run in the database
00176
00177 \param runID integer Run ID
00178 \param category One of the possible state categories
00179 \param newState New state
00180 """
00181 try:
00182 runID_fk = self.getRunID_fk(runID)
00183 except RuntimeError, e:
00184 raise RuntimeError("Cannot set state for runID %s. Received an exception during query: %s" % (runID, e) )
00185
00186 con = "runID_fk = %s" % runID_fk
00187 sql = self.__updateSQL(table=self.TableActivities,
00188 column=category,
00189 value=newState,
00190 constraint=con)
00191 self.__push(sql)
00192
00193 def getState(self, runID, category):
00194 """!\brief Retrieve the state of a run from the database
00195
00196 \param runID integer Run ID
00197 \param category One of the possible state categories
00198 """
00199 try:
00200 runID_fk = self.getRunID_fk(runID)
00201 except RuntimeError, e:
00202 raise RuntimeError("Cannot retreive state for runID %s. Received an exception during query: %s" % (runID, e) )
00203
00204 con = "runID_fk = %s" % runID_fk
00205 sql = self.__selectSQL( table = self.TableActivities,
00206 column = category,
00207 constraint = con)
00208 return self.__pull(sql)[0]
00209
00210 def getRunsByState(self, category, state):
00211 """!\brief Get a set of runIDs which are in a particular state
00212
00213 \param category One of the State categories
00214 \param state One of the possible states
00215 \return A collection of runIDs which are in the state requested.
00216 """
00217 column = "runID_fk"
00218 con = "%s = %s" % (category,state)
00219 sql = self.__selectSQL( table = self.TableActivities,
00220 column = column,
00221 constraint = con)
00222 return self.__pull(sql)
00223
00224 def setTime(self, runID, timeColumn, theTime):
00225 """!\brief Set a time tag row
00226
00227 \param runID integer Run ID
00228 \param timeColumn Tag for the column. e.g., DataStart, DataStop
00229 \param theTime Unix timestamp
00230 """
00231 try:
00232 runID_fk = self.getRunID_fk(runID)
00233 except RuntimeError, e:
00234 raise RuntimeError("Cannot retreive state for runID %s. Received an exception during query: %s" % (runID, e) )
00235 con = "runID_fk = %s" % runID_fk
00236
00237 timeStr = "FROM_UNIXTIME(%s)" % theTime
00238
00239 sql = self.__updateSQL( table = self.TableActivities,
00240 column = timeColumn,
00241 value = timeStr,
00242 constraint = con)
00243 self.__push(sql)
00244
00245 def getTime(self, runID, timeColumn):
00246
00247 try:
00248 runID_fk = self.getRunID_fk(runID)
00249 except RuntimeError, e:
00250 raise RuntimeError("Cannot retreive state for runID %s. Received an exception during query: %s" % (runID, e) )
00251
00252 con = "runID_fk = %s" % runID_fk
00253
00254 unixTimeColumn = "UNIX_TIMESTAMP(%s)" % timeColumn
00255 sql = self.__selectSQL( table = self.TableActivities,
00256 column = unixTimeColumn,
00257 constraint = con)
00258 rTime = self.__pull(sql)[0]
00259 return rTime
00260
00261
00262
00263 def setLdfFileName(self, runID, ldfFileName):
00264 """!\brief set the output LDF/LSF format data file name for a run
00265
00266 \param runID integer Run ID
00267 \param ldfFileName Fully qualified path name. Variables will be expanded before insertion.
00268
00269 """
00270 try:
00271 runID_fk = self.getRunID_fk(runID)
00272 except RuntimeError, e:
00273 raise RuntimeError("Cannot set LDF file name for runID %s. Received an exception during query: %s" % (runID, e) )
00274
00275 fnStr = "\'" + os.path.expandvars(ldfFileName) + "\'"
00276 con = "runID_fk = %s" % runID_fk
00277 sql = self.__updateSQL( table = self.TableActivities,
00278 column = self.LDFname,
00279 value = fnStr,
00280 constraint = con)
00281 self.__push(sql)
00282
00283 def getLdfFileName(self, runID):
00284 """!\brief Retrieve the full path to the LDF file from the db
00285
00286 \param runID Integer Run ID
00287 """
00288 try:
00289 runID_fk = self.getRunID_fk(runID)
00290 except RuntimeError, e:
00291 raise RuntimeError("Cannot get LDF file name for runID %s. Received an exception during query: %s" % (runID, e) )
00292
00293 con = "runID_fk = %s" % runID_fk
00294 sql = self.__selectSQL( table = self.TableActivities,
00295 column = self.LDFname,
00296 constraint = con)
00297 return self.__pull(sql)[0]
00298
00299 def setScriptOutput(self, runID, blob):
00300 """!\brief Set the script output blob
00301
00302 \param runID Integer run ID
00303 \param blob An arbitrary byte stream representation of the script output
00304 """
00305 try:
00306 runID_fk = self.getRunID_fk(runID)
00307 except RuntimeError, e:
00308 raise RuntimeError("Cannot set script output for runID %s. Received an exception during query: %s" % (runID, e) )
00309
00310 con = "runID_fk = %s" % runID_fk
00311 blobStr = "\"" + blob + "\""
00312 sql = self.__updateSQL( table = self.TableActivities,
00313 column = self.ScriptOutput,
00314 value = blobStr,
00315 constraint = con)
00316 self.__push(sql)
00317
00318
00319 def getScriptOutput(self, runID):
00320 """!\brief Retrieve the script output
00321
00322 \param runID Integer run ID
00323 \return blob an arbitrary byte stream representation of the script output
00324 """
00325 try:
00326 runID_fk = self.getRunID_fk(runID)
00327 except RuntimeError, e:
00328 raise RuntimeError("Cannot get script output for runID %s. Received an exception during query: %s" % (runID, e) )
00329
00330 con = "runID_fk = %s" % runID_fk
00331 sql = self.__selectSQL( table = self.TableActivities,
00332 column = self.ScriptOutput,
00333 constraint = con)
00334 return self.__pullBlob(sql)[0]
00335
00336 def setJobCompleteList(self, runID, jobListString):
00337 """!\brief Set the job complete list
00338
00339 \param runID Integer run ID
00340 \param jobListString String representing the list of completed jobs
00341 """
00342 try:
00343 runID_fk = self.getRunID_fk(runID)
00344 except RuntimeError, e:
00345 raise RuntimeError("Cannot set jobCompleteList for runID %s. Received an exception during query: %s" % (runID, e) )
00346
00347 con = "runID_fk = %s" % runID_fk
00348 blobStr = "\"" + jobListString + "\""
00349 sql = self.__updateSQL( table = self.TableActivities,
00350 column = self.JobComplete,
00351 value = blobStr,
00352 constraint = con)
00353 self.__push(sql)
00354
00355 def getJobCompleteList(self, runID):
00356 """!\brief Retrieve the job complete list
00357
00358 \param runID Integer run ID
00359 \return a string (possibly empty) denoting which jobs are complete.
00360 """
00361 try:
00362 runID_fk = self.getRunID_fk(runID)
00363 except RuntimeError, e:
00364 raise RuntimeError("Cannot get jobCompleteList for runID %s. Received an exception during query: %s" % (runID, e) )
00365
00366 con = "runID_fk = %s" % runID_fk
00367 sql = self.__selectSQL( table = self.TableActivities,
00368 column = self.JobComplete,
00369 constraint = con)
00370 return self.__pullBlob(sql)[0]
00371
00372 def lockRow(self, runID_fk):
00373
00374 con = "runID_fk = %s" % runID_fk
00375
00376
00377
00378
00379
00380 sql = "lock tables %s write" % ( self.TableActivities )
00381 self.__db.cursor().execute(sql)
00382
00383
00384
00385 def unlockRow(self):
00386
00387 sql = "unlock tables"
00388 self.__db.cursor().execute(sql)
00389 pass
00390
00391 def stateStr(self, state):
00392 """! Convert an enumerated state to a string for printing
00393 \param state an integer state
00394
00395 \return State string
00396 """
00397 if state in self.__stateKeys:
00398 return self.__stateKeys[state]
00399 else:
00400 return ""
00401
00402 def getRunID_fk(self, runID):
00403 """!Retreive the runID_fk from the database for speed
00404
00405 \param runID integer Run ID
00406 \return the FK (or None if run is nonexistent)
00407 """
00408 if runID is None:
00409 raise RuntimeError("Cannot have a null run identifier. Input runID = %s" % runID )
00410
00411
00412 sql = self.__selectSQL( table = self.TableActivities,
00413 column = "runid_fk",
00414 constraint= "runid_fk=%s" % runID)
00415 fk = self.__pull(sql)[0]
00416
00417 return fk
00418
00419 def __selectSQL(self, table, column, constraint):
00420 return "select %s from %s where %s" % (column, table, constraint)
00421
00422 def __updateSQL(self, table, column, value, constraint):
00423 sql = "update %s " % table
00424 sql += "set %s = %s " % (column, value)
00425 sql += "where %s" % constraint
00426 return sql
00427
00428 def __pull(self, sql):
00429
00430
00431
00432 self.__db.commit()
00433 cur = self.__db.cursor()
00434 try:
00435 cur.execute(sql)
00436 except Exception, e:
00437 print "Bad query: \n ", sql
00438 raise e
00439
00440 fetch = cur.fetchall()
00441 if fetch is None:
00442 fetch = (None,)
00443 returnFetch = []
00444 for item in fetch:
00445 (column,) = item
00446 returnFetch.append(column)
00447 return returnFetch
00448
00449 def __pullBlob(self, sql):
00450 fetched = self.__pull(sql)
00451 try:
00452 r = tuple([ item.tostring() for item in fetched ])
00453 except:
00454 r = fetched
00455 return r
00456
00457 def __push(self, sql):
00458 try:
00459 self.__db.cursor().execute(sql)
00460 except Exception, e:
00461 print "Bad query: \n ", sql
00462 raise e
00463 self.__db.commit()
00464
00465
00466
00467
00468 if __name__ == '__main__':
00469
00470
00471
00472
00473 import sys
00474 try:
00475 db = ActivitiesInterface()
00476 except Exception, e:
00477 log.exception(e)
00478 sys.exit()
00479
00480 runID = 999000999
00481 suiteID = 999
00482 LDFname = os.path.join(ONLINE_ROOT, "Work/scratch/foo.ldf")
00483
00484
00485 try:
00486 sql = "delete from %s where runID_fk = %s" % (db.TableActivities, runID)
00487 db._ActivitiesInterface__db.cursor().execute(sql)
00488 except Exception, e:
00489
00490 pass
00491
00492 try:
00493 print "attempting to create run id %s in elogbook.%s..." %\
00494 ( runID, db.TableActivities )
00495 db.newActivity(runID, suiteID)
00496 print "...Created it\n"
00497 except:
00498 raise
00499
00500 try:
00501 print "attempting to retreive run id %s from elogbook.%s..." %\
00502 ( runID, db.TableReport )
00503 runID_fk = db.getRunID_fk(runID)
00504 print "...got it: %s\n" % runID_fk
00505 except Exception, e:
00506 raise
00507
00508 try:
00509 print "attempting to set the commanding state to CommandingStarted..."
00510 db.setState(runID, db.CommandingState, db.CommandingStarted)
00511 print "...and read it back..."
00512 state = db.getState(runID, db.CommandingState)
00513 print "...read: state: %d == %s\n" %\
00514 ( state, db.stateStr(state))
00515 except:
00516 raise
00517
00518 try:
00519 print "attempting to set the timestamp to now (%s)..." % time.time()
00520 db.setTime(runID, db.DataStart, time.time())
00521 print "...and read it back..."
00522 t = db.getTime(runID, db.DataStart)
00523 print "...read: time=%s\n" % t
00524 except:
00525 raise
00526
00527 try:
00528 print "attempting to set the LDFfile name to %s" %\
00529 ( LDFname )
00530 db.setLdfFileName(runID, LDFname)
00531 print "...and read it back, noticing variables were expanded..."
00532 readStr = db.getLdfFileName(runID)
00533 print "...LDFfile = %s\n" % readStr
00534 except:
00535 raise
00536
00537 try:
00538 print "attempting to set script output to range(10)..."
00539 db.setScriptOutput(runID, str(range(10)))
00540 print "...and read it back: %s" %\
00541 (db.getScriptOutput(runID))
00542 q = db.getScriptOutput(runID)
00543 k = eval(q)
00544 print "...and eval it:", k, "is of type", type(k), "\n"
00545 except:
00546 raise
00547
00548 try:
00549 print "attempting to set the jobComplete list to something with double quotes..."
00550 tmp = [ " a b c" , "d e f", 'zqxkjshdfadsy' ]
00551
00552 db.setJobCompleteList(runID, str( tmp ) )
00553 print "...and read it back: %s" %\
00554 (db.getJobCompleteList(runID))
00555 q = db.getJobCompleteList(runID)
00556 k = eval(q)
00557 print "...and eval it:", k, "is of type", type(k), "\n"
00558 except:
00559 raise
00560
00561 try:
00562 import copy
00563 print "attempting to set the jobComplete list to db.__dict__..."
00564 z = copy.deepcopy(db.__dict__)
00565
00566
00567 z['_ActivitiesInterface__db'] = None
00568 db.setJobCompleteList(runID, str( z ) )
00569 print "...and read it back: %s" %\
00570 (db.getJobCompleteList(runID))
00571 q = db.getJobCompleteList(runID)
00572 k = eval(q)
00573 print "...and eval it:", k, "is of type", type(k), "\n"
00574 except:
00575 raise
00576
00577 try:
00578 db2 = ActivitiesInterface()
00579
00580 import threading
00581 t = threading.Timer(5, db.unlockRow)
00582 t.start()
00583 db.lockRow(runID)
00584
00585 print "attempting to test a lock on runID %s..." % runID
00586 start = time.time()
00587 print "...by setting the LDF file name to 'foo' with a second database connection"
00588 db2.setLdfFileName(runID, "foo")
00589 end = time.time()
00590
00591 print "...table locked for %.1f seconds\n" % (end - start)
00592 except Exception, e:
00593 db.unlockRow()
00594 raise
00595
00596 try:
00597 print "attempting to update a row from the current db connection during a lock..."
00598 db.lockRow(runID)
00599 db.setLdfFileName(runID, LDFname)
00600 db.unlockRow()
00601 print "...Read back LDFfile name %s\n" % db.getLdfFileName(runID)
00602 except:
00603 db.unlockRow()
00604 raise
00605
00606