Main Page | Packages | Class Hierarchy | Alphabetical List | Class List | Directories | File List | Namespace Members | Class Members | Related Pages

DataDistributor.py

00001 #!/usr/local/bin/python
00002 #
00003 #                               Copyright 2003
00004 #                                     by
00005 #                        The Board of Trustees of the
00006 #                     Leland Stanford Junior University.
00007 #                            All rights reserved.
00008 #
00009 
00010 __facility__ = "Online"
00011 __abstract__ = "GLAST LAT data distribution wrapper classes"
00012 __author__   = "R. Claus <Claus@SLAC.Stanford.edu> SLAC - GLAST LAT I&T/Online"
00013 __date__     = ("$Date: 2005/06/28 19:20:37 $").split(' ')[1]
00014 __version__  = "$Revision: 2.7 $"
00015 __release__  = "$Name: R04-12-00 $"
00016 __credits__  = "SLAC"
00017 
00018 import LATTE.copyright_SLAC
00019 
00020 
00021 import socket
00022 import struct
00023 
00024 
00025 class DataDistributorServer(object):
00026   """\brief Data Distributor Server
00027   """
00028   def __init__(self, name = None, size = 32*1024):
00029     """Data Distributor server constructor.
00030 
00031     \param name The name of the type of data to be served.
00032                 See the DataDistributorAddress class.
00033     \param size The size in bytes of the buffer to allocate for sending the
00034                 data.  This is used to set the socket send buffer size.
00035     """
00036     self.__socket = None
00037     self.__group  = None
00038     self.__port   = DataDistributorAddress.port(name)
00039     self.__size   = size
00040 
00041   def connect(self, server, interface = socket.gethostname()):
00042     """Method to set up a socket and connect to it.
00043 
00044     \param server    The address of a socket on which to serve the data.  This
00045                      address must be the same for both server and client.  The
00046                      value can be in one of the following forms:
00047                      - A number of which only the two lower decimal digits are
00048                        used, i.e., in the range 0-99
00049                      - <port>@<multicast address>, where <port> is a value that
00050                        will override what is found from the data type name
00051                        translation, and <multicast address> is some valid
00052                        multicast IP address starting with the value 239.  A name
00053                        may be used for the <multicast address> value as well, as
00054                        long as it can be found in the name server and translates
00055                        to a valid multicast IP address.
00056                      - <multicast address>, as discussed in the previous bullet.
00057                        The port value is taken to be the one found from looking up
00058                        the data type name in the port table.
00059     \param interface The network interface to send multicast traffic on.
00060                      Defaults to 'the primary network interface', however
00061                      'primary' is defined.  See socket.gethostbyname_ex()
00062                      documentation.
00063     """
00064     if self.__socket is None:
00065       self.__port, self.__group = DataDistributorAddress.lookup(self.__port,
00066                                                                 server)
00067 
00068       # Look up multicast group address in name server
00069       # (doesn't hurt if it is already in ddd.ddd.ddd.ddd format)
00070       group = socket.gethostbyname(self.__group)
00071 
00072       # Any chance this sort of looks like a valid multicast group?
00073       firstByte = int(group.split(".", 1)[0])
00074       if firstByte < 224 and firstByte > 239:
00075         raise RuntimeError, \
00076           "Group %s does not resolve to a multicast address" % self.__group
00077 
00078       s = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
00079       self.__socket = s
00080       #
00081       # Make sure there is sufficient network buffering
00082       s.setsockopt(socket.SOL_SOCKET, socket.SO_SNDBUF, self.__size)
00083       #
00084       # Construct interface address
00085       # Use gethostbyname and gethostname to force the connection onto the
00086       # primary network interface.
00087       intf = socket.gethostbyname_ex(interface)[2][0]
00088       #
00089       # Specify the interface for outgoing multicast datagrams
00090       s.setsockopt(socket.IPPROTO_IP, socket.IP_MULTICAST_IF, socket.inet_aton(intf) + socket.inet_aton('0.0.0.0'))
00091     else:
00092       raise RuntimeError, "Socket already in use"
00093 
00094   def disconnect(self):
00095     """Gracefully shut down the socket."""
00096     if self.__socket is not None:
00097       self.__socket.close()
00098       self.__socket = None
00099       self.__group  = None
00100       self.__port   = None
00101     else:
00102       raise RuntimeError, "No socket found"
00103 
00104   def publish(self, data):
00105     """Publish the data on the multicast socket.
00106 
00107     \param data The data to be sent out on the multicast socket.
00108 
00109     """
00110     if self.__socket is not None:
00111       self.__socket.sendto(data, (self.__group, self.__port))
00112     else:
00113       raise RuntimeError, "No socket found"
00114 
00115 
00116 class DataDistributorClient(object):
00117   """\brief Data distributor client class.
00118   """
00119   __timeout = 1
00120 
00121   def __init__(self, name = None, size = 64*1024):
00122     """Data Distributor client constructor.
00123 
00124     \param name The name of the type of data that the client wants to receive.
00125                 See the DataDistributorAddress class.
00126     \param size The size in bytes of the buffer to allocate for receiving the
00127                 data.  Defaults to 64 KB.
00128     """
00129     self.__socket     = None
00130     self.__group      = None
00131     self.__port       = DataDistributorAddress.port(name)
00132     self.__size       = size
00133     self.__returnNone = None
00134 
00135   # Open a UDP socket, bind it to a port and select a multicast group
00136   def connect(self, server):
00137     """Open a UDP socket, bind it to a port and select a multicast group.
00138 
00139     \param server The address the server is multicasting on.  This
00140                   address must be the same for both server and client.  The
00141                   value can be in one of the following forms:
00142                   - A number of which only the two lower decimal digits are
00143                     used, i.e., in the range 0-99
00144                   - <port>@<multicast address>, where <port> is a value that
00145                     will override what is found from the data type name
00146                     translation, and <multicast address> is some valid multicast
00147                     IP address starting with the value 239.  A name may be used
00148                     for the <multicast address> value as well, as long as it can
00149                     be found in the name server and translates to a valid
00150                     multicast IP address.
00151                   - <multicast address>, as discussed in the previous bullet.
00152                     The port value is taken to be the one found from looking up
00153                     the data type name in the port table.
00154 
00155     """
00156     if self.__socket is None:
00157       self.__port, self.__group = DataDistributorAddress.lookup(self.__port,
00158                                                                 server)
00159       #print "port@group =", str(self.__port) + '@' + self.__group
00160       # Import modules used only here
00161       import string
00162       import struct
00163       #
00164       # Create a socket
00165       s = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
00166       self.__socket = s
00167       #
00168       # Allow multiple copies of this program on one machine
00169       # (not strictly needed)
00170       s.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
00171       try:
00172         s.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEPORT, 1)
00173       except AttributeError:
00174         pass # Some systems don't support SO_REUSEPORT
00175       #
00176       # Bind it to the port
00177       s.bind(('', self.__port))
00178       #
00179       # Look up multicast group address in name server
00180       # (doesn't hurt if it is already in ddd.ddd.ddd.ddd format)
00181       group = socket.gethostbyname(self.__group)
00182       #
00183       # Any chance this sort of looks like a valid multicast group?
00184       bytes = map(int, group.split("."))
00185       if bytes[0] < 224 and bytes[0] > 239:
00186         raise RuntimeError, \
00187           "Group %s does not resolve to a multicast address" % self.__group
00188       #
00189       # Make sure there is sufficient network buffering to receive stuff in
00190       s.setsockopt(socket.SOL_SOCKET, socket.SO_RCVBUF, self.__size)
00191       #
00192       # Add group membership
00193       s.setsockopt(socket.IPPROTO_IP, socket.IP_ADD_MEMBERSHIP,
00194                    socket.inet_aton(group) + socket.inet_aton('0.0.0.0'))
00195     else:
00196       raise RuntimeError, "Socket already in use"
00197 
00198   def disconnect(self):
00199     """Gracefully shut down the socket."""
00200     s = self.__socket
00201     if s is not None:
00202       s.setsockopt(socket.IPPROTO_IP, socket.IP_DROP_MEMBERSHIP,
00203                    socket.inet_aton(self.__group) + socket.inet_aton('0.0.0.0'))
00204       s.close()
00205       self.__socket = None
00206       self.__group  = None
00207       self.__port   = None
00208     else:
00209       raise RuntimeError, "No socket found"
00210 
00211   def returnNone(self):
00212     """Method to signal the receive loop to exit."""
00213     self.__returnNone = 1
00214 
00215   def receive(self):
00216     """Receive multicast data and the sender's identification.
00217 
00218     \return A list of the data and the sender's identification string.
00219     """
00220     if self.__socket is not None:
00221       import select
00222       while self.__returnNone is None:
00223         rd, wr, ex = select.select([self.__socket], [], [], self.__timeout)
00224         if rd:
00225           data, sender = self.__socket.recvfrom(self.__size)
00226           return data, sender
00227         elif ex:
00228           return None, None
00229     else:
00230       raise RuntimeError, "No socket found"
00231     return None, None
00232 
00233 class DataDistributorAddress(object):
00234   """\brief Data distributor address class containing static methods that
00235   govern the picking of multicast ports and addresses.
00236   """
00237   # See http://www.iana.org/assignments/port-numbers on how to pick a port
00238   # I randomly chose the following from the dynamic ports section:
00239   __PORT_TEMPLATE = 54321
00240   # See http://www.iana.org/assignments/multicast-addresses on how to pick a
00241   # group BaBar uses 239.255.0.2- 239.255.0.130, so I chose the following:
00242   __GROUP_TEMPLATE = '239.255.1.%u'     # From "site-local" range (Stevens)
00243 
00244   __ports = {"LAT_EBFdata" : __PORT_TEMPLATE + 0,
00245              "LAT_CMDdata" : __PORT_TEMPLATE + 1,
00246              "LAT_HSKdata" : __PORT_TEMPLATE + 2,
00247              "LAT_MsgLog"  : __PORT_TEMPLATE + 3}
00248 
00249   def port(name):
00250     """Static method to select a physical multicast port to use based on the
00251        type of data that is desired.
00252 
00253     \param name The name of the type of data that is desired.  Choose from
00254                 'LAT_EBFdata', 'LAT_CMDdata', 'LAT_HSKdata' and 'LAT_MsgLog'.
00255     """
00256     if name in DataDistributorAddress.__ports:
00257       return DataDistributorAddress.__ports[name]
00258     return None
00259 
00260   def lookup(port, server):
00261     """Static method for resolving the multicast port and group values.
00262 
00263     \param port   An integer containing the multicast port to use, e.g. from
00264                   the port() method above, or None to indicate that the value
00265                   will come from the supplied server string.
00266     \param server A string that contains either the machine ID (test stand
00267                   instance number), or <port>@<group IP or name>, or <group IP
00268                   or name> when a port number is also supplied.  These last two
00269                   can be used to override the values chosen by this class.  The
00270                   machine ID is a number from 0 - 999, where the hundreds
00271                   digit represents the site ID and remainder (0-99) is the
00272                   machine ID at the site.  This latter portion is what's used.
00273     \return A list made up of the resolved port and group values.
00274     """
00275     # Do a DNS like action to get the machine ID of the server
00276     # Until we get that built, let's do the following:
00277 
00278     # The 'server' argument is either a machine ID or a string of the form
00279     # port@<multicast group> (multicast group could be an IP address or a name)
00280     try:
00281       # Site machine IDs run from 0 to 99.  Remove the site ID to prevent > 255
00282       group = DataDistributorAddress.__GROUP_TEMPLATE % (int(server) % 100)
00283     except ValueError:                  # Can only be <port>@<group> or <group>
00284       try:
00285         port, group = server.split('@', 1)
00286         port = int(port)
00287       except ValueError:                # Must be <group>
00288         group = server
00289     return port, group
00290 
00291   port   = staticmethod(port)
00292   lookup = staticmethod(lookup)
00293 
00294 
00295 if __name__ == "__main__":
00296   """Testing routine.
00297 
00298   Usage: Run DataDistributor (this file) in one session to start up the client
00299          and DataDistributor -s in another session to start up the server.
00300   """
00301   import time
00302 
00303   def sender():
00304     dds = DataDistributorServer('test')
00305     #dds.connect('54320@224.0.0.0')      # "node-local" according to Stevens
00306     dds.connect('54320@239.255.1.0')    # "site-local" according to Stevens
00307     #dds.connect('54320@239.255.1.0', "localhost")  # mcast to self via primary
00308                                          # interface doesn't seem to work on XP
00309                                          # with Checkpoint enabled
00310     i = 5
00311     while i > 0:  #1:
00312       i -= 1
00313       data = `time.time()`
00314       data += 50000*'-'                 # Make it a long string
00315       #print "data =", data
00316       dds.publish(data)
00317       time.sleep(1)
00318     dds.publish('Quit')
00319     dds.disconnect()
00320 
00321   def receiver():
00322     dds = DataDistributorClient('test', 64*1024)
00323     #dds.connect('54320@224.0.0.0')      # "node-local" according to Stevens
00324     dds.connect('54320@239.255.1.0')    # "site-local" according to Stevens
00325     #dds.connect('9875@sap.mcast.net')   # Didn't see any activity
00326     #dds.connect('123@ntp.mcast.net')    # Didn't see any activity
00327 
00328     # Loop, printing any data we receive
00329     while 1:
00330       data, sender = dds.receive()
00331       print sender, ':', `data[:40]`, len(data)
00332       if data == 'Quit':  break
00333 
00334     dds.disconnect()
00335 
00336   #---
00337   import sys
00338 
00339   flags = sys.argv[1:]
00340 
00341   if flags:
00342     sender()
00343   else:
00344     receiver()

Generated on Fri Jul 21 13:26:27 2006 for LATTE R04-12-00 by doxygen 1.4.3