00001 #!/usr/local/bin/python 00002 # 00003 # Copyright 2006 00004 # by 00005 # The Board of Trustees of the 00006 # Leland Stanford Junior University. 00007 # All rights reserved. 00008 # 00009 00010 __facility__ = "Online" 00011 __abstract__ = "Program to merge data from network sockets" 00012 __author__ = "R. Claus <Claus@SLAC.Stanford.edu> SLAC - GLAST LAT I&T/Online" 00013 __date__ = "3/30/2006" 00014 __updated__ = ("$Date: 2006/07/14 22:34:29 $").split(' ')[1] 00015 __version__ = "$Revision: 1.1 $" 00016 __release__ = "$Name: R04-12-00 $" 00017 __credits__ = "SLAC" 00018 00019 import LATTE.copyright_SLAC 00020 00021 import logging as log 00022 import sys 00023 import time 00024 import struct 00025 00026 import LDF 00027 00028 from LATTE.merger.geb import Geb 00029 00030 from ancillarySocket import AncillaryContributor 00031 from ldfSocket import LdfContributor 00032 00033 00034 class Merger(Geb): 00035 def __init__(self, contributors, filename): 00036 Geb.__init__(self, contributors) 00037 self.__file = file(filename, 'wb') 00038 00039 def dispose(self, (statuses, buffer)): 00040 """!\brief Method called by the base class to dispose of an event. 00041 """ 00042 self.__file.write(buffer) 00043 00044 def close(self): 00045 """!\brief Method called by the base class to close the output file. 00046 """ 00047 self.__file.close() 00048 00049 00050 def getHostPort(option): 00051 opts = option.split(':') 00052 host = opts[0] 00053 if len(opts) == 1: 00054 port = 2345 00055 else: 00056 port = int(opts[1]) 00057 return host, port 00058 00059 00060 def main(options): 00061 ancillaryContributor = AncillaryContributor(getHostPort(options.ancillaryHost)) 00062 ldfContributor = LdfContributor(getHostPort(options.ldfHost)) 00063 merger = Merger([ancillaryContributor, ldfContributor], options.outFile) 00064 #merger = Merger([ancillaryContributor], options.outFile) 00065 #merger = Merger([ldfContributor], options.outFile) 00066 00067 try: 00068 merger.start() 00069 00070 # Wait until an error occurs or the user kills us 00071 while(True): time.sleep(24*3600) 00072 00073 except KeyboardInterrupt: 00074 pass 00075 except Exception, e: 00076 log.exception(sys.argv[0] + ":\n" + str(e)) 00077 00078 # Shutdown the builder 00079 merger.shutdown() 00080 00081 00082 def usage(): 00083 """!\brief Return a program usage message 00084 """ 00085 return "\n\n%s merges, or builds, events from sockets\n" % (sys.argv[0]) 00086 00087 00088 if __name__ == "__main__": 00089 # Options, first list mandatory, second list optional 00090 from LATTE.tools.gOptions import Options 00091 00092 options = Options(["ancillaryHost", "ldfHost", "outFile"]) 00093 try: 00094 options.parse() 00095 except Exception, msg: 00096 options.usage(usage()) 00097 raise Exception, msg 00098 00099 main(options)