|
Package json_to_relation ::
Module json_to_relation
|
|
1
2
3 '''
4 Created on Sep 14, 2013
5
6 @author: paepcke
7 '''
8
9 from collections import OrderedDict
10 import math
11 import os
12 import re
13 import shutil
14 import tempfile
15 import logging
16
17 from generic_json_parser import GenericJSONParser
18 from input_source import InputSource, InURI, InString, InMongoDB, InPipe
19 from col_data_type import ColDataType
20 from output_disposition import OutputDisposition, OutputMySQLTable, OutputFile
21
22
23
24
25
27 '''
28 Given a source with JSON structures, derive a schema, and construct
29 a relational table. Source can be a local file name, a URL, or an
30 StringIO pseudofile.
31
32 JSON structures in the source must be one per line. That is, each line in
33 the source must be a self contained JSON object. Pretty printed strings
34 won't work.
35 '''
36
37 MAX_SQL_INT = math.pow(2,31) - 1
38 MIN_SQL_INT = -math.pow(2,31)
39
40
41
42
43 LEGAL_MYSQL_ATTRIBUTE_PATTERN = re.compile("^[$\w]+$")
44
53 '''
54 Create a JSON-to-Relation converter. The JSON source can be
55 a file with JSON objects, a StringIO.StringIO string pseudo file,
56 stdin, or a MongoDB
57
58 The destination can be a file, where CSV is written in Excel-readable
59 form, stdout, or a MySQL table specification, where the ouput rows
60 will be inserted.
61
62 SchemaHints optionally specify the SQL types of particular columns.
63 By default the processJSONObs() method will be conservative, and
64 specify numeric columns as DOUBLE. Even though all encountered values
65 for one column could be examined, and a more appropriate type chosen,
66 such as INT when only 4-byte integers are ever seen, future additions
67 to the table might exceed the INT capacity for that column. Example
68
69 If schemaHints is provided, it is a Dict mapping column names to ColDataType.
70 The column names in schemaHints must match the corresponding (fully nested)
71 key names in the JSON objects::
72 schemaHints dict: {'msg.length' : ColDataType.INT,
73 'chunkSize' : ColDataType.INT}
74
75 For unit testing isolated methods in this class, set jsonSource and
76 destination to None.
77
78 @param jsonSource: subclass of InputSource that wraps containing JSON structures, or a URL to such a source
79 @type jsonSource: {InPipe | InString | InURI | InMongoDB}
80 @param destination: instruction to were resulting rows are to be directed
81 @type destination: {OutputPipe | OutputFile | OutputMySQLTable}
82 @param outputFormat: format of output. Can be CSV or SQL INSERT statements
83 @type outputFormat: OutputFormat
84 @param schemaHints: Dict mapping col names to data types (optional)
85 @type schemaHints: Map<String,ColDataTYpe>
86 @param jsonParserInstance: a parser that takes one JSON string, and returns a CSV row. Parser also must inform this
87 parent object of any generated column names.
88 @type jsonParserInstance: {GenericJSONParser | EdXTrackLogJSONParser | CourseraTrackLogJSONParser}
89 @param loggingLevel: level at which logging output is show.
90 @type loggingLevel: {logging.DEBUG | logging.WARN | logging.INFO | logging.ERROR | logging.CRITICAL}
91 @raise ValueErrer: when value of jsonParserInstance is neither None, nor an instance of GenericJSONParser,
92 nor one of its subclasses.
93 @raise ValueError: when jsonSource is not an instance of InPipe, InString, InURI, or InMongoDB
94 '''
95
96
97
98
99 if jsonSource is None and destination is None:
100 return
101 if not isinstance(jsonSource, InputSource):
102 raise ValueError("JSON source must be an instance of InPipe, InString, InURI, or InMongoDB")
103
104 self.jsonSource = jsonSource
105 self.destination = destination
106 self.outputFormat = outputFormat
107 self.schemaHints = schemaHints
108
109 if logFile is not None:
110 logging.basicConfig(filename=logFile, level=loggingLevel)
111 else:
112 logging.basicConfig(level=loggingLevel)
113
114 if jsonParserInstance is None:
115 self.jsonParserInstance = GenericJSONParser(self)
116 elif isinstance(jsonParserInstance, GenericJSONParser):
117 self.jsonParserInstance = jsonParserInstance
118 else:
119 raise ValueError("Parameter jsonParserInstance needs to be of class GenericJSONParser, or one of its subclasses.")
120
121
122 if self.outputFormat == OutputDisposition.OutputFormat.SQL_INSERT_STATEMENTS:
123 raise NotImplementedError("Output as MySQL statements not yet implemented")
124
125
126
127 self.cols = OrderedDict()
128
129
130
131 self.nextNewColPos = 0;
132
133 - def convert(self, prependColHeader=False):
134 '''
135 Main user-facing API method. Read from the JSON source establish
136 in the __init__() call. Create a MySQL schema as the JSON is read.
137 Convert each JSON object into the requested output format (e.g. CSV),
138 and deliver it to the destination (e.g. a file)
139 @param prependColHeader: If true, the final destination, if it is stdout or a file,
140 will have the column names prepended. Note that this option requires that
141 the output file is first written to a temp file, and then merged with the
142 completed column name header row to the final destination that was specified
143 by the client.
144 @type prependColHeader: Boolean
145 '''
146 savedFinalOutDest = None
147 if not isinstance(self.destination, OutputMySQLTable):
148 if prependColHeader:
149 savedFinalOutDest = self.destination
150 (tmpFd, tmpFileName) = tempfile.mkstemp(suffix='.csv',prefix='jsonToRelationTmp')
151 os.close(tmpFd)
152 self.destination = OutputFile(tmpFileName)
153
154 lineCounter = 0
155 with OutputDisposition(self.destination) as outFd, self.jsonSource as inFd:
156 for jsonStr in inFd:
157 jsonStr = self.jsonSource.decompress(jsonStr)
158 newRow = []
159 try:
160 newRow = self.jsonParserInstance.processOneJSONObject(jsonStr, newRow)
161 except Exception as e:
162 logging.warn('Line %d: bad JSON: %s' % (lineCounter, `e`))
163 self.processFinishedRow(newRow, outFd)
164 lineCounter += 1
165
166
167
168 if prependColHeader and savedFinalOutDest is not None:
169 try:
170 with open(self.destination.name, 'rb') as inFd, OutputDisposition(savedFinalOutDest) as finalOutFd:
171 colHeaders = self.getColHeaders()
172 self.processFinishedRow(colHeaders, finalOutFd)
173 shutil.copyfileobj(inFd, finalOutFd.fileHandle)
174 finally:
175 os.remove(tmpFileName)
176
178 '''
179 Given a column name and MySQL datatype name, check whether this
180 column has previously been encountered. If not, a column information
181 object is created, which will eventually be used to create the column
182 header, Django model, or SQL create statements.
183 @param colName: name of the column to consider
184 @type colName: String
185 @param colDataType: datatype of the column.
186 @type colDataType: ColDataType
187 '''
188 try:
189 self.cols[colName]
190 except KeyError:
191
192 self.cols[colName] = ColumnSpec(colName, colDataType, self)
193
195 '''
196 When a row is finished, this method processes the row as per
197 the user's disposition. The method writes the row to a CSV
198 file, inserts it into a MySQL table, and generates an SQL
199 insert statement for later.
200 @param filledNewRow: the list of values for one row, possibly including empty fields
201 @type filledNewRow: List<<any>>
202 @param outFd: an instance of a class that writes to the destination
203 @type outFd: OutputDisposition
204 '''
205
206
207 outFd.writerow(map(str,filledNewRow))
208
210 '''
211 Returns an ordered list of ColumnSpec instances.
212 Each such instance holds column name and SQL type.
213 @return: ordered list of column information
214 @rtype: (ColumnSpec)
215 '''
216 return self.cols.values()
217
219 '''
220 Returns a list of column header names collected so far.
221 '''
222 headers = []
223 for colSpec in self.cols.values():
224 headers.append(colSpec.colName)
225 return headers
226
228 '''
229 Returns the position of the next new column that
230 may need to be added when a previously unseen JSON
231 label is encountered.
232 '''
233 return self.nextNewColPos
234
236 self.nextNewColPos += 1
237
239 '''
240 Given a proposed MySQL identifier, such as a column name,
241 return a possibly modified name that will be acceptable to
242 a MySQL database. MySQL accepts alphanumeric, underscore,
243 and dollar sign. Identifiers with other chars must be quoted.
244 Quote characters embedded within the identifiers must be
245 doubled to be escaped.
246 @param proposedMySQLName: input name
247 @type proposedMySQLName: String
248 @return: the possibly modified, legal MySQL identifier
249 @rtype: String
250 '''
251 if JSONToRelation.LEGAL_MYSQL_ATTRIBUTE_PATTERN.match(proposedMySQLName) is not None:
252 return proposedMySQLName
253
254
255 quoteChar = '"'
256 if proposedMySQLName.find(quoteChar) > -1:
257 quoteChar = "'"
258 if proposedMySQLName.find(quoteChar) > -1:
259
260
261 charList = list(proposedMySQLName)
262 newName = ""
263 for letter in charList:
264 if letter == quoteChar:
265 letter = quoteChar + quoteChar
266 newName += letter
267 proposedMySQLName = newName
268 return quoteChar + proposedMySQLName + quoteChar
269
271 '''
272 Housekeeping class. Each instance represents the name,
273 position, and datatype of one column. These instances are
274 used to generate column name headers, Django models, and
275 SQL insert statements.
276 '''
277
278 - def __init__(self, colName, colDataType, jsonToRelationProcessor):
279 '''
280 Create a ColumnSpec instance.
281 @param colName: name of column
282 @type colName: String
283 @param colDataType: data type of column (an enum)
284 @type colDataType: ColumnSpec
285 @param jsonToRelationProcessor: associated JSON to relation JSONToRelation instance
286 @type jsonToRelationProcessor: JSONToRelation
287 '''
288 self.colName = colName
289 self.colDataType = colDataType
290 self.colPos = jsonToRelationProcessor.getNextNewColPos()
291 jsonToRelationProcessor.bumpNextNewColPos()
292
294 '''
295 Return column name
296 @return: name of column
297 @rtype: String
298 '''
299 return self.colName
300
302 '''
303 Return SQL type
304 @return: SQL type of colum in upper case
305 @rtype: String
306 '''
307 return ColDataType().toString(self.colDataType).upper()
308
310 '''
311 Return string snippet to use in SQL CREATE TABLE or ALTER TABLE
312 statement
313 '''
314 return "%s %s" % (self.getName(), self.getType())
315
317 return "<Col %s: %s (position %s)>" % (self.colName,
318 self.getType(),
319 self.colPos)
320
323