TEZ-1023. Tez runtime configuration changes by users may not get propagated
[incubator-tez.git] / tez-runtime-library / src / main / java / org / apache / tez / runtime / library / input / ShuffledMergedInput.java
1 /**
2  * Licensed to the Apache Software Foundation (ASF) under one
3  * or more contributor license agreements.  See the NOTICE file
4  * distributed with this work for additional information
5  * regarding copyright ownership.  The ASF licenses this file
6  * to you under the Apache License, Version 2.0 (the
7  * "License"); you may not use this file except in compliance
8  * with the License.  You may obtain a copy of the License at
9  *
10  *     http://www.apache.org/licenses/LICENSE-2.0
11  *
12  * Unless required by applicable law or agreed to in writing, software
13  * distributed under the License is distributed on an "AS IS" BASIS,
14  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15  * See the License for the specific language governing permissions and
16  * limitations under the License.
17  */
18 package org.apache.tez.runtime.library.input;
19
20 import java.io.IOException;
21 import java.util.Collections;
22 import java.util.List;
23
24 import org.apache.commons.logging.Log;
25 import org.apache.commons.logging.LogFactory;
26 import org.apache.hadoop.conf.Configuration;
27 import org.apache.hadoop.io.RawComparator;
28 import org.apache.tez.common.TezJobConfig;
29 import org.apache.tez.common.TezUtils;
30 import org.apache.tez.common.counters.TaskCounter;
31 import org.apache.tez.common.counters.TezCounter;
32 import org.apache.tez.runtime.api.Event;
33 import org.apache.tez.runtime.api.LogicalInput;
34 import org.apache.tez.runtime.api.TezInputContext;
35 import org.apache.tez.runtime.library.api.KeyValuesReader;
36 import org.apache.tez.runtime.library.common.ConfigUtils;
37 import org.apache.tez.runtime.library.common.ValuesIterator;
38 import org.apache.tez.runtime.library.common.shuffle.impl.Shuffle;
39 import org.apache.tez.runtime.library.common.sort.impl.TezRawKeyValueIterator;
40
41 /**
42  * <code>ShuffleMergedInput</code> in a {@link LogicalInput} which shuffles
43  * intermediate sorted data, merges them and provides key/<values> to the
44  * consumer.
45  *
46  * The Copy and Merge will be triggered by the initialization - which is handled
47  * by the Tez framework. Input is not consumable until the Copy and Merge are
48  * complete. Methods are provided to check for this, as well as to wait for
49  * completion. Attempting to get a reader on a non-complete input will block.
50  *
51  */
52 public class ShuffledMergedInput implements LogicalInput {
53
54   static final Log LOG = LogFactory.getLog(ShuffledMergedInput.class);
55
56   protected TezInputContext inputContext;
57   protected TezRawKeyValueIterator rawIter = null;
58   protected Configuration conf;
59   protected int numInputs = 0;
60   protected Shuffle shuffle;
61   @SuppressWarnings("rawtypes")
62   protected ValuesIterator vIter;
63
64   private TezCounter inputKeyCounter;
65   private TezCounter inputValueCounter;
66
67   @Override
68   public List<Event> initialize(TezInputContext inputContext) throws IOException {
69     this.inputContext = inputContext;
70     this.conf = TezUtils.createConfFromUserPayload(inputContext.getUserPayload());
71
72     this.inputKeyCounter = inputContext.getCounters().findCounter(TaskCounter.REDUCE_INPUT_GROUPS);
73     this.inputValueCounter = inputContext.getCounters().findCounter(TaskCounter.REDUCE_INPUT_RECORDS);
74     this.conf.setStrings(TezJobConfig.LOCAL_DIRS,
75         inputContext.getWorkDirs());
76
77     // Start the shuffle - copy and merge.
78     shuffle = new Shuffle(inputContext, this.conf, numInputs);
79     shuffle.run();
80
81     return Collections.emptyList();
82   }
83
84   /**
85    * Check if the input is ready for consumption
86    *
87    * @return true if the input is ready for consumption, or if an error occurred
88    *         processing fetching the input. false if the shuffle and merge are
89    *         still in progress
90    */
91   public boolean isInputReady() {
92     return shuffle.isInputReady();
93   }
94
95   /**
96    * Waits for the input to become ready for consumption
97    * @throws IOException
98    * @throws InterruptedException
99    */
100   public void waitForInputReady() throws IOException, InterruptedException {
101     rawIter = shuffle.waitForInput();
102     createValuesIterator();
103   }
104
105   @Override
106   public List<Event> close() throws IOException {
107     rawIter.close();
108     return Collections.emptyList();
109   }
110
111   /**
112    * Get a KVReader for the Input.</p> This method will block until the input is
113    * ready - i.e. the copy and merge stages are complete. Users can use the
114    * isInputReady method to check if the input is ready, which gives an
115    * indication of whether this method will block or not.
116    *
117    * NOTE: All values for the current K-V pair must be read prior to invoking
118    * moveToNext. Once moveToNext() is called, the valueIterator from the
119    * previous K-V pair will throw an Exception
120    *
121    * @return a KVReader over the sorted input.
122    */
123   @Override
124   public KeyValuesReader getReader() throws IOException {
125     if (rawIter == null) {
126       try {
127         waitForInputReady();
128       } catch (InterruptedException e) {
129         Thread.currentThread().interrupt();
130         throw new IOException("Interrupted while waiting for input ready", e);
131       }
132     }
133     return new KeyValuesReader() {
134
135       @Override
136       public boolean next() throws IOException {
137         return vIter.moveToNext();
138       }
139
140       public Object getCurrentKey() throws IOException {
141         return vIter.getKey();
142       }
143       
144       @SuppressWarnings("unchecked")
145       public Iterable<Object> getCurrentValues() throws IOException {
146         return vIter.getValues();
147       }
148     };
149   }
150
151   @Override
152   public void handleEvents(List<Event> inputEvents) {
153     shuffle.handleEvents(inputEvents);
154   }
155
156   @Override
157   public void setNumPhysicalInputs(int numInputs) {
158     this.numInputs = numInputs;
159   }
160
161   @SuppressWarnings({ "rawtypes", "unchecked" })
162   protected void createValuesIterator()
163       throws IOException {
164     vIter = new ValuesIterator(rawIter,
165         (RawComparator) ConfigUtils.getIntermediateInputKeyComparator(conf),
166         ConfigUtils.getIntermediateInputKeyClass(conf),
167         ConfigUtils.getIntermediateInputValueClass(conf), conf, inputKeyCounter, inputValueCounter);
168
169   }
170
171   // This functionality is currently broken. If there's inputs which need to be
172   // written to disk, there's a possibility that inputs from the different
173   // sources could clobber each others' output. Also the current structures do
174   // not have adequate information to de-dupe these (vertex name)
175 //  public void mergeWith(ShuffledMergedInput other) {
176 //    this.numInputs += other.getNumPhysicalInputs();
177 //  }
178 //
179 //  public int getNumPhysicalInputs() {
180 //    return this.numInputs;
181 //  }
182 }