00001 #!/usr/local/bin/python 00002 # 00003 # Copyright 2004 00004 # by 00005 # The Board of Trustees of the 00006 # Leland Stanford Junior University. 00007 # All rights reserved. 00008 # 00009 00010 __facility__ = "Online" 00011 __abstract__ = "Housekeeping receiver module" 00012 __author__ = "J. Panetta <panetta@SLAC.Stanford.edu> SLAC - GLAST LAT I&T/Online" 00013 __date__ = "7/20/2004" 00014 __version__ = "$Revision: 1.2 $" 00015 __credits__ = "SLAC" 00016 00017 00018 import LATTE.copyright_SLAC 00019 00020 from LATTE.monitoring.Trendable import Trendable 00021 import logging 00022 00023 class Receiver(object): 00024 """\brief Abstract Receiver class 00025 00026 Receiver objects retrieve the trendable data from some location. 00027 """ 00028 def __init__(self): 00029 self.log=logging.getLogger('monitoring.Receiver') 00030 self.__loopStop=0 00031 self.__recThread=None 00032 00033 def isReceiving(self): 00034 if self.__monThread: return 1 00035 else: return 0 00036 00037 def setReceiving(self,bool): 00038 if bool: self.__startLoop() 00039 else: self.__stopLoop() 00040 00041 def shutdown(self): 00042 self.setReceiving(0) 00043 00044 def receive(self): 00045 raise NotImplementedError('receive()' \ 00046 +'must be implemented in a subclass.') 00047 00048 def process(self,trendables): 00049 for t in trendables: 00050 self.log.info(t) 00051 00052 ##private 00053 def __startLoop(self): 00054 if not self.__recThread: 00055 from threading import Thread 00056 self.__recThread=Thread(None,self.__recLoop,'ReceiverLoop') 00057 self.__recThread.start() 00058 else: 00059 self.log.warning('Already a ReceiverLoop Running') 00060 00061 def __stopLoop(self): 00062 if self.__recThread: 00063 self.__loopStop=1 00064 self.__recThread.join() 00065 self.loopStop=0 00066 self.__recThread=None 00067 00068 def __recLoop(self): 00069 self.log.info('Starting Receive Loop') 00070 while (not self.__loopStop): 00071 self.receive() 00072 self.log.info('Receive Loop Stopped') 00073 00074 class UDPReceiver(Receiver): 00075 """\brief UDPReceiver class 00076 """ 00077 tType = (Trendable('InvalidName', 0)) 00078 def __init__(self): 00079 Receiver.__init__(self) 00080 self.__ddc=None 00081 self.log=logging.getLogger('monitoring.UDPReceiver') 00082 00083 def receive(self): 00084 if self.__ddc: 00085 from cPickle import loads 00086 d,s=self.__ddc.receive() 00087 if not d: return 00088 trendables=loads(d) 00089 for t in trendables: 00090 if not isinstance(t,type(self.tType)): 00091 self.log.error('Data received is not a Trendable') 00092 trendables.remove(t) 00093 self.process(trendables) 00094 00095 def setUDPClient(self,name,server): 00096 from LATTE.tools.DataDistributor import DataDistributorClient 00097 self.log.info('Connecting to DataDistributorServer %s on %s', 00098 name,server) 00099 #self.__ddc=DataDistributorClient(name,length=40960) 00100 self.__ddc=DataDistributorClient(name) 00101 self.__ddc.connect(server) 00102 00103 def shutdown(self): 00104 if self.__ddc: 00105 self.__ddc.returnNone() 00106 self.setReceiving(0) 00107 if self.__ddc: 00108 self.log.info('Disconnecting from DataDistributorServer') 00109 self.__ddc.disconnect() 00110 self.__ddc=None 00111