Package json_to_relation :: Module input_source
[hide private]
[frames] | no frames]

Source Code for Module json_to_relation.input_source

  1  ''' 
  2  Created on Sep 14, 2013 
  3   
  4  @author: paepcke 
  5  ''' 
  6  import StringIO 
  7  import bz2 
  8  import gzip 
  9  import os 
 10  import sys 
 11  from urllib import FancyURLopener 
 12  import urllib2 
 13  from urlparse import urlparse 
 14   
 15   
16 -class COMPRESSION_TYPE:
17 NO_COMPRESSION = 0; 18 GZIP = 1; 19 BZIP2 = 2
20
21 -class InputSource(object):
22 ''' 23 Possible sources for JSON objects. These are 24 files on the local file system, URLs linking to 25 JSON strings, and MongoDB collections. 26 ''' 27
28 - def __init__(self, inputSource):
29 self.inputSource = inputSource
30
31 - def __enter__(self):
32 return self.fileHandle
33
34 - def __exit__(self, excType, excValue, excTraceback):
35 # Even if the conversion threw an error, 36 # try to close the input source: 37 try: 38 self.close() 39 except: 40 # If the conversion itself when fine, then 41 # raise this exception from the closing attempt. 42 # But if the conversion failed, then have the 43 # system re-raise that earlier exception: 44 if excValue is None: 45 (errType, errValue, errTraceback) = sys.exc_info() # @UnusedVariable 46 raise IOError("Could not close the input to the conversion: %s" % errValue) 47 # Return False to indicate that if the conversion 48 # threw an error, the exception should now be re-raised. 49 # If the conversion worked fine, then this return value 50 # is ignored. 51 return False
52 53 54
55 -class InURI(InputSource):
56 - def __init__(self, inFilePathOrURL):
57 if len(urlparse(inFilePathOrURL)[0]) == 0: 58 inFilePathOrURL = 'file://' + inFilePathOrURL 59 self.inFilePathOrURL = inFilePathOrURL 60 self.compression = self.determineCompression(self.inFilePathOrURL) 61 62 # If file is compressed and remote, pull it into a temp file 63 # so we can decompress locally. Sets self.localPathFile 64 # so that urlopen() or gzip.open(), or bz2.BZ2File() will work. 65 # Sets self.deleteTempFile if a tmp file was created: 66 self.ensureFileLocal(inFilePathOrURL) 67 68 if self.compression == COMPRESSION_TYPE.NO_COMPRESSION: 69 self.fileHandle = urllib2.urlopen(self.localFilePath) 70 elif self.compression == COMPRESSION_TYPE.GZIP: 71 self.fileHandle = gzip.open(self.localFilePath, 'rb') 72 elif self.compression == COMPRESSION_TYPE.BZIP2: 73 self.fileHandle = bz2.BZ2File(self.localFilePath, 'rb')
74
75 - def decompress(self, line):
76 if self.compression == COMPRESSION_TYPE.NO_COMPRESSION: 77 return line 78 # For gzip and bz2, the read() of the fileHandle took 79 # care of decompression. This method is here for expansion 80 # to other compression schemes: 81 return line
82
83 - def close(self):
84 # closing is different in case of file vs. URL: 85 try: 86 (scheme,netloc,path,query,fragment) = self.fileHandle.urlsplit() # @UnusedVariable 87 except AttributeError: 88 self.fileHandle.close() 89 if self.deleteTempFile: 90 try: 91 os.remove(self.localFilePath) 92 except: 93 pass
94
95 - def determineCompression(self, fileURI):
96 ''' 97 Given a file path, determine by file extension whether 98 the file is gzip or bzip2 compressed, or whether it is 99 not compressed. 100 @param fileURI: item that str() turns into a file path or URL 101 @type fileURI: STRING 102 ''' 103 if str(fileURI).endswith('bz2'): 104 return COMPRESSION_TYPE.BZIP2 105 elif str(fileURI).endswith('gz'): 106 return COMPRESSION_TYPE.GZIP 107 else: 108 return COMPRESSION_TYPE.NO_COMPRESSION
109
110 - def ensureFileLocal(self, inFilePathOrURL):
111 ''' 112 Takes a file path or URL. Sets self.localFilePath 113 to the same path if file is local, or 114 if the file is remote but uncompressed. 115 If a file is remote and compressed, retrieves 116 the file into a local tmp file and returns that 117 file name. In this case the flag self.deleteTempFile 118 is set to True. 119 @param inFilePathOrURL: file path or URL to file 120 @type inFilePathOrURL: String 121 ''' 122 self.localFilePath = inFilePathOrURL 123 self.deleteTempFile = False 124 if self.compression == COMPRESSION_TYPE.NO_COMPRESSION: 125 return 126 # Got compressed file; is it local? 127 parseResult = urlparse(inFilePathOrURL) 128 if parseResult.scheme == 'file': 129 self.localFilePath = parseResult.path 130 return 131 opener = FancyURLopener() 132 # Throws IOError if URL does not exist: 133 self.localFilePath = opener.retrieve(inFilePathOrURL)[0] 134 self.deleteTempFile = True
135
136 -class InString(InputSource):
137 - def __init__(self, inputStr):
138 self.fileHandle = StringIO.StringIO(inputStr)
139
140 - def decompress(self, line):
141 ''' 142 No decompression for strings 143 @param line: 144 @type line: 145 ''' 146 return line
147
148 - def close(self):
149 pass
150
151 -class InMongoDB(InputSource):
152 - def __init__(self, server, pwd, dbName, collName):
153 self.server = server 154 self.pwd = pwd 155 self.dbName = dbName 156 self.collName = collName 157 self.fileHandle = self.connect()
158
159 - def connect(self):
160 raise NotImplementedError("MangoDB connector not yet implemented")
161
162 - def decompress(self, line):
163 raise NotImplementedError("MangoDB connector not yet implemented")
164
165 - def close(self):
166 raise NotImplementedError("MangoDB connector not yet implemented")
167
168 -class InPipe(InputSource):
169 - def __init__(self):
170 self.fileHandle = sys.stdin
171
172 - def decompress(self, line):
173 ''' 174 No decompression for pipes. Pipe through gunzip or similar first. 175 @param line: 176 @type line: 177 ''' 178 return line
179
180 - def close(self):
181 pass # don't close stdin
182