Main Page | Packages | Class Hierarchy | Alphabetical List | Class List | Directories | File List | Namespace Members | Class Members | Related Pages

Collector.py

00001 #!/usr/local/bin/python
00002 #
00003 #                               Copyright 2004
00004 #                                     by
00005 #                        The Board of Trustees of the
00006 #                     Leland Stanford Junior University.
00007 #                            All rights reserved.
00008 #
00009 
00010 __facility__ = "Online"
00011 __abstract__ = "GLAST LAT Housekeeping server infrastructure"
00012 __author__   = "Jim Panetta <panetta@slac.stanford.edu> SLAC - GLAST I&T"
00013 __date__     = "07/28/04"
00014 __version__  = "$Revision: 1.6 $"
00015 __release__  = "$Name: R04-12-00 $"
00016 __credits__  = "SLAC"
00017 
00018 import LATTE.copyright_SLAC
00019 
00020 import logging
00021 import threading
00022 import time
00023 
00024 class Collector(object):
00025   def __init__(self, *args):
00026     self.__loopStop   = 0
00027     self.__monDelay   = 0
00028     self.__monThread  = None
00029     self.__retrievers = {}
00030     self.__recorders  = []
00031     self.__delayUnits = 0.01  # seconds
00032     self.__log        = logging.getLogger('collecting.Collector')
00033     self.setDelay(10)
00034   
00035   def addRetriever(self,name,retriever):
00036     self.__retrievers[name]=retriever
00037     retriever.startup()
00038   
00039   def delRetriever(self,name):
00040     if self.__retrievers.has_key(name):
00041       del self.__retrievers[name]
00042   
00043   def addRecorder(self,recorder):
00044     self.__recorders.append(recorder)
00045   
00046   def setDelay(self,delay):
00047     """Set the delay in units of seconds  If the delay < 0, delay set to 0. 
00048     """
00049     if delay < 0:
00050       delay = 0
00051     acq = self.isAcquiring()
00052     self.setAcquiring(0)
00053     self.__monDelay = int(delay / self.__delayUnits)
00054     self.setAcquiring(acq)
00055   
00056   def getDelay(self):
00057     return self.__monDelay * self.__delayUnits
00058   
00059   def isAcquiring(self):
00060     if self.__monThread:
00061       return 1
00062     return 0
00063   
00064   def setAcquiring(self,bool):
00065     if bool:
00066       self.__startLoop()
00067     else:
00068       self.__stopLoop()
00069   
00070   def collectData(self):
00071     for r in self.__retrievers.values():
00072       data = r.retrievePacket()
00073       # Process the data
00074       for l in self.__recorders:
00075         l.updateTrends(data)
00076   
00077   def shutdown(self):
00078     for r in self.__retrievers.values() + self.__recorders:
00079       r.shutdown()
00080     self.__stopLoop()
00081 
00082   def __startLoop(self):
00083     if self.__monThread:
00084       self.__log.warning("There is already a Collector Loop")
00085     else:
00086       self.__monThread = threading.Thread(None,
00087                                           self.__monLoop,
00088                                           'Collector',
00089                                           (self.__monDelay,))
00090       self.__monThread.start()
00091 
00092   def __stopLoop(self):
00093     if self.__monThread:
00094       self.__log.info("Shutting down Collector Loop")
00095       self.__loopStop  = 1
00096       self.__monThread.join()
00097       self.__monThread = None
00098       self.__loopStop  = 0
00099 
00100   def __monLoop(self,delay):
00101     self.__log.info("Starting Collector Loop")
00102     while (not self.__loopStop):
00103       self.collectData()
00104       for i in xrange(delay):
00105         if (self.__loopStop):
00106           break;
00107         time.sleep(self.__delayUnits)
00108     self.__log.info("Collector Loop Stopped")
00109 
00110 def testCollector():
00111   logging.basicConfig()
00112   logging.root.setLevel(logging.DEBUG)
00113 
00114   udpServerName = 'testTrendingServer'
00115   udpServerAddr = '54320@239.255.1.14'
00116   from Retriever import DummyRetriever
00117   from Recorder  import UDPRecorder, DummyRecorder
00118 
00119   c   = Collector()
00120   rec = UDPRecorder()
00121   rec.startUDPServer(udpServerName,udpServerAddr)
00122   c.addRetriever('dummy',DummyRetriever())
00123   c.addRecorder(rec)
00124   c.addRecorder(DummyRecorder())
00125   c.setAcquiring(True)
00126   
00127   try:
00128     while 1:
00129       time.sleep(.01)
00130   except Exception, e:
00131     c.shutdown()
00132   
00133 if __name__ == '__main__':
00134   testCollector()

Generated on Fri Jul 21 13:26:27 2006 for LATTE R04-12-00 by doxygen 1.4.3