00001
00002
00003
00004
00005
00006
00007
00008
00009
00010 __facility__ = "Online"
00011 __abstract__ = "GLAST LAT data distribution wrapper classes"
00012 __author__ = "R. Claus <Claus@SLAC.Stanford.edu> SLAC - GLAST LAT I&T/Online"
00013 __date__ = "2005/07/23 00:08:27"
00014 __updated__ = "$Date: 2005/09/22 00:29:31 $"
00015 __version__ = "$Revision: 1.6 $"
00016 __release__ = "$Name: HEAD $"
00017 __credits__ = "SLAC"
00018
00019 import LICOS.copyright_SLAC
00020
00021
00022 import socket
00023 import struct
00024
00025
00026 class DataDistributorServer(object):
00027 """!\brief Data Distributor Server.
00028
00029 """
00030 def __init__(self, name = None, size = 32*1024):
00031 """Data Distributor server constructor.
00032
00033 \param name The name of the type of data to be served.
00034 See the DataDistributorAddress class.
00035 \param size The size in bytes of the buffer to allocate for sending the
00036 data. This is used to set the socket send buffer size.
00037 """
00038 self.__socket = None
00039 self.__group = None
00040 self.__port = DataDistributorAddress.port(name)
00041 self.__size = size
00042
00043 def connect(self, server, interface = socket.gethostname()):
00044 """!\brief Method to set up a socket and connect to it.
00045
00046 \param server The address of a socket on which to serve the data. This
00047 address must be the same for both server and client. The
00048 value can be in one of the following forms:
00049 - A number of which only the two lower decimal digits are
00050 used, i.e., in the range 0-99
00051 - <port>@<multicast address>, where <port> is a value that
00052 will override what is found from the data type name
00053 translation, and <multicast address> is some valid
00054 multicast IP address starting with the value 239. A name
00055 may be used for the <multicast address> value as well, as
00056 long as it can be found in the name server and translates
00057 to a valid multicast IP address.
00058 - <multicast address>, as discussed in the previous bullet.
00059 The port value is taken to be the one found from looking up
00060 the data type name in the port table.
00061 \param interface The network interface to send multicast traffic on.
00062 Defaults to 'the primary network interface', however
00063 'primary' is defined. See socket.gethostbyname_ex()
00064 documentation.
00065 """
00066 if self.__socket is None:
00067 self.__port, self.__group = DataDistributorAddress.lookup(self.__port,
00068 server)
00069
00070
00071
00072 group = socket.gethostbyname(self.__group)
00073
00074
00075 firstByte = int(group.split(".", 1)[0])
00076 if firstByte < 224 and firstByte > 239:
00077 raise RuntimeError, \
00078 "Group %s does not resolve to a multicast address" % self.__group
00079
00080 s = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
00081 self.__socket = s
00082
00083
00084 s.setsockopt(socket.SOL_SOCKET, socket.SO_SNDBUF, self.__size)
00085
00086
00087
00088
00089 intf = socket.gethostbyname_ex(interface)[2][0]
00090
00091
00092 s.setsockopt(socket.IPPROTO_IP, socket.IP_MULTICAST_IF, socket.inet_aton(intf) + socket.inet_aton('0.0.0.0'))
00093 else:
00094 raise RuntimeError, "Socket already in use"
00095
00096 def disconnect(self):
00097 """!\brief Gracefully shut down the socket.
00098
00099 """
00100 if self.__socket is not None:
00101 self.__socket.close()
00102 self.__socket = None
00103 self.__group = None
00104 self.__port = None
00105 else:
00106 raise RuntimeError, "No socket found"
00107
00108 def publish(self, data):
00109 """!\brief Publish the data on the multicast socket.
00110
00111 \param data The data to be sent out on the multicast socket.
00112
00113 """
00114 if self.__socket is not None:
00115 self.__socket.sendto(data, (self.__group, self.__port))
00116 else:
00117 raise RuntimeError, "No socket found"
00118
00119
00120 class DataDistributorClient(object):
00121 """!\brief Data distributor client class.
00122
00123 """
00124 __timeout = 1
00125
00126 def __init__(self, name = None, size = 64*1024):
00127 """!\brief Data Distributor client constructor.
00128
00129 \param name The name of the type of data that the client wants to receive.
00130 See the DataDistributorAddress class.
00131 \param size The size in bytes of the buffer to allocate for receiving the
00132 data. Defaults to 64 KB.
00133 """
00134 self.__socket = None
00135 self.__group = None
00136 self.__port = DataDistributorAddress.port(name)
00137 self.__size = size
00138 self.__returnNone = None
00139
00140
00141 def connect(self, server):
00142 """!\brief Open a UDP socket, bind it to a port and select a multicast group.
00143
00144 \param server The address the server is multicasting on. This
00145 address must be the same for both server and client. The
00146 value can be in one of the following forms:
00147 - A number of which only the two lower decimal digits are
00148 used, i.e., in the range 0-99
00149 - <port>@<multicast address>, where <port> is a value that
00150 will override what is found from the data type name
00151 translation, and <multicast address> is some valid multicast
00152 IP address starting with the value 239. A name may be used
00153 for the <multicast address> value as well, as long as it can
00154 be found in the name server and translates to a valid
00155 multicast IP address.
00156 - <multicast address>, as discussed in the previous bullet.
00157 The port value is taken to be the one found from looking up
00158 the data type name in the port table.
00159
00160 """
00161 if self.__socket is None:
00162 self.__port, self.__group = DataDistributorAddress.lookup(self.__port,
00163 server)
00164
00165
00166 import string
00167 import struct
00168
00169
00170 s = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
00171 self.__socket = s
00172
00173
00174
00175 s.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
00176 try:
00177 s.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEPORT, 1)
00178 except AttributeError:
00179 pass
00180
00181
00182 s.bind(('', self.__port))
00183
00184
00185
00186 group = socket.gethostbyname(self.__group)
00187
00188
00189 bytes = map(int, group.split("."))
00190 if bytes[0] < 224 and bytes[0] > 239:
00191 raise RuntimeError, \
00192 "Group %s does not resolve to a multicast address" % self.__group
00193
00194
00195 s.setsockopt(socket.SOL_SOCKET, socket.SO_RCVBUF, self.__size)
00196
00197
00198 s.setsockopt(socket.IPPROTO_IP, socket.IP_ADD_MEMBERSHIP,
00199 socket.inet_aton(group) + socket.inet_aton('0.0.0.0'))
00200 else:
00201 raise RuntimeError, "Socket already in use"
00202
00203 def disconnect(self):
00204 """!\brief Gracefully shut down the socket.
00205
00206 """
00207 s = self.__socket
00208 if s is not None:
00209 s.setsockopt(socket.IPPROTO_IP, socket.IP_DROP_MEMBERSHIP,
00210 socket.inet_aton(self.__group) + socket.inet_aton('0.0.0.0'))
00211 s.close()
00212 self.__socket = None
00213 self.__group = None
00214 self.__port = None
00215 else:
00216 raise RuntimeError, "No socket found"
00217
00218 def returnNone(self):
00219 """!\brief Method to signal the receive loop to exit.
00220
00221 """
00222 self.__returnNone = 1
00223
00224 def receive(self):
00225 """!\brief Receive multicast data and the sender's identification.
00226
00227 \return A list of the data and the sender's identification string.
00228 """
00229 if self.__socket is not None:
00230 import select
00231 while self.__returnNone is None:
00232 rd, wr, ex = select.select([self.__socket], [], [], self.__timeout)
00233 if rd:
00234 data, sender = self.__socket.recvfrom(self.__size)
00235 return data, sender
00236 elif ex:
00237 return None, None
00238 else:
00239 raise RuntimeError, "No socket found"
00240 return None, None
00241
00242 class DataDistributorAddress(object):
00243 """!\brief Data distributor address class containing static methods that
00244 govern the picking of multicast ports and addresses.
00245
00246 """
00247
00248
00249 __PORT_TEMPLATE = 54321
00250
00251
00252 __GROUP_TEMPLATE = '239.255.1.%u'
00253
00254 __ports = {"LAT_EBFdata" : __PORT_TEMPLATE + 0,
00255 "LAT_LDFdata" : __PORT_TEMPLATE + 0,
00256 "LAT_CMDdata" : __PORT_TEMPLATE + 1,
00257 "LAT_HSKdata" : __PORT_TEMPLATE + 2,
00258 "LAT_MsgLog" : __PORT_TEMPLATE + 3}
00259
00260 def port(name):
00261 """!\brief Static method to select a physical multicast port to use based on the
00262 type of data that is desired.
00263
00264 \param name The name of the type of data that is desired. Choose from
00265 'LAT_EBFdata', 'LAT_CMDdata', 'LAT_HSKdata' and 'LAT_MsgLog'.
00266 """
00267 if name in DataDistributorAddress.__ports:
00268 return DataDistributorAddress.__ports[name]
00269 return None
00270
00271 def lookup(port, server):
00272 """!\brief Static method for resolving the multicast port and group values.
00273
00274 \param port An integer containing the multicast port to use, e.g. from
00275 the port() method above, or None to indicate that the value
00276 will come from the supplied server string.
00277 \param server A string that contains either the machine ID (test stand
00278 instance number), or <port>@<group IP or name>, or <group IP
00279 or name> when a port number is also supplied. These last two
00280 can be used to override the values chosen by this class. The
00281 machine ID is a number from 0 - 999, where the hundreds
00282 digit represents the site ID and remainder (0-99) is the
00283 machine ID at the site. This latter portion is what's used.
00284 \return A list made up of the resolved port and group values.
00285 """
00286
00287
00288
00289
00290
00291 try:
00292
00293 group = DataDistributorAddress.__GROUP_TEMPLATE % (int(server) % 100)
00294 except ValueError:
00295 try:
00296 port, group = server.split('@', 1)
00297 port = int(port)
00298 except ValueError:
00299 group = server
00300 return port, group
00301
00302 port = staticmethod(port)
00303 lookup = staticmethod(lookup)
00304
00305
00306 if __name__ == "__main__":
00307 """Testing routine.
00308
00309 Usage: Run DataDistributor (this file) in one session to start up the client
00310 and DataDistributor -s in another session to start up the server.
00311 """
00312 import time
00313
00314 def sender():
00315 dds = DataDistributorServer('test')
00316
00317 dds.connect('54320@239.255.1.0')
00318
00319
00320
00321 i = 5
00322 while i > 0:
00323 i -= 1
00324 data = `time.time()`
00325 data += 50000*'-'
00326
00327 dds.publish(data)
00328 time.sleep(1)
00329 dds.publish('Quit')
00330 dds.disconnect()
00331
00332 def receiver():
00333 dds = DataDistributorClient('test', 64*1024)
00334
00335 dds.connect('54320@239.255.1.0')
00336
00337
00338
00339
00340 while 1:
00341 data, sender = dds.receive()
00342 print sender, ':', `data[:40]`, len(data)
00343 if data == 'Quit': break
00344
00345 dds.disconnect()
00346
00347
00348 import sys
00349
00350 flags = sys.argv[1:]
00351
00352 if flags:
00353 sender()
00354 else:
00355 receiver()