FLUME-2043. JMS Source removed on failure to create configuration
[flume.git] / flume-ng-sinks / flume-hdfs-sink / src / main / java / org / apache / flume / sink / hdfs / HDFSEventSink.java
1 /**
2  * Licensed to the Apache Software Foundation (ASF) under one
3  * or more contributor license agreements.  See the NOTICE file
4  * distributed with this work for additional information
5  * regarding copyright ownership.  The ASF licenses this file
6  * to you under the Apache License, Version 2.0 (the
7  * "License"); you may not use this file except in compliance
8  * with the License.  You may obtain a copy of the License at
9  *
10  *     http://www.apache.org/licenses/LICENSE-2.0
11  *
12  * Unless required by applicable law or agreed to in writing, software
13  * distributed under the License is distributed on an "AS IS" BASIS,
14  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15  * See the License for the specific language governing permissions and
16  * limitations under the License.
17  */
18
19 package org.apache.flume.sink.hdfs;
20
21 import java.io.File;
22 import java.io.IOException;
23 import java.util.ArrayList;
24 import java.util.Calendar;
25 import java.util.LinkedHashMap;
26 import java.util.List;
27 import java.util.TimeZone;
28 import java.util.Map.Entry;
29 import java.util.concurrent.ExecutorService;
30 import java.util.concurrent.Executors;
31 import java.util.concurrent.ScheduledExecutorService;
32 import java.util.concurrent.TimeUnit;
33 import java.util.concurrent.atomic.AtomicReference;
34
35 import com.google.common.annotations.VisibleForTesting;
36 import org.apache.flume.Channel;
37 import org.apache.flume.Clock;
38 import org.apache.flume.Context;
39 import org.apache.flume.Event;
40 import org.apache.flume.EventDeliveryException;
41 import org.apache.flume.SystemClock;
42 import org.apache.flume.Transaction;
43 import org.apache.flume.conf.Configurable;
44 import org.apache.flume.formatter.output.BucketPath;
45 import org.apache.flume.instrumentation.SinkCounter;
46 import org.apache.flume.sink.AbstractSink;
47 import org.apache.hadoop.conf.Configuration;
48 import org.apache.hadoop.io.SequenceFile.CompressionType;
49 import org.apache.hadoop.io.compress.CompressionCodec;
50 import org.apache.hadoop.io.compress.CompressionCodecFactory;
51 import org.apache.hadoop.security.SecurityUtil;
52 import org.apache.hadoop.security.UserGroupInformation;
53 import org.apache.hadoop.security.UserGroupInformation.AuthenticationMethod;
54 import org.slf4j.Logger;
55 import org.slf4j.LoggerFactory;
56
57 import com.google.common.base.Preconditions;
58 import com.google.common.collect.Lists;
59 import com.google.common.util.concurrent.ThreadFactoryBuilder;
60
61 public class HDFSEventSink extends AbstractSink implements Configurable {
62   public interface WriterCallback {
63     public void run(String filePath);
64   }
65
66   private static final Logger LOG = LoggerFactory
67       .getLogger(HDFSEventSink.class);
68
69   private static String DIRECTORY_DELIMITER = System.getProperty("file.separator");
70
71   private static final long defaultRollInterval = 30;
72   private static final long defaultRollSize = 1024;
73   private static final long defaultRollCount = 10;
74   private static final String defaultFileName = "FlumeData";
75   private static final String defaultSuffix = "";
76   private static final String defaultInUsePrefix = "";
77   private static final String defaultInUseSuffix = ".tmp";
78   private static final long defaultBatchSize = 100;
79   private static final String defaultFileType = HDFSWriterFactory.SequenceFileType;
80   private static final int defaultMaxOpenFiles = 5000;
81
82   /**
83    * Default length of time we wait for blocking BucketWriter calls
84    * before timing out the operation. Intended to prevent server hangs.
85    */
86   private static final long defaultCallTimeout = 10000;
87   /**
88    * Default number of threads available for tasks
89    * such as append/open/close/flush with hdfs.
90    * These tasks are done in a separate thread in
91    * the case that they take too long. In which
92    * case we create a new file and move on.
93    */
94   private static final int defaultThreadPoolSize = 10;
95   private static final int defaultRollTimerPoolSize = 1;
96
97   /**
98    * Singleton credential manager that manages static credentials for the
99    * entire JVM
100    */
101   private static final AtomicReference<KerberosUser> staticLogin
102       = new AtomicReference<KerberosUser>();
103
104   private final HDFSWriterFactory writerFactory;
105   private WriterLinkedHashMap sfWriters;
106
107   private long rollInterval;
108   private long rollSize;
109   private long rollCount;
110   private long batchSize;
111   private int threadsPoolSize;
112   private int rollTimerPoolSize;
113   private CompressionCodec codeC;
114   private CompressionType compType;
115   private String fileType;
116   private String filePath;
117   private String fileName;
118   private String suffix;
119   private String inUsePrefix;
120   private String inUseSuffix;
121   private TimeZone timeZone;
122   private int maxOpenFiles;
123   private ExecutorService callTimeoutPool;
124   private ScheduledExecutorService timedRollerPool;
125
126   private String kerbConfPrincipal;
127   private String kerbKeytab;
128   private String proxyUserName;
129   private UserGroupInformation proxyTicket;
130
131   private boolean needRounding = false;
132   private int roundUnit = Calendar.SECOND;
133   private int roundValue = 1;
134   private boolean useLocalTime = false;
135
136   private long callTimeout;
137   private Context context;
138   private SinkCounter sinkCounter;
139
140   private volatile int idleTimeout;
141   private Clock clock;
142
143   /*
144    * Extended Java LinkedHashMap for open file handle LRU queue.
145    * We want to clear the oldest file handle if there are too many open ones.
146    */
147   private static class WriterLinkedHashMap
148       extends LinkedHashMap<String, BucketWriter> {
149
150     private final int maxOpenFiles;
151
152     public WriterLinkedHashMap(int maxOpenFiles) {
153       super(16, 0.75f, true); // stock initial capacity/load, access ordering
154       this.maxOpenFiles = maxOpenFiles;
155     }
156
157     @Override
158     protected boolean removeEldestEntry(Entry<String, BucketWriter> eldest) {
159       if (size() > maxOpenFiles) {
160         // If we have more that max open files, then close the last one and
161         // return true
162         try {
163           eldest.getValue().close();
164         } catch (IOException e) {
165           LOG.warn(eldest.getKey().toString(), e);
166         } catch (InterruptedException e) {
167           LOG.warn(eldest.getKey().toString(), e);
168           Thread.currentThread().interrupt();
169         }
170         return true;
171       } else {
172         return false;
173       }
174     }
175   }
176
177   public HDFSEventSink() {
178     this(new HDFSWriterFactory());
179   }
180
181   public HDFSEventSink(HDFSWriterFactory writerFactory) {
182     this.writerFactory = writerFactory;
183   }
184
185   // read configuration and setup thresholds
186   @Override
187   public void configure(Context context) {
188     this.context = context;
189
190     filePath = Preconditions.checkNotNull(
191         context.getString("hdfs.path"), "hdfs.path is required");
192     fileName = context.getString("hdfs.filePrefix", defaultFileName);
193     this.suffix = context.getString("hdfs.fileSuffix", defaultSuffix);
194     inUsePrefix = context.getString("hdfs.inUsePrefix", defaultInUsePrefix);
195     inUseSuffix = context.getString("hdfs.inUseSuffix", defaultInUseSuffix);
196     String tzName = context.getString("hdfs.timeZone");
197     timeZone = tzName == null ? null : TimeZone.getTimeZone(tzName);
198     rollInterval = context.getLong("hdfs.rollInterval", defaultRollInterval);
199     rollSize = context.getLong("hdfs.rollSize", defaultRollSize);
200     rollCount = context.getLong("hdfs.rollCount", defaultRollCount);
201     batchSize = context.getLong("hdfs.batchSize", defaultBatchSize);
202     idleTimeout = context.getInteger("hdfs.idleTimeout", 0);
203     String codecName = context.getString("hdfs.codeC");
204     fileType = context.getString("hdfs.fileType", defaultFileType);
205     maxOpenFiles = context.getInteger("hdfs.maxOpenFiles", defaultMaxOpenFiles);
206     callTimeout = context.getLong("hdfs.callTimeout", defaultCallTimeout);
207     threadsPoolSize = context.getInteger("hdfs.threadsPoolSize",
208         defaultThreadPoolSize);
209     rollTimerPoolSize = context.getInteger("hdfs.rollTimerPoolSize",
210         defaultRollTimerPoolSize);
211     kerbConfPrincipal = context.getString("hdfs.kerberosPrincipal", "");
212     kerbKeytab = context.getString("hdfs.kerberosKeytab", "");
213     proxyUserName = context.getString("hdfs.proxyUser", "");
214
215     Preconditions.checkArgument(batchSize > 0,
216         "batchSize must be greater than 0");
217     if (codecName == null) {
218       codeC = null;
219       compType = CompressionType.NONE;
220     } else {
221       codeC = getCodec(codecName);
222       // TODO : set proper compression type
223       compType = CompressionType.BLOCK;
224     }
225
226     // Do not allow user to set fileType DataStream with codeC together
227     // To prevent output file with compress extension (like .snappy)
228     if(fileType.equalsIgnoreCase(HDFSWriterFactory.DataStreamType)
229         && codecName != null) {
230       throw new IllegalArgumentException("fileType: " + fileType +
231           " which does NOT support compressed output. Please don't set codeC" +
232           " or change the fileType if compressed output is desired.");
233     }
234
235     if(fileType.equalsIgnoreCase(HDFSWriterFactory.CompStreamType)) {
236       Preconditions.checkNotNull(codeC, "It's essential to set compress codec"
237           + " when fileType is: " + fileType);
238     }
239
240     if (!authenticate()) {
241       LOG.error("Failed to authenticate!");
242     }
243     needRounding = context.getBoolean("hdfs.round", false);
244
245     if(needRounding) {
246       String unit = context.getString("hdfs.roundUnit", "second");
247       if (unit.equalsIgnoreCase("hour")) {
248         this.roundUnit = Calendar.HOUR_OF_DAY;
249       } else if (unit.equalsIgnoreCase("minute")) {
250         this.roundUnit = Calendar.MINUTE;
251       } else if (unit.equalsIgnoreCase("second")){
252         this.roundUnit = Calendar.SECOND;
253       } else {
254         LOG.warn("Rounding unit is not valid, please set one of" +
255             "minute, hour, or second. Rounding will be disabled");
256         needRounding = false;
257       }
258       this.roundValue = context.getInteger("hdfs.roundValue", 1);
259       if(roundUnit == Calendar.SECOND || roundUnit == Calendar.MINUTE){
260         Preconditions.checkArgument(roundValue > 0 && roundValue <= 60,
261             "Round value" +
262             "must be > 0 and <= 60");
263       } else if (roundUnit == Calendar.HOUR_OF_DAY){
264         Preconditions.checkArgument(roundValue > 0 && roundValue <= 24,
265             "Round value" +
266             "must be > 0 and <= 24");
267       }
268     }
269
270     this.useLocalTime = context.getBoolean("hdfs.useLocalTimeStamp", false);
271     if(useLocalTime) {
272       clock = new SystemClock();
273     }
274
275     if (sinkCounter == null) {
276       sinkCounter = new SinkCounter(getName());
277     }
278   }
279
280   private static boolean codecMatches(Class<? extends CompressionCodec> cls,
281       String codecName) {
282     String simpleName = cls.getSimpleName();
283     if (cls.getName().equals(codecName)
284         || simpleName.equalsIgnoreCase(codecName)) {
285       return true;
286     }
287     if (simpleName.endsWith("Codec")) {
288       String prefix = simpleName.substring(0,
289           simpleName.length() - "Codec".length());
290       if (prefix.equalsIgnoreCase(codecName)) {
291         return true;
292       }
293     }
294     return false;
295   }
296
297   @VisibleForTesting
298   static CompressionCodec getCodec(String codecName) {
299     Configuration conf = new Configuration();
300     List<Class<? extends CompressionCodec>> codecs = CompressionCodecFactory
301         .getCodecClasses(conf);
302     // Wish we could base this on DefaultCodec but appears not all codec's
303     // extend DefaultCodec(Lzo)
304     CompressionCodec codec = null;
305     ArrayList<String> codecStrs = new ArrayList<String>();
306     codecStrs.add("None");
307     for (Class<? extends CompressionCodec> cls : codecs) {
308       codecStrs.add(cls.getSimpleName());
309       if (codecMatches(cls, codecName)) {
310         try {
311           codec = cls.newInstance();
312         } catch (InstantiationException e) {
313           LOG.error("Unable to instantiate " + cls + " class");
314         } catch (IllegalAccessException e) {
315           LOG.error("Unable to access " + cls + " class");
316         }
317       }
318     }
319
320     if (codec == null) {
321       if (!codecName.equalsIgnoreCase("None")) {
322         throw new IllegalArgumentException("Unsupported compression codec "
323             + codecName + ".  Please choose from: " + codecStrs);
324       }
325     } else if (codec instanceof org.apache.hadoop.conf.Configurable) {
326       // Must check instanceof codec as BZip2Codec doesn't inherit Configurable
327       // Must set the configuration for Configurable objects that may or do use
328       // native libs
329       ((org.apache.hadoop.conf.Configurable) codec).setConf(conf);
330     }
331     return codec;
332   }
333
334
335   /**
336    * Pull events out of channel and send it to HDFS. Take at most batchSize
337    * events per Transaction. Find the corresponding bucket for the event.
338    * Ensure the file is open. Serialize the data and write it to the file on
339    * HDFS. <br/>
340    * This method is not thread safe.
341    */
342   public Status process() throws EventDeliveryException {
343     Channel channel = getChannel();
344     Transaction transaction = channel.getTransaction();
345     List<BucketWriter> writers = Lists.newArrayList();
346     transaction.begin();
347     try {
348       int txnEventCount = 0;
349       for (txnEventCount = 0; txnEventCount < batchSize; txnEventCount++) {
350         Event event = channel.take();
351         if (event == null) {
352           break;
353         }
354
355         // reconstruct the path name by substituting place holders
356         String realPath = BucketPath.escapeString(filePath, event.getHeaders(),
357             timeZone, needRounding, roundUnit, roundValue, useLocalTime);
358         String realName = BucketPath.escapeString(fileName, event.getHeaders(),
359           timeZone, needRounding, roundUnit, roundValue, useLocalTime);
360
361         String lookupPath = realPath + DIRECTORY_DELIMITER + realName;
362         BucketWriter bucketWriter = sfWriters.get(lookupPath);
363
364         // we haven't seen this file yet, so open it and cache the handle
365         if (bucketWriter == null) {
366           HDFSWriter hdfsWriter = writerFactory.getWriter(fileType);
367
368           WriterCallback idleCallback = null;
369           if(idleTimeout != 0) {
370             idleCallback = new WriterCallback() {
371               @Override
372               public void run(String bucketPath) {
373                 sfWriters.remove(bucketPath);
374               }
375             };
376           }
377           bucketWriter = new BucketWriter(rollInterval, rollSize, rollCount,
378               batchSize, context, realPath, realName, inUsePrefix, inUseSuffix,
379               suffix, codeC, compType, hdfsWriter, timedRollerPool,
380               proxyTicket, sinkCounter, idleTimeout, idleCallback,
381               lookupPath, callTimeout, callTimeoutPool);
382
383           sfWriters.put(lookupPath, bucketWriter);
384         }
385
386         // track the buckets getting written in this transaction
387         if (!writers.contains(bucketWriter)) {
388           writers.add(bucketWriter);
389         }
390
391         // Write the data to HDFS
392         bucketWriter.append(event);
393       }
394
395       if (txnEventCount == 0) {
396         sinkCounter.incrementBatchEmptyCount();
397       } else if (txnEventCount == batchSize) {
398         sinkCounter.incrementBatchCompleteCount();
399       } else {
400         sinkCounter.incrementBatchUnderflowCount();
401       }
402
403       // flush all pending buckets before committing the transaction
404       for (BucketWriter bucketWriter : writers) {
405         bucketWriter.flush();
406       }
407
408       transaction.commit();
409
410       if (txnEventCount < 1) {
411         return Status.BACKOFF;
412       } else {
413         sinkCounter.addToEventDrainSuccessCount(txnEventCount);
414         return Status.READY;
415       }
416     } catch (IOException eIO) {
417       transaction.rollback();
418       LOG.warn("HDFS IO error", eIO);
419       return Status.BACKOFF;
420     } catch (Throwable th) {
421       transaction.rollback();
422       LOG.error("process failed", th);
423       if (th instanceof Error) {
424         throw (Error) th;
425       } else {
426         throw new EventDeliveryException(th);
427       }
428     } finally {
429       transaction.close();
430     }
431   }
432
433   @Override
434   public void stop() {
435     // do not constrain close() calls with a timeout
436     for (Entry<String, BucketWriter> entry : sfWriters.entrySet()) {
437       LOG.info("Closing {}", entry.getKey());
438
439       try {
440         entry.getValue().close();
441       } catch (Exception ex) {
442         LOG.warn("Exception while closing " + entry.getKey() + ". " +
443                 "Exception follows.", ex);
444         if (ex instanceof InterruptedException) {
445           Thread.currentThread().interrupt();
446         }
447       }
448     }
449
450     // shut down all our thread pools
451     ExecutorService toShutdown[] = {callTimeoutPool, timedRollerPool};
452     for (ExecutorService execService : toShutdown) {
453       execService.shutdown();
454       try {
455         while (execService.isTerminated() == false) {
456           execService.awaitTermination(
457                   Math.max(defaultCallTimeout, callTimeout), TimeUnit.MILLISECONDS);
458         }
459       } catch (InterruptedException ex) {
460         LOG.warn("shutdown interrupted on " + execService, ex);
461       }
462     }
463
464     callTimeoutPool = null;
465     timedRollerPool = null;
466
467     sfWriters.clear();
468     sfWriters = null;
469     sinkCounter.stop();
470     super.stop();
471   }
472
473   @Override
474   public void start() {
475     String timeoutName = "hdfs-" + getName() + "-call-runner-%d";
476     callTimeoutPool = Executors.newFixedThreadPool(threadsPoolSize,
477             new ThreadFactoryBuilder().setNameFormat(timeoutName).build());
478
479     String rollerName = "hdfs-" + getName() + "-roll-timer-%d";
480     timedRollerPool = Executors.newScheduledThreadPool(rollTimerPoolSize,
481             new ThreadFactoryBuilder().setNameFormat(rollerName).build());
482
483     this.sfWriters = new WriterLinkedHashMap(maxOpenFiles);
484     sinkCounter.start();
485     super.start();
486   }
487
488   private boolean authenticate() {
489
490     // logic for kerberos login
491     boolean useSecurity = UserGroupInformation.isSecurityEnabled();
492
493     LOG.info("Hadoop Security enabled: " + useSecurity);
494
495     if (useSecurity) {
496
497       // sanity checking
498       if (kerbConfPrincipal.isEmpty()) {
499         LOG.error("Hadoop running in secure mode, but Flume config doesn't "
500                 + "specify a principal to use for Kerberos auth.");
501         return false;
502       }
503       if (kerbKeytab.isEmpty()) {
504         LOG.error("Hadoop running in secure mode, but Flume config doesn't "
505                 + "specify a keytab to use for Kerberos auth.");
506         return false;
507       } else {
508         //If keytab is specified, user should want it take effect.
509         //HDFSEventSink will halt when keytab file is non-exist or unreadable
510         File kfile = new File(kerbKeytab);
511         if (!(kfile.isFile() && kfile.canRead())) {
512           throw new IllegalArgumentException("The keyTab file: "
513                   + kerbKeytab + " is nonexistent or can't read. "
514                   + "Please specify a readable keytab file for Kerberos auth.");
515         }
516       }
517
518       String principal;
519       try {
520         // resolves _HOST pattern using standard Hadoop search/replace
521         // via DNS lookup when 2nd argument is empty
522         principal = SecurityUtil.getServerPrincipal(kerbConfPrincipal, "");
523       } catch (IOException e) {
524         LOG.error("Host lookup error resolving kerberos principal ("
525                 + kerbConfPrincipal + "). Exception follows.", e);
526         return false;
527       }
528
529       Preconditions.checkNotNull(principal, "Principal must not be null");
530       KerberosUser prevUser = staticLogin.get();
531       KerberosUser newUser = new KerberosUser(principal, kerbKeytab);
532
533       // be cruel and unusual when user tries to login as multiple principals
534       // this isn't really valid with a reconfigure but this should be rare
535       // enough to warrant a restart of the agent JVM
536       // TODO: find a way to interrogate the entire current config state,
537       // since we don't have to be unnecessarily protective if they switch all
538       // HDFS sinks to use a different principal all at once.
539       Preconditions.checkState(prevUser == null || prevUser.equals(newUser),
540               "Cannot use multiple kerberos principals in the same agent. " +
541                       " Must restart agent to use new principal or keytab. " +
542                       "Previous = %s, New = %s", prevUser, newUser);
543
544       // attempt to use cached credential if the user is the same
545       // this is polite and should avoid flooding the KDC with auth requests
546       UserGroupInformation curUser = null;
547       if (prevUser != null && prevUser.equals(newUser)) {
548         try {
549           curUser = UserGroupInformation.getLoginUser();
550         } catch (IOException e) {
551           LOG.warn("User unexpectedly had no active login. Continuing with " +
552                   "authentication", e);
553         }
554       }
555
556       if (curUser == null || !curUser.getUserName().equals(principal)) {
557         try {
558           // static login
559           kerberosLogin(this, principal, kerbKeytab);
560         } catch (IOException e) {
561           LOG.error("Authentication or file read error while attempting to "
562                   + "login as kerberos principal (" + principal + ") using "
563                   + "keytab (" + kerbKeytab + "). Exception follows.", e);
564           return false;
565         }
566       } else {
567         LOG.debug("{}: Using existing principal login: {}", this, curUser);
568       }
569
570       // we supposedly got through this unscathed... so store the static user
571       staticLogin.set(newUser);
572     }
573
574     // hadoop impersonation works with or without kerberos security
575     proxyTicket = null;
576     if (!proxyUserName.isEmpty()) {
577       try {
578         proxyTicket = UserGroupInformation.createProxyUser(
579                 proxyUserName, UserGroupInformation.getLoginUser());
580       } catch (IOException e) {
581         LOG.error("Unable to login as proxy user. Exception follows.", e);
582         return false;
583       }
584     }
585
586     UserGroupInformation ugi = null;
587     if (proxyTicket != null) {
588       ugi = proxyTicket;
589     } else if (useSecurity) {
590       try {
591         ugi = UserGroupInformation.getLoginUser();
592       } catch (IOException e) {
593         LOG.error("Unexpected error: Unable to get authenticated user after " +
594                 "apparent successful login! Exception follows.", e);
595         return false;
596       }
597     }
598
599     if (ugi != null) {
600       // dump login information
601       AuthenticationMethod authMethod = ugi.getAuthenticationMethod();
602       LOG.info("Auth method: {}", authMethod);
603       LOG.info(" User name: {}", ugi.getUserName());
604       LOG.info(" Using keytab: {}", ugi.isFromKeytab());
605       if (authMethod == AuthenticationMethod.PROXY) {
606         UserGroupInformation superUser;
607         try {
608           superUser = UserGroupInformation.getLoginUser();
609           LOG.info(" Superuser auth: {}", superUser.getAuthenticationMethod());
610           LOG.info(" Superuser name: {}", superUser.getUserName());
611           LOG.info(" Superuser using keytab: {}", superUser.isFromKeytab());
612         } catch (IOException e) {
613           LOG.error("Unexpected error: unknown superuser impersonating proxy.",
614                   e);
615           return false;
616         }
617       }
618
619       LOG.info("Logged in as user {}", ugi.getUserName());
620
621       return true;
622     }
623
624     return true;
625   }
626
627   /**
628    * Static synchronized method for static Kerberos login. <br/>
629    * Static synchronized due to a thundering herd problem when multiple Sinks
630    * attempt to log in using the same principal at the same time with the
631    * intention of impersonating different users (or even the same user).
632    * If this is not controlled, MIT Kerberos v5 believes it is seeing a replay
633    * attach and it returns:
634    * <blockquote>Request is a replay (34) - PROCESS_TGS</blockquote>
635    * In addition, since the underlying Hadoop APIs we are using for
636    * impersonation are static, we define this method as static as well.
637    *
638    * @param principal
639    *         Fully-qualified principal to use for authentication.
640    * @param keytab
641    *         Location of keytab file containing credentials for principal.
642    * @return Logged-in user
643    * @throws IOException
644    *         if login fails.
645    */
646   private static synchronized UserGroupInformation kerberosLogin(
647           HDFSEventSink sink, String principal, String keytab) throws IOException {
648
649     // if we are the 2nd user thru the lock, the login should already be
650     // available statically if login was successful
651     UserGroupInformation curUser = null;
652     try {
653       curUser = UserGroupInformation.getLoginUser();
654     } catch (IOException e) {
655       // not a big deal but this shouldn't typically happen because it will
656       // generally fall back to the UNIX user
657       LOG.debug("Unable to get login user before Kerberos auth attempt.", e);
658     }
659
660     // we already have logged in successfully
661     if (curUser != null && curUser.getUserName().equals(principal)) {
662       LOG.debug("{}: Using existing principal ({}): {}",
663               new Object[]{sink, principal, curUser});
664
665       // no principal found
666     } else {
667
668       LOG.info("{}: Attempting kerberos login as principal ({}) from keytab " +
669               "file ({})", new Object[]{sink, principal, keytab});
670
671       // attempt static kerberos login
672       UserGroupInformation.loginUserFromKeytab(principal, keytab);
673       curUser = UserGroupInformation.getLoginUser();
674     }
675
676     return curUser;
677   }
678
679   @Override
680   public String toString() {
681     return "{ Sink type:" + getClass().getSimpleName() + ", name:" + getName() +
682             " }";
683   }
684
685   @VisibleForTesting
686   void setBucketClock(Clock clock) {
687     BucketPath.setClock(clock);
688   }
689 }