SAMZA-1622: avro writer to support generic record
authorHai Lu <halu@linkedin.com>
Fri, 16 Mar 2018 16:22:01 +0000 (09:22 -0700)
committerxiliu <xiliu@linkedin.com>
Fri, 16 Mar 2018 16:22:01 +0000 (09:22 -0700)
avro writer in HDFS system producer to support generic record

Author: Hai Lu <halu@linkedin.com>

Reviewers: Xinyu Liu <xinyuliu.us@gmail.com>

Closes #449 from lhaiesp/master

samza-hdfs/src/main/scala/org/apache/samza/system/hdfs/writer/AvroDataFileHdfsWriter.scala

index c3da611..f1868cd 100644 (file)
 package org.apache.samza.system.hdfs.writer
 
 import org.apache.avro.file.{CodecFactory, DataFileWriter}
+import org.apache.avro.generic.{GenericDatumWriter, GenericRecord}
 import org.apache.avro.reflect.{ReflectData, ReflectDatumWriter}
 import org.apache.hadoop.fs.FileSystem
 import org.apache.hadoop.io.IOUtils
-import org.apache.hadoop.io.compress.{DefaultCodec, GzipCodec, SnappyCodec}
 import org.apache.samza.system.OutgoingMessageEnvelope
 import org.apache.samza.system.hdfs.HdfsConfig
 
@@ -67,11 +67,17 @@ class AvroDataFileHdfsWriter (dfs: FileSystem, systemName: String, config: HdfsC
 
   protected def getNextWriter(record: Object): Option[DataFileWriter[Object]] = {
     val path = bucketer.get.getNextWritePath(dfs)
-    val schema = ReflectData.get().getSchema(record.getClass)
-    val datumWriter = new ReflectDatumWriter[Object](schema)
+    val isGenericRecord = record.isInstanceOf[GenericRecord]
+    val schema = record match {
+      case genericRecord: GenericRecord => genericRecord.getSchema
+      case _ => ReflectData.get().getSchema(record.getClass)
+    }
+    val datumWriter = if (isGenericRecord)
+      new GenericDatumWriter[Object](schema)
+    else new ReflectDatumWriter[Object](schema)
     val fileWriter = new DataFileWriter[Object](datumWriter)
     val cn = config.getCompressionType(systemName)
-    if (cn != "none") fileWriter.setCodec(CodecFactory.fromString(cn))
+    if (!cn.equals("none")) fileWriter.setCodec(CodecFactory.fromString(cn))
     Some(fileWriter.create(schema, dfs.create(path)))
   }