[SPARK-24073][SQL] Rename DataReaderFactory to InputPartition.
authorRyan Blue <blue@apache.org>
Thu, 10 May 2018 04:48:54 +0000 (21:48 -0700)
committergatorsmile <gatorsmile@gmail.com>
Thu, 10 May 2018 04:48:54 +0000 (21:48 -0700)
## What changes were proposed in this pull request?

Renames:
* `DataReaderFactory` to `InputPartition`
* `DataReader` to `InputPartitionReader`
* `createDataReaderFactories` to `planInputPartitions`
* `createUnsafeDataReaderFactories` to `planUnsafeInputPartitions`
* `createBatchDataReaderFactories` to `planBatchInputPartitions`

This fixes the changes in SPARK-23219, which renamed ReadTask to
DataReaderFactory. The intent of that change was to make the read and
write API match (write side uses DataWriterFactory), but the underlying
problem is that the two classes are not equivalent.

ReadTask/DataReader function as Iterable/Iterator. One InputPartition is
a specific partition of the data to be read, in contrast to
DataWriterFactory where the same factory instance is used in all write
tasks. InputPartition's purpose is to manage the lifecycle of the
associated reader, which is now called InputPartitionReader, with an
explicit create operation to mirror the close operation. This was no
longer clear from the API because DataReaderFactory appeared to be more
generic than it is and it isn't clear why a set of them is produced for
a read.

## How was this patch tested?

Existing tests, which have been updated to use the new name.

Author: Ryan Blue <blue@apache.org>

Closes #21145 from rdblue/SPARK-24073-revert-data-reader-factory-rename.

39 files changed:
external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaContinuousReader.scala
external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaMicroBatchReader.scala
external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaSourceProvider.scala
external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaMicroBatchSourceSuite.scala
sql/core/src/main/java/org/apache/spark/sql/sources/v2/MicroBatchReadSupport.java
sql/core/src/main/java/org/apache/spark/sql/sources/v2/reader/ContinuousInputPartition.java [moved from sql/core/src/main/java/org/apache/spark/sql/sources/v2/reader/ContinuousDataReaderFactory.java with 78% similarity]
sql/core/src/main/java/org/apache/spark/sql/sources/v2/reader/DataSourceReader.java
sql/core/src/main/java/org/apache/spark/sql/sources/v2/reader/InputPartition.java [moved from sql/core/src/main/java/org/apache/spark/sql/sources/v2/reader/DataReaderFactory.java with 81% similarity]
sql/core/src/main/java/org/apache/spark/sql/sources/v2/reader/InputPartitionReader.java [moved from sql/core/src/main/java/org/apache/spark/sql/sources/v2/reader/DataReader.java with 92% similarity]
sql/core/src/main/java/org/apache/spark/sql/sources/v2/reader/SupportsReportPartitioning.java
sql/core/src/main/java/org/apache/spark/sql/sources/v2/reader/SupportsScanColumnarBatch.java
sql/core/src/main/java/org/apache/spark/sql/sources/v2/reader/SupportsScanUnsafeRow.java
sql/core/src/main/java/org/apache/spark/sql/sources/v2/reader/partitioning/ClusteredDistribution.java
sql/core/src/main/java/org/apache/spark/sql/sources/v2/reader/partitioning/Distribution.java
sql/core/src/main/java/org/apache/spark/sql/sources/v2/reader/partitioning/Partitioning.java
sql/core/src/main/java/org/apache/spark/sql/sources/v2/reader/streaming/ContinuousInputPartitionReader.java [moved from sql/core/src/main/java/org/apache/spark/sql/sources/v2/reader/streaming/ContinuousDataReader.java with 84% similarity]
sql/core/src/main/java/org/apache/spark/sql/sources/v2/reader/streaming/ContinuousReader.java
sql/core/src/main/java/org/apache/spark/sql/sources/v2/reader/streaming/MicroBatchReader.java
sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceRDD.scala
sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2ScanExec.scala
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/continuous/ContinuousDataSourceRDD.scala
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/continuous/ContinuousQueuedDataReader.scala
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/continuous/ContinuousRateStreamSource.scala
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/memory.scala
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/sources/ContinuousMemoryStream.scala
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/sources/RateStreamMicroBatchReader.scala
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/sources/socket.scala
sql/core/src/test/java/test/org/apache/spark/sql/sources/v2/JavaAdvancedDataSourceV2.java
sql/core/src/test/java/test/org/apache/spark/sql/sources/v2/JavaBatchDataSourceV2.java
sql/core/src/test/java/test/org/apache/spark/sql/sources/v2/JavaPartitionAwareDataSource.java
sql/core/src/test/java/test/org/apache/spark/sql/sources/v2/JavaSchemaRequiredDataSource.java
sql/core/src/test/java/test/org/apache/spark/sql/sources/v2/JavaSimpleDataSourceV2.java
sql/core/src/test/java/test/org/apache/spark/sql/sources/v2/JavaUnsafeRowDataSourceV2.java
sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/sources/RateStreamProviderSuite.scala
sql/core/src/test/scala/org/apache/spark/sql/sources/v2/DataSourceV2Suite.scala
sql/core/src/test/scala/org/apache/spark/sql/sources/v2/SimpleWritableDataSource.scala
sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQuerySuite.scala
sql/core/src/test/scala/org/apache/spark/sql/streaming/continuous/ContinuousQueuedDataReaderSuite.scala
sql/core/src/test/scala/org/apache/spark/sql/streaming/sources/StreamingDataSourceV2Suite.scala

index f26c134..88abf8a 100644 (file)
@@ -29,7 +29,7 @@ import org.apache.spark.sql.SparkSession
 import org.apache.spark.sql.catalyst.expressions.UnsafeRow
 import org.apache.spark.sql.kafka010.KafkaSourceProvider.{INSTRUCTION_FOR_FAIL_ON_DATA_LOSS_FALSE, INSTRUCTION_FOR_FAIL_ON_DATA_LOSS_TRUE}
 import org.apache.spark.sql.sources.v2.reader._
-import org.apache.spark.sql.sources.v2.reader.streaming.{ContinuousDataReader, ContinuousReader, Offset, PartitionOffset}
+import org.apache.spark.sql.sources.v2.reader.streaming.{ContinuousInputPartitionReader, ContinuousReader, Offset, PartitionOffset}
 import org.apache.spark.sql.types.StructType
 
 /**
@@ -86,7 +86,7 @@ class KafkaContinuousReader(
     KafkaSourceOffset(JsonUtils.partitionOffsets(json))
   }
 
-  override def createUnsafeRowReaderFactories(): ju.List[DataReaderFactory[UnsafeRow]] = {
+  override def planUnsafeInputPartitions(): ju.List[InputPartition[UnsafeRow]] = {
     import scala.collection.JavaConverters._
 
     val oldStartPartitionOffsets = KafkaSourceOffset.getPartitionOffsets(offset)
@@ -108,7 +108,7 @@ class KafkaContinuousReader(
       case (topicPartition, start) =>
         KafkaContinuousDataReaderFactory(
           topicPartition, start, kafkaParams, pollTimeoutMs, failOnDataLoss)
-          .asInstanceOf[DataReaderFactory[UnsafeRow]]
+          .asInstanceOf[InputPartition[UnsafeRow]]
     }.asJava
   }
 
@@ -161,18 +161,18 @@ case class KafkaContinuousDataReaderFactory(
     startOffset: Long,
     kafkaParams: ju.Map[String, Object],
     pollTimeoutMs: Long,
-    failOnDataLoss: Boolean) extends ContinuousDataReaderFactory[UnsafeRow] {
+    failOnDataLoss: Boolean) extends ContinuousInputPartition[UnsafeRow] {
 
-  override def createDataReaderWithOffset(offset: PartitionOffset): DataReader[UnsafeRow] = {
+  override def createContinuousReader(offset: PartitionOffset): InputPartitionReader[UnsafeRow] = {
     val kafkaOffset = offset.asInstanceOf[KafkaSourcePartitionOffset]
     require(kafkaOffset.topicPartition == topicPartition,
       s"Expected topicPartition: $topicPartition, but got: ${kafkaOffset.topicPartition}")
-    new KafkaContinuousDataReader(
+    new KafkaContinuousInputPartitionReader(
       topicPartition, kafkaOffset.partitionOffset, kafkaParams, pollTimeoutMs, failOnDataLoss)
   }
 
-  override def createDataReader(): KafkaContinuousDataReader = {
-    new KafkaContinuousDataReader(
+  override def createPartitionReader(): KafkaContinuousInputPartitionReader = {
+    new KafkaContinuousInputPartitionReader(
       topicPartition, startOffset, kafkaParams, pollTimeoutMs, failOnDataLoss)
   }
 }
@@ -187,12 +187,12 @@ case class KafkaContinuousDataReaderFactory(
  * @param failOnDataLoss Flag indicating whether data reader should fail if some offsets
  *                       are skipped.
  */
-class KafkaContinuousDataReader(
+class KafkaContinuousInputPartitionReader(
     topicPartition: TopicPartition,
     startOffset: Long,
     kafkaParams: ju.Map[String, Object],
     pollTimeoutMs: Long,
-    failOnDataLoss: Boolean) extends ContinuousDataReader[UnsafeRow] {
+    failOnDataLoss: Boolean) extends ContinuousInputPartitionReader[UnsafeRow] {
   private val consumer = KafkaDataConsumer.acquire(topicPartition, kafkaParams, useCache = false)
   private val converter = new KafkaRecordToUnsafeRowConverter
 
index cbe655f..8a37773 100644 (file)
@@ -33,7 +33,7 @@ import org.apache.spark.sql.catalyst.expressions.UnsafeRow
 import org.apache.spark.sql.execution.streaming.{HDFSMetadataLog, SerializedOffset}
 import org.apache.spark.sql.kafka010.KafkaSourceProvider.{INSTRUCTION_FOR_FAIL_ON_DATA_LOSS_FALSE, INSTRUCTION_FOR_FAIL_ON_DATA_LOSS_TRUE}
 import org.apache.spark.sql.sources.v2.DataSourceOptions
-import org.apache.spark.sql.sources.v2.reader.{DataReader, DataReaderFactory, SupportsScanUnsafeRow}
+import org.apache.spark.sql.sources.v2.reader.{InputPartition, InputPartitionReader, SupportsScanUnsafeRow}
 import org.apache.spark.sql.sources.v2.reader.streaming.{MicroBatchReader, Offset}
 import org.apache.spark.sql.types.StructType
 import org.apache.spark.util.UninterruptibleThread
@@ -101,7 +101,7 @@ private[kafka010] class KafkaMicroBatchReader(
         }
   }
 
-  override def createUnsafeRowReaderFactories(): ju.List[DataReaderFactory[UnsafeRow]] = {
+  override def planUnsafeInputPartitions(): ju.List[InputPartition[UnsafeRow]] = {
     // Find the new partitions, and get their earliest offsets
     val newPartitions = endPartitionOffsets.keySet.diff(startPartitionOffsets.keySet)
     val newPartitionInitialOffsets = kafkaOffsetReader.fetchEarliestOffsets(newPartitions.toSeq)
@@ -146,7 +146,7 @@ private[kafka010] class KafkaMicroBatchReader(
       new KafkaMicroBatchDataReaderFactory(
         range, executorKafkaParams, pollTimeoutMs, failOnDataLoss, reuseKafkaConsumer)
     }
-    factories.map(_.asInstanceOf[DataReaderFactory[UnsafeRow]]).asJava
+    factories.map(_.asInstanceOf[InputPartition[UnsafeRow]]).asJava
   }
 
   override def getStartOffset: Offset = {
@@ -299,27 +299,28 @@ private[kafka010] class KafkaMicroBatchReader(
   }
 }
 
-/** A [[DataReaderFactory]] for reading Kafka data in a micro-batch streaming query. */
+/** A [[InputPartition]] for reading Kafka data in a micro-batch streaming query. */
 private[kafka010] case class KafkaMicroBatchDataReaderFactory(
     offsetRange: KafkaOffsetRange,
     executorKafkaParams: ju.Map[String, Object],
     pollTimeoutMs: Long,
     failOnDataLoss: Boolean,
-    reuseKafkaConsumer: Boolean) extends DataReaderFactory[UnsafeRow] {
+    reuseKafkaConsumer: Boolean) extends InputPartition[UnsafeRow] {
 
   override def preferredLocations(): Array[String] = offsetRange.preferredLoc.toArray
 
-  override def createDataReader(): DataReader[UnsafeRow] = new KafkaMicroBatchDataReader(
-    offsetRange, executorKafkaParams, pollTimeoutMs, failOnDataLoss, reuseKafkaConsumer)
+  override def createPartitionReader(): InputPartitionReader[UnsafeRow] =
+    new KafkaMicroBatchInputPartitionReader(offsetRange, executorKafkaParams, pollTimeoutMs,
+      failOnDataLoss, reuseKafkaConsumer)
 }
 
-/** A [[DataReader]] for reading Kafka data in a micro-batch streaming query. */
-private[kafka010] case class KafkaMicroBatchDataReader(
+/** A [[InputPartitionReader]] for reading Kafka data in a micro-batch streaming query. */
+private[kafka010] case class KafkaMicroBatchInputPartitionReader(
     offsetRange: KafkaOffsetRange,
     executorKafkaParams: ju.Map[String, Object],
     pollTimeoutMs: Long,
     failOnDataLoss: Boolean,
-    reuseKafkaConsumer: Boolean) extends DataReader[UnsafeRow] with Logging {
+    reuseKafkaConsumer: Boolean) extends InputPartitionReader[UnsafeRow] with Logging {
 
   private val consumer = KafkaDataConsumer.acquire(
     offsetRange.topicPartition, executorKafkaParams, reuseKafkaConsumer)
index 36b9f04..d225c1e 100644 (file)
@@ -31,6 +31,7 @@ import org.apache.spark.sql.{AnalysisException, DataFrame, SaveMode, SparkSessio
 import org.apache.spark.sql.execution.streaming.{Sink, Source}
 import org.apache.spark.sql.sources._
 import org.apache.spark.sql.sources.v2.{ContinuousReadSupport, DataSourceOptions, MicroBatchReadSupport, StreamWriteSupport}
+import org.apache.spark.sql.sources.v2.reader.streaming.ContinuousInputPartitionReader
 import org.apache.spark.sql.sources.v2.writer.streaming.StreamWriter
 import org.apache.spark.sql.streaming.OutputMode
 import org.apache.spark.sql.types.StructType
@@ -149,7 +150,7 @@ private[kafka010] class KafkaSourceProvider extends DataSourceRegister
   }
 
   /**
-   * Creates a [[org.apache.spark.sql.sources.v2.reader.streaming.ContinuousDataReader]] to read
+   * Creates a [[ContinuousInputPartitionReader]] to read
    * Kafka data in a continuous streaming query.
    */
   override def createContinuousReader(
index d2d04b6..871f970 100644 (file)
@@ -678,7 +678,7 @@ class KafkaMicroBatchV2SourceSuite extends KafkaMicroBatchSourceSuiteBase {
           Optional.of[OffsetV2](KafkaSourceOffset(Map(tp -> 0L))),
           Optional.of[OffsetV2](KafkaSourceOffset(Map(tp -> 100L)))
         )
-        val factories = reader.createUnsafeRowReaderFactories().asScala
+        val factories = reader.planUnsafeInputPartitions().asScala
           .map(_.asInstanceOf[KafkaMicroBatchDataReaderFactory])
         withClue(s"minPartitions = $minPartitions generated factories $factories\n\t") {
           assert(factories.size == numPartitionsGenerated)
index 209ffa7..7f4a2c9 100644 (file)
@@ -34,7 +34,7 @@ public interface MicroBatchReadSupport extends DataSourceV2 {
    * streaming query.
    *
    * The execution engine will create a micro-batch reader at the start of a streaming query,
-   * alternate calls to setOffsetRange and createDataReaderFactories for each batch to process, and
+   * alternate calls to setOffsetRange and planInputPartitions for each batch to process, and
    * then call stop() when the execution is complete. Note that a single query may have multiple
    * executions due to restart or failure recovery.
    *
@@ -21,15 +21,15 @@ import org.apache.spark.annotation.InterfaceStability;
 import org.apache.spark.sql.sources.v2.reader.streaming.PartitionOffset;
 
 /**
- * A mix-in interface for {@link DataReaderFactory}. Continuous data reader factories can
- * implement this interface to provide creating {@link DataReader} with particular offset.
+ * A mix-in interface for {@link InputPartition}. Continuous input partitions can
+ * implement this interface to provide creating {@link InputPartitionReader} with particular offset.
  */
 @InterfaceStability.Evolving
-public interface ContinuousDataReaderFactory<T> extends DataReaderFactory<T> {
+public interface ContinuousInputPartition<T> extends InputPartition<T> {
   /**
    * Create a DataReader with particular offset as its startOffset.
    *
    * @param offset offset want to set as the DataReader's startOffset.
    */
-  DataReader<T> createDataReaderWithOffset(PartitionOffset offset);
+  InputPartitionReader<T> createContinuousReader(PartitionOffset offset);
 }
index a470bcc..f898c29 100644 (file)
@@ -31,8 +31,8 @@ import org.apache.spark.sql.types.StructType;
  * {@link ReadSupport#createReader(DataSourceOptions)} or
  * {@link ReadSupportWithSchema#createReader(StructType, DataSourceOptions)}.
  * It can mix in various query optimization interfaces to speed up the data scan. The actual scan
- * logic is delegated to {@link DataReaderFactory}s that are returned by
- * {@link #createDataReaderFactories()}.
+ * logic is delegated to {@link InputPartition}s that are returned by
+ * {@link #planInputPartitions()}.
  *
  * There are mainly 3 kinds of query optimizations:
  *   1. Operators push-down. E.g., filter push-down, required columns push-down(aka column
@@ -65,8 +65,8 @@ public interface DataSourceReader {
   StructType readSchema();
 
   /**
-   * Returns a list of reader factories. Each factory is responsible for creating a data reader to
-   * output data for one RDD partition. That means the number of factories returned here is same as
+   * Returns a list of read tasks. Each task is responsible for creating a data reader to
+   * output data for one RDD partition. That means the number of tasks returned here is same as
    * the number of RDD partitions this scan outputs.
    *
    * Note that, this may not be a full scan if the data source reader mixes in other optimization
@@ -76,5 +76,5 @@ public interface DataSourceReader {
    * If this method fails (by throwing an exception), the action would fail and no Spark job was
    * submitted.
    */
-  List<DataReaderFactory<Row>> createDataReaderFactories();
+  List<InputPartition<Row>> planInputPartitions();
 }
@@ -22,20 +22,20 @@ import java.io.Serializable;
 import org.apache.spark.annotation.InterfaceStability;
 
 /**
- * A reader factory returned by {@link DataSourceReader#createDataReaderFactories()} and is
+ * An input partition returned by {@link DataSourceReader#planInputPartitions()} and is
  * responsible for creating the actual data reader. The relationship between
- * {@link DataReaderFactory} and {@link DataReader}
+ * {@link InputPartition} and {@link InputPartitionReader}
  * is similar to the relationship between {@link Iterable} and {@link java.util.Iterator}.
  *
- * Note that, the reader factory will be serialized and sent to executors, then the data reader
- * will be created on executors and do the actual reading. So {@link DataReaderFactory} must be
- * serializable and {@link DataReader} doesn't need to be.
+ * Note that input partitions will be serialized and sent to executors, then the partition reader
+ * will be created on executors and do the actual reading. So {@link InputPartition} must be
+ * serializable and {@link InputPartitionReader} doesn't need to be.
  */
 @InterfaceStability.Evolving
-public interface DataReaderFactory<T> extends Serializable {
+public interface InputPartition<T> extends Serializable {
 
   /**
-   * The preferred locations where the data reader returned by this reader factory can run faster,
+   * The preferred locations where the data reader returned by this partition can run faster,
    * but Spark does not guarantee to run the data reader on these locations.
    * The implementations should make sure that it can be run on any location.
    * The location is a string representing the host name.
@@ -57,5 +57,5 @@ public interface DataReaderFactory<T> extends Serializable {
    * If this method fails (by throwing an exception), the corresponding Spark task would fail and
    * get retried until hitting the maximum retry times.
    */
-  DataReader<T> createDataReader();
+  InputPartitionReader<T> createPartitionReader();
 }
@@ -23,7 +23,7 @@ import java.io.IOException;
 import org.apache.spark.annotation.InterfaceStability;
 
 /**
- * A data reader returned by {@link DataReaderFactory#createDataReader()} and is responsible for
+ * A data reader returned by {@link InputPartition#createPartitionReader()} and is responsible for
  * outputting data for a RDD partition.
  *
  * Note that, Currently the type `T` can only be {@link org.apache.spark.sql.Row} for normal data
@@ -31,7 +31,7 @@ import org.apache.spark.annotation.InterfaceStability;
  * readers that mix in {@link SupportsScanUnsafeRow}.
  */
 @InterfaceStability.Evolving
-public interface DataReader<T> extends Closeable {
+public interface InputPartitionReader<T> extends Closeable {
 
   /**
    * Proceed to next record, returns false if there is no more records.
index 6076287..6b60da7 100644 (file)
@@ -24,7 +24,7 @@ import org.apache.spark.sql.sources.v2.reader.partitioning.Partitioning;
  * A mix in interface for {@link DataSourceReader}. Data source readers can implement this
  * interface to report data partitioning and try to avoid shuffle at Spark side.
  *
- * Note that, when the reader creates exactly one {@link DataReaderFactory}, Spark may avoid
+ * Note that, when the reader creates exactly one {@link InputPartition}, Spark may avoid
  * adding a shuffle even if the reader does not implement this interface.
  */
 @InterfaceStability.Evolving
index 2e5cfa7..0faf81d 100644 (file)
@@ -30,22 +30,22 @@ import org.apache.spark.sql.vectorized.ColumnarBatch;
 @InterfaceStability.Evolving
 public interface SupportsScanColumnarBatch extends DataSourceReader {
   @Override
-  default List<DataReaderFactory<Row>> createDataReaderFactories() {
+  default List<InputPartition<Row>> planInputPartitions() {
     throw new IllegalStateException(
-      "createDataReaderFactories not supported by default within SupportsScanColumnarBatch.");
+      "planInputPartitions not supported by default within SupportsScanColumnarBatch.");
   }
 
   /**
-   * Similar to {@link DataSourceReader#createDataReaderFactories()}, but returns columnar data
+   * Similar to {@link DataSourceReader#planInputPartitions()}, but returns columnar data
    * in batches.
    */
-  List<DataReaderFactory<ColumnarBatch>> createBatchDataReaderFactories();
+  List<InputPartition<ColumnarBatch>> planBatchInputPartitions();
 
   /**
    * Returns true if the concrete data source reader can read data in batch according to the scan
    * properties like required columns, pushes filters, etc. It's possible that the implementation
    * can only support some certain columns with certain types. Users can overwrite this method and
-   * {@link #createDataReaderFactories()} to fallback to normal read path under some conditions.
+   * {@link #planInputPartitions()} to fallback to normal read path under some conditions.
    */
   default boolean enableBatchRead() {
     return true;
index 9cd749e..f2220f6 100644 (file)
@@ -33,14 +33,14 @@ import org.apache.spark.sql.catalyst.expressions.UnsafeRow;
 public interface SupportsScanUnsafeRow extends DataSourceReader {
 
   @Override
-  default List<DataReaderFactory<Row>> createDataReaderFactories() {
+  default List<InputPartition<Row>> planInputPartitions() {
     throw new IllegalStateException(
-      "createDataReaderFactories not supported by default within SupportsScanUnsafeRow");
+      "planInputPartitions not supported by default within SupportsScanUnsafeRow");
   }
 
   /**
-   * Similar to {@link DataSourceReader#createDataReaderFactories()},
+   * Similar to {@link DataSourceReader#planInputPartitions()},
    * but returns data in unsafe row format.
    */
-  List<DataReaderFactory<UnsafeRow>> createUnsafeRowReaderFactories();
+  List<InputPartition<UnsafeRow>> planUnsafeInputPartitions();
 }
index 2d0ee50..38ca5fc 100644 (file)
 package org.apache.spark.sql.sources.v2.reader.partitioning;
 
 import org.apache.spark.annotation.InterfaceStability;
-import org.apache.spark.sql.sources.v2.reader.DataReader;
+import org.apache.spark.sql.sources.v2.reader.InputPartitionReader;
 
 /**
  * A concrete implementation of {@link Distribution}. Represents a distribution where records that
  * share the same values for the {@link #clusteredColumns} will be produced by the same
- * {@link DataReader}.
+ * {@link InputPartitionReader}.
  */
 @InterfaceStability.Evolving
 public class ClusteredDistribution implements Distribution {
index f6b111f..d2ee951 100644 (file)
 package org.apache.spark.sql.sources.v2.reader.partitioning;
 
 import org.apache.spark.annotation.InterfaceStability;
-import org.apache.spark.sql.sources.v2.reader.DataReader;
+import org.apache.spark.sql.sources.v2.reader.InputPartitionReader;
 
 /**
  * An interface to represent data distribution requirement, which specifies how the records should
- * be distributed among the data partitions(one {@link DataReader} outputs data for one partition).
+ * be distributed among the data partitions(one {@link InputPartitionReader} outputs data for one partition).
  * Note that this interface has nothing to do with the data ordering inside one
- * partition(the output records of a single {@link DataReader}).
+ * partition(the output records of a single {@link InputPartitionReader}).
  *
  * The instance of this interface is created and provided by Spark, then consumed by
  * {@link Partitioning#satisfy(Distribution)}. This means data source developers don't need to
index 309d9e5..f460f6b 100644 (file)
@@ -18,7 +18,7 @@
 package org.apache.spark.sql.sources.v2.reader.partitioning;
 
 import org.apache.spark.annotation.InterfaceStability;
-import org.apache.spark.sql.sources.v2.reader.DataReaderFactory;
+import org.apache.spark.sql.sources.v2.reader.InputPartition;
 import org.apache.spark.sql.sources.v2.reader.SupportsReportPartitioning;
 
 /**
@@ -31,7 +31,7 @@ import org.apache.spark.sql.sources.v2.reader.SupportsReportPartitioning;
 public interface Partitioning {
 
   /**
-   * Returns the number of partitions(i.e., {@link DataReaderFactory}s) the data source outputs.
+   * Returns the number of partitions(i.e., {@link InputPartition}s) the data source outputs.
    */
   int numPartitions();
 
 package org.apache.spark.sql.sources.v2.reader.streaming;
 
 import org.apache.spark.annotation.InterfaceStability;
-import org.apache.spark.sql.sources.v2.reader.DataReader;
+import org.apache.spark.sql.sources.v2.reader.InputPartitionReader;
 
 /**
- * A variation on {@link DataReader} for use with streaming in continuous processing mode.
+ * A variation on {@link InputPartitionReader} for use with streaming in continuous processing mode.
  */
 @InterfaceStability.Evolving
-public interface ContinuousDataReader<T> extends DataReader<T> {
+public interface ContinuousInputPartitionReader<T> extends InputPartitionReader<T> {
     /**
      * Get the offset of the current record, or the start offset if no records have been read.
      *
index 7fe7f00..716c5c0 100644 (file)
@@ -27,7 +27,7 @@ import java.util.Optional;
  * A mix-in interface for {@link DataSourceReader}. Data source readers can implement this
  * interface to allow reading in a continuous processing mode stream.
  *
- * Implementations must ensure each reader factory output is a {@link ContinuousDataReader}.
+ * Implementations must ensure each partition reader is a {@link ContinuousInputPartitionReader}.
  *
  * Note: This class currently extends {@link BaseStreamingSource} to maintain compatibility with
  * DataSource V1 APIs. This extension will be removed once we get rid of V1 completely.
@@ -35,7 +35,7 @@ import java.util.Optional;
 @InterfaceStability.Evolving
 public interface ContinuousReader extends BaseStreamingSource, DataSourceReader {
     /**
-     * Merge partitioned offsets coming from {@link ContinuousDataReader} instances for each
+     * Merge partitioned offsets coming from {@link ContinuousInputPartitionReader} instances for each
      * partition to a single global offset.
      */
     Offset mergeOffsets(PartitionOffset[] offsets);
@@ -47,7 +47,7 @@ public interface ContinuousReader extends BaseStreamingSource, DataSourceReader
     Offset deserializeOffset(String json);
 
     /**
-     * Set the desired start offset for reader factories created from this reader. The scan will
+     * Set the desired start offset for partitions created from this reader. The scan will
      * start from the first record after the provided offset, or from an implementation-defined
      * inferred starting point if no offset is provided.
      */
@@ -61,8 +61,8 @@ public interface ContinuousReader extends BaseStreamingSource, DataSourceReader
     Offset getStartOffset();
 
     /**
-     * The execution engine will call this method in every epoch to determine if new reader
-     * factories need to be generated, which may be required if for example the underlying
+     * The execution engine will call this method in every epoch to determine if new input
+     * partitions need to be generated, which may be required if for example the underlying
      * source system has had partitions added or removed.
      *
      * If true, the query will be shut down and restarted with a new reader.
index 67ebde3..0159c73 100644 (file)
@@ -33,7 +33,7 @@ import java.util.Optional;
 @InterfaceStability.Evolving
 public interface MicroBatchReader extends DataSourceReader, BaseStreamingSource {
     /**
-     * Set the desired offset range for reader factories created from this reader. Reader factories
+     * Set the desired offset range for input partitions created from this reader. Partition readers
      * will generate only data within (`start`, `end`]; that is, from the first record after `start`
      * to the record with offset `end`.
      *
index f85971b..1a6b324 100644 (file)
@@ -22,14 +22,14 @@ import scala.reflect.ClassTag
 
 import org.apache.spark.{InterruptibleIterator, Partition, SparkContext, TaskContext}
 import org.apache.spark.rdd.RDD
-import org.apache.spark.sql.sources.v2.reader.DataReaderFactory
+import org.apache.spark.sql.sources.v2.reader.InputPartition
 
-class DataSourceRDDPartition[T : ClassTag](val index: Int, val readerFactory: DataReaderFactory[T])
+class DataSourceRDDPartition[T : ClassTag](val index: Int, val inputPartition: InputPartition[T])
   extends Partition with Serializable
 
 class DataSourceRDD[T: ClassTag](
     sc: SparkContext,
-    @transient private val readerFactories: Seq[DataReaderFactory[T]])
+    @transient private val readerFactories: Seq[InputPartition[T]])
   extends RDD[T](sc, Nil) {
 
   override protected def getPartitions: Array[Partition] = {
@@ -39,7 +39,8 @@ class DataSourceRDD[T: ClassTag](
   }
 
   override def compute(split: Partition, context: TaskContext): Iterator[T] = {
-    val reader = split.asInstanceOf[DataSourceRDDPartition[T]].readerFactory.createDataReader()
+    val reader = split.asInstanceOf[DataSourceRDDPartition[T]].inputPartition
+        .createPartitionReader()
     context.addTaskCompletionListener(_ => reader.close())
     val iter = new Iterator[T] {
       private[this] var valuePrepared = false
@@ -63,6 +64,6 @@ class DataSourceRDD[T: ClassTag](
   }
 
   override def getPreferredLocations(split: Partition): Seq[String] = {
-    split.asInstanceOf[DataSourceRDDPartition[T]].readerFactory.preferredLocations()
+    split.asInstanceOf[DataSourceRDDPartition[T]].inputPartition.preferredLocations()
   }
 }
index 77cb707..c6a7684 100644 (file)
@@ -59,13 +59,13 @@ case class DataSourceV2ScanExec(
   }
 
   override def outputPartitioning: physical.Partitioning = reader match {
-    case r: SupportsScanColumnarBatch if r.enableBatchRead() && batchReaderFactories.size == 1 =>
+    case r: SupportsScanColumnarBatch if r.enableBatchRead() && batchPartitions.size == 1 =>
       SinglePartition
 
-    case r: SupportsScanColumnarBatch if !r.enableBatchRead() && readerFactories.size == 1 =>
+    case r: SupportsScanColumnarBatch if !r.enableBatchRead() && partitions.size == 1 =>
       SinglePartition
 
-    case r if !r.isInstanceOf[SupportsScanColumnarBatch] && readerFactories.size == 1 =>
+    case r if !r.isInstanceOf[SupportsScanColumnarBatch] && partitions.size == 1 =>
       SinglePartition
 
     case s: SupportsReportPartitioning =>
@@ -75,19 +75,19 @@ case class DataSourceV2ScanExec(
     case _ => super.outputPartitioning
   }
 
-  private lazy val readerFactories: Seq[DataReaderFactory[UnsafeRow]] = reader match {
-    case r: SupportsScanUnsafeRow => r.createUnsafeRowReaderFactories().asScala
+  private lazy val partitions: Seq[InputPartition[UnsafeRow]] = reader match {
+    case r: SupportsScanUnsafeRow => r.planUnsafeInputPartitions().asScala
     case _ =>
-      reader.createDataReaderFactories().asScala.map {
-        new RowToUnsafeRowDataReaderFactory(_, reader.readSchema()): DataReaderFactory[UnsafeRow]
+      reader.planInputPartitions().asScala.map {
+        new RowToUnsafeRowInputPartition(_, reader.readSchema()): InputPartition[UnsafeRow]
       }
   }
 
-  private lazy val batchReaderFactories: Seq[DataReaderFactory[ColumnarBatch]] = reader match {
+  private lazy val batchPartitions: Seq[InputPartition[ColumnarBatch]] = reader match {
     case r: SupportsScanColumnarBatch if r.enableBatchRead() =>
       assert(!reader.isInstanceOf[ContinuousReader],
         "continuous stream reader does not support columnar read yet.")
-      r.createBatchDataReaderFactories().asScala
+      r.planBatchInputPartitions().asScala
   }
 
   private lazy val inputRDD: RDD[InternalRow] = reader match {
@@ -95,19 +95,18 @@ case class DataSourceV2ScanExec(
       EpochCoordinatorRef.get(
           sparkContext.getLocalProperty(ContinuousExecution.EPOCH_COORDINATOR_ID_KEY),
           sparkContext.env)
-        .askSync[Unit](SetReaderPartitions(readerFactories.size))
+        .askSync[Unit](SetReaderPartitions(partitions.size))
       new ContinuousDataSourceRDD(
         sparkContext,
         sqlContext.conf.continuousStreamingExecutorQueueSize,
         sqlContext.conf.continuousStreamingExecutorPollIntervalMs,
-        readerFactories)
-        .asInstanceOf[RDD[InternalRow]]
+        partitions).asInstanceOf[RDD[InternalRow]]
 
     case r: SupportsScanColumnarBatch if r.enableBatchRead() =>
-      new DataSourceRDD(sparkContext, batchReaderFactories).asInstanceOf[RDD[InternalRow]]
+      new DataSourceRDD(sparkContext, batchPartitions).asInstanceOf[RDD[InternalRow]]
 
     case _ =>
-      new DataSourceRDD(sparkContext, readerFactories).asInstanceOf[RDD[InternalRow]]
+      new DataSourceRDD(sparkContext, partitions).asInstanceOf[RDD[InternalRow]]
   }
 
   override def inputRDDs(): Seq[RDD[InternalRow]] = Seq(inputRDD)
@@ -132,19 +131,22 @@ case class DataSourceV2ScanExec(
   }
 }
 
-class RowToUnsafeRowDataReaderFactory(rowReaderFactory: DataReaderFactory[Row], schema: StructType)
-  extends DataReaderFactory[UnsafeRow] {
+class RowToUnsafeRowInputPartition(partition: InputPartition[Row], schema: StructType)
+  extends InputPartition[UnsafeRow] {
 
-  override def preferredLocations: Array[String] = rowReaderFactory.preferredLocations
+  override def preferredLocations: Array[String] = partition.preferredLocations
 
-  override def createDataReader: DataReader[UnsafeRow] = {
-    new RowToUnsafeDataReader(
-      rowReaderFactory.createDataReader, RowEncoder.apply(schema).resolveAndBind())
+  override def createPartitionReader: InputPartitionReader[UnsafeRow] = {
+    new RowToUnsafeInputPartitionReader(
+      partition.createPartitionReader, RowEncoder.apply(schema).resolveAndBind())
   }
 }
 
-class RowToUnsafeDataReader(val rowReader: DataReader[Row], encoder: ExpressionEncoder[Row])
-  extends DataReader[UnsafeRow] {
+class RowToUnsafeInputPartitionReader(
+    val rowReader: InputPartitionReader[Row],
+    encoder: ExpressionEncoder[Row])
+
+  extends InputPartitionReader[UnsafeRow] {
 
   override def next: Boolean = rowReader.next
 
index 0a3b9dc..a7ccce1 100644 (file)
@@ -21,14 +21,14 @@ import org.apache.spark._
 import org.apache.spark.rdd.RDD
 import org.apache.spark.sql.{Row, SQLContext}
 import org.apache.spark.sql.catalyst.expressions.UnsafeRow
-import org.apache.spark.sql.execution.datasources.v2.{DataSourceRDDPartition, RowToUnsafeDataReader}
+import org.apache.spark.sql.execution.datasources.v2.{DataSourceRDDPartition, RowToUnsafeInputPartitionReader}
 import org.apache.spark.sql.sources.v2.reader._
-import org.apache.spark.sql.sources.v2.reader.streaming.{ContinuousDataReader, PartitionOffset}
+import org.apache.spark.sql.sources.v2.reader.streaming.{ContinuousInputPartitionReader, PartitionOffset}
 import org.apache.spark.util.{NextIterator, ThreadUtils}
 
 class ContinuousDataSourceRDDPartition(
     val index: Int,
-    val readerFactory: DataReaderFactory[UnsafeRow])
+    val inputPartition: InputPartition[UnsafeRow])
   extends Partition with Serializable {
 
   // This is semantically a lazy val - it's initialized once the first time a call to
@@ -51,12 +51,12 @@ class ContinuousDataSourceRDD(
     sc: SparkContext,
     dataQueueSize: Int,
     epochPollIntervalMs: Long,
-    @transient private val readerFactories: Seq[DataReaderFactory[UnsafeRow]])
+    @transient private val readerFactories: Seq[InputPartition[UnsafeRow]])
   extends RDD[UnsafeRow](sc, Nil) {
 
   override protected def getPartitions: Array[Partition] = {
     readerFactories.zipWithIndex.map {
-      case (readerFactory, index) => new ContinuousDataSourceRDDPartition(index, readerFactory)
+      case (inputPartition, index) => new ContinuousDataSourceRDDPartition(index, inputPartition)
     }.toArray
   }
 
@@ -75,7 +75,7 @@ class ContinuousDataSourceRDD(
       if (partition.queueReader == null) {
         partition.queueReader =
           new ContinuousQueuedDataReader(
-            partition.readerFactory, context, dataQueueSize, epochPollIntervalMs)
+            partition.inputPartition, context, dataQueueSize, epochPollIntervalMs)
       }
 
       partition.queueReader
@@ -96,17 +96,17 @@ class ContinuousDataSourceRDD(
   }
 
   override def getPreferredLocations(split: Partition): Seq[String] = {
-    split.asInstanceOf[ContinuousDataSourceRDDPartition].readerFactory.preferredLocations()
+    split.asInstanceOf[ContinuousDataSourceRDDPartition].inputPartition.preferredLocations()
   }
 }
 
 object ContinuousDataSourceRDD {
   private[continuous] def getContinuousReader(
-      reader: DataReader[UnsafeRow]): ContinuousDataReader[_] = {
+      reader: InputPartitionReader[UnsafeRow]): ContinuousInputPartitionReader[_] = {
     reader match {
-      case r: ContinuousDataReader[UnsafeRow] => r
-      case wrapped: RowToUnsafeDataReader =>
-        wrapped.rowReader.asInstanceOf[ContinuousDataReader[Row]]
+      case r: ContinuousInputPartitionReader[UnsafeRow] => r
+      case wrapped: RowToUnsafeInputPartitionReader =>
+        wrapped.rowReader.asInstanceOf[ContinuousInputPartitionReader[Row]]
       case _ =>
         throw new IllegalStateException(s"Unknown continuous reader type ${reader.getClass}")
     }
index 01a999f..d864557 100644 (file)
 package org.apache.spark.sql.execution.streaming.continuous
 
 import java.io.Closeable
-import java.util.concurrent.{ArrayBlockingQueue, BlockingQueue, TimeUnit}
-import java.util.concurrent.atomic.AtomicBoolean
+import java.util.concurrent.{ArrayBlockingQueue, TimeUnit}
 
 import scala.util.control.NonFatal
 
-import org.apache.spark.{Partition, SparkEnv, SparkException, TaskContext}
+import org.apache.spark.{SparkEnv, SparkException, TaskContext}
 import org.apache.spark.internal.Logging
 import org.apache.spark.sql.catalyst.expressions.UnsafeRow
-import org.apache.spark.sql.sources.v2.reader.{DataReader, DataReaderFactory}
+import org.apache.spark.sql.sources.v2.reader.{InputPartition, InputPartitionReader}
 import org.apache.spark.sql.sources.v2.reader.streaming.PartitionOffset
 import org.apache.spark.util.ThreadUtils
 
@@ -38,11 +37,11 @@ import org.apache.spark.util.ThreadUtils
  * offsets across epochs. Each compute() should call the next() method here until null is returned.
  */
 class ContinuousQueuedDataReader(
-    factory: DataReaderFactory[UnsafeRow],
+    partition: InputPartition[UnsafeRow],
     context: TaskContext,
     dataQueueSize: Int,
     epochPollIntervalMs: Long) extends Closeable {
-  private val reader = factory.createDataReader()
+  private val reader = partition.createPartitionReader()
 
   // Important sequencing - we must get our starting point before the provider threads start running
   private var currentOffset: PartitionOffset =
@@ -132,7 +131,7 @@ class ContinuousQueuedDataReader(
 
   /**
    * The data component of [[ContinuousQueuedDataReader]]. Pushes (row, offset) to the queue when
-   * a new row arrives to the [[DataReader]].
+   * a new row arrives to the [[InputPartitionReader]].
    */
   class DataReaderThread extends Thread(
       s"continuous-reader--${context.partitionId()}--" +
index 2f0de26..8d25d9c 100644 (file)
@@ -28,7 +28,7 @@ import org.apache.spark.sql.execution.streaming.{RateStreamOffset, ValueRunTimeM
 import org.apache.spark.sql.execution.streaming.sources.RateStreamProvider
 import org.apache.spark.sql.sources.v2.DataSourceOptions
 import org.apache.spark.sql.sources.v2.reader._
-import org.apache.spark.sql.sources.v2.reader.streaming.{ContinuousDataReader, ContinuousReader, Offset, PartitionOffset}
+import org.apache.spark.sql.sources.v2.reader.streaming.{ContinuousInputPartitionReader, ContinuousReader, Offset, PartitionOffset}
 import org.apache.spark.sql.types.StructType
 
 case class RateStreamPartitionOffset(
@@ -67,7 +67,7 @@ class RateStreamContinuousReader(options: DataSourceOptions)
 
   override def getStartOffset(): Offset = offset
 
-  override def createDataReaderFactories(): java.util.List[DataReaderFactory[Row]] = {
+  override def planInputPartitions(): java.util.List[InputPartition[Row]] = {
     val partitionStartMap = offset match {
       case off: RateStreamOffset => off.partitionToValueAndRunTimeMs
       case off =>
@@ -91,7 +91,7 @@ class RateStreamContinuousReader(options: DataSourceOptions)
         i,
         numPartitions,
         perPartitionRate)
-        .asInstanceOf[DataReaderFactory[Row]]
+        .asInstanceOf[InputPartition[Row]]
     }.asJava
   }
 
@@ -119,13 +119,13 @@ case class RateStreamContinuousDataReaderFactory(
     partitionIndex: Int,
     increment: Long,
     rowsPerSecond: Double)
-  extends ContinuousDataReaderFactory[Row] {
+  extends ContinuousInputPartition[Row] {
 
-  override def createDataReaderWithOffset(offset: PartitionOffset): DataReader[Row] = {
+  override def createContinuousReader(offset: PartitionOffset): InputPartitionReader[Row] = {
     val rateStreamOffset = offset.asInstanceOf[RateStreamPartitionOffset]
     require(rateStreamOffset.partition == partitionIndex,
       s"Expected partitionIndex: $partitionIndex, but got: ${rateStreamOffset.partition}")
-    new RateStreamContinuousDataReader(
+    new RateStreamContinuousInputPartitionReader(
       rateStreamOffset.currentValue,
       rateStreamOffset.currentTimeMs,
       partitionIndex,
@@ -133,18 +133,18 @@ case class RateStreamContinuousDataReaderFactory(
       rowsPerSecond)
   }
 
-  override def createDataReader(): DataReader[Row] =
-    new RateStreamContinuousDataReader(
+  override def createPartitionReader(): InputPartitionReader[Row] =
+    new RateStreamContinuousInputPartitionReader(
       startValue, startTimeMs, partitionIndex, increment, rowsPerSecond)
 }
 
-class RateStreamContinuousDataReader(
+class RateStreamContinuousInputPartitionReader(
     startValue: Long,
     startTimeMs: Long,
     partitionIndex: Int,
     increment: Long,
     rowsPerSecond: Double)
-  extends ContinuousDataReader[Row] {
+  extends ContinuousInputPartitionReader[Row] {
   private var nextReadTime: Long = startTimeMs
   private val readTimeIncrement: Long = (1000 / rowsPerSecond).toLong
 
index 6720cdd..daa2963 100644 (file)
@@ -33,7 +33,7 @@ import org.apache.spark.sql.catalyst.expressions.{Attribute, UnsafeRow}
 import org.apache.spark.sql.catalyst.plans.logical.{LeafNode, LogicalPlan, Statistics}
 import org.apache.spark.sql.catalyst.plans.logical.statsEstimation.EstimationUtils
 import org.apache.spark.sql.catalyst.streaming.InternalOutputModes._
-import org.apache.spark.sql.sources.v2.reader.{DataReader, DataReaderFactory, SupportsScanUnsafeRow}
+import org.apache.spark.sql.sources.v2.reader.{InputPartition, InputPartitionReader, SupportsScanUnsafeRow}
 import org.apache.spark.sql.sources.v2.reader.streaming.{MicroBatchReader, Offset => OffsetV2}
 import org.apache.spark.sql.streaming.OutputMode
 import org.apache.spark.sql.types.StructType
@@ -139,7 +139,7 @@ case class MemoryStream[A : Encoder](id: Int, sqlContext: SQLContext)
     if (endOffset.offset == -1) null else endOffset
   }
 
-  override def createUnsafeRowReaderFactories(): ju.List[DataReaderFactory[UnsafeRow]] = {
+  override def planUnsafeInputPartitions(): ju.List[InputPartition[UnsafeRow]] = {
     synchronized {
       // Compute the internal batch numbers to fetch: [startOrdinal, endOrdinal)
       val startOrdinal = startOffset.offset.toInt + 1
@@ -156,7 +156,7 @@ case class MemoryStream[A : Encoder](id: Int, sqlContext: SQLContext)
       logDebug(generateDebugString(newBlocks.flatten, startOrdinal, endOrdinal))
 
       newBlocks.map { block =>
-        new MemoryStreamDataReaderFactory(block).asInstanceOf[DataReaderFactory[UnsafeRow]]
+        new MemoryStreamDataReaderFactory(block).asInstanceOf[InputPartition[UnsafeRow]]
       }.asJava
     }
   }
@@ -202,9 +202,9 @@ case class MemoryStream[A : Encoder](id: Int, sqlContext: SQLContext)
 
 
 class MemoryStreamDataReaderFactory(records: Array[UnsafeRow])
-  extends DataReaderFactory[UnsafeRow] {
-  override def createDataReader(): DataReader[UnsafeRow] = {
-    new DataReader[UnsafeRow] {
+  extends InputPartition[UnsafeRow] {
+  override def createPartitionReader(): InputPartitionReader[UnsafeRow] = {
+    new InputPartitionReader[UnsafeRow] {
       private var currentIndex = -1
 
       override def next(): Boolean = {
index a8fca3c..fef792e 100644 (file)
@@ -34,8 +34,8 @@ import org.apache.spark.sql.{Encoder, Row, SQLContext}
 import org.apache.spark.sql.execution.streaming._
 import org.apache.spark.sql.execution.streaming.sources.ContinuousMemoryStream.GetRecord
 import org.apache.spark.sql.sources.v2.{ContinuousReadSupport, DataSourceOptions}
-import org.apache.spark.sql.sources.v2.reader.DataReaderFactory
-import org.apache.spark.sql.sources.v2.reader.streaming.{ContinuousDataReader, ContinuousReader, Offset, PartitionOffset}
+import org.apache.spark.sql.sources.v2.reader.InputPartition
+import org.apache.spark.sql.sources.v2.reader.streaming.{ContinuousInputPartitionReader, ContinuousReader, Offset, PartitionOffset}
 import org.apache.spark.sql.types.StructType
 import org.apache.spark.util.RpcUtils
 
@@ -99,7 +99,7 @@ class ContinuousMemoryStream[A : Encoder](id: Int, sqlContext: SQLContext)
     )
   }
 
-  override def createDataReaderFactories(): ju.List[DataReaderFactory[Row]] = {
+  override def planInputPartitions(): ju.List[InputPartition[Row]] = {
     synchronized {
       val endpointName = s"ContinuousMemoryStreamRecordEndpoint-${java.util.UUID.randomUUID()}-$id"
       endpointRef =
@@ -108,7 +108,7 @@ class ContinuousMemoryStream[A : Encoder](id: Int, sqlContext: SQLContext)
       startOffset.partitionNums.map {
         case (part, index) =>
           new ContinuousMemoryStreamDataReaderFactory(
-            endpointName, part, index): DataReaderFactory[Row]
+            endpointName, part, index): InputPartition[Row]
       }.toList.asJava
     }
   }
@@ -160,9 +160,9 @@ object ContinuousMemoryStream {
 class ContinuousMemoryStreamDataReaderFactory(
     driverEndpointName: String,
     partition: Int,
-    startOffset: Int) extends DataReaderFactory[Row] {
-  override def createDataReader: ContinuousMemoryStreamDataReader =
-    new ContinuousMemoryStreamDataReader(driverEndpointName, partition, startOffset)
+    startOffset: Int) extends InputPartition[Row] {
+  override def createPartitionReader: ContinuousMemoryStreamInputPartitionReader =
+    new ContinuousMemoryStreamInputPartitionReader(driverEndpointName, partition, startOffset)
 }
 
 /**
@@ -170,10 +170,10 @@ class ContinuousMemoryStreamDataReaderFactory(
  *
  * Polls the driver endpoint for new records.
  */
-class ContinuousMemoryStreamDataReader(
+class ContinuousMemoryStreamInputPartitionReader(
     driverEndpointName: String,
     partition: Int,
-    startOffset: Int) extends ContinuousDataReader[Row] {
+    startOffset: Int) extends ContinuousInputPartitionReader[Row] {
   private val endpoint = RpcUtils.makeDriverRef(
     driverEndpointName,
     SparkEnv.get.conf,
index f54291b..723cc3a 100644 (file)
@@ -134,7 +134,7 @@ class RateStreamMicroBatchReader(options: DataSourceOptions, checkpointLocation:
     LongOffset(json.toLong)
   }
 
-  override def createDataReaderFactories(): java.util.List[DataReaderFactory[Row]] = {
+  override def planInputPartitions(): java.util.List[InputPartition[Row]] = {
     val startSeconds = LongOffset.convert(start).map(_.offset).getOrElse(0L)
     val endSeconds = LongOffset.convert(end).map(_.offset).getOrElse(0L)
     assert(startSeconds <= endSeconds, s"startSeconds($startSeconds) > endSeconds($endSeconds)")
@@ -169,7 +169,7 @@ class RateStreamMicroBatchReader(options: DataSourceOptions, checkpointLocation:
     (0 until numPartitions).map { p =>
       new RateStreamMicroBatchDataReaderFactory(
         p, numPartitions, rangeStart, rangeEnd, localStartTimeMs, relativeMsPerValue)
-        : DataReaderFactory[Row]
+        : InputPartition[Row]
     }.toList.asJava
   }
 
@@ -188,19 +188,20 @@ class RateStreamMicroBatchDataReaderFactory(
     rangeStart: Long,
     rangeEnd: Long,
     localStartTimeMs: Long,
-    relativeMsPerValue: Double) extends DataReaderFactory[Row] {
+    relativeMsPerValue: Double) extends InputPartition[Row] {
 
-  override def createDataReader(): DataReader[Row] = new RateStreamMicroBatchDataReader(
-    partitionId, numPartitions, rangeStart, rangeEnd, localStartTimeMs, relativeMsPerValue)
+  override def createPartitionReader(): InputPartitionReader[Row] =
+    new RateStreamMicroBatchInputPartitionReader(partitionId, numPartitions, rangeStart, rangeEnd,
+      localStartTimeMs, relativeMsPerValue)
 }
 
-class RateStreamMicroBatchDataReader(
+class RateStreamMicroBatchInputPartitionReader(
     partitionId: Int,
     numPartitions: Int,
     rangeStart: Long,
     rangeEnd: Long,
     localStartTimeMs: Long,
-    relativeMsPerValue: Double) extends DataReader[Row] {
+    relativeMsPerValue: Double) extends InputPartitionReader[Row] {
   private var count = 0
 
   override def next(): Boolean = {
index 90f4a5b..8240e06 100644 (file)
@@ -33,7 +33,7 @@ import org.apache.spark.sql._
 import org.apache.spark.sql.execution.streaming.LongOffset
 import org.apache.spark.sql.sources.DataSourceRegister
 import org.apache.spark.sql.sources.v2.{DataSourceOptions, DataSourceV2, MicroBatchReadSupport}
-import org.apache.spark.sql.sources.v2.reader.{DataReader, DataReaderFactory}
+import org.apache.spark.sql.sources.v2.reader.{InputPartition, InputPartitionReader}
 import org.apache.spark.sql.sources.v2.reader.streaming.{MicroBatchReader, Offset}
 import org.apache.spark.sql.types.{StringType, StructField, StructType, TimestampType}
 
@@ -140,7 +140,7 @@ class TextSocketMicroBatchReader(options: DataSourceOptions) extends MicroBatchR
     }
   }
 
-  override def createDataReaderFactories(): JList[DataReaderFactory[Row]] = {
+  override def planInputPartitions(): JList[InputPartition[Row]] = {
     assert(startOffset != null && endOffset != null,
       "start offset and end offset should already be set before create read tasks.")
 
@@ -165,21 +165,22 @@ class TextSocketMicroBatchReader(options: DataSourceOptions) extends MicroBatchR
 
     (0 until numPartitions).map { i =>
       val slice = slices(i)
-      new DataReaderFactory[Row] {
-        override def createDataReader(): DataReader[Row] = new DataReader[Row] {
-          private var currentIdx = -1
+      new InputPartition[Row] {
+        override def createPartitionReader(): InputPartitionReader[Row] =
+          new InputPartitionReader[Row] {
+            private var currentIdx = -1
+
+            override def next(): Boolean = {
+              currentIdx += 1
+              currentIdx < slice.size
+            }
 
-          override def next(): Boolean = {
-            currentIdx += 1
-            currentIdx < slice.size
-          }
+            override def get(): Row = {
+              Row(slice(currentIdx)._1, slice(currentIdx)._2)
+            }
 
-          override def get(): Row = {
-            Row(slice(currentIdx)._1, slice(currentIdx)._2)
+            override def close(): Unit = {}
           }
-
-          override def close(): Unit = {}
-        }
       }
     }.toList.asJava
   }
index 172e5d5..714638e 100644 (file)
@@ -79,8 +79,8 @@ public class JavaAdvancedDataSourceV2 implements DataSourceV2, ReadSupport {
     }
 
     @Override
-    public List<DataReaderFactory<Row>> createDataReaderFactories() {
-      List<DataReaderFactory<Row>> res = new ArrayList<>();
+    public List<InputPartition<Row>> planInputPartitions() {
+      List<InputPartition<Row>> res = new ArrayList<>();
 
       Integer lowerBound = null;
       for (Filter filter : filters) {
@@ -94,33 +94,33 @@ public class JavaAdvancedDataSourceV2 implements DataSourceV2, ReadSupport {
       }
 
       if (lowerBound == null) {
-        res.add(new JavaAdvancedDataReaderFactory(0, 5, requiredSchema));
-        res.add(new JavaAdvancedDataReaderFactory(5, 10, requiredSchema));
+        res.add(new JavaAdvancedInputPartition(0, 5, requiredSchema));
+        res.add(new JavaAdvancedInputPartition(5, 10, requiredSchema));
       } else if (lowerBound < 4) {
-        res.add(new JavaAdvancedDataReaderFactory(lowerBound + 1, 5, requiredSchema));
-        res.add(new JavaAdvancedDataReaderFactory(5, 10, requiredSchema));
+        res.add(new JavaAdvancedInputPartition(lowerBound + 1, 5, requiredSchema));
+        res.add(new JavaAdvancedInputPartition(5, 10, requiredSchema));
       } else if (lowerBound < 9) {
-        res.add(new JavaAdvancedDataReaderFactory(lowerBound + 1, 10, requiredSchema));
+        res.add(new JavaAdvancedInputPartition(lowerBound + 1, 10, requiredSchema));
       }
 
       return res;
     }
   }
 
-  static class JavaAdvancedDataReaderFactory implements DataReaderFactory<Row>, DataReader<Row> {
+  static class JavaAdvancedInputPartition implements InputPartition<Row>, InputPartitionReader<Row> {
     private int start;
     private int end;
     private StructType requiredSchema;
 
-    JavaAdvancedDataReaderFactory(int start, int end, StructType requiredSchema) {
+    JavaAdvancedInputPartition(int start, int end, StructType requiredSchema) {
       this.start = start;
       this.end = end;
       this.requiredSchema = requiredSchema;
     }
 
     @Override
-    public DataReader<Row> createDataReader() {
-      return new JavaAdvancedDataReaderFactory(start - 1, end, requiredSchema);
+    public InputPartitionReader<Row> createPartitionReader() {
+      return new JavaAdvancedInputPartition(start - 1, end, requiredSchema);
     }
 
     @Override
index c550937..97d6176 100644 (file)
@@ -42,14 +42,14 @@ public class JavaBatchDataSourceV2 implements DataSourceV2, ReadSupport {
     }
 
     @Override
-    public List<DataReaderFactory<ColumnarBatch>> createBatchDataReaderFactories() {
+    public List<InputPartition<ColumnarBatch>> planBatchInputPartitions() {
       return java.util.Arrays.asList(
-               new JavaBatchDataReaderFactory(0, 50), new JavaBatchDataReaderFactory(50, 90));
+               new JavaBatchInputPartition(0, 50), new JavaBatchInputPartition(50, 90));
     }
   }
 
-  static class JavaBatchDataReaderFactory
-      implements DataReaderFactory<ColumnarBatch>, DataReader<ColumnarBatch> {
+  static class JavaBatchInputPartition
+      implements InputPartition<ColumnarBatch>, InputPartitionReader<ColumnarBatch> {
     private int start;
     private int end;
 
@@ -59,13 +59,13 @@ public class JavaBatchDataSourceV2 implements DataSourceV2, ReadSupport {
     private OnHeapColumnVector j;
     private ColumnarBatch batch;
 
-    JavaBatchDataReaderFactory(int start, int end) {
+    JavaBatchInputPartition(int start, int end) {
       this.start = start;
       this.end = end;
     }
 
     @Override
-    public DataReader<ColumnarBatch> createDataReader() {
+    public InputPartitionReader<ColumnarBatch> createPartitionReader() {
       this.i = new OnHeapColumnVector(BATCH_SIZE, DataTypes.IntegerType);
       this.j = new OnHeapColumnVector(BATCH_SIZE, DataTypes.IntegerType);
       ColumnVector[] vectors = new ColumnVector[2];
index 32fad59..e49c8cf 100644 (file)
@@ -43,10 +43,10 @@ public class JavaPartitionAwareDataSource implements DataSourceV2, ReadSupport {
     }
 
     @Override
-    public List<DataReaderFactory<Row>> createDataReaderFactories() {
+    public List<InputPartition<Row>> planInputPartitions() {
       return java.util.Arrays.asList(
-        new SpecificDataReaderFactory(new int[]{1, 1, 3}, new int[]{4, 4, 6}),
-        new SpecificDataReaderFactory(new int[]{2, 4, 4}, new int[]{6, 2, 2}));
+        new SpecificInputPartition(new int[]{1, 1, 3}, new int[]{4, 4, 6}),
+        new SpecificInputPartition(new int[]{2, 4, 4}, new int[]{6, 2, 2}));
     }
 
     @Override
@@ -73,12 +73,12 @@ public class JavaPartitionAwareDataSource implements DataSourceV2, ReadSupport {
     }
   }
 
-  static class SpecificDataReaderFactory implements DataReaderFactory<Row>, DataReader<Row> {
+  static class SpecificInputPartition implements InputPartition<Row>, InputPartitionReader<Row> {
     private int[] i;
     private int[] j;
     private int current = -1;
 
-    SpecificDataReaderFactory(int[] i, int[] j) {
+    SpecificInputPartition(int[] i, int[] j) {
       assert i.length == j.length;
       this.i = i;
       this.j = j;
@@ -101,7 +101,7 @@ public class JavaPartitionAwareDataSource implements DataSourceV2, ReadSupport {
     }
 
     @Override
-    public DataReader<Row> createDataReader() {
+    public InputPartitionReader<Row> createPartitionReader() {
       return this;
     }
   }
index 048d078..80eeffd 100644 (file)
@@ -24,7 +24,7 @@ import org.apache.spark.sql.sources.v2.DataSourceOptions;
 import org.apache.spark.sql.sources.v2.DataSourceV2;
 import org.apache.spark.sql.sources.v2.ReadSupportWithSchema;
 import org.apache.spark.sql.sources.v2.reader.DataSourceReader;
-import org.apache.spark.sql.sources.v2.reader.DataReaderFactory;
+import org.apache.spark.sql.sources.v2.reader.InputPartition;
 import org.apache.spark.sql.types.StructType;
 
 public class JavaSchemaRequiredDataSource implements DataSourceV2, ReadSupportWithSchema {
@@ -42,7 +42,7 @@ public class JavaSchemaRequiredDataSource implements DataSourceV2, ReadSupportWi
     }
 
     @Override
-    public List<DataReaderFactory<Row>> createDataReaderFactories() {
+    public List<InputPartition<Row>> planInputPartitions() {
       return java.util.Collections.emptyList();
     }
   }
index 96f55b8..8522a63 100644 (file)
@@ -25,8 +25,8 @@ import org.apache.spark.sql.catalyst.expressions.GenericRow;
 import org.apache.spark.sql.sources.v2.DataSourceV2;
 import org.apache.spark.sql.sources.v2.DataSourceOptions;
 import org.apache.spark.sql.sources.v2.ReadSupport;
-import org.apache.spark.sql.sources.v2.reader.DataReader;
-import org.apache.spark.sql.sources.v2.reader.DataReaderFactory;
+import org.apache.spark.sql.sources.v2.reader.InputPartitionReader;
+import org.apache.spark.sql.sources.v2.reader.InputPartition;
 import org.apache.spark.sql.sources.v2.reader.DataSourceReader;
 import org.apache.spark.sql.types.StructType;
 
@@ -41,25 +41,25 @@ public class JavaSimpleDataSourceV2 implements DataSourceV2, ReadSupport {
     }
 
     @Override
-    public List<DataReaderFactory<Row>> createDataReaderFactories() {
+    public List<InputPartition<Row>> planInputPartitions() {
       return java.util.Arrays.asList(
-        new JavaSimpleDataReaderFactory(0, 5),
-        new JavaSimpleDataReaderFactory(5, 10));
+        new JavaSimpleInputPartition(0, 5),
+        new JavaSimpleInputPartition(5, 10));
     }
   }
 
-  static class JavaSimpleDataReaderFactory implements DataReaderFactory<Row>, DataReader<Row> {
+  static class JavaSimpleInputPartition implements InputPartition<Row>, InputPartitionReader<Row> {
     private int start;
     private int end;
 
-    JavaSimpleDataReaderFactory(int start, int end) {
+    JavaSimpleInputPartition(int start, int end) {
       this.start = start;
       this.end = end;
     }
 
     @Override
-    public DataReader<Row> createDataReader() {
-      return new JavaSimpleDataReaderFactory(start - 1, end);
+    public InputPartitionReader<Row> createPartitionReader() {
+      return new JavaSimpleInputPartition(start - 1, end);
     }
 
     @Override
index c3916e0..3ad8e7a 100644 (file)
@@ -38,20 +38,20 @@ public class JavaUnsafeRowDataSourceV2 implements DataSourceV2, ReadSupport {
     }
 
     @Override
-    public List<DataReaderFactory<UnsafeRow>> createUnsafeRowReaderFactories() {
+    public List<InputPartition<UnsafeRow>> planUnsafeInputPartitions() {
       return java.util.Arrays.asList(
-        new JavaUnsafeRowDataReaderFactory(0, 5),
-        new JavaUnsafeRowDataReaderFactory(5, 10));
+        new JavaUnsafeRowInputPartition(0, 5),
+        new JavaUnsafeRowInputPartition(5, 10));
     }
   }
 
-  static class JavaUnsafeRowDataReaderFactory
-      implements DataReaderFactory<UnsafeRow>, DataReader<UnsafeRow> {
+  static class JavaUnsafeRowInputPartition
+      implements InputPartition<UnsafeRow>, InputPartitionReader<UnsafeRow> {
     private int start;
     private int end;
     private UnsafeRow row;
 
-    JavaUnsafeRowDataReaderFactory(int start, int end) {
+    JavaUnsafeRowInputPartition(int start, int end) {
       this.start = start;
       this.end = end;
       this.row = new UnsafeRow(2);
@@ -59,8 +59,8 @@ public class JavaUnsafeRowDataSourceV2 implements DataSourceV2, ReadSupport {
     }
 
     @Override
-    public DataReader<UnsafeRow> createDataReader() {
-      return new JavaUnsafeRowDataReaderFactory(start - 1, end);
+    public InputPartitionReader<UnsafeRow> createPartitionReader() {
+      return new JavaUnsafeRowInputPartition(start - 1, end);
     }
 
     @Override
index ff14ec3..39a010f 100644 (file)
@@ -142,9 +142,9 @@ class RateSourceSuite extends StreamTest {
     val startOffset = LongOffset(0L)
     val endOffset = LongOffset(1L)
     reader.setOffsetRange(Optional.of(startOffset), Optional.of(endOffset))
-    val tasks = reader.createDataReaderFactories()
+    val tasks = reader.planInputPartitions()
     assert(tasks.size == 1)
-    val dataReader = tasks.get(0).createDataReader()
+    val dataReader = tasks.get(0).createPartitionReader()
     val data = ArrayBuffer[Row]()
     while (dataReader.next()) {
       data.append(dataReader.get())
@@ -159,11 +159,11 @@ class RateSourceSuite extends StreamTest {
     val startOffset = LongOffset(0L)
     val endOffset = LongOffset(1L)
     reader.setOffsetRange(Optional.of(startOffset), Optional.of(endOffset))
-    val tasks = reader.createDataReaderFactories()
+    val tasks = reader.planInputPartitions()
     assert(tasks.size == 11)
 
     val readData = tasks.asScala
-      .map(_.createDataReader())
+      .map(_.createPartitionReader())
       .flatMap { reader =>
         val buf = scala.collection.mutable.ListBuffer[Row]()
         while (reader.next()) buf.append(reader.get())
@@ -304,7 +304,7 @@ class RateSourceSuite extends StreamTest {
     val reader = new RateStreamContinuousReader(
       new DataSourceOptions(Map("numPartitions" -> "2", "rowsPerSecond" -> "20").asJava))
     reader.setStartOffset(Optional.empty())
-    val tasks = reader.createDataReaderFactories()
+    val tasks = reader.planInputPartitions()
     assert(tasks.size == 2)
 
     val data = scala.collection.mutable.ListBuffer[Row]()
@@ -314,7 +314,7 @@ class RateSourceSuite extends StreamTest {
           .asInstanceOf[RateStreamOffset]
           .partitionToValueAndRunTimeMs(t.partitionIndex)
           .runTimeMs
-        val r = t.createDataReader().asInstanceOf[RateStreamContinuousDataReader]
+        val r = t.createPartitionReader().asInstanceOf[RateStreamContinuousInputPartitionReader]
         for (rowIndex <- 0 to 9) {
           r.next()
           data.append(r.get())
index e0a5327..505a3f3 100644 (file)
@@ -346,8 +346,8 @@ class SimpleSinglePartitionSource extends DataSourceV2 with ReadSupport {
   class Reader extends DataSourceReader {
     override def readSchema(): StructType = new StructType().add("i", "int").add("j", "int")
 
-    override def createDataReaderFactories(): JList[DataReaderFactory[Row]] = {
-      java.util.Arrays.asList(new SimpleDataReaderFactory(0, 5))
+    override def planInputPartitions(): JList[InputPartition[Row]] = {
+      java.util.Arrays.asList(new SimpleInputPartition(0, 5))
     }
   }
 
@@ -359,20 +359,21 @@ class SimpleDataSourceV2 extends DataSourceV2 with ReadSupport {
   class Reader extends DataSourceReader {
     override def readSchema(): StructType = new StructType().add("i", "int").add("j", "int")
 
-    override def createDataReaderFactories(): JList[DataReaderFactory[Row]] = {
-      java.util.Arrays.asList(new SimpleDataReaderFactory(0, 5), new SimpleDataReaderFactory(5, 10))
+    override def planInputPartitions(): JList[InputPartition[Row]] = {
+      java.util.Arrays.asList(new SimpleInputPartition(0, 5), new SimpleInputPartition(5, 10))
     }
   }
 
   override def createReader(options: DataSourceOptions): DataSourceReader = new Reader
 }
 
-class SimpleDataReaderFactory(start: Int, end: Int)
-  extends DataReaderFactory[Row]
-  with DataReader[Row] {
+class SimpleInputPartition(start: Int, end: Int)
+  extends InputPartition[Row]
+  with InputPartitionReader[Row] {
   private var current = start - 1
 
-  override def createDataReader(): DataReader[Row] = new SimpleDataReaderFactory(start, end)
+  override def createPartitionReader(): InputPartitionReader[Row] =
+    new SimpleInputPartition(start, end)
 
   override def next(): Boolean = {
     current += 1
@@ -413,21 +414,21 @@ class AdvancedDataSourceV2 extends DataSourceV2 with ReadSupport {
       requiredSchema
     }
 
-    override def createDataReaderFactories(): JList[DataReaderFactory[Row]] = {
+    override def planInputPartitions(): JList[InputPartition[Row]] = {
       val lowerBound = filters.collect {
         case GreaterThan("i", v: Int) => v
       }.headOption
 
-      val res = new ArrayList[DataReaderFactory[Row]]
+      val res = new ArrayList[InputPartition[Row]]
 
       if (lowerBound.isEmpty) {
-        res.add(new AdvancedDataReaderFactory(0, 5, requiredSchema))
-        res.add(new AdvancedDataReaderFactory(5, 10, requiredSchema))
+        res.add(new AdvancedInputPartition(0, 5, requiredSchema))
+        res.add(new AdvancedInputPartition(5, 10, requiredSchema))
       } else if (lowerBound.get < 4) {
-        res.add(new AdvancedDataReaderFactory(lowerBound.get + 1, 5, requiredSchema))
-        res.add(new AdvancedDataReaderFactory(5, 10, requiredSchema))
+        res.add(new AdvancedInputPartition(lowerBound.get + 1, 5, requiredSchema))
+        res.add(new AdvancedInputPartition(5, 10, requiredSchema))
       } else if (lowerBound.get < 9) {
-        res.add(new AdvancedDataReaderFactory(lowerBound.get + 1, 10, requiredSchema))
+        res.add(new AdvancedInputPartition(lowerBound.get + 1, 10, requiredSchema))
       }
 
       res
@@ -437,13 +438,13 @@ class AdvancedDataSourceV2 extends DataSourceV2 with ReadSupport {
   override def createReader(options: DataSourceOptions): DataSourceReader = new Reader
 }
 
-class AdvancedDataReaderFactory(start: Int, end: Int, requiredSchema: StructType)
-  extends DataReaderFactory[Row] with DataReader[Row] {
+class AdvancedInputPartition(start: Int, end: Int, requiredSchema: StructType)
+  extends InputPartition[Row] with InputPartitionReader[Row] {
 
   private var current = start - 1
 
-  override def createDataReader(): DataReader[Row] = {
-    new AdvancedDataReaderFactory(start, end, requiredSchema)
+  override def createPartitionReader(): InputPartitionReader[Row] = {
+    new AdvancedInputPartition(start, end, requiredSchema)
   }
 
   override def close(): Unit = {}
@@ -468,24 +469,24 @@ class UnsafeRowDataSourceV2 extends DataSourceV2 with ReadSupport {
   class Reader extends DataSourceReader with SupportsScanUnsafeRow {
     override def readSchema(): StructType = new StructType().add("i", "int").add("j", "int")
 
-    override def createUnsafeRowReaderFactories(): JList[DataReaderFactory[UnsafeRow]] = {
-      java.util.Arrays.asList(new UnsafeRowDataReaderFactory(0, 5),
-        new UnsafeRowDataReaderFactory(5, 10))
+    override def planUnsafeInputPartitions(): JList[InputPartition[UnsafeRow]] = {
+      java.util.Arrays.asList(new UnsafeRowInputPartitionReader(0, 5),
+        new UnsafeRowInputPartitionReader(5, 10))
     }
   }
 
   override def createReader(options: DataSourceOptions): DataSourceReader = new Reader
 }
 
-class UnsafeRowDataReaderFactory(start: Int, end: Int)
-  extends DataReaderFactory[UnsafeRow] with DataReader[UnsafeRow] {
+class UnsafeRowInputPartitionReader(start: Int, end: Int)
+  extends InputPartition[UnsafeRow] with InputPartitionReader[UnsafeRow] {
 
   private val row = new UnsafeRow(2)
   row.pointTo(new Array[Byte](8 * 3), 8 * 3)
 
   private var current = start - 1
 
-  override def createDataReader(): DataReader[UnsafeRow] = this
+  override def createPartitionReader(): InputPartitionReader[UnsafeRow] = this
 
   override def next(): Boolean = {
     current += 1
@@ -503,7 +504,7 @@ class UnsafeRowDataReaderFactory(start: Int, end: Int)
 class SchemaRequiredDataSource extends DataSourceV2 with ReadSupportWithSchema {
 
   class Reader(val readSchema: StructType) extends DataSourceReader {
-    override def createDataReaderFactories(): JList[DataReaderFactory[Row]] =
+    override def planInputPartitions(): JList[InputPartition[Row]] =
       java.util.Collections.emptyList()
   }
 
@@ -516,16 +517,17 @@ class BatchDataSourceV2 extends DataSourceV2 with ReadSupport {
   class Reader extends DataSourceReader with SupportsScanColumnarBatch {
     override def readSchema(): StructType = new StructType().add("i", "int").add("j", "int")
 
-    override def createBatchDataReaderFactories(): JList[DataReaderFactory[ColumnarBatch]] = {
-      java.util.Arrays.asList(new BatchDataReaderFactory(0, 50), new BatchDataReaderFactory(50, 90))
+    override def planBatchInputPartitions(): JList[InputPartition[ColumnarBatch]] = {
+      java.util.Arrays.asList(
+        new BatchInputPartitionReader(0, 50), new BatchInputPartitionReader(50, 90))
     }
   }
 
   override def createReader(options: DataSourceOptions): DataSourceReader = new Reader
 }
 
-class BatchDataReaderFactory(start: Int, end: Int)
-  extends DataReaderFactory[ColumnarBatch] with DataReader[ColumnarBatch] {
+class BatchInputPartitionReader(start: Int, end: Int)
+  extends InputPartition[ColumnarBatch] with InputPartitionReader[ColumnarBatch] {
 
   private final val BATCH_SIZE = 20
   private lazy val i = new OnHeapColumnVector(BATCH_SIZE, IntegerType)
@@ -534,7 +536,7 @@ class BatchDataReaderFactory(start: Int, end: Int)
 
   private var current = start
 
-  override def createDataReader(): DataReader[ColumnarBatch] = this
+  override def createPartitionReader(): InputPartitionReader[ColumnarBatch] = this
 
   override def next(): Boolean = {
     i.reset()
@@ -568,11 +570,11 @@ class PartitionAwareDataSource extends DataSourceV2 with ReadSupport {
   class Reader extends DataSourceReader with SupportsReportPartitioning {
     override def readSchema(): StructType = new StructType().add("a", "int").add("b", "int")
 
-    override def createDataReaderFactories(): JList[DataReaderFactory[Row]] = {
+    override def planInputPartitions(): JList[InputPartition[Row]] = {
       // Note that we don't have same value of column `a` across partitions.
       java.util.Arrays.asList(
-        new SpecificDataReaderFactory(Array(1, 1, 3), Array(4, 4, 6)),
-        new SpecificDataReaderFactory(Array(2, 4, 4), Array(6, 2, 2)))
+        new SpecificInputPartitionReader(Array(1, 1, 3), Array(4, 4, 6)),
+        new SpecificInputPartitionReader(Array(2, 4, 4), Array(6, 2, 2)))
     }
 
     override def outputPartitioning(): Partitioning = new MyPartitioning
@@ -590,14 +592,14 @@ class PartitionAwareDataSource extends DataSourceV2 with ReadSupport {
   override def createReader(options: DataSourceOptions): DataSourceReader = new Reader
 }
 
-class SpecificDataReaderFactory(i: Array[Int], j: Array[Int])
-  extends DataReaderFactory[Row]
-  with DataReader[Row] {
+class SpecificInputPartitionReader(i: Array[Int], j: Array[Int])
+  extends InputPartition[Row]
+  with InputPartitionReader[Row] {
   assert(i.length == j.length)
 
   private var current = -1
 
-  override def createDataReader(): DataReader[Row] = this
+  override def createPartitionReader(): InputPartitionReader[Row] = this
 
   override def next(): Boolean = {
     current += 1
index a5007fa..694bb3b 100644 (file)
@@ -28,7 +28,7 @@ import org.apache.hadoop.fs.{FileSystem, FSDataInputStream, Path}
 import org.apache.spark.SparkContext
 import org.apache.spark.sql.{Row, SaveMode}
 import org.apache.spark.sql.catalyst.InternalRow
-import org.apache.spark.sql.sources.v2.reader.{DataReader, DataReaderFactory, DataSourceReader}
+import org.apache.spark.sql.sources.v2.reader.{DataSourceReader, InputPartition, InputPartitionReader}
 import org.apache.spark.sql.sources.v2.writer._
 import org.apache.spark.sql.types.{DataType, StructType}
 import org.apache.spark.util.SerializableConfiguration
@@ -45,7 +45,7 @@ class SimpleWritableDataSource extends DataSourceV2 with ReadSupport with WriteS
   class Reader(path: String, conf: Configuration) extends DataSourceReader {
     override def readSchema(): StructType = schema
 
-    override def createDataReaderFactories(): JList[DataReaderFactory[Row]] = {
+    override def planInputPartitions(): JList[InputPartition[Row]] = {
       val dataPath = new Path(path)
       val fs = dataPath.getFileSystem(conf)
       if (fs.exists(dataPath)) {
@@ -54,9 +54,9 @@ class SimpleWritableDataSource extends DataSourceV2 with ReadSupport with WriteS
           name.startsWith("_") || name.startsWith(".")
         }.map { f =>
           val serializableConf = new SerializableConfiguration(conf)
-          new SimpleCSVDataReaderFactory(
+          new SimpleCSVInputPartitionReader(
             f.getPath.toUri.toString,
-            serializableConf): DataReaderFactory[Row]
+            serializableConf): InputPartition[Row]
         }.toList.asJava
       } else {
         Collections.emptyList()
@@ -156,14 +156,14 @@ class SimpleWritableDataSource extends DataSourceV2 with ReadSupport with WriteS
   }
 }
 
-class SimpleCSVDataReaderFactory(path: String, conf: SerializableConfiguration)
-  extends DataReaderFactory[Row] with DataReader[Row] {
+class SimpleCSVInputPartitionReader(path: String, conf: SerializableConfiguration)
+  extends InputPartition[Row] with InputPartitionReader[Row] {
 
   @transient private var lines: Iterator[String] = _
   @transient private var currentLine: String = _
   @transient private var inputStream: FSDataInputStream = _
 
-  override def createDataReader(): DataReader[Row] = {
+  override def createPartitionReader(): InputPartitionReader[Row] = {
     val filePath = new Path(path)
     val fs = filePath.getFileSystem(conf.value)
     inputStream = fs.open(filePath)
index 5798699..dcf6cb5 100644 (file)
@@ -35,7 +35,7 @@ import org.apache.spark.sql.execution.streaming._
 import org.apache.spark.sql.execution.streaming.sources.TestForeachWriter
 import org.apache.spark.sql.functions._
 import org.apache.spark.sql.internal.SQLConf
-import org.apache.spark.sql.sources.v2.reader.DataReaderFactory
+import org.apache.spark.sql.sources.v2.reader.InputPartition
 import org.apache.spark.sql.sources.v2.reader.streaming.{Offset => OffsetV2}
 import org.apache.spark.sql.streaming.util.{BlockingSource, MockSourceProvider, StreamManualClock}
 import org.apache.spark.sql.types.StructType
@@ -227,10 +227,10 @@ class StreamingQuerySuite extends StreamTest with BeforeAndAfter with Logging wi
       }
 
       // getBatch should take 100 ms the first time it is called
-      override def createUnsafeRowReaderFactories(): ju.List[DataReaderFactory[UnsafeRow]] = {
+      override def planUnsafeInputPartitions(): ju.List[InputPartition[UnsafeRow]] = {
         synchronized {
           clock.waitTillTime(1350)
-          super.createUnsafeRowReaderFactories()
+          super.planUnsafeInputPartitions()
         }
       }
     }
@@ -290,13 +290,14 @@ class StreamingQuerySuite extends StreamTest with BeforeAndAfter with Logging wi
 
       AdvanceManualClock(100), // time = 1150 to unblock getEndOffset
       AssertClockTime(1150),
-      AssertStreamExecThreadIsWaitingForTime(1350), // will block on createReadTasks that needs 1350
+      // will block on planInputPartitions that needs 1350
+      AssertStreamExecThreadIsWaitingForTime(1350),
       AssertOnQuery(_.status.isDataAvailable === true),
       AssertOnQuery(_.status.isTriggerActive === true),
       AssertOnQuery(_.status.message === "Processing new data"),
       AssertOnQuery(_.recentProgress.count(_.numInputRows > 0) === 0),
 
-      AdvanceManualClock(200), // time = 1350 to unblock createReadTasks
+      AdvanceManualClock(200), // time = 1350 to unblock planInputPartitions
       AssertClockTime(1350),
       AssertStreamExecThreadIsWaitingForTime(1500), // will block on map task that needs 1500
       AssertOnQuery(_.status.isDataAvailable === true),
index e755625..f47d3ec 100644 (file)
@@ -27,8 +27,8 @@ import org.apache.spark.{SparkEnv, SparkFunSuite, TaskContext}
 import org.apache.spark.rpc.{RpcEndpointRef, RpcEnv}
 import org.apache.spark.sql.catalyst.expressions.{GenericInternalRow, UnsafeProjection, UnsafeRow}
 import org.apache.spark.sql.execution.streaming.continuous._
-import org.apache.spark.sql.sources.v2.reader.DataReaderFactory
-import org.apache.spark.sql.sources.v2.reader.streaming.{ContinuousDataReader, ContinuousReader, PartitionOffset}
+import org.apache.spark.sql.sources.v2.reader.InputPartition
+import org.apache.spark.sql.sources.v2.reader.streaming.{ContinuousInputPartitionReader, ContinuousReader, PartitionOffset}
 import org.apache.spark.sql.sources.v2.writer.streaming.StreamWriter
 import org.apache.spark.sql.streaming.StreamTest
 import org.apache.spark.sql.types.{DataType, IntegerType}
@@ -72,8 +72,8 @@ class ContinuousQueuedDataReaderSuite extends StreamTest with MockitoSugar {
    */
   private def setup(): (BlockingQueue[UnsafeRow], ContinuousQueuedDataReader) = {
     val queue = new ArrayBlockingQueue[UnsafeRow](1024)
-    val factory = new DataReaderFactory[UnsafeRow] {
-      override def createDataReader() = new ContinuousDataReader[UnsafeRow] {
+    val factory = new InputPartition[UnsafeRow] {
+      override def createPartitionReader() = new ContinuousInputPartitionReader[UnsafeRow] {
         var index = -1
         var curr: UnsafeRow = _
 
index af4618b..c1a28b9 100644 (file)
@@ -26,7 +26,7 @@ import org.apache.spark.sql.execution.streaming.continuous.ContinuousTrigger
 import org.apache.spark.sql.internal.SQLConf
 import org.apache.spark.sql.sources.{DataSourceRegister, StreamSinkProvider}
 import org.apache.spark.sql.sources.v2.{ContinuousReadSupport, DataSourceOptions, MicroBatchReadSupport, StreamWriteSupport}
-import org.apache.spark.sql.sources.v2.reader.DataReaderFactory
+import org.apache.spark.sql.sources.v2.reader.InputPartition
 import org.apache.spark.sql.sources.v2.reader.streaming.{ContinuousReader, MicroBatchReader, Offset, PartitionOffset}
 import org.apache.spark.sql.sources.v2.writer.streaming.StreamWriter
 import org.apache.spark.sql.streaming.{OutputMode, StreamTest, Trigger}
@@ -44,7 +44,7 @@ case class FakeReader() extends MicroBatchReader with ContinuousReader {
   def mergeOffsets(offsets: Array[PartitionOffset]): Offset = RateStreamOffset(Map())
   def setStartOffset(start: Optional[Offset]): Unit = {}
 
-  def createDataReaderFactories(): java.util.ArrayList[DataReaderFactory[Row]] = {
+  def planInputPartitions(): java.util.ArrayList[InputPartition[Row]] = {
     throw new IllegalStateException("fake source - cannot actually read")
   }
 }