Main Page | Packages | Class Hierarchy | Alphabetical List | Class List | Directories | File List | Namespace Members | Class Members | Related Pages

Receiver.py

00001 #!/usr/local/bin/python
00002 #
00003 #                               Copyright 2004
00004 #                                     by
00005 #                        The Board of Trustees of the
00006 #                     Leland Stanford Junior University.
00007 #                            All rights reserved.
00008 #
00009 
00010 __facility__ = "Online"
00011 __abstract__ = "Housekeeping receiver module"
00012 __author__   = "J. Panetta <panetta@SLAC.Stanford.edu> SLAC - GLAST LAT I&T/Online"
00013 __date__     = "7/20/2004"
00014 __version__  = "$Revision: 1.2 $"
00015 __credits__  = "SLAC"
00016 
00017 
00018 import LATTE.copyright_SLAC
00019 
00020 from LATTE.monitoring.Trendable import Trendable
00021 import logging
00022 
00023 class Receiver(object):
00024   """\brief Abstract Receiver class
00025 
00026   Receiver objects retrieve the trendable data from some location.
00027   """
00028   def __init__(self):
00029     self.log=logging.getLogger('monitoring.Receiver')
00030     self.__loopStop=0
00031     self.__recThread=None
00032 
00033   def isReceiving(self):
00034     if self.__monThread: return 1
00035     else: return 0
00036 
00037   def setReceiving(self,bool):
00038     if bool: self.__startLoop()
00039     else: self.__stopLoop()
00040 
00041   def shutdown(self):
00042     self.setReceiving(0)
00043 
00044   def receive(self):
00045     raise NotImplementedError('receive()' \
00046                               +'must be implemented in a subclass.')
00047 
00048   def process(self,trendables):
00049     for t in trendables:
00050       self.log.info(t)
00051 
00052 ##private
00053   def __startLoop(self):
00054     if not self.__recThread:
00055       from threading import Thread
00056       self.__recThread=Thread(None,self.__recLoop,'ReceiverLoop')
00057       self.__recThread.start()
00058     else:
00059       self.log.warning('Already a ReceiverLoop Running')
00060 
00061   def __stopLoop(self):
00062     if self.__recThread:
00063       self.__loopStop=1
00064       self.__recThread.join()
00065       self.loopStop=0
00066       self.__recThread=None
00067 
00068   def __recLoop(self):
00069     self.log.info('Starting Receive Loop')
00070     while (not self.__loopStop):
00071       self.receive()
00072     self.log.info('Receive Loop Stopped')
00073 
00074 class UDPReceiver(Receiver):
00075   """\brief UDPReceiver class
00076   """
00077   tType = (Trendable('InvalidName', 0))
00078   def __init__(self):
00079     Receiver.__init__(self)
00080     self.__ddc=None
00081     self.log=logging.getLogger('monitoring.UDPReceiver')
00082 
00083   def receive(self):
00084     if self.__ddc:
00085       from cPickle import loads
00086       d,s=self.__ddc.receive()
00087       if not d: return
00088       trendables=loads(d)
00089       for t in trendables:
00090         if not isinstance(t,type(self.tType)):
00091           self.log.error('Data received is not a Trendable')
00092           trendables.remove(t)
00093       self.process(trendables)
00094 
00095   def setUDPClient(self,name,server):
00096     from LATTE.tools.DataDistributor import DataDistributorClient
00097     self.log.info('Connecting to DataDistributorServer %s on %s',
00098                     name,server)
00099     #self.__ddc=DataDistributorClient(name,length=40960)
00100     self.__ddc=DataDistributorClient(name)
00101     self.__ddc.connect(server)
00102 
00103   def shutdown(self):
00104     if self.__ddc:
00105       self.__ddc.returnNone()
00106     self.setReceiving(0)
00107     if self.__ddc:
00108       self.log.info('Disconnecting from DataDistributorServer')
00109       self.__ddc.disconnect()
00110       self.__ddc=None
00111 

Generated on Fri Jul 21 13:26:32 2006 for LATTE R04-12-00 by doxygen 1.4.3