METAMODEL-1165: Fixed - added default_table alias table
[metamodel.git] / elasticsearch / native / src / main / java / org / apache / metamodel / elasticsearch / nativeclient / ElasticSearchDataContext.java
1 /**
2 * Licensed to the Apache Software Foundation (ASF) under one
3 * or more contributor license agreements. See the NOTICE file
4 * distributed with this work for additional information
5 * regarding copyright ownership. The ASF licenses this file
6 * to you under the Apache License, Version 2.0 (the
7 * "License"); you may not use this file except in compliance
8 * with the License. You may obtain a copy of the License at
9 *
10 * http://www.apache.org/licenses/LICENSE-2.0
11 *
12 * Unless required by applicable law or agreed to in writing,
13 * software distributed under the License is distributed on an
14 * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15 * KIND, either express or implied. See the License for the
16 * specific language governing permissions and limitations
17 * under the License.
18 */
19 package org.apache.metamodel.elasticsearch.nativeclient;
20
21 import java.lang.reflect.Method;
22 import java.util.ArrayList;
23 import java.util.Arrays;
24 import java.util.Comparator;
25 import java.util.List;
26 import java.util.Map;
27 import java.util.stream.Collectors;
28
29 import org.apache.metamodel.DataContext;
30 import org.apache.metamodel.MetaModelException;
31 import org.apache.metamodel.QueryPostprocessDataContext;
32 import org.apache.metamodel.UpdateScript;
33 import org.apache.metamodel.UpdateSummary;
34 import org.apache.metamodel.UpdateableDataContext;
35 import org.apache.metamodel.data.DataSet;
36 import org.apache.metamodel.data.DataSetHeader;
37 import org.apache.metamodel.data.Row;
38 import org.apache.metamodel.data.SimpleDataSetHeader;
39 import org.apache.metamodel.elasticsearch.common.ElasticSearchMetaData;
40 import org.apache.metamodel.elasticsearch.common.ElasticSearchUtils;
41 import org.apache.metamodel.query.FilterItem;
42 import org.apache.metamodel.query.LogicalOperator;
43 import org.apache.metamodel.query.SelectItem;
44 import org.apache.metamodel.schema.Column;
45 import org.apache.metamodel.schema.MutableColumn;
46 import org.apache.metamodel.schema.MutableSchema;
47 import org.apache.metamodel.schema.MutableTable;
48 import org.apache.metamodel.schema.Schema;
49 import org.apache.metamodel.schema.Table;
50 import org.apache.metamodel.util.SimpleTableDef;
51 import org.elasticsearch.Version;
52 import org.elasticsearch.action.admin.cluster.state.ClusterStateRequestBuilder;
53 import org.elasticsearch.action.count.CountResponse;
54 import org.elasticsearch.action.get.GetResponse;
55 import org.elasticsearch.action.search.SearchRequestBuilder;
56 import org.elasticsearch.action.search.SearchResponse;
57 import org.elasticsearch.client.Client;
58 import org.elasticsearch.cluster.ClusterState;
59 import org.elasticsearch.cluster.metadata.IndexMetaData;
60 import org.elasticsearch.cluster.metadata.MappingMetaData;
61 import org.elasticsearch.common.collect.ImmutableOpenMap;
62 import org.elasticsearch.common.hppc.ObjectLookupContainer;
63 import org.elasticsearch.common.hppc.cursors.ObjectCursor;
64 import org.elasticsearch.common.unit.TimeValue;
65 import org.elasticsearch.index.query.QueryBuilder;
66 import org.elasticsearch.index.query.QueryBuilders;
67 import org.slf4j.Logger;
68 import org.slf4j.LoggerFactory;
69
70 /**
71 * DataContext implementation for ElasticSearch analytics engine.
72 *
73 * ElasticSearch has a data storage structure hierarchy that briefly goes like
74 * this:
75 * <ul>
76 * <li>Index</li>
77 * <li>Document type (short: Type) (within an index)</li>
78 * <li>Documents (of a particular type)</li>
79 * </ul>
80 *
81 * When instantiating this DataContext, an index name is provided. Within this
82 * index, each document type is represented as a table.
83 *
84 * This implementation supports either automatic discovery of a schema or manual
85 * specification of a schema, through the {@link SimpleTableDef} class.
86 */
87 public class ElasticSearchDataContext extends QueryPostprocessDataContext implements DataContext, UpdateableDataContext {
88
89 private static final Logger logger = LoggerFactory.getLogger(ElasticSearchDataContext.class);
90
91 public static final TimeValue TIMEOUT_SCROLL = TimeValue.timeValueSeconds(60);
92
93 private final Client elasticSearchClient;
94 private final String indexName;
95 // Table definitions that are set from the beginning, not supposed to be
96 // changed.
97 private final List<SimpleTableDef> staticTableDefinitions;
98
99 // Table definitions that are discovered, these can change
100 private final List<SimpleTableDef> dynamicTableDefinitions = new ArrayList<>();
101
102 /**
103 * Constructs a {@link ElasticSearchDataContext}. This constructor accepts a
104 * custom array of {@link SimpleTableDef}s which allows the user to define
105 * his own view on the indexes in the engine.
106 *
107 * @param client
108 * the ElasticSearch client
109 * @param indexName
110 * the name of the ElasticSearch index to represent
111 * @param tableDefinitions
112 * an array of {@link SimpleTableDef}s, which define the table
113 * and column model of the ElasticSearch index.
114 */
115 public ElasticSearchDataContext(Client client, String indexName, SimpleTableDef... tableDefinitions) {
116 super(false);
117 if (client == null) {
118 throw new IllegalArgumentException("ElasticSearch Client cannot be null");
119 }
120 if (indexName == null || indexName.trim().length() == 0) {
121 throw new IllegalArgumentException("Invalid ElasticSearch Index name: " + indexName);
122 }
123 this.elasticSearchClient = client;
124 this.indexName = indexName;
125 this.staticTableDefinitions = Arrays.asList(tableDefinitions);
126 this.dynamicTableDefinitions.addAll(Arrays.asList(detectSchema()));
127 }
128
129 /**
130 * Constructs a {@link ElasticSearchDataContext} and automatically detects
131 * the schema structure/view on all indexes (see
132 * {@link #detectTable(ClusterState, String, String)}).
133 *
134 * @param client
135 * the ElasticSearch client
136 * @param indexName
137 * the name of the ElasticSearch index to represent
138 */
139 public ElasticSearchDataContext(Client client, String indexName) {
140 this(client, indexName, new SimpleTableDef[0]);
141 }
142
143 /**
144 * Performs an analysis of the available indexes in an ElasticSearch cluster
145 * {@link Client} instance and detects the elasticsearch types structure
146 * based on the metadata provided by the ElasticSearch java client.
147 *
148 * @see {@link #detectTable(ClusterState, String, String)}
149 * @return a mutable schema instance, useful for further fine tuning by the
150 * user.
151 */
152 private SimpleTableDef[] detectSchema() {
153 logger.info("Detecting schema for index '{}'", indexName);
154
155 final ClusterState cs;
156 final ClusterStateRequestBuilder clusterStateRequestBuilder = getElasticSearchClient().admin().cluster()
157 .prepareState();
158
159 // different methods here to set the index name, so we have to use
160 // reflection :-/
161 try {
162 final byte majorVersion = Version.CURRENT.major;
163 final Object methodArgument = new String[] { indexName };
164 if (majorVersion == 0) {
165 final Method method = ClusterStateRequestBuilder.class.getMethod("setFilterIndices", String[].class);
166 method.invoke(clusterStateRequestBuilder, methodArgument);
167 } else {
168 final Method method = ClusterStateRequestBuilder.class.getMethod("setIndices", String[].class);
169 method.invoke(clusterStateRequestBuilder, methodArgument);
170 }
171 } catch (Exception e) {
172 logger.error("Failed to set index name on ClusterStateRequestBuilder, version {}", Version.CURRENT, e);
173 throw new MetaModelException("Failed to create request for index information needed to detect schema", e);
174 }
175 cs = clusterStateRequestBuilder.execute().actionGet().getState();
176
177 final List<SimpleTableDef> result = new ArrayList<>();
178
179 final IndexMetaData imd = cs.getMetaData().index(indexName);
180 if (imd == null) {
181 // index does not exist
182 logger.warn("No metadata returned for index name '{}' - no tables will be detected.");
183 } else {
184 final ImmutableOpenMap<String, MappingMetaData> mappings = imd.getMappings();
185 final ObjectLookupContainer<String> documentTypes = mappings.keys();
186
187 for (final Object documentTypeCursor : documentTypes) {
188 final String documentType = ((ObjectCursor<?>) documentTypeCursor).value.toString();
189 try {
190 final SimpleTableDef table = detectTable(cs, indexName, documentType);
191 result.add(table);
192 } catch (Exception e) {
193 logger.error("Unexpected error during detectTable for document type '{}'", documentType, e);
194 }
195 }
196 }
197 final SimpleTableDef[] tableDefArray = result.toArray(new SimpleTableDef[result.size()]);
198 Arrays.sort(tableDefArray, new Comparator<SimpleTableDef>() {
199 @Override
200 public int compare(SimpleTableDef o1, SimpleTableDef o2) {
201 return o1.getName().compareTo(o2.getName());
202 }
203 });
204
205 return tableDefArray;
206 }
207
208 /**
209 * Performs an analysis of an available index type in an ElasticSearch
210 * {@link Client} client and tries to detect the index structure based on
211 * the metadata provided by the java client.
212 *
213 * @param cs
214 * the ElasticSearch cluster
215 * @param indexName
216 * the name of the index
217 * @param documentType
218 * the name of the index type
219 * @return a table definition for ElasticSearch.
220 */
221 public static SimpleTableDef detectTable(ClusterState cs, String indexName, String documentType) throws Exception {
222 logger.debug("Detecting table for document type '{}' in index '{}'", documentType, indexName);
223 final IndexMetaData imd = cs.getMetaData().index(indexName);
224 if (imd == null) {
225 // index does not exist
226 throw new IllegalArgumentException("No such index: " + indexName);
227 }
228 final MappingMetaData mappingMetaData = imd.mapping(documentType);
229 if (mappingMetaData == null) {
230 throw new IllegalArgumentException("No such document type in index '" + indexName + "': " + documentType);
231 }
232 final Map<String, Object> mp = mappingMetaData.getSourceAsMap();
233 final Object metadataProperties = mp.get("properties");
234 if (metadataProperties != null && metadataProperties instanceof Map) {
235 @SuppressWarnings("unchecked")
236 final Map<String, ?> metadataPropertiesMap = (Map<String, ?>) metadataProperties;
237 final ElasticSearchMetaData metaData = ElasticSearchMetaDataParser.parse(metadataPropertiesMap);
238 final SimpleTableDef std = new SimpleTableDef(documentType, metaData.getColumnNames(),
239 metaData.getColumnTypes());
240 return std;
241 }
242 throw new IllegalArgumentException("No mapping properties defined for document type '" + documentType
243 + "' in index: " + indexName);
244 }
245
246 @Override
247 protected Schema getMainSchema() throws MetaModelException {
248 final MutableSchema theSchema = new MutableSchema(getMainSchemaName());
249 for (final SimpleTableDef tableDef : staticTableDefinitions) {
250 addTable(theSchema, tableDef);
251 }
252
253 final SimpleTableDef[] tables = detectSchema();
254 synchronized (this) {
255 dynamicTableDefinitions.clear();
256 dynamicTableDefinitions.addAll(Arrays.asList(tables));
257 for (final SimpleTableDef tableDef : dynamicTableDefinitions) {
258 final List<String> tableNames = theSchema.getTableNames();
259
260 if (!tableNames.contains(tableDef.getName())) {
261 addTable(theSchema, tableDef);
262 }
263 }
264 }
265
266 return theSchema;
267 }
268
269 private void addTable(final MutableSchema theSchema, final SimpleTableDef tableDef) {
270 final MutableTable table = tableDef.toTable().setSchema(theSchema);
271 final Column idColumn = table.getColumnByName(ElasticSearchUtils.FIELD_ID);
272 if (idColumn != null && idColumn instanceof MutableColumn) {
273 final MutableColumn mutableColumn = (MutableColumn) idColumn;
274 mutableColumn.setPrimaryKey(true);
275 }
276 theSchema.addTable(table);
277 }
278
279 @Override
280 protected String getMainSchemaName() throws MetaModelException {
281 return indexName;
282 }
283
284 @Override
285 protected DataSet materializeMainSchemaTable(Table table, List<SelectItem> selectItems,
286 List<FilterItem> whereItems, int firstRow, int maxRows) {
287 final QueryBuilder queryBuilder = ElasticSearchUtils.createQueryBuilderForSimpleWhere(whereItems,
288 LogicalOperator.AND);
289 if (queryBuilder != null) {
290 // where clause can be pushed down to an ElasticSearch query
291 final SearchRequestBuilder searchRequest = createSearchRequest(table, firstRow, maxRows, queryBuilder);
292 final SearchResponse response = searchRequest.execute().actionGet();
293 return new ElasticSearchDataSet(elasticSearchClient, response, selectItems, false);
294 }
295 return super.materializeMainSchemaTable(table, selectItems, whereItems, firstRow, maxRows);
296 }
297
298 @Override
299 protected DataSet materializeMainSchemaTable(Table table, List<Column> columns, int maxRows) {
300 final SearchRequestBuilder searchRequest = createSearchRequest(table, 1, maxRows, null);
301 final SearchResponse response = searchRequest.execute().actionGet();
302 return new ElasticSearchDataSet(elasticSearchClient, response, columns.stream().map(SelectItem::new).collect(Collectors.toList()), false);
303 }
304
305 private SearchRequestBuilder createSearchRequest(Table table, int firstRow, int maxRows, QueryBuilder queryBuilder) {
306 final String documentType = table.getName();
307 final SearchRequestBuilder searchRequest = elasticSearchClient.prepareSearch(indexName).setTypes(documentType);
308 if (firstRow > 1) {
309 final int zeroBasedFrom = firstRow - 1;
310 searchRequest.setFrom(zeroBasedFrom);
311 }
312 if (limitMaxRowsIsSet(maxRows)) {
313 searchRequest.setSize(maxRows);
314 } else {
315 searchRequest.setScroll(TIMEOUT_SCROLL);
316 }
317
318 if (queryBuilder != null) {
319 searchRequest.setQuery(queryBuilder);
320 }
321
322 return searchRequest;
323 }
324
325 @Override
326 protected Row executePrimaryKeyLookupQuery(Table table, List<SelectItem> selectItems, Column primaryKeyColumn,
327 Object keyValue) {
328 if (keyValue == null) {
329 return null;
330 }
331
332 final String documentType = table.getName();
333 final String id = keyValue.toString();
334
335 final GetResponse response = elasticSearchClient.prepareGet(indexName, documentType, id).execute().actionGet();
336
337 if (!response.isExists()) {
338 return null;
339 }
340
341 final Map<String, Object> source = response.getSource();
342 final String documentId = response.getId();
343
344 final DataSetHeader header = new SimpleDataSetHeader(selectItems);
345
346 return NativeElasticSearchUtils.createRow(source, documentId, header);
347 }
348
349 @Override
350 protected Number executeCountQuery(Table table, List<FilterItem> whereItems, boolean functionApproximationAllowed) {
351 if (!whereItems.isEmpty()) {
352 // not supported - will have to be done by counting client-side
353 return null;
354 }
355 final String documentType = table.getName();
356 final CountResponse response = elasticSearchClient.prepareCount(indexName)
357 .setQuery(QueryBuilders.termQuery("_type", documentType)).execute().actionGet();
358 return response.getCount();
359 }
360
361 private boolean limitMaxRowsIsSet(int maxRows) {
362 return (maxRows != -1);
363 }
364
365 @Override
366 public UpdateSummary executeUpdate(UpdateScript update) {
367 final ElasticSearchUpdateCallback callback = new ElasticSearchUpdateCallback(this);
368 update.run(callback);
369 callback.onExecuteUpdateFinished();
370 return callback.getUpdateSummary();
371 }
372
373 /**
374 * Gets the {@link Client} that this {@link DataContext} is wrapping.
375 *
376 * @return
377 */
378 public Client getElasticSearchClient() {
379 return elasticSearchClient;
380 }
381
382 /**
383 * Gets the name of the index that this {@link DataContext} is working on.
384 *
385 * @return
386 */
387 public String getIndexName() {
388 return indexName;
389 }
390 }