METAMODEL-1165: Fixed - added default_table alias table
[metamodel.git] / mongodb / mongo2 / src / main / java / org / apache / metamodel / mongodb / mongo2 / MongoDbDataContext.java
1 /**
2 * Licensed to the Apache Software Foundation (ASF) under one
3 * or more contributor license agreements. See the NOTICE file
4 * distributed with this work for additional information
5 * regarding copyright ownership. The ASF licenses this file
6 * to you under the Apache License, Version 2.0 (the
7 * "License"); you may not use this file except in compliance
8 * with the License. You may obtain a copy of the License at
9 *
10 * http://www.apache.org/licenses/LICENSE-2.0
11 *
12 * Unless required by applicable law or agreed to in writing,
13 * software distributed under the License is distributed on an
14 * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15 * KIND, either express or implied. See the License for the
16 * specific language governing permissions and limitations
17 * under the License.
18 */
19 package org.apache.metamodel.mongodb.mongo2;
20
21 import java.util.ArrayList;
22 import java.util.HashSet;
23 import java.util.List;
24 import java.util.Map.Entry;
25 import java.util.Set;
26 import java.util.SortedMap;
27 import java.util.TreeMap;
28 import java.util.regex.Pattern;
29 import java.util.stream.Collectors;
30
31 import org.apache.metamodel.DataContext;
32 import org.apache.metamodel.MetaModelException;
33 import org.apache.metamodel.QueryPostprocessDataContext;
34 import org.apache.metamodel.UpdateScript;
35 import org.apache.metamodel.UpdateSummary;
36 import org.apache.metamodel.UpdateableDataContext;
37 import org.apache.metamodel.data.DataSet;
38 import org.apache.metamodel.data.DataSetHeader;
39 import org.apache.metamodel.data.InMemoryDataSet;
40 import org.apache.metamodel.data.Row;
41 import org.apache.metamodel.data.SimpleDataSetHeader;
42 import org.apache.metamodel.mongodb.common.MongoDBUtils;
43 import org.apache.metamodel.query.FilterItem;
44 import org.apache.metamodel.query.FromItem;
45 import org.apache.metamodel.query.OperatorType;
46 import org.apache.metamodel.query.Query;
47 import org.apache.metamodel.query.SelectItem;
48 import org.apache.metamodel.schema.Column;
49 import org.apache.metamodel.schema.ColumnType;
50 import org.apache.metamodel.schema.ColumnTypeImpl;
51 import org.apache.metamodel.schema.MutableColumn;
52 import org.apache.metamodel.schema.MutableSchema;
53 import org.apache.metamodel.schema.MutableTable;
54 import org.apache.metamodel.schema.Schema;
55 import org.apache.metamodel.schema.Table;
56 import org.apache.metamodel.util.SimpleTableDef;
57 import org.bson.types.ObjectId;
58 import org.slf4j.Logger;
59 import org.slf4j.LoggerFactory;
60
61 import com.mongodb.BasicDBList;
62 import com.mongodb.BasicDBObject;
63 import com.mongodb.DB;
64 import com.mongodb.DBCollection;
65 import com.mongodb.DBCursor;
66 import com.mongodb.DBObject;
67 import com.mongodb.WriteConcern;
68
69 /**
70 * DataContext implementation for MongoDB.
71 *
72 * Since MongoDB has no schema, a virtual schema will be used in this
73 * DataContext. This implementation supports either automatic discovery of a
74 * schema or manual specification of a schema, through the
75 * {@link MongoDbTableDef} class.
76 */
77 public class MongoDbDataContext extends QueryPostprocessDataContext implements UpdateableDataContext {
78
79 private static final Logger logger = LoggerFactory.getLogger(MongoDbDataSet.class);
80
81 private final DB _mongoDb;
82 private final SimpleTableDef[] _tableDefs;
83 private WriteConcernAdvisor _writeConcernAdvisor;
84 private Schema _schema;
85
86 /**
87 * Constructs a {@link MongoDbDataContext}. This constructor accepts a
88 * custom array of {@link MongoDbTableDef}s which allows the user to define
89 * his own view on the collections in the database.
90 *
91 * @param mongoDb
92 * the mongo db connection
93 * @param tableDefs
94 * an array of {@link MongoDbTableDef}s, which define the table
95 * and column model of the mongo db collections. (consider using
96 * {@link #detectSchema(DB)} or {@link #detectTable(DB, String)}
97 * ).
98 */
99 public MongoDbDataContext(DB mongoDb, SimpleTableDef... tableDefs) {
100 super(false);
101 _mongoDb = mongoDb;
102 _tableDefs = tableDefs;
103 _schema = null;
104 }
105
106 /**
107 * Constructs a {@link MongoDbDataContext} and automatically detects the
108 * schema structure/view on all collections (see {@link #detectSchema(DB)}).
109 *
110 * @param mongoDb
111 * the mongo db connection
112 */
113 public MongoDbDataContext(DB mongoDb) {
114 this(mongoDb, detectSchema(mongoDb));
115 }
116
117 /**
118 * Performs an analysis of the available collections in a Mongo {@link DB}
119 * instance and tries to detect the table's structure based on the first
120 * 1000 documents in each collection.
121 *
122 * @param db
123 * the mongo db to inspect
124 * @return a mutable schema instance, useful for further fine tuning by the
125 * user.
126 * @see #detectTable(DB, String)
127 */
128 public static SimpleTableDef[] detectSchema(DB db) {
129 Set<String> collectionNames = db.getCollectionNames();
130 SimpleTableDef[] result = new SimpleTableDef[collectionNames.size()];
131 int i = 0;
132 for (String collectionName : collectionNames) {
133 SimpleTableDef table = detectTable(db, collectionName);
134 result[i] = table;
135 i++;
136 }
137 return result;
138 }
139
140 /**
141 * Performs an analysis of an available collection in a Mongo {@link DB}
142 * instance and tries to detect the table structure based on the first 1000
143 * documents in the collection.
144 *
145 * @param db
146 * the mongo DB
147 * @param collectionName
148 * the name of the collection
149 * @return a table definition for mongo db.
150 */
151 public static SimpleTableDef detectTable(DB db, String collectionName) {
152 final DBCollection collection = db.getCollection(collectionName);
153 final DBCursor cursor = collection.find().limit(1000);
154
155 final SortedMap<String, Set<Class<?>>> columnsAndTypes = new TreeMap<String, Set<Class<?>>>();
156 while (cursor.hasNext()) {
157 DBObject object = cursor.next();
158 Set<String> keysInObject = object.keySet();
159 for (String key : keysInObject) {
160 Set<Class<?>> types = columnsAndTypes.get(key);
161 if (types == null) {
162 types = new HashSet<Class<?>>();
163 columnsAndTypes.put(key, types);
164 }
165 Object value = object.get(key);
166 if (value != null) {
167 types.add(value.getClass());
168 }
169 }
170 }
171 cursor.close();
172
173 final String[] columnNames = new String[columnsAndTypes.size()];
174 final ColumnType[] columnTypes = new ColumnType[columnsAndTypes.size()];
175
176 int i = 0;
177 for (Entry<String, Set<Class<?>>> columnAndTypes : columnsAndTypes.entrySet()) {
178 final String columnName = columnAndTypes.getKey();
179 final Set<Class<?>> columnTypeSet = columnAndTypes.getValue();
180 final Class<?> columnType;
181 if (columnTypeSet.size() == 1) {
182 columnType = columnTypeSet.iterator().next();
183 } else {
184 columnType = Object.class;
185 }
186 columnNames[i] = columnName;
187 if (columnType == ObjectId.class) {
188 columnTypes[i] = ColumnType.ROWID;
189 } else {
190 columnTypes[i] = ColumnTypeImpl.convertColumnType(columnType);
191 }
192 i++;
193 }
194
195 return new SimpleTableDef(collectionName, columnNames, columnTypes);
196 }
197
198 @Override
199 protected Schema getMainSchema() throws MetaModelException {
200 if (_schema == null) {
201 MutableSchema schema = new MutableSchema(getMainSchemaName());
202 for (SimpleTableDef tableDef : _tableDefs) {
203
204 MutableTable table = tableDef.toTable().setSchema(schema);
205 for (Column column : table.getColumnsOfType(ColumnType.ROWID)) {
206 if (column instanceof MutableColumn) {
207 ((MutableColumn) column).setPrimaryKey(true);
208 }
209 }
210
211 schema.addTable(table);
212 }
213
214 _schema = schema;
215 }
216 return _schema;
217 }
218
219 @Override
220 protected String getMainSchemaName() throws MetaModelException {
221 return _mongoDb.getName();
222 }
223
224 @Override
225 protected Number executeCountQuery(Table table, List<FilterItem> whereItems, boolean functionApproximationAllowed) {
226 final DBCollection collection = _mongoDb.getCollection(table.getName());
227
228 final DBObject query = createMongoDbQuery(table, whereItems);
229
230 logger.info("Executing MongoDB 'count' query: {}", query);
231 final long count = collection.count(query);
232
233 return count;
234 }
235
236 @Override
237 protected Row executePrimaryKeyLookupQuery(Table table, List<SelectItem> selectItems, Column primaryKeyColumn,
238 Object keyValue) {
239 final DBCollection collection = _mongoDb.getCollection(table.getName());
240
241 List<FilterItem> whereItems = new ArrayList<FilterItem>();
242 SelectItem selectItem = new SelectItem(primaryKeyColumn);
243 FilterItem primaryKeyWhereItem = new FilterItem(selectItem, OperatorType.EQUALS_TO, keyValue);
244 whereItems.add(primaryKeyWhereItem);
245 final DBObject query = createMongoDbQuery(table, whereItems);
246 final DBObject resultDBObject = collection.findOne(query);
247
248 DataSetHeader header = new SimpleDataSetHeader(selectItems);
249
250 Row row = MongoDBUtils.toRow(resultDBObject, header);
251
252 return row;
253 }
254
255 @Override
256 public DataSet executeQuery(Query query) {
257 // Check for queries containing only simple selects and where clauses,
258 // or if it is a COUNT(*) query.
259
260 // if from clause only contains a main schema table
261 List<FromItem> fromItems = query.getFromClause().getItems();
262 if (fromItems.size() == 1 && fromItems.get(0).getTable() != null
263 && fromItems.get(0).getTable().getSchema() == _schema) {
264 final Table table = fromItems.get(0).getTable();
265
266 // if GROUP BY, HAVING and ORDER BY clauses are not specified
267 if (query.getGroupByClause().isEmpty() && query.getHavingClause().isEmpty()
268 && query.getOrderByClause().isEmpty()) {
269
270 final List<FilterItem> whereItems = query.getWhereClause().getItems();
271
272 // if all of the select items are "pure" column selection
273 boolean allSelectItemsAreColumns = true;
274 List<SelectItem> selectItems = query.getSelectClause().getItems();
275
276 // if it is a
277 // "SELECT [columns] FROM [table] WHERE [conditions]"
278 // query.
279 for (SelectItem selectItem : selectItems) {
280 if (selectItem.hasFunction() || selectItem.getColumn() == null) {
281 allSelectItemsAreColumns = false;
282 break;
283 }
284 }
285
286 if (allSelectItemsAreColumns) {
287 logger.debug("Query can be expressed in full MongoDB, no post processing needed.");
288
289 // prepare for a non-post-processed query
290
291
292 // checking if the query is a primary key lookup query
293 if (whereItems.size() == 1) {
294 final FilterItem whereItem = whereItems.get(0);
295 final SelectItem selectItem = whereItem.getSelectItem();
296 if (!whereItem.isCompoundFilter() && selectItem != null && selectItem.getColumn() != null) {
297 final Column column = selectItem.getColumn();
298 if (column.isPrimaryKey() && OperatorType.EQUALS_TO.equals(whereItem.getOperator())) {
299 logger.debug("Query is a primary key lookup query. Trying executePrimaryKeyLookupQuery(...)");
300 final Object operand = whereItem.getOperand();
301 final Row row = executePrimaryKeyLookupQuery(table, selectItems, column, operand);
302 if (row == null) {
303 logger.debug("DataContext did not return any primary key lookup query results. Proceeding "
304 + "with manual lookup.");
305 } else {
306 final DataSetHeader header = new SimpleDataSetHeader(selectItems);
307 return new InMemoryDataSet(header, row);
308 }
309 }
310 }
311 }
312
313 int firstRow = (query.getFirstRow() == null ? 1 : query.getFirstRow());
314 int maxRows = (query.getMaxRows() == null ? -1 : query.getMaxRows());
315 boolean thereIsAtLeastOneAlias = false;
316
317 for (SelectItem selectItem : selectItems) {
318 if (selectItem.getAlias() != null) {
319 thereIsAtLeastOneAlias = true;
320 break;
321 }
322 }
323
324 if (thereIsAtLeastOneAlias) {
325 final DataSet dataSet = materializeMainSchemaTableInternal(
326 table,
327 selectItems,
328 whereItems,
329 firstRow,
330 maxRows, false);
331 return dataSet;
332 } else {
333 final DataSet dataSet = materializeMainSchemaTableInternal(table, selectItems, whereItems, firstRow,
334 maxRows, false);
335 return dataSet;
336 }
337 }
338 }
339 }
340
341 logger.debug("Query will be simplified for MongoDB and post processed.");
342 return super.executeQuery(query);
343 }
344
345
346
347 private DataSet materializeMainSchemaTableInternal(Table table, List<SelectItem> selectItems,
348 List<FilterItem> whereItems, int firstRow, int maxRows, boolean queryPostProcessed) {
349 DBCursor cursor = getCursor(table, whereItems, firstRow, maxRows);
350
351 return new MongoDbDataSet(cursor, selectItems, queryPostProcessed);
352 }
353
354 private DBCursor getCursor(Table table, List<FilterItem> whereItems, int firstRow, int maxRows) {
355 final DBCollection collection = _mongoDb.getCollection(table.getName());
356
357 final DBObject query = createMongoDbQuery(table, whereItems);
358
359 logger.info("Executing MongoDB 'find' query: {}", query);
360 DBCursor cursor = collection.find(query);
361
362 if (maxRows > 0) {
363 cursor = cursor.limit(maxRows);
364 }
365 if (firstRow > 1) {
366 final int skip = firstRow - 1;
367 cursor = cursor.skip(skip);
368 }
369 return cursor;
370 }
371
372 protected BasicDBObject createMongoDbQuery(Table table, List<FilterItem> whereItems) {
373 assert _schema == table.getSchema();
374
375 final BasicDBObject query = new BasicDBObject();
376 if (whereItems != null && !whereItems.isEmpty()) {
377 for (FilterItem item : whereItems) {
378 convertToCursorObject(query, item);
379 }
380 }
381
382 return query;
383 }
384
385 private void convertToCursorObject(BasicDBObject query, FilterItem item) {
386 if (item.isCompoundFilter()) {
387
388 BasicDBList orList = new BasicDBList();
389
390 final FilterItem[] childItems = item.getChildItems();
391 for (FilterItem childItem : childItems) {
392 BasicDBObject childObject = new BasicDBObject();
393 convertToCursorObject(childObject, childItem);
394 orList.add(childObject);
395 }
396
397 query.put("$or", orList);
398
399 } else {
400
401 final Column column = item.getSelectItem().getColumn();
402 final String columnName = column.getName();
403 final String operatorName = getOperatorName(item);
404
405 Object operand = item.getOperand();
406 if (ObjectId.isValid(String.valueOf(operand))) {
407 operand = new ObjectId(String.valueOf(operand));
408 }
409
410 final BasicDBObject existingFilterObject = (BasicDBObject) query.get(columnName);
411 if (existingFilterObject == null) {
412 if (operatorName == null) {
413 if (OperatorType.LIKE.equals(item.getOperator())) {
414 query.put(columnName, turnOperandIntoRegExp(operand));
415 } else {
416 query.put(columnName, operand);
417 }
418 } else {
419 query.put(columnName, new BasicDBObject(operatorName, operand));
420 }
421 } else {
422 if (operatorName == null) {
423 throw new IllegalStateException("Cannot retrieve records for a column with two EQUALS_TO operators");
424 } else {
425 existingFilterObject.append(operatorName, operand);
426 }
427 }
428 }
429 }
430
431 private String getOperatorName(FilterItem item) {
432 final OperatorType operator = item.getOperator();
433
434 if (OperatorType.EQUALS_TO.equals(operator)) {
435 return null;
436 }
437 if (OperatorType.LIKE.equals(operator)) {
438 return null;
439 }
440 if (OperatorType.LESS_THAN.equals(operator)) {
441 return "$lt";
442 }
443 if (OperatorType.LESS_THAN_OR_EQUAL.equals(operator)) {
444 return "$lte";
445 }
446 if (OperatorType.GREATER_THAN.equals(operator)) {
447 return "$gt";
448 }
449 if (OperatorType.GREATER_THAN_OR_EQUAL.equals(operator)) {
450 return "$gte";
451 }
452 if (OperatorType.DIFFERENT_FROM.equals(operator)) {
453 return "$ne";
454 }
455 if (OperatorType.IN.equals(operator)) {
456 return "$in";
457 }
458
459 throw new IllegalStateException("Unsupported operator type: " + operator);
460 }
461
462 private Pattern turnOperandIntoRegExp(Object operand) {
463 StringBuilder operandAsRegExp = new StringBuilder(replaceWildCardLikeChars(operand.toString()));
464 operandAsRegExp.insert(0, "^").append("$");
465 return Pattern.compile(operandAsRegExp.toString(), Pattern.CASE_INSENSITIVE);
466 }
467
468 private String replaceWildCardLikeChars(String operand) {
469 return operand.replaceAll("%", ".*");
470 }
471
472 @Override
473 protected DataSet materializeMainSchemaTable(Table table, List<Column> columns, int maxRows) {
474 return materializeMainSchemaTableInternal(
475 table,
476 columns.stream().map(SelectItem::new).collect(Collectors.toList()),
477 null,
478 1,
479 maxRows,
480 true);
481 }
482
483 @Override
484 protected DataSet materializeMainSchemaTable(Table table, List<Column> columns, int firstRow, int maxRows) {
485 return materializeMainSchemaTableInternal(
486 table,
487 columns.stream().map(SelectItem::new).collect(Collectors.toList()),
488 null,
489 firstRow,
490 maxRows,
491 true);
492 }
493
494 /**
495 * Executes an update with a specific {@link WriteConcernAdvisor}.
496 */
497 public UpdateSummary executeUpdate(UpdateScript update, WriteConcernAdvisor writeConcernAdvisor) {
498 MongoDbUpdateCallback callback = new MongoDbUpdateCallback(this, writeConcernAdvisor);
499 try {
500 update.run(callback);
501 } finally {
502 callback.close();
503 }
504 return callback.getUpdateSummary();
505 }
506
507 /**
508 * Executes an update with a specific {@link WriteConcern}.
509 */
510 public UpdateSummary executeUpdate(UpdateScript update, WriteConcern writeConcern) {
511 return executeUpdate(update, new SimpleWriteConcernAdvisor(writeConcern));
512 }
513
514 @Override
515 public UpdateSummary executeUpdate(UpdateScript update) {
516 return executeUpdate(update, getWriteConcernAdvisor());
517 }
518
519 /**
520 * Gets the {@link WriteConcernAdvisor} to use on
521 * {@link #executeUpdate(UpdateScript)} calls.
522 */
523 public WriteConcernAdvisor getWriteConcernAdvisor() {
524 if (_writeConcernAdvisor == null) {
525 return new DefaultWriteConcernAdvisor();
526 }
527 return _writeConcernAdvisor;
528 }
529
530 /**
531 * Sets a global {@link WriteConcern} advisor to use on
532 * {@link #executeUpdate(UpdateScript)}.
533 */
534 public void setWriteConcernAdvisor(WriteConcernAdvisor writeConcernAdvisor) {
535 _writeConcernAdvisor = writeConcernAdvisor;
536 }
537
538 /**
539 * Gets the {@link DB} instance that this {@link DataContext} is backed by.
540 */
541 public DB getMongoDb() {
542 return _mongoDb;
543 }
544
545 protected void addTable(MutableTable table) {
546 if (_schema instanceof MutableSchema) {
547 MutableSchema mutableSchema = (MutableSchema) _schema;
548 mutableSchema.addTable(table);
549 } else {
550 throw new UnsupportedOperationException("Schema is not mutable");
551 }
552 }
553 }