[SPARK-17528][SQL] data should be copied properly before saving into InternalRow
authorWenchen Fan <wenchen@databricks.com>
Sat, 1 Jul 2017 01:25:29 +0000 (09:25 +0800)
committerWenchen Fan <wenchen@databricks.com>
Sat, 1 Jul 2017 01:25:29 +0000 (09:25 +0800)
commit4eb41879ce774dec1d16b2281ab1fbf41f9d418a
tree1de4c0893a6e10339d71e4145e9ee1e143ba7f2c
parentfd1325522549937232f37215db53d6478f48644c
[SPARK-17528][SQL] data should be copied properly before saving into InternalRow

## What changes were proposed in this pull request?

For performance reasons, `UnsafeRow.getString`, `getStruct`, etc. return a "pointer" that points to a memory region of this unsafe row. This makes the unsafe projection a little dangerous, because all of its output rows share one instance.

When we implement SQL operators, we should be careful to not cache the input rows because they may be produced by unsafe projection from child operator and thus its content may change overtime.

However, when we updating values of InternalRow(e.g. in mutable projection and safe projection), we only copy UTF8String, we should also copy InternalRow, ArrayData and MapData. This PR fixes this, and also fixes the copy of vairous InternalRow, ArrayData and MapData implementations.

## How was this patch tested?

new regression tests

Author: Wenchen Fan <wenchen@databricks.com>

Closes #18483 from cloud-fan/fix-copy.
18 files changed:
common/unsafe/src/main/java/org/apache/spark/unsafe/types/UTF8String.java
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/InternalRow.scala
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/Cast.scala
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/SpecificInternalRow.scala
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/aggregate/collect.scala
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/aggregate/interfaces.scala
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/codegen/CodeGenerator.scala
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/codegen/GenerateSafeProjection.scala
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/rows.scala
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/GenericArrayData.scala
sql/catalyst/src/test/scala/org/apache/spark/sql/RowTest.scala
sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/MapDataSuite.scala [deleted file]
sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/codegen/GeneratedProjectionSuite.scala
sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/util/ComplexDataSuite.scala [new file with mode: 0644]
sql/core/src/main/java/org/apache/spark/sql/execution/vectorized/ColumnarBatch.java
sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/SortBasedAggregationIterator.scala
sql/core/src/main/scala/org/apache/spark/sql/execution/columnar/GenerateColumnAccessor.scala
sql/core/src/main/scala/org/apache/spark/sql/execution/window/AggregateProcessor.scala