00001 #!/usr/local/bin/python 00002 # 00003 # Copyright 2004 00004 # by 00005 # The Board of Trustees of the 00006 # Leland Stanford Junior University. 00007 # All rights reserved. 00008 # 00009 00010 __facility__ = "Online" 00011 __abstract__ = "GLAST LAT Housekeeping server infrastructure" 00012 __author__ = "Jim Panetta <panetta@slac.stanford.edu> SLAC - GLAST I&T" 00013 __date__ = "07/28/04" 00014 __version__ = "$Revision: 1.6 $" 00015 __release__ = "$Name: R04-12-00 $" 00016 __credits__ = "SLAC" 00017 00018 import LATTE.copyright_SLAC 00019 00020 import logging 00021 import threading 00022 import time 00023 00024 class Collector(object): 00025 def __init__(self, *args): 00026 self.__loopStop = 0 00027 self.__monDelay = 0 00028 self.__monThread = None 00029 self.__retrievers = {} 00030 self.__recorders = [] 00031 self.__delayUnits = 0.01 # seconds 00032 self.__log = logging.getLogger('collecting.Collector') 00033 self.setDelay(10) 00034 00035 def addRetriever(self,name,retriever): 00036 self.__retrievers[name]=retriever 00037 retriever.startup() 00038 00039 def delRetriever(self,name): 00040 if self.__retrievers.has_key(name): 00041 del self.__retrievers[name] 00042 00043 def addRecorder(self,recorder): 00044 self.__recorders.append(recorder) 00045 00046 def setDelay(self,delay): 00047 """Set the delay in units of seconds If the delay < 0, delay set to 0. 00048 """ 00049 if delay < 0: 00050 delay = 0 00051 acq = self.isAcquiring() 00052 self.setAcquiring(0) 00053 self.__monDelay = int(delay / self.__delayUnits) 00054 self.setAcquiring(acq) 00055 00056 def getDelay(self): 00057 return self.__monDelay * self.__delayUnits 00058 00059 def isAcquiring(self): 00060 if self.__monThread: 00061 return 1 00062 return 0 00063 00064 def setAcquiring(self,bool): 00065 if bool: 00066 self.__startLoop() 00067 else: 00068 self.__stopLoop() 00069 00070 def collectData(self): 00071 for r in self.__retrievers.values(): 00072 data = r.retrievePacket() 00073 # Process the data 00074 for l in self.__recorders: 00075 l.updateTrends(data) 00076 00077 def shutdown(self): 00078 for r in self.__retrievers.values() + self.__recorders: 00079 r.shutdown() 00080 self.__stopLoop() 00081 00082 def __startLoop(self): 00083 if self.__monThread: 00084 self.__log.warning("There is already a Collector Loop") 00085 else: 00086 self.__monThread = threading.Thread(None, 00087 self.__monLoop, 00088 'Collector', 00089 (self.__monDelay,)) 00090 self.__monThread.start() 00091 00092 def __stopLoop(self): 00093 if self.__monThread: 00094 self.__log.info("Shutting down Collector Loop") 00095 self.__loopStop = 1 00096 self.__monThread.join() 00097 self.__monThread = None 00098 self.__loopStop = 0 00099 00100 def __monLoop(self,delay): 00101 self.__log.info("Starting Collector Loop") 00102 while (not self.__loopStop): 00103 self.collectData() 00104 for i in xrange(delay): 00105 if (self.__loopStop): 00106 break; 00107 time.sleep(self.__delayUnits) 00108 self.__log.info("Collector Loop Stopped") 00109 00110 def testCollector(): 00111 logging.basicConfig() 00112 logging.root.setLevel(logging.DEBUG) 00113 00114 udpServerName = 'testTrendingServer' 00115 udpServerAddr = '54320@239.255.1.14' 00116 from Retriever import DummyRetriever 00117 from Recorder import UDPRecorder, DummyRecorder 00118 00119 c = Collector() 00120 rec = UDPRecorder() 00121 rec.startUDPServer(udpServerName,udpServerAddr) 00122 c.addRetriever('dummy',DummyRetriever()) 00123 c.addRecorder(rec) 00124 c.addRecorder(DummyRecorder()) 00125 c.setAcquiring(True) 00126 00127 try: 00128 while 1: 00129 time.sleep(.01) 00130 except Exception, e: 00131 c.shutdown() 00132 00133 if __name__ == '__main__': 00134 testCollector()