rcActivitiesInterface.py

Go to the documentation of this file.
00001 #!/usr/local/bin/python
00002 #
00003 #                               Copyright 2005
00004 #                                     by
00005 #                        The Board of Trustees of the
00006 #                     Leland Stanford Junior University.
00007 #                            All rights reserved.
00008 #
00009 
00010 __facility__ = "Online"
00011 __abstract__ = "Interface to the Run activities table"
00012 __author__   = "S. Tuvi <stuvi@SLAC.Stanford.edu> SLAC - GLAST LAT I&T/Online"
00013 __date__     = "2005/07/23 00:08:27"
00014 __updated__  = "$Date: 2006/04/21 23:39:01 $"
00015 __version__  = "$Revision: 1.17 $"
00016 __release__  = "$Name: HEAD $"
00017 __credits__  = "SLAC"
00018 
00019 try:
00020   import LICOS.copyright_SLAC
00021 except:
00022   pass
00023 
00024 import MySQLdb
00025 import logging as log
00026 import os
00027 import time
00028 
00029 try:
00030   from LICOS.lib.LATconstants import *
00031 except:
00032   ONLINE_ROOT = os.environ['ONLINE_ROOT']
00033 
00034 class ActivitiesInterface(object):
00035   __stdDbName = "elogbook"
00036   __stdDbUser = "elogbook"
00037   __stdDbPass = "elogbook"
00038   __stdDbPort = 3306
00039 
00040   ## Allowed tables
00041   TableStates     = "LICOS_states"
00042   TableActivities = "LICOS_activities"
00043   TableReport     = "elogreport"
00044 
00045   ## State categories
00046   CommandingState = "CommandingState_FK"
00047   DataState       = "DataState_FK"
00048   AnalysisState   = "AnalysisState_FK"
00049 
00050   ## Time rows
00051   DataStart       = "DataStart"
00052   DataStop        = "DataStop"
00053 
00054   ## Suite id
00055   SuiteIdentifier = "SuiteIdentifier"
00056 
00057   ## LDF file name
00058   LDFname         = "LdfFileName"
00059 
00060   ## script Output
00061   ScriptOutput    = "ScriptOutput"
00062 
00063   ## Job complete list
00064   JobComplete     = "JobCompleteList"
00065 
00066   ## Primary key
00067   PrimaryKey      = "RunID_FK"
00068 
00069   # Definition of standard processing states in the database
00070   # used when caching the foreign keys to reduce overhead
00071   __states    = [ "ActivityCreated",
00072                   "CommandingStarted",
00073                   "CommandingComplete",
00074                   "NoData",
00075                   "DataStarted",
00076                   "DataComplete",
00077                   "NoAnalysis",
00078                   "AnalysisStarted",
00079                   "AnalysisComplete", ]
00080 
00081   """!\brief Interface to the activities database
00082   """
00083   def __init__(self,
00084                dbName=__stdDbName,
00085                dbUser=__stdDbUser,
00086                dbPass=__stdDbPass,
00087                dbPort=__stdDbPort):
00088 
00089     self.__db     = None
00090     self.__dbName = dbName
00091     self.__dbUser = dbUser
00092     self.__dbPass = dbPass
00093     self.__dbPort = dbPort
00094     self.__stateKeys = {}
00095 
00096     self.initialize()
00097 
00098     pass
00099 
00100   def __del__(self):
00101     """ Reimplementation of the destructor to make bloody well sure
00102         that the table ends up in an unlocked state...
00103 
00104     """
00105     self.unlockRow()
00106     self.__db = None
00107     # no need to call super's __del__ as there is no super
00108 
00109   def initialize(self):
00110     """!\brief Initialize the object.  Do a database connect, etc.
00111     """
00112     self.connect()
00113 
00114     # Seed the definitions of states and make them attributes of the class
00115     for stateName in self.__states:
00116       constraint = "stateName = '%s'" % (stateName)
00117       tmp = self.__pull(self.__selectSQL( table=self.TableStates, column = "state_PK", constraint=constraint))
00118       #print tmp[0], stateName
00119       setattr(self, stateName, tmp[0])
00120       self.__stateKeys[tmp[0]] = stateName
00121     #print dir(self)
00122 
00123   def connect(self):
00124     """!\brief connect to the database
00125     """
00126     if "ELOGBOOK_HOST" in os.environ:
00127       host = os.environ["ELOGBOOK_HOST"]
00128     else:
00129       host = "localhost"
00130 
00131     if 'ELOGBOOK_DB' in os.environ:
00132       self.__dbName = os.environ['ELOGBOOK_DB']
00133 
00134     try:
00135       self.__db = MySQLdb.connect(host=host,
00136                                 db     = self.__dbName,
00137                                 user   = self.__dbUser,
00138                                 passwd = self.__dbPass,
00139                                 port   = self.__dbPort)
00140     except MySQLdb.OperationalError, e:
00141       msg  = "Error encountered when attempting to connect to MySQL server on host: '%s'\n" % host
00142       msg += "  Actual exception was: %s" % e
00143       log.exception(MySQLdb.OperationalError(msg))
00144       raise
00145 
00146   def newActivity(self, runID, suiteID=None):
00147     """! Create a new activity in the activities database
00148 
00149     \param runID   run identifier
00150     \param suiteID Suite linkage identifier  Note:  no check is made to see whether the suite exists.
00151     """
00152     if suiteID is None:
00153       suiteID = "Null"
00154     #try:
00155     #  runID_fk = self.getRunID_fk(runID)
00156     #except Exception, e:
00157     #  raise RuntimeError("Cannot define activity for runID %s.  Received an exception during query: %s" % (runID, e) )
00158     #if runID_fk is None:
00159     #  raise RuntimeError("Cannot define activity for runID %s.  %s not found in table %s" % (runID, runID, self.TableReport))
00160 
00161     runID_fk = runID
00162 
00163     sql  = "insert into %s " % self.TableActivities
00164     sql += "(RunID_FK, SuiteIdentifier, CommandingState_FK, DataState_FK, AnalysisState_FK) values "
00165     sql += "(%s, %s, %s, %s, %s)" % \
00166            (runID_fk, suiteID, self.ActivityCreated, self.NoData, self.NoAnalysis)
00167     try:
00168       self.__db.cursor().execute(sql)
00169       self.__db.commit()
00170     except Exception, e:
00171       print "BadQuery: \n  ", sql
00172       raise
00173 
00174   def setState(self, runID, category, newState):
00175     """! Set the state of a run in the database
00176 
00177     \param runID     integer Run ID
00178     \param category  One of the possible state categories
00179     \param newState  New state
00180     """
00181     try:
00182       runID_fk = self.getRunID_fk(runID)
00183     except RuntimeError, e:
00184       raise RuntimeError("Cannot set state for runID %s.  Received an exception during query: %s" % (runID, e) )
00185 
00186     con = "runID_fk = %s" % runID_fk
00187     sql = self.__updateSQL(table=self.TableActivities,
00188                            column=category,
00189                            value=newState,
00190                            constraint=con)
00191     self.__push(sql)
00192 
00193   def getState(self, runID, category):
00194     """!\brief Retrieve the state of a run from the database
00195 
00196     \param runID     integer Run ID
00197     \param category  One of the possible state categories
00198     """
00199     try:
00200       runID_fk = self.getRunID_fk(runID)
00201     except RuntimeError, e:
00202       raise RuntimeError("Cannot retreive state for runID %s.  Received an exception during query: %s" % (runID, e) )
00203 
00204     con = "runID_fk = %s" % runID_fk
00205     sql = self.__selectSQL( table = self.TableActivities,
00206                             column = category,
00207                             constraint = con)
00208     return self.__pull(sql)[0]
00209 
00210   def getRunsByState(self, category, state):
00211     """!\brief Get a set of runIDs which are in a particular state
00212 
00213     \param category  One of the State categories
00214     \param state     One of the possible states
00215     \return          A collection of runIDs which are in the state requested.
00216     """
00217     column = "runID_fk"
00218     con = "%s = %s" % (category,state)
00219     sql = self.__selectSQL( table = self.TableActivities,
00220                             column = column,
00221                             constraint = con)
00222     return self.__pull(sql)
00223 
00224   def setTime(self, runID, timeColumn, theTime):
00225     """!\brief Set a time tag row
00226 
00227     \param runID        integer Run ID
00228     \param timeColumn   Tag for the column.  e.g., DataStart, DataStop
00229     \param theTime      Unix timestamp
00230     """
00231     try:
00232       runID_fk = self.getRunID_fk(runID)
00233     except RuntimeError, e:
00234       raise RuntimeError("Cannot retreive state for runID %s.  Received an exception during query: %s" % (runID, e) )
00235     con = "runID_fk = %s" % runID_fk
00236 
00237     timeStr = "FROM_UNIXTIME(%s)" % theTime
00238 
00239     sql = self.__updateSQL( table = self.TableActivities,
00240                             column = timeColumn,
00241                             value = timeStr,
00242                             constraint = con)
00243     self.__push(sql)
00244 
00245   def getTime(self, runID, timeColumn):
00246 
00247     try:
00248       runID_fk = self.getRunID_fk(runID)
00249     except RuntimeError, e:
00250       raise RuntimeError("Cannot retreive state for runID %s.  Received an exception during query: %s" % (runID, e) )
00251 
00252     con = "runID_fk = %s" % runID_fk
00253 
00254     unixTimeColumn = "UNIX_TIMESTAMP(%s)" % timeColumn
00255     sql = self.__selectSQL( table = self.TableActivities,
00256                             column = unixTimeColumn,
00257                             constraint = con)
00258     rTime =  self.__pull(sql)[0]
00259     return rTime
00260 
00261 
00262 
00263   def setLdfFileName(self, runID, ldfFileName):
00264     """!\brief set the output LDF/LSF format data file name for a run
00265 
00266     \param runID         integer Run ID
00267     \param ldfFileName   Fully qualified path name.  Variables will be expanded before insertion.
00268 
00269     """
00270     try:
00271       runID_fk = self.getRunID_fk(runID)
00272     except RuntimeError, e:
00273       raise RuntimeError("Cannot set LDF file name for runID %s.  Received an exception during query: %s" % (runID, e) )
00274 
00275     fnStr = "\'" + os.path.expandvars(ldfFileName) + "\'"
00276     con = "runID_fk = %s" % runID_fk
00277     sql = self.__updateSQL( table = self.TableActivities,
00278                             column = self.LDFname,
00279                             value  = fnStr,
00280                             constraint = con)
00281     self.__push(sql)
00282 
00283   def getLdfFileName(self, runID):
00284     """!\brief Retrieve the full path to the LDF file from the db
00285 
00286     \param runID    Integer Run ID
00287     """
00288     try:
00289       runID_fk = self.getRunID_fk(runID)
00290     except RuntimeError, e:
00291       raise RuntimeError("Cannot get LDF file name for runID %s.  Received an exception during query: %s" % (runID, e) )
00292 
00293     con = "runID_fk = %s" % runID_fk
00294     sql = self.__selectSQL( table = self.TableActivities,
00295                             column = self.LDFname,
00296                             constraint = con)
00297     return self.__pull(sql)[0]
00298 
00299   def setScriptOutput(self, runID, blob):
00300     """!\brief Set the script output blob
00301 
00302     \param runID   Integer run ID
00303     \param blob    An arbitrary byte stream representation of the script output
00304     """
00305     try:
00306       runID_fk = self.getRunID_fk(runID)
00307     except RuntimeError, e:
00308       raise RuntimeError("Cannot set script output for runID %s.  Received an exception during query: %s" % (runID, e) )
00309 
00310     con = "runID_fk = %s" % runID_fk
00311     blobStr = "\"" + blob + "\""
00312     sql = self.__updateSQL( table = self.TableActivities,
00313                             column = self.ScriptOutput,
00314                             value  = blobStr,
00315                             constraint = con)
00316     self.__push(sql)
00317 
00318 
00319   def getScriptOutput(self, runID):
00320     """!\brief Retrieve the script output
00321 
00322     \param runID   Integer run ID
00323     \return blob  an arbitrary byte stream representation of the script output
00324     """
00325     try:
00326       runID_fk = self.getRunID_fk(runID)
00327     except RuntimeError, e:
00328       raise RuntimeError("Cannot get script output for runID %s.  Received an exception during query: %s" % (runID, e) )
00329 
00330     con = "runID_fk = %s" % runID_fk
00331     sql = self.__selectSQL( table = self.TableActivities,
00332                             column = self.ScriptOutput,
00333                             constraint = con)
00334     return self.__pullBlob(sql)[0]
00335 
00336   def setJobCompleteList(self, runID, jobListString):
00337     """!\brief Set the job complete list
00338 
00339     \param runID   Integer run ID
00340     \param jobListString  String representing the list of completed jobs
00341     """
00342     try:
00343       runID_fk = self.getRunID_fk(runID)
00344     except RuntimeError, e:
00345       raise RuntimeError("Cannot set jobCompleteList for runID %s.  Received an exception during query: %s" % (runID, e) )
00346 
00347     con = "runID_fk = %s" % runID_fk
00348     blobStr = "\"" + jobListString + "\""
00349     sql = self.__updateSQL( table = self.TableActivities,
00350                             column = self.JobComplete,
00351                             value  = blobStr,
00352                             constraint = con)
00353     self.__push(sql)
00354 
00355   def getJobCompleteList(self, runID):
00356     """!\brief Retrieve the job complete list
00357 
00358     \param runID   Integer run ID
00359     \return a string (possibly empty) denoting which jobs are complete.
00360     """
00361     try:
00362       runID_fk = self.getRunID_fk(runID)
00363     except RuntimeError, e:
00364       raise RuntimeError("Cannot get jobCompleteList for runID %s.  Received an exception during query: %s" % (runID, e) )
00365 
00366     con = "runID_fk = %s" % runID_fk
00367     sql = self.__selectSQL( table = self.TableActivities,
00368                             column = self.JobComplete,
00369                             constraint = con)
00370     return self.__pullBlob(sql)[0]
00371 
00372   def lockRow(self, runID_fk):
00373 
00374     con = "runID_fk = %s" % runID_fk
00375     # The following doesn't work for locking a row because the next
00376     # query/whatever will unlock.  I thought autocommit=False would
00377     # prevent this, but no.  So... Lock it down!!!
00378     #sql = "select * from %s where %s LOCK IN SHARE MODE" %\
00379     #      ( self.TableActivities, con)
00380     sql = "lock tables %s write" % ( self.TableActivities )
00381     self.__db.cursor().execute(sql)
00382 
00383 
00384 
00385   def unlockRow(self):
00386     #sql = "commit"
00387     sql = "unlock tables"
00388     self.__db.cursor().execute(sql)
00389     pass
00390 
00391   def stateStr(self, state):
00392     """! Convert an enumerated state to a string for printing
00393     \param state  an integer state
00394 
00395     \return  State string
00396     """
00397     if state in self.__stateKeys:
00398       return self.__stateKeys[state]
00399     else:
00400       return ""
00401 
00402   def getRunID_fk(self, runID):
00403     """!Retreive the runID_fk from the database for speed
00404 
00405     \param runID     integer Run ID
00406     \return the FK (or None if run is nonexistent)
00407     """
00408     if runID is None:
00409       raise RuntimeError("Cannot have a null run identifier.  Input runID = %s" % runID )
00410 
00411 
00412     sql = self.__selectSQL( table = self.TableActivities,
00413                                 column = "runid_fk",
00414                                 constraint= "runid_fk=%s" % runID)
00415     fk = self.__pull(sql)[0]
00416 
00417     return fk
00418 
00419   def __selectSQL(self, table, column, constraint):
00420     return "select %s from %s where %s" % (column, table, constraint)
00421 
00422   def __updateSQL(self, table, column, value, constraint):
00423     sql  = "update %s " % table
00424     sql += "set %s = %s " % (column, value)
00425     sql += "where %s" % constraint
00426     return sql
00427 
00428   def __pull(self, sql):
00429     # JHP:  I have *no* idea why I have to commit *before*
00430     #    I pull from the db, but without it, getRunID_fk won't return a
00431     #    recently created runID.
00432     self.__db.commit()
00433     cur = self.__db.cursor()
00434     try:
00435       cur.execute(sql)
00436     except Exception, e:
00437       print "Bad query: \n  ", sql
00438       raise e
00439     # LHM change to fetchall
00440     fetch = cur.fetchall()
00441     if fetch is None:
00442       fetch = (None,)
00443     returnFetch = []
00444     for item in fetch:
00445       (column,) = item
00446       returnFetch.append(column)
00447     return returnFetch
00448 
00449   def __pullBlob(self, sql):
00450     fetched = self.__pull(sql)
00451     try:
00452       r = tuple([ item.tostring() for item in fetched ])
00453     except:
00454       r = fetched
00455     return r
00456 
00457   def __push(self, sql):
00458     try:
00459       self.__db.cursor().execute(sql)
00460     except Exception, e:
00461       print "Bad query: \n  ", sql
00462       raise e
00463     self.__db.commit()
00464 
00465 
00466 
00467 
00468 if __name__ == '__main__':
00469   # regression test for activities database.
00470   #   Also a short usage and syntax guide.
00471 
00472 
00473   import sys
00474   try:
00475     db = ActivitiesInterface()
00476   except Exception, e:
00477     log.exception(e)
00478     sys.exit()
00479 
00480   runID       = 999000999  # Can also use '999000999'
00481   suiteID     = 999      # Can also use '000999'
00482   LDFname     = os.path.join(ONLINE_ROOT, "Work/scratch/foo.ldf")
00483 
00484   # Delete this runID from the database so we can test it
00485   try:
00486     sql = "delete from %s where runID_fk = %s" % (db.TableActivities, runID)
00487     db._ActivitiesInterface__db.cursor().execute(sql)
00488   except Exception, e:
00489     # ignore exceptions
00490     pass
00491 
00492   try:
00493     print "attempting to create run id %s in elogbook.%s..." %\
00494           ( runID, db.TableActivities )
00495     db.newActivity(runID, suiteID)
00496     print "...Created it\n"
00497   except:
00498     raise
00499 
00500   try:
00501     print "attempting to retreive run id %s from elogbook.%s..." %\
00502           ( runID, db.TableReport )
00503     runID_fk = db.getRunID_fk(runID)
00504     print "...got it: %s\n" % runID_fk
00505   except Exception, e:
00506     raise
00507 
00508   try:
00509     print "attempting to set the commanding state to CommandingStarted..."
00510     db.setState(runID, db.CommandingState, db.CommandingStarted)
00511     print "...and read it back..."
00512     state = db.getState(runID, db.CommandingState)
00513     print "...read: state: %d == %s\n" %\
00514           ( state, db.stateStr(state))
00515   except:
00516     raise
00517 
00518   try:
00519     print "attempting to set the timestamp to now (%s)..." % time.time()
00520     db.setTime(runID, db.DataStart, time.time())
00521     print "...and read it back..."
00522     t = db.getTime(runID, db.DataStart)
00523     print "...read: time=%s\n" % t
00524   except:
00525     raise
00526 
00527   try:
00528     print "attempting to set the LDFfile name to %s" %\
00529           ( LDFname )
00530     db.setLdfFileName(runID, LDFname)
00531     print "...and read it back, noticing variables were expanded..."
00532     readStr = db.getLdfFileName(runID)
00533     print "...LDFfile = %s\n" % readStr
00534   except:
00535     raise
00536 
00537   try:
00538     print "attempting to set script output to range(10)..."
00539     db.setScriptOutput(runID, str(range(10)))
00540     print "...and read it back: %s" %\
00541           (db.getScriptOutput(runID))
00542     q = db.getScriptOutput(runID)
00543     k = eval(q)
00544     print "...and eval it:", k, "is of type", type(k), "\n"
00545   except:
00546     raise
00547 
00548   try:
00549     print "attempting to set the jobComplete list to something with double quotes..."
00550     tmp = [ " a b c" , "d e f", 'zqxkjshdfadsy' ]
00551 
00552     db.setJobCompleteList(runID, str( tmp ) )
00553     print "...and read it back: %s" %\
00554           (db.getJobCompleteList(runID))
00555     q = db.getJobCompleteList(runID)
00556     k = eval(q)
00557     print "...and eval it:", k, "is of type", type(k), "\n"
00558   except:
00559     raise
00560 
00561   try:
00562     import copy
00563     print "attempting to set the jobComplete list to db.__dict__..."
00564     z = copy.deepcopy(db.__dict__)
00565     # gotta remove an object (<mysql...>) from the dict, or it won't stringify
00566     # reversibly
00567     z['_ActivitiesInterface__db'] = None
00568     db.setJobCompleteList(runID, str( z ) )
00569     print "...and read it back: %s" %\
00570           (db.getJobCompleteList(runID))
00571     q = db.getJobCompleteList(runID)
00572     k = eval(q)
00573     print "...and eval it:", k, "is of type", type(k), "\n"
00574   except:
00575     raise
00576 
00577   try:
00578     db2 = ActivitiesInterface()
00579 
00580     import threading
00581     t = threading.Timer(5, db.unlockRow)
00582     t.start()
00583     db.lockRow(runID)
00584 
00585     print "attempting to test a lock on runID %s..." % runID
00586     start = time.time()
00587     print "...by setting the LDF file name to 'foo' with a second database connection"
00588     db2.setLdfFileName(runID, "foo")
00589     end = time.time()
00590 
00591     print "...table locked for %.1f seconds\n" % (end - start)
00592   except Exception, e:
00593     db.unlockRow()
00594     raise
00595 
00596   try:
00597     print "attempting to update a row from the current db connection during a lock..."
00598     db.lockRow(runID)
00599     db.setLdfFileName(runID, LDFname)
00600     db.unlockRow()
00601     print "...Read back LDFfile name %s\n" % db.getLdfFileName(runID)
00602   except:
00603     db.unlockRow()
00604     raise
00605 
00606 

Generated on Thu Apr 27 20:52:43 2006 for LICOS L02-01-00 by doxygen 1.4.6-NO