METAMODEL-1165: Fixed - added default_table alias table
[metamodel.git] / elasticsearch / rest / src / main / java / org / apache / metamodel / elasticsearch / rest / ElasticSearchRestDataContext.java
1 /**
2 * Licensed to the Apache Software Foundation (ASF) under one
3 * or more contributor license agreements. See the NOTICE file
4 * distributed with this work for additional information
5 * regarding copyright ownership. The ASF licenses this file
6 * to you under the Apache License, Version 2.0 (the
7 * "License"); you may not use this file except in compliance
8 * with the License. You may obtain a copy of the License at
9 *
10 * http://www.apache.org/licenses/LICENSE-2.0
11 *
12 * Unless required by applicable law or agreed to in writing,
13 * software distributed under the License is distributed on an
14 * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15 * KIND, either express or implied. See the License for the
16 * specific language governing permissions and limitations
17 * under the License.
18 */
19 package org.apache.metamodel.elasticsearch.rest;
20
21 import java.util.ArrayList;
22 import java.util.Arrays;
23 import java.util.Collections;
24 import java.util.Comparator;
25 import java.util.List;
26 import java.util.Map;
27 import java.util.Set;
28 import java.util.stream.Collectors;
29
30 import org.apache.metamodel.BatchUpdateScript;
31 import org.apache.metamodel.DataContext;
32 import org.apache.metamodel.MetaModelException;
33 import org.apache.metamodel.QueryPostprocessDataContext;
34 import org.apache.metamodel.UpdateScript;
35 import org.apache.metamodel.UpdateSummary;
36 import org.apache.metamodel.UpdateableDataContext;
37 import org.apache.metamodel.data.DataSet;
38 import org.apache.metamodel.data.DataSetHeader;
39 import org.apache.metamodel.data.Row;
40 import org.apache.metamodel.data.SimpleDataSetHeader;
41 import org.apache.metamodel.elasticsearch.common.ElasticSearchMetaData;
42 import org.apache.metamodel.elasticsearch.common.ElasticSearchUtils;
43 import org.apache.metamodel.query.FilterItem;
44 import org.apache.metamodel.query.LogicalOperator;
45 import org.apache.metamodel.query.SelectItem;
46 import org.apache.metamodel.schema.Column;
47 import org.apache.metamodel.schema.MutableColumn;
48 import org.apache.metamodel.schema.MutableSchema;
49 import org.apache.metamodel.schema.MutableTable;
50 import org.apache.metamodel.schema.Schema;
51 import org.apache.metamodel.schema.Table;
52 import org.apache.metamodel.util.SimpleTableDef;
53 import org.elasticsearch.index.query.QueryBuilder;
54 import org.elasticsearch.index.query.QueryBuilders;
55 import org.elasticsearch.search.builder.SearchSourceBuilder;
56 import org.slf4j.Logger;
57 import org.slf4j.LoggerFactory;
58
59 import com.google.gson.JsonElement;
60 import com.google.gson.JsonObject;
61
62 import io.searchbox.client.JestClient;
63 import io.searchbox.client.JestResult;
64 import io.searchbox.core.Count;
65 import io.searchbox.core.CountResult;
66 import io.searchbox.core.Get;
67 import io.searchbox.core.Search;
68 import io.searchbox.core.SearchResult;
69 import io.searchbox.indices.mapping.GetMapping;
70 import io.searchbox.params.Parameters;
71
72 /**
73 * DataContext implementation for ElasticSearch analytics engine.
74 *
75 * ElasticSearch has a data storage structure hierarchy that briefly goes like
76 * this:
77 * <ul>
78 * <li>Index</li>
79 * <li>Document type (short: Type) (within an index)</li>
80 * <li>Documents (of a particular type)</li>
81 * </ul>
82 *
83 * When instantiating this DataContext, an index name is provided. Within this
84 * index, each document type is represented as a table.
85 *
86 * This implementation supports either automatic discovery of a schema or manual
87 * specification of a schema, through the {@link SimpleTableDef} class.
88 */
89 public class ElasticSearchRestDataContext extends QueryPostprocessDataContext implements DataContext,
90 UpdateableDataContext {
91
92 private static final Logger logger = LoggerFactory.getLogger(ElasticSearchRestDataContext.class);
93
94 public static final String FIELD_ID = "_id";
95
96 // 1 minute timeout
97 public static final String TIMEOUT_SCROLL = "1m";
98
99 // we scroll when more than 400 rows are expected
100 private static final int SCROLL_THRESHOLD = 400;
101
102 private final JestClient elasticSearchClient;
103
104 private final String indexName;
105 // Table definitions that are set from the beginning, not supposed to be
106 // changed.
107 private final List<SimpleTableDef> staticTableDefinitions;
108
109 // Table definitions that are discovered, these can change
110 private final List<SimpleTableDef> dynamicTableDefinitions = new ArrayList<>();
111
112 /**
113 * Constructs a {@link ElasticSearchRestDataContext}. This constructor
114 * accepts a custom array of {@link SimpleTableDef}s which allows the user
115 * to define his own view on the indexes in the engine.
116 *
117 * @param client
118 * the ElasticSearch client
119 * @param indexName
120 * the name of the ElasticSearch index to represent
121 * @param tableDefinitions
122 * an array of {@link SimpleTableDef}s, which define the table
123 * and column model of the ElasticSearch index.
124 */
125 public ElasticSearchRestDataContext(JestClient client, String indexName, SimpleTableDef... tableDefinitions) {
126 super(false);
127 if (client == null) {
128 throw new IllegalArgumentException("ElasticSearch Client cannot be null");
129 }
130 if (indexName == null || indexName.trim().length() == 0) {
131 throw new IllegalArgumentException("Invalid ElasticSearch Index name: " + indexName);
132 }
133 this.elasticSearchClient = client;
134 this.indexName = indexName;
135 this.staticTableDefinitions = (tableDefinitions == null || tableDefinitions.length == 0 ? Collections
136 .<SimpleTableDef> emptyList() : Arrays.asList(tableDefinitions));
137 this.dynamicTableDefinitions.addAll(Arrays.asList(detectSchema()));
138 }
139
140 /**
141 * Constructs a {@link ElasticSearchRestDataContext} and automatically
142 * detects the schema structure/view on all indexes (see
143 * {@link #detectTable(JsonObject, String)}).
144 *
145 * @param client
146 * the ElasticSearch client
147 * @param indexName
148 * the name of the ElasticSearch index to represent
149 */
150 public ElasticSearchRestDataContext(JestClient client, String indexName) {
151 this(client, indexName, new SimpleTableDef[0]);
152 }
153
154 /**
155 * Performs an analysis of the available indexes in an ElasticSearch cluster
156 * {@link JestClient} instance and detects the elasticsearch types structure
157 * based on the metadata provided by the ElasticSearch java client.
158 *
159 * @see {@link #detectTable(JsonObject, String)}
160 * @return a mutable schema instance, useful for further fine tuning by the
161 * user.
162 */
163 private SimpleTableDef[] detectSchema() {
164 logger.info("Detecting schema for index '{}'", indexName);
165
166 final JestResult jestResult;
167 try {
168 final GetMapping getMapping = new GetMapping.Builder().addIndex(indexName).build();
169 jestResult = elasticSearchClient.execute(getMapping);
170 } catch (Exception e) {
171 logger.error("Failed to retrieve mappings", e);
172 throw new MetaModelException("Failed to execute request for index information needed to detect schema", e);
173 }
174
175 if (!jestResult.isSucceeded()) {
176 logger.error("Failed to retrieve mappings; {}", jestResult.getErrorMessage());
177 throw new MetaModelException("Failed to retrieve mappings; " + jestResult.getErrorMessage());
178 }
179
180 final List<SimpleTableDef> result = new ArrayList<>();
181
182 final Set<Map.Entry<String, JsonElement>> mappings = jestResult.getJsonObject().getAsJsonObject(indexName)
183 .getAsJsonObject("mappings").entrySet();
184 if (mappings.size() == 0) {
185 logger.warn("No metadata returned for index name '{}' - no tables will be detected.");
186 } else {
187
188 for (Map.Entry<String, JsonElement> entry : mappings) {
189 final String documentType = entry.getKey();
190
191 try {
192 final SimpleTableDef table = detectTable(entry.getValue().getAsJsonObject().get("properties")
193 .getAsJsonObject(), documentType);
194 result.add(table);
195 } catch (Exception e) {
196 logger.error("Unexpected error during detectTable for document type '{}'", documentType, e);
197 }
198 }
199 }
200 final SimpleTableDef[] tableDefArray = result.toArray(new SimpleTableDef[result.size()]);
201 Arrays.sort(tableDefArray, new Comparator<SimpleTableDef>() {
202 @Override
203 public int compare(SimpleTableDef o1, SimpleTableDef o2) {
204 return o1.getName().compareTo(o2.getName());
205 }
206 });
207
208 return tableDefArray;
209 }
210
211 /**
212 * Performs an analysis of an available index type in an ElasticSearch
213 * {@link JestClient} client and tries to detect the index structure based
214 * on the metadata provided by the java client.
215 *
216 * @param metadataProperties
217 * the ElasticSearch mapping
218 * @param documentType
219 * the name of the index type
220 * @return a table definition for ElasticSearch.
221 */
222 private static SimpleTableDef detectTable(JsonObject metadataProperties, String documentType) {
223 final ElasticSearchMetaData metaData = JestElasticSearchMetaDataParser.parse(metadataProperties);
224 return new SimpleTableDef(documentType, metaData.getColumnNames(), metaData.getColumnTypes());
225 }
226
227 @Override
228 protected Schema getMainSchema() throws MetaModelException {
229 final MutableSchema theSchema = new MutableSchema(getMainSchemaName());
230 for (final SimpleTableDef tableDef : staticTableDefinitions) {
231 addTable(theSchema, tableDef);
232 }
233
234 final SimpleTableDef[] tables = detectSchema();
235 synchronized (this) {
236 dynamicTableDefinitions.clear();
237 dynamicTableDefinitions.addAll(Arrays.asList(tables));
238 for (final SimpleTableDef tableDef : dynamicTableDefinitions) {
239 final List<String> tableNames = theSchema.getTableNames();
240
241 if (!tableNames.contains(tableDef.getName())) {
242 addTable(theSchema, tableDef);
243 }
244 }
245 }
246
247 return theSchema;
248 }
249
250 private void addTable(final MutableSchema theSchema, final SimpleTableDef tableDef) {
251 final MutableTable table = tableDef.toTable().setSchema(theSchema);
252 final Column idColumn = table.getColumnByName(FIELD_ID);
253 if (idColumn != null && idColumn instanceof MutableColumn) {
254 final MutableColumn mutableColumn = (MutableColumn) idColumn;
255 mutableColumn.setPrimaryKey(true);
256 }
257 theSchema.addTable(table);
258 }
259
260 @Override
261 protected String getMainSchemaName() throws MetaModelException {
262 return indexName;
263 }
264
265 @Override
266 protected DataSet materializeMainSchemaTable(Table table, List<SelectItem> selectItems,
267 List<FilterItem> whereItems, int firstRow, int maxRows) {
268 final QueryBuilder queryBuilder = ElasticSearchUtils.createQueryBuilderForSimpleWhere(whereItems,
269 LogicalOperator.AND);
270 if (queryBuilder != null) {
271 // where clause can be pushed down to an ElasticSearch query
272 SearchSourceBuilder searchSourceBuilder = createSearchRequest(firstRow, maxRows, queryBuilder);
273 SearchResult result = executeSearch(table, searchSourceBuilder, scrollNeeded(maxRows));
274
275 return new JestElasticSearchDataSet(elasticSearchClient, result, selectItems);
276 }
277 return super.materializeMainSchemaTable(table, selectItems, whereItems, firstRow, maxRows);
278 }
279
280 private boolean scrollNeeded(int maxRows) {
281 // if either we don't know about max rows or max rows is set higher than threshold
282 return !limitMaxRowsIsSet(maxRows) || maxRows > SCROLL_THRESHOLD;
283 }
284
285 private SearchResult executeSearch(Table table, SearchSourceBuilder searchSourceBuilder, boolean scroll) {
286 Search.Builder builder = new Search.Builder(searchSourceBuilder.toString()).addIndex(getIndexName()).addType(
287 table.getName());
288 if (scroll) {
289 builder.setParameter(Parameters.SCROLL, TIMEOUT_SCROLL);
290 }
291
292 Search search = builder.build();
293 SearchResult result;
294 try {
295 result = elasticSearchClient.execute(search);
296 } catch (Exception e) {
297 logger.warn("Could not execute ElasticSearch query", e);
298 throw new MetaModelException("Could not execute ElasticSearch query", e);
299 }
300 return result;
301 }
302
303 @Override
304 protected DataSet materializeMainSchemaTable(Table table, List<Column> columns, int maxRows) {
305 SearchResult searchResult = executeSearch(table, createSearchRequest(1, maxRows, null), scrollNeeded(
306 maxRows));
307
308 return new JestElasticSearchDataSet(elasticSearchClient, searchResult, columns.stream().map(SelectItem::new).collect(Collectors.toList()));
309 }
310
311 private SearchSourceBuilder createSearchRequest(int firstRow, int maxRows, QueryBuilder queryBuilder) {
312 final SearchSourceBuilder searchRequest = new SearchSourceBuilder();
313 if (firstRow > 1) {
314 final int zeroBasedFrom = firstRow - 1;
315 searchRequest.from(zeroBasedFrom);
316 }
317 if (limitMaxRowsIsSet(maxRows)) {
318 searchRequest.size(maxRows);
319 } else {
320 searchRequest.size(Integer.MAX_VALUE);
321 }
322
323 if (queryBuilder != null) {
324 searchRequest.query(queryBuilder);
325 }
326
327 return searchRequest;
328 }
329
330 @Override
331 protected Row executePrimaryKeyLookupQuery(Table table, List<SelectItem> selectItems, Column primaryKeyColumn,
332 Object keyValue) {
333 if (keyValue == null) {
334 return null;
335 }
336
337 final String documentType = table.getName();
338 final String id = keyValue.toString();
339
340 final Get get = new Get.Builder(indexName, id).type(documentType).build();
341 final JestResult getResult = JestClientExecutor.execute(elasticSearchClient, get);
342
343 final DataSetHeader header = new SimpleDataSetHeader(selectItems);
344
345 return JestElasticSearchUtils.createRow(getResult.getJsonObject().get("_source").getAsJsonObject(), id, header);
346 }
347
348 @Override
349 protected Number executeCountQuery(Table table, List<FilterItem> whereItems, boolean functionApproximationAllowed) {
350 if (!whereItems.isEmpty()) {
351 // not supported - will have to be done by counting client-side
352 return null;
353 }
354 final String documentType = table.getName();
355 SearchSourceBuilder sourceBuilder = new SearchSourceBuilder();
356 sourceBuilder.query(QueryBuilders.termQuery("_type", documentType));
357
358 Count count = new Count.Builder().addIndex(indexName).query(sourceBuilder.toString()).build();
359
360 CountResult countResult;
361 try {
362 countResult = elasticSearchClient.execute(count);
363 } catch (Exception e) {
364 logger.warn("Could not execute ElasticSearch get query", e);
365 throw new MetaModelException("Could not execute ElasticSearch get query", e);
366 }
367
368 return countResult.getCount();
369 }
370
371 private boolean limitMaxRowsIsSet(int maxRows) {
372 return (maxRows != -1);
373 }
374
375 @Override
376 public UpdateSummary executeUpdate(UpdateScript update) {
377 final boolean isBatch = update instanceof BatchUpdateScript;
378 final JestElasticSearchUpdateCallback callback = new JestElasticSearchUpdateCallback(this, isBatch);
379 update.run(callback);
380 callback.onExecuteUpdateFinished();
381 return callback.getUpdateSummary();
382 }
383
384 /**
385 * Gets the {@link JestClient} that this {@link DataContext} is wrapping.
386 */
387 public JestClient getElasticSearchClient() {
388 return elasticSearchClient;
389 }
390
391 /**
392 * Gets the name of the index that this {@link DataContext} is working on.
393 */
394 public String getIndexName() {
395 return indexName;
396 }
397 }