FLUME-2043. JMS Source removed on failure to create configuration
[flume.git] / flume-ng-sinks / flume-hdfs-sink / src / main / java / org / apache / flume / sink / hdfs / HDFSEventSink.java
1 /**
2 * Licensed to the Apache Software Foundation (ASF) under one
3 * or more contributor license agreements. See the NOTICE file
4 * distributed with this work for additional information
5 * regarding copyright ownership. The ASF licenses this file
6 * to you under the Apache License, Version 2.0 (the
7 * "License"); you may not use this file except in compliance
8 * with the License. You may obtain a copy of the License at
9 *
10 * http://www.apache.org/licenses/LICENSE-2.0
11 *
12 * Unless required by applicable law or agreed to in writing, software
13 * distributed under the License is distributed on an "AS IS" BASIS,
14 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15 * See the License for the specific language governing permissions and
16 * limitations under the License.
17 */
18
19 package org.apache.flume.sink.hdfs;
20
21 import java.io.File;
22 import java.io.IOException;
23 import java.util.ArrayList;
24 import java.util.Calendar;
25 import java.util.LinkedHashMap;
26 import java.util.List;
27 import java.util.TimeZone;
28 import java.util.Map.Entry;
29 import java.util.concurrent.ExecutorService;
30 import java.util.concurrent.Executors;
31 import java.util.concurrent.ScheduledExecutorService;
32 import java.util.concurrent.TimeUnit;
33 import java.util.concurrent.atomic.AtomicReference;
34
35 import com.google.common.annotations.VisibleForTesting;
36 import org.apache.flume.Channel;
37 import org.apache.flume.Clock;
38 import org.apache.flume.Context;
39 import org.apache.flume.Event;
40 import org.apache.flume.EventDeliveryException;
41 import org.apache.flume.SystemClock;
42 import org.apache.flume.Transaction;
43 import org.apache.flume.conf.Configurable;
44 import org.apache.flume.formatter.output.BucketPath;
45 import org.apache.flume.instrumentation.SinkCounter;
46 import org.apache.flume.sink.AbstractSink;
47 import org.apache.hadoop.conf.Configuration;
48 import org.apache.hadoop.io.SequenceFile.CompressionType;
49 import org.apache.hadoop.io.compress.CompressionCodec;
50 import org.apache.hadoop.io.compress.CompressionCodecFactory;
51 import org.apache.hadoop.security.SecurityUtil;
52 import org.apache.hadoop.security.UserGroupInformation;
53 import org.apache.hadoop.security.UserGroupInformation.AuthenticationMethod;
54 import org.slf4j.Logger;
55 import org.slf4j.LoggerFactory;
56
57 import com.google.common.base.Preconditions;
58 import com.google.common.collect.Lists;
59 import com.google.common.util.concurrent.ThreadFactoryBuilder;
60
61 public class HDFSEventSink extends AbstractSink implements Configurable {
62 public interface WriterCallback {
63 public void run(String filePath);
64 }
65
66 private static final Logger LOG = LoggerFactory
67 .getLogger(HDFSEventSink.class);
68
69 private static String DIRECTORY_DELIMITER = System.getProperty("file.separator");
70
71 private static final long defaultRollInterval = 30;
72 private static final long defaultRollSize = 1024;
73 private static final long defaultRollCount = 10;
74 private static final String defaultFileName = "FlumeData";
75 private static final String defaultSuffix = "";
76 private static final String defaultInUsePrefix = "";
77 private static final String defaultInUseSuffix = ".tmp";
78 private static final long defaultBatchSize = 100;
79 private static final String defaultFileType = HDFSWriterFactory.SequenceFileType;
80 private static final int defaultMaxOpenFiles = 5000;
81
82 /**
83 * Default length of time we wait for blocking BucketWriter calls
84 * before timing out the operation. Intended to prevent server hangs.
85 */
86 private static final long defaultCallTimeout = 10000;
87 /**
88 * Default number of threads available for tasks
89 * such as append/open/close/flush with hdfs.
90 * These tasks are done in a separate thread in
91 * the case that they take too long. In which
92 * case we create a new file and move on.
93 */
94 private static final int defaultThreadPoolSize = 10;
95 private static final int defaultRollTimerPoolSize = 1;
96
97 /**
98 * Singleton credential manager that manages static credentials for the
99 * entire JVM
100 */
101 private static final AtomicReference<KerberosUser> staticLogin
102 = new AtomicReference<KerberosUser>();
103
104 private final HDFSWriterFactory writerFactory;
105 private WriterLinkedHashMap sfWriters;
106
107 private long rollInterval;
108 private long rollSize;
109 private long rollCount;
110 private long batchSize;
111 private int threadsPoolSize;
112 private int rollTimerPoolSize;
113 private CompressionCodec codeC;
114 private CompressionType compType;
115 private String fileType;
116 private String filePath;
117 private String fileName;
118 private String suffix;
119 private String inUsePrefix;
120 private String inUseSuffix;
121 private TimeZone timeZone;
122 private int maxOpenFiles;
123 private ExecutorService callTimeoutPool;
124 private ScheduledExecutorService timedRollerPool;
125
126 private String kerbConfPrincipal;
127 private String kerbKeytab;
128 private String proxyUserName;
129 private UserGroupInformation proxyTicket;
130
131 private boolean needRounding = false;
132 private int roundUnit = Calendar.SECOND;
133 private int roundValue = 1;
134 private boolean useLocalTime = false;
135
136 private long callTimeout;
137 private Context context;
138 private SinkCounter sinkCounter;
139
140 private volatile int idleTimeout;
141 private Clock clock;
142
143 /*
144 * Extended Java LinkedHashMap for open file handle LRU queue.
145 * We want to clear the oldest file handle if there are too many open ones.
146 */
147 private static class WriterLinkedHashMap
148 extends LinkedHashMap<String, BucketWriter> {
149
150 private final int maxOpenFiles;
151
152 public WriterLinkedHashMap(int maxOpenFiles) {
153 super(16, 0.75f, true); // stock initial capacity/load, access ordering
154 this.maxOpenFiles = maxOpenFiles;
155 }
156
157 @Override
158 protected boolean removeEldestEntry(Entry<String, BucketWriter> eldest) {
159 if (size() > maxOpenFiles) {
160 // If we have more that max open files, then close the last one and
161 // return true
162 try {
163 eldest.getValue().close();
164 } catch (IOException e) {
165 LOG.warn(eldest.getKey().toString(), e);
166 } catch (InterruptedException e) {
167 LOG.warn(eldest.getKey().toString(), e);
168 Thread.currentThread().interrupt();
169 }
170 return true;
171 } else {
172 return false;
173 }
174 }
175 }
176
177 public HDFSEventSink() {
178 this(new HDFSWriterFactory());
179 }
180
181 public HDFSEventSink(HDFSWriterFactory writerFactory) {
182 this.writerFactory = writerFactory;
183 }
184
185 // read configuration and setup thresholds
186 @Override
187 public void configure(Context context) {
188 this.context = context;
189
190 filePath = Preconditions.checkNotNull(
191 context.getString("hdfs.path"), "hdfs.path is required");
192 fileName = context.getString("hdfs.filePrefix", defaultFileName);
193 this.suffix = context.getString("hdfs.fileSuffix", defaultSuffix);
194 inUsePrefix = context.getString("hdfs.inUsePrefix", defaultInUsePrefix);
195 inUseSuffix = context.getString("hdfs.inUseSuffix", defaultInUseSuffix);
196 String tzName = context.getString("hdfs.timeZone");
197 timeZone = tzName == null ? null : TimeZone.getTimeZone(tzName);
198 rollInterval = context.getLong("hdfs.rollInterval", defaultRollInterval);
199 rollSize = context.getLong("hdfs.rollSize", defaultRollSize);
200 rollCount = context.getLong("hdfs.rollCount", defaultRollCount);
201 batchSize = context.getLong("hdfs.batchSize", defaultBatchSize);
202 idleTimeout = context.getInteger("hdfs.idleTimeout", 0);
203 String codecName = context.getString("hdfs.codeC");
204 fileType = context.getString("hdfs.fileType", defaultFileType);
205 maxOpenFiles = context.getInteger("hdfs.maxOpenFiles", defaultMaxOpenFiles);
206 callTimeout = context.getLong("hdfs.callTimeout", defaultCallTimeout);
207 threadsPoolSize = context.getInteger("hdfs.threadsPoolSize",
208 defaultThreadPoolSize);
209 rollTimerPoolSize = context.getInteger("hdfs.rollTimerPoolSize",
210 defaultRollTimerPoolSize);
211 kerbConfPrincipal = context.getString("hdfs.kerberosPrincipal", "");
212 kerbKeytab = context.getString("hdfs.kerberosKeytab", "");
213 proxyUserName = context.getString("hdfs.proxyUser", "");
214
215 Preconditions.checkArgument(batchSize > 0,
216 "batchSize must be greater than 0");
217 if (codecName == null) {
218 codeC = null;
219 compType = CompressionType.NONE;
220 } else {
221 codeC = getCodec(codecName);
222 // TODO : set proper compression type
223 compType = CompressionType.BLOCK;
224 }
225
226 // Do not allow user to set fileType DataStream with codeC together
227 // To prevent output file with compress extension (like .snappy)
228 if(fileType.equalsIgnoreCase(HDFSWriterFactory.DataStreamType)
229 && codecName != null) {
230 throw new IllegalArgumentException("fileType: " + fileType +
231 " which does NOT support compressed output. Please don't set codeC" +
232 " or change the fileType if compressed output is desired.");
233 }
234
235 if(fileType.equalsIgnoreCase(HDFSWriterFactory.CompStreamType)) {
236 Preconditions.checkNotNull(codeC, "It's essential to set compress codec"
237 + " when fileType is: " + fileType);
238 }
239
240 if (!authenticate()) {
241 LOG.error("Failed to authenticate!");
242 }
243 needRounding = context.getBoolean("hdfs.round", false);
244
245 if(needRounding) {
246 String unit = context.getString("hdfs.roundUnit", "second");
247 if (unit.equalsIgnoreCase("hour")) {
248 this.roundUnit = Calendar.HOUR_OF_DAY;
249 } else if (unit.equalsIgnoreCase("minute")) {
250 this.roundUnit = Calendar.MINUTE;
251 } else if (unit.equalsIgnoreCase("second")){
252 this.roundUnit = Calendar.SECOND;
253 } else {
254 LOG.warn("Rounding unit is not valid, please set one of" +
255 "minute, hour, or second. Rounding will be disabled");
256 needRounding = false;
257 }
258 this.roundValue = context.getInteger("hdfs.roundValue", 1);
259 if(roundUnit == Calendar.SECOND || roundUnit == Calendar.MINUTE){
260 Preconditions.checkArgument(roundValue > 0 && roundValue <= 60,
261 "Round value" +
262 "must be > 0 and <= 60");
263 } else if (roundUnit == Calendar.HOUR_OF_DAY){
264 Preconditions.checkArgument(roundValue > 0 && roundValue <= 24,
265 "Round value" +
266 "must be > 0 and <= 24");
267 }
268 }
269
270 this.useLocalTime = context.getBoolean("hdfs.useLocalTimeStamp", false);
271 if(useLocalTime) {
272 clock = new SystemClock();
273 }
274
275 if (sinkCounter == null) {
276 sinkCounter = new SinkCounter(getName());
277 }
278 }
279
280 private static boolean codecMatches(Class<? extends CompressionCodec> cls,
281 String codecName) {
282 String simpleName = cls.getSimpleName();
283 if (cls.getName().equals(codecName)
284 || simpleName.equalsIgnoreCase(codecName)) {
285 return true;
286 }
287 if (simpleName.endsWith("Codec")) {
288 String prefix = simpleName.substring(0,
289 simpleName.length() - "Codec".length());
290 if (prefix.equalsIgnoreCase(codecName)) {
291 return true;
292 }
293 }
294 return false;
295 }
296
297 @VisibleForTesting
298 static CompressionCodec getCodec(String codecName) {
299 Configuration conf = new Configuration();
300 List<Class<? extends CompressionCodec>> codecs = CompressionCodecFactory
301 .getCodecClasses(conf);
302 // Wish we could base this on DefaultCodec but appears not all codec's
303 // extend DefaultCodec(Lzo)
304 CompressionCodec codec = null;
305 ArrayList<String> codecStrs = new ArrayList<String>();
306 codecStrs.add("None");
307 for (Class<? extends CompressionCodec> cls : codecs) {
308 codecStrs.add(cls.getSimpleName());
309 if (codecMatches(cls, codecName)) {
310 try {
311 codec = cls.newInstance();
312 } catch (InstantiationException e) {
313 LOG.error("Unable to instantiate " + cls + " class");
314 } catch (IllegalAccessException e) {
315 LOG.error("Unable to access " + cls + " class");
316 }
317 }
318 }
319
320 if (codec == null) {
321 if (!codecName.equalsIgnoreCase("None")) {
322 throw new IllegalArgumentException("Unsupported compression codec "
323 + codecName + ". Please choose from: " + codecStrs);
324 }
325 } else if (codec instanceof org.apache.hadoop.conf.Configurable) {
326 // Must check instanceof codec as BZip2Codec doesn't inherit Configurable
327 // Must set the configuration for Configurable objects that may or do use
328 // native libs
329 ((org.apache.hadoop.conf.Configurable) codec).setConf(conf);
330 }
331 return codec;
332 }
333
334
335 /**
336 * Pull events out of channel and send it to HDFS. Take at most batchSize
337 * events per Transaction. Find the corresponding bucket for the event.
338 * Ensure the file is open. Serialize the data and write it to the file on
339 * HDFS. <br/>
340 * This method is not thread safe.
341 */
342 public Status process() throws EventDeliveryException {
343 Channel channel = getChannel();
344 Transaction transaction = channel.getTransaction();
345 List<BucketWriter> writers = Lists.newArrayList();
346 transaction.begin();
347 try {
348 int txnEventCount = 0;
349 for (txnEventCount = 0; txnEventCount < batchSize; txnEventCount++) {
350 Event event = channel.take();
351 if (event == null) {
352 break;
353 }
354
355 // reconstruct the path name by substituting place holders
356 String realPath = BucketPath.escapeString(filePath, event.getHeaders(),
357 timeZone, needRounding, roundUnit, roundValue, useLocalTime);
358 String realName = BucketPath.escapeString(fileName, event.getHeaders(),
359 timeZone, needRounding, roundUnit, roundValue, useLocalTime);
360
361 String lookupPath = realPath + DIRECTORY_DELIMITER + realName;
362 BucketWriter bucketWriter = sfWriters.get(lookupPath);
363
364 // we haven't seen this file yet, so open it and cache the handle
365 if (bucketWriter == null) {
366 HDFSWriter hdfsWriter = writerFactory.getWriter(fileType);
367
368 WriterCallback idleCallback = null;
369 if(idleTimeout != 0) {
370 idleCallback = new WriterCallback() {
371 @Override
372 public void run(String bucketPath) {
373 sfWriters.remove(bucketPath);
374 }
375 };
376 }
377 bucketWriter = new BucketWriter(rollInterval, rollSize, rollCount,
378 batchSize, context, realPath, realName, inUsePrefix, inUseSuffix,
379 suffix, codeC, compType, hdfsWriter, timedRollerPool,
380 proxyTicket, sinkCounter, idleTimeout, idleCallback,
381 lookupPath, callTimeout, callTimeoutPool);
382
383 sfWriters.put(lookupPath, bucketWriter);
384 }
385
386 // track the buckets getting written in this transaction
387 if (!writers.contains(bucketWriter)) {
388 writers.add(bucketWriter);
389 }
390
391 // Write the data to HDFS
392 bucketWriter.append(event);
393 }
394
395 if (txnEventCount == 0) {
396 sinkCounter.incrementBatchEmptyCount();
397 } else if (txnEventCount == batchSize) {
398 sinkCounter.incrementBatchCompleteCount();
399 } else {
400 sinkCounter.incrementBatchUnderflowCount();
401 }
402
403 // flush all pending buckets before committing the transaction
404 for (BucketWriter bucketWriter : writers) {
405 bucketWriter.flush();
406 }
407
408 transaction.commit();
409
410 if (txnEventCount < 1) {
411 return Status.BACKOFF;
412 } else {
413 sinkCounter.addToEventDrainSuccessCount(txnEventCount);
414 return Status.READY;
415 }
416 } catch (IOException eIO) {
417 transaction.rollback();
418 LOG.warn("HDFS IO error", eIO);
419 return Status.BACKOFF;
420 } catch (Throwable th) {
421 transaction.rollback();
422 LOG.error("process failed", th);
423 if (th instanceof Error) {
424 throw (Error) th;
425 } else {
426 throw new EventDeliveryException(th);
427 }
428 } finally {
429 transaction.close();
430 }
431 }
432
433 @Override
434 public void stop() {
435 // do not constrain close() calls with a timeout
436 for (Entry<String, BucketWriter> entry : sfWriters.entrySet()) {
437 LOG.info("Closing {}", entry.getKey());
438
439 try {
440 entry.getValue().close();
441 } catch (Exception ex) {
442 LOG.warn("Exception while closing " + entry.getKey() + ". " +
443 "Exception follows.", ex);
444 if (ex instanceof InterruptedException) {
445 Thread.currentThread().interrupt();
446 }
447 }
448 }
449
450 // shut down all our thread pools
451 ExecutorService toShutdown[] = {callTimeoutPool, timedRollerPool};
452 for (ExecutorService execService : toShutdown) {
453 execService.shutdown();
454 try {
455 while (execService.isTerminated() == false) {
456 execService.awaitTermination(
457 Math.max(defaultCallTimeout, callTimeout), TimeUnit.MILLISECONDS);
458 }
459 } catch (InterruptedException ex) {
460 LOG.warn("shutdown interrupted on " + execService, ex);
461 }
462 }
463
464 callTimeoutPool = null;
465 timedRollerPool = null;
466
467 sfWriters.clear();
468 sfWriters = null;
469 sinkCounter.stop();
470 super.stop();
471 }
472
473 @Override
474 public void start() {
475 String timeoutName = "hdfs-" + getName() + "-call-runner-%d";
476 callTimeoutPool = Executors.newFixedThreadPool(threadsPoolSize,
477 new ThreadFactoryBuilder().setNameFormat(timeoutName).build());
478
479 String rollerName = "hdfs-" + getName() + "-roll-timer-%d";
480 timedRollerPool = Executors.newScheduledThreadPool(rollTimerPoolSize,
481 new ThreadFactoryBuilder().setNameFormat(rollerName).build());
482
483 this.sfWriters = new WriterLinkedHashMap(maxOpenFiles);
484 sinkCounter.start();
485 super.start();
486 }
487
488 private boolean authenticate() {
489
490 // logic for kerberos login
491 boolean useSecurity = UserGroupInformation.isSecurityEnabled();
492
493 LOG.info("Hadoop Security enabled: " + useSecurity);
494
495 if (useSecurity) {
496
497 // sanity checking
498 if (kerbConfPrincipal.isEmpty()) {
499 LOG.error("Hadoop running in secure mode, but Flume config doesn't "
500 + "specify a principal to use for Kerberos auth.");
501 return false;
502 }
503 if (kerbKeytab.isEmpty()) {
504 LOG.error("Hadoop running in secure mode, but Flume config doesn't "
505 + "specify a keytab to use for Kerberos auth.");
506 return false;
507 } else {
508 //If keytab is specified, user should want it take effect.
509 //HDFSEventSink will halt when keytab file is non-exist or unreadable
510 File kfile = new File(kerbKeytab);
511 if (!(kfile.isFile() && kfile.canRead())) {
512 throw new IllegalArgumentException("The keyTab file: "
513 + kerbKeytab + " is nonexistent or can't read. "
514 + "Please specify a readable keytab file for Kerberos auth.");
515 }
516 }
517
518 String principal;
519 try {
520 // resolves _HOST pattern using standard Hadoop search/replace
521 // via DNS lookup when 2nd argument is empty
522 principal = SecurityUtil.getServerPrincipal(kerbConfPrincipal, "");
523 } catch (IOException e) {
524 LOG.error("Host lookup error resolving kerberos principal ("
525 + kerbConfPrincipal + "). Exception follows.", e);
526 return false;
527 }
528
529 Preconditions.checkNotNull(principal, "Principal must not be null");
530 KerberosUser prevUser = staticLogin.get();
531 KerberosUser newUser = new KerberosUser(principal, kerbKeytab);
532
533 // be cruel and unusual when user tries to login as multiple principals
534 // this isn't really valid with a reconfigure but this should be rare
535 // enough to warrant a restart of the agent JVM
536 // TODO: find a way to interrogate the entire current config state,
537 // since we don't have to be unnecessarily protective if they switch all
538 // HDFS sinks to use a different principal all at once.
539 Preconditions.checkState(prevUser == null || prevUser.equals(newUser),
540 "Cannot use multiple kerberos principals in the same agent. " +
541 " Must restart agent to use new principal or keytab. " +
542 "Previous = %s, New = %s", prevUser, newUser);
543
544 // attempt to use cached credential if the user is the same
545 // this is polite and should avoid flooding the KDC with auth requests
546 UserGroupInformation curUser = null;
547 if (prevUser != null && prevUser.equals(newUser)) {
548 try {
549 curUser = UserGroupInformation.getLoginUser();
550 } catch (IOException e) {
551 LOG.warn("User unexpectedly had no active login. Continuing with " +
552 "authentication", e);
553 }
554 }
555
556 if (curUser == null || !curUser.getUserName().equals(principal)) {
557 try {
558 // static login
559 kerberosLogin(this, principal, kerbKeytab);
560 } catch (IOException e) {
561 LOG.error("Authentication or file read error while attempting to "
562 + "login as kerberos principal (" + principal + ") using "
563 + "keytab (" + kerbKeytab + "). Exception follows.", e);
564 return false;
565 }
566 } else {
567 LOG.debug("{}: Using existing principal login: {}", this, curUser);
568 }
569
570 // we supposedly got through this unscathed... so store the static user
571 staticLogin.set(newUser);
572 }
573
574 // hadoop impersonation works with or without kerberos security
575 proxyTicket = null;
576 if (!proxyUserName.isEmpty()) {
577 try {
578 proxyTicket = UserGroupInformation.createProxyUser(
579 proxyUserName, UserGroupInformation.getLoginUser());
580 } catch (IOException e) {
581 LOG.error("Unable to login as proxy user. Exception follows.", e);
582 return false;
583 }
584 }
585
586 UserGroupInformation ugi = null;
587 if (proxyTicket != null) {
588 ugi = proxyTicket;
589 } else if (useSecurity) {
590 try {
591 ugi = UserGroupInformation.getLoginUser();
592 } catch (IOException e) {
593 LOG.error("Unexpected error: Unable to get authenticated user after " +
594 "apparent successful login! Exception follows.", e);
595 return false;
596 }
597 }
598
599 if (ugi != null) {
600 // dump login information
601 AuthenticationMethod authMethod = ugi.getAuthenticationMethod();
602 LOG.info("Auth method: {}", authMethod);
603 LOG.info(" User name: {}", ugi.getUserName());
604 LOG.info(" Using keytab: {}", ugi.isFromKeytab());
605 if (authMethod == AuthenticationMethod.PROXY) {
606 UserGroupInformation superUser;
607 try {
608 superUser = UserGroupInformation.getLoginUser();
609 LOG.info(" Superuser auth: {}", superUser.getAuthenticationMethod());
610 LOG.info(" Superuser name: {}", superUser.getUserName());
611 LOG.info(" Superuser using keytab: {}", superUser.isFromKeytab());
612 } catch (IOException e) {
613 LOG.error("Unexpected error: unknown superuser impersonating proxy.",
614 e);
615 return false;
616 }
617 }
618
619 LOG.info("Logged in as user {}", ugi.getUserName());
620
621 return true;
622 }
623
624 return true;
625 }
626
627 /**
628 * Static synchronized method for static Kerberos login. <br/>
629 * Static synchronized due to a thundering herd problem when multiple Sinks
630 * attempt to log in using the same principal at the same time with the
631 * intention of impersonating different users (or even the same user).
632 * If this is not controlled, MIT Kerberos v5 believes it is seeing a replay
633 * attach and it returns:
634 * <blockquote>Request is a replay (34) - PROCESS_TGS</blockquote>
635 * In addition, since the underlying Hadoop APIs we are using for
636 * impersonation are static, we define this method as static as well.
637 *
638 * @param principal
639 * Fully-qualified principal to use for authentication.
640 * @param keytab
641 * Location of keytab file containing credentials for principal.
642 * @return Logged-in user
643 * @throws IOException
644 * if login fails.
645 */
646 private static synchronized UserGroupInformation kerberosLogin(
647 HDFSEventSink sink, String principal, String keytab) throws IOException {
648
649 // if we are the 2nd user thru the lock, the login should already be
650 // available statically if login was successful
651 UserGroupInformation curUser = null;
652 try {
653 curUser = UserGroupInformation.getLoginUser();
654 } catch (IOException e) {
655 // not a big deal but this shouldn't typically happen because it will
656 // generally fall back to the UNIX user
657 LOG.debug("Unable to get login user before Kerberos auth attempt.", e);
658 }
659
660 // we already have logged in successfully
661 if (curUser != null && curUser.getUserName().equals(principal)) {
662 LOG.debug("{}: Using existing principal ({}): {}",
663 new Object[]{sink, principal, curUser});
664
665 // no principal found
666 } else {
667
668 LOG.info("{}: Attempting kerberos login as principal ({}) from keytab " +
669 "file ({})", new Object[]{sink, principal, keytab});
670
671 // attempt static kerberos login
672 UserGroupInformation.loginUserFromKeytab(principal, keytab);
673 curUser = UserGroupInformation.getLoginUser();
674 }
675
676 return curUser;
677 }
678
679 @Override
680 public String toString() {
681 return "{ Sink type:" + getClass().getSimpleName() + ", name:" + getName() +
682 " }";
683 }
684
685 @VisibleForTesting
686 void setBucketClock(Clock clock) {
687 BucketPath.setClock(clock);
688 }
689 }

Copyright 2016, The Apache Software Foundation.