[MINOR] Add logging for EventHubs configs
authorJagadish <jvenkatraman@linkedin.com>
Wed, 30 May 2018 22:51:08 +0000 (15:51 -0700)
committerJagadish <jvenkatraman@linkedin.com>
Wed, 30 May 2018 22:51:08 +0000 (15:51 -0700)
prateekm for review

Author: Jagadish <jvenkatraman@linkedin.com>

Reviewers: Prateek M<pmaheshw@linkedin.com>

Closes #540 from vjagadish1989/eh-logging

samza-azure/src/main/java/org/apache/samza/system/eventhub/EventHubConfig.java
samza-core/src/main/java/org/apache/samza/execution/ExecutionPlanner.java

index e26d47c..61f823c 100644 (file)
@@ -26,6 +26,8 @@ import org.apache.samza.config.Config;
 import org.apache.samza.config.MapConfig;
 import org.apache.samza.config.StreamConfig;
 import org.apache.samza.system.eventhub.producer.EventHubSystemProducer;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 import scala.collection.JavaConversions;
 
 import java.time.Duration;
@@ -85,13 +87,21 @@ public class EventHubConfig extends MapConfig {
 
   private final Map<String, String> physcialToId = new HashMap<>();
 
+  private static final Logger LOG = LoggerFactory.getLogger(EventHubConfig.class);
+
   public EventHubConfig(Config config) {
     super(config);
 
     // Build reverse index for streamName -> streamId
     StreamConfig streamConfig = new StreamConfig(config);
+
+    LOG.info("Building mappings from physicalName to streamId");
     JavaConversions.asJavaCollection(streamConfig.getStreamIds())
-            .forEach((streamId) -> physcialToId.put(streamConfig.getPhysicalName(streamId), streamId));
+        .forEach((streamId) -> {
+            String physicalName = streamConfig.getPhysicalName(streamId);
+            LOG.info("Obtained physicalName: {} for streamId: {} ", physicalName, streamId);
+            physcialToId.put(physicalName, streamId);
+          });
   }
 
   private String getFromStreamIdOrName(String configName, String streamName, String defaultString) {
@@ -145,6 +155,7 @@ public class EventHubConfig extends MapConfig {
    * @return EventHubs namespace
    */
   public String getStreamNamespace(String systemName, String streamName) {
+    LOG.info("Obtaining name-space for system: {} physical name: {}", systemName, streamName);
     return validateRequiredConfig(getFromStreamIdOrName(CONFIG_STREAM_NAMESPACE, streamName),
             "Namespace", systemName, streamName);
   }
@@ -157,6 +168,7 @@ public class EventHubConfig extends MapConfig {
    * @return EventHubs entity path
    */
   public String getStreamEntityPath(String systemName, String streamName) {
+    LOG.info("Obtaining entity-path for system: {} physical name: {}", systemName, streamName);
     return validateRequiredConfig(getFromStreamIdOrName(CONFIG_STREAM_ENTITYPATH, streamName),
             "EntityPath", systemName, streamName);
   }
index 9d8bd5f..48f939c 100644 (file)
@@ -164,7 +164,7 @@ public class ExecutionPlanner {
       // set the partitions of a stream to its StreamEdge
       streamToPartitionCount.forEach((stream, partitionCount) -> {
           streamToStreamEdge.get(stream).setPartitionCount(partitionCount);
-          log.debug("Partition count is {} for stream {}", partitionCount, stream);
+          log.info("Partition count is {} for stream {}", partitionCount, stream);
         });
     }
   }