TEZ-698. Make it easy to create and configure MRInput/MROutput and other inputs/outpu...
[incubator-tez.git] / tez-runtime-library / src / main / java / org / apache / tez / runtime / library / output / OnFileUnorderedKVOutput.java
1 /**
2 * Licensed to the Apache Software Foundation (ASF) under one
3 * or more contributor license agreements.  See the NOTICE file
4 * distributed with this work for additional information
5 * regarding copyright ownership.  The ASF licenses this file
6 * to you under the Apache License, Version 2.0 (the
7 * "License"); you may not use this file except in compliance
8 * with the License.  You may obtain a copy of the License at
9 *
10 *     http://www.apache.org/licenses/LICENSE-2.0
11 *
12 * Unless required by applicable law or agreed to in writing, software
13 * distributed under the License is distributed on an "AS IS" BASIS,
14 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15 * See the License for the specific language governing permissions and
16 * limitations under the License.
17 */
18
19 package org.apache.tez.runtime.library.output;
20
21 import java.nio.ByteBuffer;
22 import java.util.Collections;
23 import java.util.List;
24
25 import org.apache.hadoop.classification.InterfaceAudience.Private;
26 import org.apache.hadoop.yarn.api.ApplicationConstants;
27 import org.apache.tez.dag.api.TezUncheckedException;
28 import org.apache.tez.runtime.api.Event;
29 import org.apache.tez.runtime.api.LogicalOutput;
30 import org.apache.tez.runtime.api.TezOutputContext;
31 import org.apache.tez.runtime.api.events.DataMovementEvent;
32 import org.apache.tez.runtime.library.api.KeyValueWriter;
33 import org.apache.tez.runtime.library.broadcast.output.FileBasedKVWriter;
34 import org.apache.tez.runtime.library.shuffle.common.ShuffleUtils;
35 import org.apache.tez.runtime.library.shuffle.impl.ShuffleUserPayloads.DataMovementEventPayloadProto;
36
37 import com.google.common.annotations.VisibleForTesting;
38 import com.google.common.base.Preconditions;
39 import com.google.common.collect.Lists;
40
41 public class OnFileUnorderedKVOutput implements LogicalOutput {
42
43   private TezOutputContext outputContext;
44   private FileBasedKVWriter kvWriter;
45
46   public OnFileUnorderedKVOutput() {
47   }
48
49   @Override
50   public List<Event> initialize(TezOutputContext outputContext)
51       throws Exception {
52     this.outputContext = outputContext;
53     this.kvWriter = new FileBasedKVWriter(outputContext);
54     return Collections.emptyList();
55   }
56
57   @Override
58   public KeyValueWriter getWriter() throws Exception {
59     return kvWriter;
60   }
61
62   @Override
63   public void handleEvents(List<Event> outputEvents) {
64     throw new TezUncheckedException("Not expecting any events");
65   }
66
67   @Override
68   public List<Event> close() throws Exception {
69     boolean outputGenerated = this.kvWriter.close();
70     DataMovementEventPayloadProto.Builder payloadBuilder = DataMovementEventPayloadProto
71         .newBuilder();
72
73     String host = getHost();
74     ByteBuffer shuffleMetadata = outputContext
75         .getServiceProviderMetaData(ShuffleUtils.SHUFFLE_HANDLER_SERVICE_ID);
76     int shufflePort = ShuffleUtils
77         .deserializeShuffleProviderMetaData(shuffleMetadata);
78     payloadBuilder.setOutputGenerated(outputGenerated);
79     if (outputGenerated) {
80       payloadBuilder.setHost(host);
81       payloadBuilder.setPort(shufflePort);
82       payloadBuilder.setPathComponent(outputContext.getUniqueIdentifier());
83     }
84     DataMovementEventPayloadProto payloadProto = payloadBuilder.build();
85
86     DataMovementEvent dmEvent = new DataMovementEvent(0,
87         payloadProto.toByteArray());
88     List<Event> events = Lists.newArrayListWithCapacity(1);
89     events.add(dmEvent);
90     return events;
91   }
92
93   @Override
94   public void setNumPhysicalOutputs(int numOutputs) {
95     Preconditions.checkArgument(numOutputs == 1,
96         "Number of outputs can only be 1 for " + this.getClass().getName());
97   }
98   
99   @VisibleForTesting
100   @Private
101   String getHost() {
102     return System.getenv(ApplicationConstants.Environment.NM_HOST.toString());
103   }
104
105 }