TEZ-873. Expose InputSplit via MRInputLegacy, and underlying splits via
[incubator-tez.git] / tez-runtime-library / src / main / java / org / apache / tez / runtime / library / output / OnFileSortedOutput.java
1 /**
2  * Licensed to the Apache Software Foundation (ASF) under one
3  * or more contributor license agreements.  See the NOTICE file
4  * distributed with this work for additional information
5  * regarding copyright ownership.  The ASF licenses this file
6  * to you under the Apache License, Version 2.0 (the
7  * "License"); you may not use this file except in compliance
8  * with the License.  You may obtain a copy of the License at
9  *
10  *     http://www.apache.org/licenses/LICENSE-2.0
11  *
12  * Unless required by applicable law or agreed to in writing, software
13  * distributed under the License is distributed on an "AS IS" BASIS,
14  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15  * See the License for the specific language governing permissions and
16  * limitations under the License.
17  */
18 package org.apache.tez.runtime.library.output;
19
20 import java.io.IOException;
21 import java.nio.ByteBuffer;
22 import java.util.Collections;
23 import java.util.List;
24
25 import org.apache.hadoop.conf.Configuration;
26 import org.apache.hadoop.yarn.api.ApplicationConstants;
27 import org.apache.tez.common.TezJobConfig;
28 import org.apache.tez.common.TezUtils;
29 import org.apache.tez.common.counters.TaskCounter;
30 import org.apache.tez.runtime.api.Event;
31 import org.apache.tez.runtime.api.LogicalOutput;
32 import org.apache.tez.runtime.api.TezOutputContext;
33 import org.apache.tez.runtime.api.events.DataMovementEvent;
34 import org.apache.tez.runtime.api.events.VertexManagerEvent;
35 import org.apache.tez.runtime.library.api.KeyValueWriter;
36 import org.apache.tez.runtime.library.common.sort.impl.ExternalSorter;
37 import org.apache.tez.runtime.library.common.sort.impl.dflt.DefaultSorter;
38 import org.apache.tez.runtime.library.shuffle.common.ShuffleUtils;
39 import org.apache.tez.runtime.library.shuffle.impl.ShuffleUserPayloads.DataMovementEventPayloadProto;
40 import org.apache.tez.runtime.library.shuffle.impl.ShuffleUserPayloads.VertexManagerEventPayloadProto;
41
42 import com.google.common.collect.Lists;
43
44 /**
45  * <code>OnFileSortedOutput</code> is an {@link LogicalOutput} which sorts key/value pairs 
46  * written to it and persists it to a file.
47  */
48 public class OnFileSortedOutput implements LogicalOutput {
49   
50   protected ExternalSorter sorter;
51   protected Configuration conf;
52   protected int numOutputs;
53   protected TezOutputContext outputContext;
54   private long startTime;
55   private long endTime;
56   
57   
58   @Override
59   public List<Event> initialize(TezOutputContext outputContext)
60       throws IOException {
61     this.startTime = System.nanoTime();
62     this.outputContext = outputContext;
63     sorter = new DefaultSorter();
64     this.conf = TezUtils.createConfFromUserPayload(outputContext.getUserPayload());
65     // Initializing this parametr in this conf since it is used in multiple
66     // places (wherever LocalDirAllocator is used) - TezTaskOutputFiles,
67     // TezMerger, etc.
68     this.conf.setStrings(TezJobConfig.LOCAL_DIRS, outputContext.getWorkDirs());
69     sorter.initialize(outputContext, conf, numOutputs);
70     return Collections.emptyList();
71   }
72
73   @Override
74   public KeyValueWriter getWriter() throws IOException {
75     return new KeyValueWriter() {
76       @Override
77       public void write(Object key, Object value) throws IOException {
78         sorter.write(key, value);
79       }
80     };
81   }
82
83   @Override
84   public void handleEvents(List<Event> outputEvents) {
85     // Not expecting any events.
86   }
87
88   @Override
89   public void setNumPhysicalOutputs(int numOutputs) {
90     this.numOutputs = numOutputs;
91   }
92
93   @Override
94   public List<Event> close() throws IOException {
95     sorter.flush();
96     sorter.close();
97     this.endTime = System.nanoTime();
98
99    return generateEventsOnClose();
100   }
101   
102   protected List<Event> generateEventsOnClose() throws IOException {
103     String host = System.getenv(ApplicationConstants.Environment.NM_HOST
104         .toString());
105     ByteBuffer shuffleMetadata = outputContext
106         .getServiceProviderMetaData(ShuffleUtils.SHUFFLE_HANDLER_SERVICE_ID);
107     int shufflePort = ShuffleUtils.deserializeShuffleProviderMetaData(shuffleMetadata);
108
109     DataMovementEventPayloadProto.Builder payloadBuilder = DataMovementEventPayloadProto
110         .newBuilder();
111     payloadBuilder.setHost(host);
112     payloadBuilder.setPort(shufflePort);
113     payloadBuilder.setPathComponent(outputContext.getUniqueIdentifier());
114     payloadBuilder.setRunDuration((int) ((endTime - startTime) / 1000));
115     DataMovementEventPayloadProto payloadProto = payloadBuilder.build();
116     byte[] payloadBytes = payloadProto.toByteArray();
117
118     long outputSize = outputContext.getCounters()
119         .findCounter(TaskCounter.MAP_OUTPUT_BYTES).getValue();
120     VertexManagerEventPayloadProto.Builder vmBuilder = VertexManagerEventPayloadProto
121         .newBuilder();
122     vmBuilder.setOutputSize(outputSize);
123     VertexManagerEvent vmEvent = new VertexManagerEvent(
124         outputContext.getDestinationVertexName(), vmBuilder.build().toByteArray());    
125
126     List<Event> events = Lists.newArrayListWithCapacity(numOutputs+1);
127     events.add(vmEvent);
128     
129     for (int i = 0; i < numOutputs; i++) {
130       DataMovementEvent event = new DataMovementEvent(i, payloadBytes);
131       events.add(event);
132     }
133     return events;
134   }
135 }