METAMODEL-1165: Fixed - added default_table alias table
[metamodel.git] / dynamodb / src / main / java / org / apache / metamodel / dynamodb / DynamoDbDataContext.java
1 /**
2 * Licensed to the Apache Software Foundation (ASF) under one
3 * or more contributor license agreements. See the NOTICE file
4 * distributed with this work for additional information
5 * regarding copyright ownership. The ASF licenses this file
6 * to you under the Apache License, Version 2.0 (the
7 * "License"); you may not use this file except in compliance
8 * with the License. You may obtain a copy of the License at
9 *
10 * http://www.apache.org/licenses/LICENSE-2.0
11 *
12 * Unless required by applicable law or agreed to in writing,
13 * software distributed under the License is distributed on an
14 * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15 * KIND, either express or implied. See the License for the
16 * specific language governing permissions and limitations
17 * under the License.
18 */
19 package org.apache.metamodel.dynamodb;
20
21 import java.io.Closeable;
22 import java.util.ArrayList;
23 import java.util.Collections;
24 import java.util.HashMap;
25 import java.util.List;
26 import java.util.Map;
27 import java.util.stream.Collectors;
28
29 import org.apache.metamodel.MetaModelException;
30 import org.apache.metamodel.QueryPostprocessDataContext;
31 import org.apache.metamodel.UpdateScript;
32 import org.apache.metamodel.UpdateSummary;
33 import org.apache.metamodel.UpdateableDataContext;
34 import org.apache.metamodel.data.DataSet;
35 import org.apache.metamodel.data.DefaultRow;
36 import org.apache.metamodel.data.Row;
37 import org.apache.metamodel.data.SimpleDataSetHeader;
38 import org.apache.metamodel.query.FilterItem;
39 import org.apache.metamodel.query.SelectItem;
40 import org.apache.metamodel.schema.Column;
41 import org.apache.metamodel.schema.ColumnType;
42 import org.apache.metamodel.schema.MutableColumn;
43 import org.apache.metamodel.schema.MutableSchema;
44 import org.apache.metamodel.schema.MutableTable;
45 import org.apache.metamodel.schema.Schema;
46 import org.apache.metamodel.schema.Table;
47 import org.apache.metamodel.util.SimpleTableDef;
48
49 import com.amazonaws.services.dynamodbv2.AmazonDynamoDB;
50 import com.amazonaws.services.dynamodbv2.AmazonDynamoDBClientBuilder;
51 import com.amazonaws.services.dynamodbv2.model.AttributeDefinition;
52 import com.amazonaws.services.dynamodbv2.model.AttributeValue;
53 import com.amazonaws.services.dynamodbv2.model.DescribeTableResult;
54 import com.amazonaws.services.dynamodbv2.model.GetItemRequest;
55 import com.amazonaws.services.dynamodbv2.model.GetItemResult;
56 import com.amazonaws.services.dynamodbv2.model.GlobalSecondaryIndexDescription;
57 import com.amazonaws.services.dynamodbv2.model.KeySchemaElement;
58 import com.amazonaws.services.dynamodbv2.model.ListTablesResult;
59 import com.amazonaws.services.dynamodbv2.model.LocalSecondaryIndexDescription;
60 import com.amazonaws.services.dynamodbv2.model.ScanRequest;
61 import com.amazonaws.services.dynamodbv2.model.ScanResult;
62 import com.amazonaws.services.dynamodbv2.model.TableDescription;
63
64 /**
65 * DataContext implementation for Amazon DynamoDB.
66 */
67 public class DynamoDbDataContext extends QueryPostprocessDataContext implements UpdateableDataContext, Closeable {
68
69 /**
70 * System property key used for getting the read throughput capacity when
71 * creating new tables. Defaults to 5.
72 */
73 public static final String SYSTEM_PROPERTY_THROUGHPUT_READ_CAPACITY = "metamodel.dynamodb.throughput.capacity.read";
74
75 /**
76 * System property key used for getting the write throughput capacity when
77 * creating new tables. Defaults to 5.
78 */
79 public static final String SYSTEM_PROPERTY_THROUGHPUT_WRITE_CAPACITY = "metamodel.dynamodb.throughput.capacity.write";
80
81 /**
82 * The artificial schema name used by this DataContext.
83 */
84 public static final String SCHEMA_NAME = "public";
85
86 private final AmazonDynamoDB _dynamoDb;
87 private final boolean _shutdownOnClose;
88 private final SimpleTableDef[] _tableDefs;
89
90 public DynamoDbDataContext() {
91 this(AmazonDynamoDBClientBuilder.defaultClient(), null, true);
92 }
93
94 public DynamoDbDataContext(SimpleTableDef[] tableDefs) {
95 this(AmazonDynamoDBClientBuilder.defaultClient(), tableDefs, true);
96 }
97
98 public DynamoDbDataContext(AmazonDynamoDB client) {
99 this(client, null, false);
100 }
101
102 public DynamoDbDataContext(AmazonDynamoDB client, SimpleTableDef[] tableDefs) {
103 this(client, tableDefs, false);
104 }
105
106 private DynamoDbDataContext(AmazonDynamoDB client, SimpleTableDef[] tableDefs, boolean shutdownOnClose) {
107 super(false);
108 _dynamoDb = client;
109 _tableDefs = (tableDefs == null ? new SimpleTableDef[0] : tableDefs);
110 _shutdownOnClose = shutdownOnClose;
111 }
112
113 public AmazonDynamoDB getDynamoDb() {
114 return _dynamoDb;
115 }
116
117 @Override
118 public void close() {
119 if (_shutdownOnClose) {
120 _dynamoDb.shutdown();
121 }
122 }
123
124 @Override
125 protected Schema getMainSchema() throws MetaModelException {
126 final Map<String, SimpleTableDef> tableDefs = new HashMap<>();
127 for (final SimpleTableDef tableDef : _tableDefs) {
128 tableDefs.put(tableDef.getName(), tableDef);
129 }
130
131 final MutableSchema schema = new MutableSchema(getMainSchemaName());
132 final ListTablesResult tables = _dynamoDb.listTables();
133 final List<String> tableNames = tables.getTableNames();
134 for (final String tableName : tableNames) {
135 final MutableTable table = new MutableTable(tableName, schema);
136 schema.addTable(table);
137
138 final DescribeTableResult descripeTableResult = _dynamoDb.describeTable(tableName);
139 final TableDescription tableDescription = descripeTableResult.getTable();
140
141 // add primary keys
142 addColumnFromKeySchema("Primary index", tableDescription.getKeySchema(), table, true);
143
144 // add attributes from global and local indices
145 final List<GlobalSecondaryIndexDescription> globalSecondaryIndexes = tableDescription
146 .getGlobalSecondaryIndexes();
147 if (globalSecondaryIndexes != null) {
148 for (final GlobalSecondaryIndexDescription globalSecondaryIndex : globalSecondaryIndexes) {
149 addColumnFromKeySchema(globalSecondaryIndex.getIndexName(), globalSecondaryIndex.getKeySchema(),
150 table, false);
151 }
152 }
153 final List<LocalSecondaryIndexDescription> localSecondaryIndexes = tableDescription
154 .getLocalSecondaryIndexes();
155 if (localSecondaryIndexes != null) {
156 for (final LocalSecondaryIndexDescription localSecondaryIndex : localSecondaryIndexes) {
157 addColumnFromKeySchema(localSecondaryIndex.getIndexName(), localSecondaryIndex.getKeySchema(),
158 table, false);
159 }
160 }
161
162 // add top-level attribute definitions
163 final List<AttributeDefinition> attributeDefinitions = tableDescription.getAttributeDefinitions();
164 for (final AttributeDefinition attributeDefinition : attributeDefinitions) {
165 final String attributeName = attributeDefinition.getAttributeName();
166 MutableColumn column = (MutableColumn) table.getColumnByName(attributeName);
167 if (column == null) {
168 column = new MutableColumn(attributeName, table);
169 table.addColumn(column);
170 }
171 final String attributeType = attributeDefinition.getAttributeType();
172 column.setType(DynamoDbUtils.toColumnType(attributeName, attributeType));
173 column.setIndexed(true);
174 column.setNativeType(attributeType);
175 }
176
177 // add additional metadata from SimpleTableDefs if available
178 final SimpleTableDef tableDef = tableDefs.get(tableName);
179 if (tableDef != null) {
180 final String[] columnNames = tableDef.getColumnNames();
181 final ColumnType[] columnTypes = tableDef.getColumnTypes();
182 for (int i = 0; i < columnNames.length; i++) {
183 final String columnName = columnNames[i];
184 final ColumnType columnType = columnTypes[i];
185 MutableColumn column = (MutableColumn) table.getColumnByName(columnName);
186 if (column == null) {
187 column = new MutableColumn(columnName, table);
188 table.addColumn(column);
189 }
190 if (column.getType() == null && columnType != null) {
191 column.setType(columnType);
192 }
193 }
194 }
195
196 // add additional attributes based on global and local indices
197 if (globalSecondaryIndexes != null) {
198 for (final GlobalSecondaryIndexDescription globalSecondaryIndex : globalSecondaryIndexes) {
199 final List<String> nonKeyAttributes = globalSecondaryIndex.getProjection().getNonKeyAttributes();
200 for (final String attributeName : nonKeyAttributes) {
201 addColumnFromNonKeyAttribute(globalSecondaryIndex.getIndexName(), table, attributeName);
202 }
203 }
204 }
205 if (localSecondaryIndexes != null) {
206 for (final LocalSecondaryIndexDescription localSecondaryIndex : localSecondaryIndexes) {
207 final List<String> nonKeyAttributes = localSecondaryIndex.getProjection().getNonKeyAttributes();
208 for (final String attributeName : nonKeyAttributes) {
209 addColumnFromNonKeyAttribute(localSecondaryIndex.getIndexName(), table, attributeName);
210 }
211 }
212 }
213 }
214 return schema;
215 }
216
217 private void addColumnFromNonKeyAttribute(String indexName, MutableTable table, String attributeName) {
218 MutableColumn column = (MutableColumn) table.getColumnByName(attributeName);
219 if (column == null) {
220 column = new MutableColumn(attributeName, table);
221 table.addColumn(column);
222 }
223 appendRemarks(column, indexName + " non-key attribute");
224 }
225
226 private void addColumnFromKeySchema(String indexName, List<KeySchemaElement> keySchema, MutableTable table,
227 boolean primaryKey) {
228 for (final KeySchemaElement keySchemaElement : keySchema) {
229 final String attributeName = keySchemaElement.getAttributeName();
230 if (table.getColumnByName(attributeName) == null) {
231 final String keyType = keySchemaElement.getKeyType();
232 final MutableColumn column = new MutableColumn(attributeName, table).setPrimaryKey(primaryKey);
233 appendRemarks(column, indexName + " member ('" + keyType + "' type)");
234 table.addColumn(column);
235 }
236 }
237 }
238
239 private static void appendRemarks(MutableColumn column, String remarks) {
240 final String existingRemarks = column.getRemarks();
241 if (existingRemarks == null) {
242 column.setRemarks(remarks);
243 } else {
244 column.setRemarks(existingRemarks + ", " + remarks);
245 }
246 }
247
248 @Override
249 protected String getMainSchemaName() throws MetaModelException {
250 return SCHEMA_NAME;
251 }
252
253 @Override
254 protected Number executeCountQuery(Table table, List<FilterItem> whereItems, boolean functionApproximationAllowed) {
255 if (!whereItems.isEmpty()) {
256 return null;
257 }
258 return _dynamoDb.describeTable(table.getName()).getTable().getItemCount();
259 }
260
261 @Override
262 protected DataSet materializeMainSchemaTable(Table table, List<Column> columns, int maxRows) {
263 final List<String> attributeNames = columns.stream().map(col-> col.getName()).collect(Collectors.toList());
264 final ScanRequest scanRequest = new ScanRequest(table.getName());
265 scanRequest.setAttributesToGet(attributeNames);
266 if (maxRows > 0) {
267 scanRequest.setLimit(maxRows);
268 }
269 final ScanResult result = _dynamoDb.scan(scanRequest);
270 return new DynamoDbDataSet(columns, result);
271 }
272
273 @Override
274 protected Row executePrimaryKeyLookupQuery(Table table, List<SelectItem> selectItems, Column primaryKeyColumn,
275 Object keyValue) {
276 final List<String> attributeNames = new ArrayList<>();
277 for (SelectItem selectItem : selectItems) {
278 attributeNames.add(selectItem.getColumn().getName());
279 }
280
281 final GetItemRequest getItemRequest = new GetItemRequest(table.getName(), Collections.singletonMap(
282 primaryKeyColumn.getName(), DynamoDbUtils.toAttributeValue(keyValue))).withAttributesToGet(
283 attributeNames);
284 final GetItemResult item = _dynamoDb.getItem(getItemRequest);
285
286 final Object[] values = new Object[selectItems.size()];
287 for (int i = 0; i < values.length; i++) {
288 final AttributeValue attributeValue = item.getItem().get(attributeNames.get(i));
289 values[i] = DynamoDbUtils.toValue(attributeValue);
290 }
291
292 return new DefaultRow(new SimpleDataSetHeader(selectItems), values);
293 }
294
295 @Override
296 public UpdateSummary executeUpdate(UpdateScript update) {
297 final DynamoDbUpdateCallback callback = new DynamoDbUpdateCallback(this);
298 try {
299 update.run(callback);
300 } finally {
301 if (callback.isInterrupted()) {
302 Thread.currentThread().interrupt();
303 }
304 }
305 return callback.getUpdateSummary();
306 }
307 }