DataDistributor.py

Go to the documentation of this file.
00001 #!/usr/local/bin/python
00002 #
00003 #                               Copyright 2005
00004 #                                     by
00005 #                        The Board of Trustees of the
00006 #                     Leland Stanford Junior University.
00007 #                            All rights reserved.
00008 #
00009 
00010 __facility__ = "Online"
00011 __abstract__ = "GLAST LAT data distribution wrapper classes"
00012 __author__   = "R. Claus <Claus@SLAC.Stanford.edu> SLAC - GLAST LAT I&T/Online"
00013 __date__     = "2005/07/23 00:08:27"
00014 __updated__  = "$Date: 2005/09/22 00:29:31 $"
00015 __version__  = "$Revision: 1.6 $"
00016 __release__  = "$Name: HEAD $"
00017 __credits__  = "SLAC"
00018 
00019 import LICOS.copyright_SLAC
00020 
00021 
00022 import socket
00023 import struct
00024 
00025 
00026 class DataDistributorServer(object):
00027   """!\brief Data Distributor Server.
00028 
00029   """
00030   def __init__(self, name = None, size = 32*1024):
00031     """Data Distributor server constructor.
00032 
00033     \param name The name of the type of data to be served.
00034                 See the DataDistributorAddress class.
00035     \param size The size in bytes of the buffer to allocate for sending the
00036                 data.  This is used to set the socket send buffer size.
00037     """
00038     self.__socket = None
00039     self.__group  = None
00040     self.__port   = DataDistributorAddress.port(name)
00041     self.__size   = size
00042 
00043   def connect(self, server, interface = socket.gethostname()):
00044     """!\brief Method to set up a socket and connect to it.
00045 
00046     \param server    The address of a socket on which to serve the data.  This
00047                      address must be the same for both server and client.  The
00048                      value can be in one of the following forms:
00049                      - A number of which only the two lower decimal digits are
00050                        used, i.e., in the range 0-99
00051                      - <port>@<multicast address>, where <port> is a value that
00052                        will override what is found from the data type name
00053                        translation, and <multicast address> is some valid
00054                        multicast IP address starting with the value 239.  A name
00055                        may be used for the <multicast address> value as well, as
00056                        long as it can be found in the name server and translates
00057                        to a valid multicast IP address.
00058                      - <multicast address>, as discussed in the previous bullet.
00059                        The port value is taken to be the one found from looking up
00060                        the data type name in the port table.
00061     \param interface The network interface to send multicast traffic on.
00062                      Defaults to 'the primary network interface', however
00063                      'primary' is defined.  See socket.gethostbyname_ex()
00064                      documentation.
00065     """
00066     if self.__socket is None:
00067       self.__port, self.__group = DataDistributorAddress.lookup(self.__port,
00068                                                                 server)
00069 
00070       # Look up multicast group address in name server
00071       # (doesn't hurt if it is already in ddd.ddd.ddd.ddd format)
00072       group = socket.gethostbyname(self.__group)
00073 
00074       # Any chance this sort of looks like a valid multicast group?
00075       firstByte = int(group.split(".", 1)[0])
00076       if firstByte < 224 and firstByte > 239:
00077         raise RuntimeError, \
00078           "Group %s does not resolve to a multicast address" % self.__group
00079 
00080       s = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
00081       self.__socket = s
00082       #
00083       # Make sure there is sufficient network buffering
00084       s.setsockopt(socket.SOL_SOCKET, socket.SO_SNDBUF, self.__size)
00085       #
00086       # Construct interface address
00087       # Use gethostbyname and gethostname to force the connection onto the
00088       # primary network interface.
00089       intf = socket.gethostbyname_ex(interface)[2][0]
00090       #
00091       # Specify the interface for outgoing multicast datagrams
00092       s.setsockopt(socket.IPPROTO_IP, socket.IP_MULTICAST_IF, socket.inet_aton(intf) + socket.inet_aton('0.0.0.0'))
00093     else:
00094       raise RuntimeError, "Socket already in use"
00095 
00096   def disconnect(self):
00097     """!\brief Gracefully shut down the socket.
00098 
00099     """
00100     if self.__socket is not None:
00101       self.__socket.close()
00102       self.__socket = None
00103       self.__group  = None
00104       self.__port   = None
00105     else:
00106       raise RuntimeError, "No socket found"
00107 
00108   def publish(self, data):
00109     """!\brief Publish the data on the multicast socket.
00110 
00111     \param data The data to be sent out on the multicast socket.
00112 
00113     """
00114     if self.__socket is not None:
00115       self.__socket.sendto(data, (self.__group, self.__port))
00116     else:
00117       raise RuntimeError, "No socket found"
00118 
00119 
00120 class DataDistributorClient(object):
00121   """!\brief Data distributor client class.
00122 
00123   """
00124   __timeout = 1
00125 
00126   def __init__(self, name = None, size = 64*1024):
00127     """!\brief Data Distributor client constructor.
00128 
00129     \param name The name of the type of data that the client wants to receive.
00130                 See the DataDistributorAddress class.
00131     \param size The size in bytes of the buffer to allocate for receiving the
00132                 data.  Defaults to 64 KB.
00133     """
00134     self.__socket     = None
00135     self.__group      = None
00136     self.__port       = DataDistributorAddress.port(name)
00137     self.__size       = size
00138     self.__returnNone = None
00139 
00140   # Open a UDP socket, bind it to a port and select a multicast group
00141   def connect(self, server):
00142     """!\brief Open a UDP socket, bind it to a port and select a multicast group.
00143 
00144     \param server The address the server is multicasting on.  This
00145                   address must be the same for both server and client.  The
00146                   value can be in one of the following forms:
00147                   - A number of which only the two lower decimal digits are
00148                     used, i.e., in the range 0-99
00149                   - <port>@<multicast address>, where <port> is a value that
00150                     will override what is found from the data type name
00151                     translation, and <multicast address> is some valid multicast
00152                     IP address starting with the value 239.  A name may be used
00153                     for the <multicast address> value as well, as long as it can
00154                     be found in the name server and translates to a valid
00155                     multicast IP address.
00156                   - <multicast address>, as discussed in the previous bullet.
00157                     The port value is taken to be the one found from looking up
00158                     the data type name in the port table.
00159 
00160     """
00161     if self.__socket is None:
00162       self.__port, self.__group = DataDistributorAddress.lookup(self.__port,
00163                                                                 server)
00164       #print "port@group =", str(self.__port) + '@' + self.__group
00165       # Import modules used only here
00166       import string
00167       import struct
00168       #
00169       # Create a socket
00170       s = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
00171       self.__socket = s
00172       #
00173       # Allow multiple copies of this program on one machine
00174       # (not strictly needed)
00175       s.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
00176       try:
00177         s.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEPORT, 1)
00178       except AttributeError:
00179         pass # Some systems don't support SO_REUSEPORT
00180       #
00181       # Bind it to the port
00182       s.bind(('', self.__port))
00183       #
00184       # Look up multicast group address in name server
00185       # (doesn't hurt if it is already in ddd.ddd.ddd.ddd format)
00186       group = socket.gethostbyname(self.__group)
00187       #
00188       # Any chance this sort of looks like a valid multicast group?
00189       bytes = map(int, group.split("."))
00190       if bytes[0] < 224 and bytes[0] > 239:
00191         raise RuntimeError, \
00192           "Group %s does not resolve to a multicast address" % self.__group
00193       #
00194       # Make sure there is sufficient network buffering to receive stuff in
00195       s.setsockopt(socket.SOL_SOCKET, socket.SO_RCVBUF, self.__size)
00196       #
00197       # Add group membership
00198       s.setsockopt(socket.IPPROTO_IP, socket.IP_ADD_MEMBERSHIP,
00199                    socket.inet_aton(group) + socket.inet_aton('0.0.0.0'))
00200     else:
00201       raise RuntimeError, "Socket already in use"
00202 
00203   def disconnect(self):
00204     """!\brief Gracefully shut down the socket.
00205 
00206     """
00207     s = self.__socket
00208     if s is not None:
00209       s.setsockopt(socket.IPPROTO_IP, socket.IP_DROP_MEMBERSHIP,
00210                    socket.inet_aton(self.__group) + socket.inet_aton('0.0.0.0'))
00211       s.close()
00212       self.__socket = None
00213       self.__group  = None
00214       self.__port   = None
00215     else:
00216       raise RuntimeError, "No socket found"
00217 
00218   def returnNone(self):
00219     """!\brief Method to signal the receive loop to exit.
00220 
00221     """
00222     self.__returnNone = 1
00223 
00224   def receive(self):
00225     """!\brief Receive multicast data and the sender's identification.
00226 
00227     \return A list of the data and the sender's identification string.
00228     """
00229     if self.__socket is not None:
00230       import select
00231       while self.__returnNone is None:
00232         rd, wr, ex = select.select([self.__socket], [], [], self.__timeout)
00233         if rd:
00234           data, sender = self.__socket.recvfrom(self.__size)
00235           return data, sender
00236         elif ex:
00237           return None, None
00238     else:
00239       raise RuntimeError, "No socket found"
00240     return None, None
00241 
00242 class DataDistributorAddress(object):
00243   """!\brief Data distributor address class containing static methods that
00244   govern the picking of multicast ports and addresses.
00245 
00246   """
00247   # See http://www.iana.org/assignments/port-numbers on how to pick a port
00248   # I randomly chose the following from the dynamic ports section:
00249   __PORT_TEMPLATE = 54321
00250   # See http://www.iana.org/assignments/multicast-addresses on how to pick a
00251   # group BaBar uses 239.255.0.2- 239.255.0.130, so I chose the following:
00252   __GROUP_TEMPLATE = '239.255.1.%u'     # From "site-local" range (Stevens)
00253 
00254   __ports = {"LAT_EBFdata" : __PORT_TEMPLATE + 0,
00255              "LAT_LDFdata" : __PORT_TEMPLATE + 0, # Alias for EBFdata
00256              "LAT_CMDdata" : __PORT_TEMPLATE + 1,
00257              "LAT_HSKdata" : __PORT_TEMPLATE + 2,
00258              "LAT_MsgLog"  : __PORT_TEMPLATE + 3}
00259 
00260   def port(name):
00261     """!\brief Static method to select a physical multicast port to use based on the
00262        type of data that is desired.
00263 
00264     \param name The name of the type of data that is desired.  Choose from
00265                 'LAT_EBFdata', 'LAT_CMDdata', 'LAT_HSKdata' and 'LAT_MsgLog'.
00266     """
00267     if name in DataDistributorAddress.__ports:
00268       return DataDistributorAddress.__ports[name]
00269     return None
00270 
00271   def lookup(port, server):
00272     """!\brief Static method for resolving the multicast port and group values.
00273 
00274     \param port   An integer containing the multicast port to use, e.g. from
00275                   the port() method above, or None to indicate that the value
00276                   will come from the supplied server string.
00277     \param server A string that contains either the machine ID (test stand
00278                   instance number), or <port>@<group IP or name>, or <group IP
00279                   or name> when a port number is also supplied.  These last two
00280                   can be used to override the values chosen by this class.  The
00281                   machine ID is a number from 0 - 999, where the hundreds
00282                   digit represents the site ID and remainder (0-99) is the
00283                   machine ID at the site.  This latter portion is what's used.
00284     \return A list made up of the resolved port and group values.
00285     """
00286     # Do a DNS like action to get the machine ID of the server
00287     # Until we get that built, let's do the following:
00288 
00289     # The 'server' argument is either a machine ID or a string of the form
00290     # port@<multicast group> (multicast group could be an IP address or a name)
00291     try:
00292       # Site machine IDs run from 0 to 99.  Remove the site ID to prevent > 255
00293       group = DataDistributorAddress.__GROUP_TEMPLATE % (int(server) % 100)
00294     except ValueError:                  # Can only be <port>@<group> or <group>
00295       try:
00296         port, group = server.split('@', 1)
00297         port = int(port)
00298       except ValueError:                # Must be <group>
00299         group = server
00300     return port, group
00301 
00302   port   = staticmethod(port)
00303   lookup = staticmethod(lookup)
00304 
00305 
00306 if __name__ == "__main__":
00307   """Testing routine.
00308 
00309   Usage: Run DataDistributor (this file) in one session to start up the client
00310          and DataDistributor -s in another session to start up the server.
00311   """
00312   import time
00313 
00314   def sender():
00315     dds = DataDistributorServer('test')
00316     #dds.connect('54320@224.0.0.0')      # "node-local" according to Stevens
00317     dds.connect('54320@239.255.1.0')    # "site-local" according to Stevens
00318     #dds.connect('54320@239.255.1.0', "localhost")  # mcast to self via primary
00319                                          # interface doesn't seem to work on XP
00320                                          # with Checkpoint enabled
00321     i = 5
00322     while i > 0:  #1:
00323       i -= 1
00324       data = `time.time()`
00325       data += 50000*'-'                 # Make it a long string
00326       #print "data =", data
00327       dds.publish(data)
00328       time.sleep(1)
00329     dds.publish('Quit')
00330     dds.disconnect()
00331 
00332   def receiver():
00333     dds = DataDistributorClient('test', 64*1024)
00334     #dds.connect('54320@224.0.0.0')      # "node-local" according to Stevens
00335     dds.connect('54320@239.255.1.0')    # "site-local" according to Stevens
00336     #dds.connect('9875@sap.mcast.net')   # Didn't see any activity
00337     #dds.connect('123@ntp.mcast.net')    # Didn't see any activity
00338 
00339     # Loop, printing any data we receive
00340     while 1:
00341       data, sender = dds.receive()
00342       print sender, ':', `data[:40]`, len(data)
00343       if data == 'Quit':  break
00344 
00345     dds.disconnect()
00346 
00347   #---
00348   import sys
00349 
00350   flags = sys.argv[1:]
00351 
00352   if flags:
00353     sender()
00354   else:
00355     receiver()

Generated on Thu Apr 27 20:52:41 2006 for LICOS L02-01-00 by doxygen 1.4.6-NO