00001 #!/usr/local/bin/python 00002 # 00003 # Copyright 2004 00004 # by 00005 # The Board of Trustees of the 00006 # Leland Stanford Junior University. 00007 # All rights reserved. 00008 # 00009 00010 __facility__ = "Online" 00011 __abstract__ = "GLAST LAT Housekeeping server infrastructure" 00012 __author__ = "Jim Panetta <panetta@slac.stanford.edu> SLAC - GLAST I&T" 00013 __date__ = "07/28/04" 00014 __version__ = "$Revision: 1.3 $" 00015 __release__ = "$Name: R04-12-00 $" 00016 __credits__ = "SLAC" 00017 00018 import LATTE.copyright_SLAC 00019 import logging 00020 import os 00021 00022 class Recorder(object): 00023 """\brief Recorder class 00024 00025 Base class for the Recorder portion of the monitoring system. 00026 Recorders send on the data that the Retriever class acquires. 00027 """ 00028 def __init__(self): 00029 self.__cachedValues={} 00030 self.__log=logging.getLogger('monitoring.Recorder') 00031 00032 def updateTrends(self, trendables): 00033 if trendables is None: return 00034 for t in trendables: 00035 self.__cachedValues[t.name]=t 00036 self.processTrends(trendables) 00037 00038 def processTrends(self, trendables): 00039 raise NotImplementedError('processTrends(trendables)'\ 00040 +'must be implemented in a subclass.') 00041 00042 def shutdown(self): 00043 pass 00044 00045 class DummyRecorder(Recorder): 00046 """\brief Example recorder that dumps the trendables to a logger 00047 """ 00048 def __init__(self): 00049 Recorder.__init__(self) 00050 self.__log=logging.getLogger('monitoring.Recorder') 00051 00052 def processTrends(self, trendables): 00053 for t in trendables: 00054 self.__log.info(t) 00055 00056 class FlatFileRecorder(Recorder): 00057 def __init__(self, flatFile): 00058 Recorder.__init__(self) 00059 self.__flatFile = open(flatFile, 'a+') 00060 00061 keyList = self.__flatFile.readline() 00062 if keyList == '': # No file, 00063 self.processTrends = self.__createFileHeader 00064 else: 00065 # create a list of keys from whitespace separated text blobs 00066 self.__keyList = keyList.split() 00067 self.processTrends = self.__processTrends 00068 self.__flatFile.seek(0,2) 00069 self.__pad = ' ' 00070 00071 def shutdown(self): 00072 self.__flatFile.close() 00073 00074 def __processTrends(self, trendables): 00075 trendMap = {} 00076 # Add the timestamp 00077 timeStamp = trendables[0].time 00078 trendMap['TimeStamp'] =timeStamp 00079 line = '' 00080 00081 for t in trendables: 00082 trendMap[t.name] = t.content['Raw Value'] 00083 00084 for k in self.__keyList: 00085 line += str(trendMap[k]) + self.__pad 00086 line += '\n' 00087 self.__flatFile.write(line) 00088 self.__flatFile.flush() 00089 00090 def __createFileHeader(self, trendables): 00091 # 1: reset function pointer to processTrends 00092 self.processTrends = self.__processTrends 00093 # 2: create the sorted key list 00094 self.__keyList = [] 00095 for t in trendables: 00096 self.__keyList.append(t.name) 00097 self.__keyList.sort() 00098 self.__keyList.insert(0, 'TimeStamp') 00099 line = '' 00100 # 3: write the index line of the file: 00101 for k in self.__keyList: 00102 line += (k + self.__pad) 00103 line += '\n' 00104 self.__flatFile.writelines([line]) 00105 # 4: Profit! 00106 self.processTrends(trendables) 00107 00108 00109 class UDPRecorder(Recorder): 00110 """\brief Example recorder that UDP broadcasts trendables 00111 00112 Uses LATTE.tools.DataDistributionServer 00113 """ 00114 def __init__(self): 00115 Recorder.__init__(self) 00116 self.__dds=None 00117 self.__log=logging.getLogger('monitoring.UDPRecorder') 00118 00119 def startUDPServer(self,name,server): 00120 from LATTE.tools.DataDistributor import DataDistributorServer 00121 self.__log.info("Starting DataDistributorServer %s on %s",name,server) 00122 self.__dds=DataDistributorServer(name) 00123 self.__dds.connect(server) 00124 00125 def stopUDPServer(self): 00126 if self.__dds: 00127 self.__log.info('Disconnecting Broadcast Server') 00128 self.__dds.disconnect() 00129 self.__dds=None 00130 00131 def processTrends(self,trendables): 00132 if self.__dds: 00133 from cPickle import dumps 00134 # Send to receiver as a binary pickled object 00135 self.__dds.publish(dumps(trendables,1)) 00136 00137 def shutdown(self): 00138 self.stopUDPServer()