00001 #!/usr/local/bin/python
00002 #
00003 # Copyright 2003
00004 # by
00005 # The Board of Trustees of the
00006 # Leland Stanford Junior University.
00007 # All rights reserved.
00008 #
00009
00010 __facility__ = "Online"
00011 __abstract__ = "GLAST LAT data distribution wrapper classes"
00012 __author__ = "R. Claus <Claus@SLAC.Stanford.edu> SLAC - GLAST LAT I&T/Online"
00013 __date__ = ("$Date: 2005/06/28 19:20:37 $").split(' ')[1]
00014 __version__ = "$Revision: 2.7 $"
00015 __release__ = "$Name: R04-12-00 $"
00016 __credits__ = "SLAC"
00017
00018 import LATTE.copyright_SLAC
00019
00020
00021 import socket
00022 import struct
00023
00024
00025 class DataDistributorServer(object):
00026 """\brief Data Distributor Server
00027 """
00028 def __init__(self, name = None, size = 32*1024):
00029 """Data Distributor server constructor.
00030
00031 \param name The name of the type of data to be served.
00032 See the DataDistributorAddress class.
00033 \param size The size in bytes of the buffer to allocate for sending the
00034 data. This is used to set the socket send buffer size.
00035 """
00036 self.__socket = None
00037 self.__group = None
00038 self.__port = DataDistributorAddress.port(name)
00039 self.__size = size
00040
00041 def connect(self, server, interface = socket.gethostname()):
00042 """Method to set up a socket and connect to it.
00043
00044 \param server The address of a socket on which to serve the data. This
00045 address must be the same for both server and client. The
00046 value can be in one of the following forms:
00047 - A number of which only the two lower decimal digits are
00048 used, i.e., in the range 0-99
00049 - <port>@<multicast address>, where <port> is a value that
00050 will override what is found from the data type name
00051 translation, and <multicast address> is some valid
00052 multicast IP address starting with the value 239. A name
00053 may be used for the <multicast address> value as well, as
00054 long as it can be found in the name server and translates
00055 to a valid multicast IP address.
00056 - <multicast address>, as discussed in the previous bullet.
00057 The port value is taken to be the one found from looking up
00058 the data type name in the port table.
00059 \param interface The network interface to send multicast traffic on.
00060 Defaults to 'the primary network interface', however
00061 'primary' is defined. See socket.gethostbyname_ex()
00062 documentation.
00063 """
00064 if self.__socket is None:
00065 self.__port, self.__group = DataDistributorAddress.lookup(self.__port,
00066 server)
00067
00068 # Look up multicast group address in name server
00069 # (doesn't hurt if it is already in ddd.ddd.ddd.ddd format)
00070 group = socket.gethostbyname(self.__group)
00071
00072 # Any chance this sort of looks like a valid multicast group?
00073 firstByte = int(group.split(".", 1)[0])
00074 if firstByte < 224 and firstByte > 239:
00075 raise RuntimeError, \
00076 "Group %s does not resolve to a multicast address" % self.__group
00077
00078 s = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
00079 self.__socket = s
00080 #
00081 # Make sure there is sufficient network buffering
00082 s.setsockopt(socket.SOL_SOCKET, socket.SO_SNDBUF, self.__size)
00083 #
00084 # Construct interface address
00085 # Use gethostbyname and gethostname to force the connection onto the
00086 # primary network interface.
00087 intf = socket.gethostbyname_ex(interface)[2][0]
00088 #
00089 # Specify the interface for outgoing multicast datagrams
00090 s.setsockopt(socket.IPPROTO_IP, socket.IP_MULTICAST_IF, socket.inet_aton(intf) + socket.inet_aton('0.0.0.0'))
00091 else:
00092 raise RuntimeError, "Socket already in use"
00093
00094 def disconnect(self):
00095 """Gracefully shut down the socket."""
00096 if self.__socket is not None:
00097 self.__socket.close()
00098 self.__socket = None
00099 self.__group = None
00100 self.__port = None
00101 else:
00102 raise RuntimeError, "No socket found"
00103
00104 def publish(self, data):
00105 """Publish the data on the multicast socket.
00106
00107 \param data The data to be sent out on the multicast socket.
00108
00109 """
00110 if self.__socket is not None:
00111 self.__socket.sendto(data, (self.__group, self.__port))
00112 else:
00113 raise RuntimeError, "No socket found"
00114
00115
00116 class DataDistributorClient(object):
00117 """\brief Data distributor client class.
00118 """
00119 __timeout = 1
00120
00121 def __init__(self, name = None, size = 64*1024):
00122 """Data Distributor client constructor.
00123
00124 \param name The name of the type of data that the client wants to receive.
00125 See the DataDistributorAddress class.
00126 \param size The size in bytes of the buffer to allocate for receiving the
00127 data. Defaults to 64 KB.
00128 """
00129 self.__socket = None
00130 self.__group = None
00131 self.__port = DataDistributorAddress.port(name)
00132 self.__size = size
00133 self.__returnNone = None
00134
00135 # Open a UDP socket, bind it to a port and select a multicast group
00136 def connect(self, server):
00137 """Open a UDP socket, bind it to a port and select a multicast group.
00138
00139 \param server The address the server is multicasting on. This
00140 address must be the same for both server and client. The
00141 value can be in one of the following forms:
00142 - A number of which only the two lower decimal digits are
00143 used, i.e., in the range 0-99
00144 - <port>@<multicast address>, where <port> is a value that
00145 will override what is found from the data type name
00146 translation, and <multicast address> is some valid multicast
00147 IP address starting with the value 239. A name may be used
00148 for the <multicast address> value as well, as long as it can
00149 be found in the name server and translates to a valid
00150 multicast IP address.
00151 - <multicast address>, as discussed in the previous bullet.
00152 The port value is taken to be the one found from looking up
00153 the data type name in the port table.
00154
00155 """
00156 if self.__socket is None:
00157 self.__port, self.__group = DataDistributorAddress.lookup(self.__port,
00158 server)
00159 #print "port@group =", str(self.__port) + '@' + self.__group
00160 # Import modules used only here
00161 import string
00162 import struct
00163 #
00164 # Create a socket
00165 s = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
00166 self.__socket = s
00167 #
00168 # Allow multiple copies of this program on one machine
00169 # (not strictly needed)
00170 s.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
00171 try:
00172 s.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEPORT, 1)
00173 except AttributeError:
00174 pass # Some systems don't support SO_REUSEPORT
00175 #
00176 # Bind it to the port
00177 s.bind(('', self.__port))
00178 #
00179 # Look up multicast group address in name server
00180 # (doesn't hurt if it is already in ddd.ddd.ddd.ddd format)
00181 group = socket.gethostbyname(self.__group)
00182 #
00183 # Any chance this sort of looks like a valid multicast group?
00184 bytes = map(int, group.split("."))
00185 if bytes[0] < 224 and bytes[0] > 239:
00186 raise RuntimeError, \
00187 "Group %s does not resolve to a multicast address" % self.__group
00188 #
00189 # Make sure there is sufficient network buffering to receive stuff in
00190 s.setsockopt(socket.SOL_SOCKET, socket.SO_RCVBUF, self.__size)
00191 #
00192 # Add group membership
00193 s.setsockopt(socket.IPPROTO_IP, socket.IP_ADD_MEMBERSHIP,
00194 socket.inet_aton(group) + socket.inet_aton('0.0.0.0'))
00195 else:
00196 raise RuntimeError, "Socket already in use"
00197
00198 def disconnect(self):
00199 """Gracefully shut down the socket."""
00200 s = self.__socket
00201 if s is not None:
00202 s.setsockopt(socket.IPPROTO_IP, socket.IP_DROP_MEMBERSHIP,
00203 socket.inet_aton(self.__group) + socket.inet_aton('0.0.0.0'))
00204 s.close()
00205 self.__socket = None
00206 self.__group = None
00207 self.__port = None
00208 else:
00209 raise RuntimeError, "No socket found"
00210
00211 def returnNone(self):
00212 """Method to signal the receive loop to exit."""
00213 self.__returnNone = 1
00214
00215 def receive(self):
00216 """Receive multicast data and the sender's identification.
00217
00218 \return A list of the data and the sender's identification string.
00219 """
00220 if self.__socket is not None:
00221 import select
00222 while self.__returnNone is None:
00223 rd, wr, ex = select.select([self.__socket], [], [], self.__timeout)
00224 if rd:
00225 data, sender = self.__socket.recvfrom(self.__size)
00226 return data, sender
00227 elif ex:
00228 return None, None
00229 else:
00230 raise RuntimeError, "No socket found"
00231 return None, None
00232
00233 class DataDistributorAddress(object):
00234 """\brief Data distributor address class containing static methods that
00235 govern the picking of multicast ports and addresses.
00236 """
00237 # See http:
00238 # I randomly chose the following from the dynamic ports section:
00239 __PORT_TEMPLATE = 54321
00240 # See http:
00241 # group BaBar uses 239.255.0.2- 239.255.0.130, so I chose the following:
00242 __GROUP_TEMPLATE = '239.255.1.%u' # From "site-local" range (Stevens)
00243
00244 __ports = {"LAT_EBFdata" : __PORT_TEMPLATE + 0,
00245 "LAT_CMDdata" : __PORT_TEMPLATE + 1,
00246 "LAT_HSKdata" : __PORT_TEMPLATE + 2,
00247 "LAT_MsgLog" : __PORT_TEMPLATE + 3}
00248
00249 def port(name):
00250 """Static method to select a physical multicast port to use based on the
00251 type of data that is desired.
00252
00253 \param name The name of the type of data that is desired. Choose from
00254 'LAT_EBFdata', 'LAT_CMDdata', 'LAT_HSKdata' and 'LAT_MsgLog'.
00255 """
00256 if name in DataDistributorAddress.__ports:
00257 return DataDistributorAddress.__ports[name]
00258 return None
00259
00260 def lookup(port, server):
00261 """Static method for resolving the multicast port and group values.
00262
00263 \param port An integer containing the multicast port to use, e.g. from
00264 the port() method above, or None to indicate that the value
00265 will come from the supplied server string.
00266 \param server A string that contains either the machine ID (test stand
00267 instance number), or <port>@<group IP or name>, or <group IP
00268 or name> when a port number is also supplied. These last two
00269 can be used to override the values chosen by this class. The
00270 machine ID is a number from 0 - 999, where the hundreds
00271 digit represents the site ID and remainder (0-99) is the
00272 machine ID at the site. This latter portion is what's used.
00273 \return A list made up of the resolved port and group values.
00274 """
00275 # Do a DNS like action to get the machine ID of the server
00276 # Until we get that built, let's do the following:
00277
00278 # The 'server' argument is either a machine ID or a string of the form
00279 # port@<multicast group> (multicast group could be an IP address or a name)
00280 try:
00281 # Site machine IDs run from 0 to 99. Remove the site ID to prevent > 255
00282 group = DataDistributorAddress.__GROUP_TEMPLATE % (int(server) % 100)
00283 except ValueError: # Can only be <port>@<group> or <group>
00284 try:
00285 port, group = server.split('@', 1)
00286 port = int(port)
00287 except ValueError: # Must be <group>
00288 group = server
00289 return port, group
00290
00291 port = staticmethod(port)
00292 lookup = staticmethod(lookup)
00293
00294
00295 if __name__ == "__main__":
00296 """Testing routine.
00297
00298 Usage: Run DataDistributor (this file) in one session to start up the client
00299 and DataDistributor -s in another session to start up the server.
00300 """
00301 import time
00302
00303 def sender():
00304 dds = DataDistributorServer('test')
00305 #dds.connect('54320@224.0.0.0') # "node-local" according to Stevens
00306 dds.connect('54320@239.255.1.0') # "site-local" according to Stevens
00307 #dds.connect('54320@239.255.1.0', "localhost") # mcast to self via primary
00308 # interface doesn't seem to work on XP
00309 # with Checkpoint enabled
00310 i = 5
00311 while i > 0: #1:
00312 i -= 1
00313 data = `time.time()`
00314 data += 50000*'-' # Make it a long string
00315 #print "data =", data
00316 dds.publish(data)
00317 time.sleep(1)
00318 dds.publish('Quit')
00319 dds.disconnect()
00320
00321 def receiver():
00322 dds = DataDistributorClient('test', 64*1024)
00323 #dds.connect('54320@224.0.0.0') # "node-local" according to Stevens
00324 dds.connect('54320@239.255.1.0') # "site-local" according to Stevens
00325 #dds.connect('9875@sap.mcast.net') # Didn't see any activity
00326 #dds.connect('123@ntp.mcast.net') # Didn't see any activity
00327
00328 # Loop, printing any data we receive
00329 while 1:
00330 data, sender = dds.receive()
00331 print sender, ':', `data[:40]`, len(data)
00332 if data == 'Quit': break
00333
00334 dds.disconnect()
00335
00336 #---
00337 import sys
00338
00339 flags = sys.argv[1:]
00340
00341 if flags:
00342 sender()
00343 else:
00344 receiver()