2020
2121import java .io .IOException ;
2222import java .util .Objects ;
23+ import java .util .concurrent .BlockingDeque ;
24+ import java .util .concurrent .ExecutorService ;
25+ import java .util .concurrent .Executors ;
26+ import java .util .concurrent .Future ;
27+ import java .util .concurrent .LinkedBlockingDeque ;
28+ import java .util .concurrent .atomic .AtomicInteger ;
2329
30+ import com .google .common .util .concurrent .ThreadFactoryBuilder ;
31+ import org .apache .tez .mapreduce .grouper .TezSplitGrouper ;
2432import org .slf4j .Logger ;
2533import org .slf4j .LoggerFactory ;
2634import org .apache .hadoop .classification .InterfaceAudience .Public ;
@@ -129,14 +137,64 @@ public class TezGroupedSplitsRecordReader implements RecordReader<K, V> {
129137 int idx = 0 ;
130138 long progress ;
131139 RecordReader <K , V > curReader ;
132-
140+ private final AtomicInteger initIndex ;
141+ private final int numReaders ;
142+ private ExecutorService initReaderExecService ;
143+ private BlockingDeque <Future <RecordReader <K , V >>> initedReaders ;
144+
133145 public TezGroupedSplitsRecordReader (TezGroupedSplit split , JobConf job ,
134146 Reporter reporter ) throws IOException {
135147 this .groupedSplit = split ;
136148 this .job = job ;
137149 this .reporter = reporter ;
150+ this .initIndex = new AtomicInteger (0 );
151+ int numThreads = conf .getInt (TezSplitGrouper .TEZ_GROUPING_SPLIT_INIT_THREADS ,
152+ TezSplitGrouper .TEZ_GROUPING_SPLIT_INIT_THREADS_DEFAULT );
153+ this .numReaders = Math .min (groupedSplit .wrappedSplits .size (),
154+ conf .getInt (TezSplitGrouper .TEZ_GROUPING_SPLIT_INIT_NUM_RECORDREADERS ,
155+ TezSplitGrouper .TEZ_GROUPING_SPLIT_INIT_NUM_RECORDREADERS_DEFAULT ));
156+ // skip multi-threaded split opening when number of readers is less than 1
157+ if (numReaders > 1 ) {
158+ this .initReaderExecService = Executors .newFixedThreadPool (numThreads ,
159+ new ThreadFactoryBuilder ()
160+ .setDaemon (true )
161+ .setPriority (Thread .MAX_PRIORITY )
162+ .setNameFormat ("TEZ-Split-Init-Thread-%d" )
163+ .build ());
164+ this .initedReaders = new LinkedBlockingDeque <>();
165+ }
138166 initNextRecordReader ();
139167 }
168+
169+ private void preInitReaders () {
170+ if (initReaderExecService == null ) {
171+ return ;
172+ }
173+ for (int i = 0 ; i < numReaders ; i ++) {
174+ initedReaders .offer (this .initReaderExecService .submit (() -> {
175+ try {
176+ int index = initIndex .getAndIncrement ();
177+ if (index >= groupedSplit .wrappedSplits .size ()) {
178+ return null ;
179+ }
180+ InputSplit s = groupedSplit .wrappedSplits .get (index );
181+ RecordReader <K , V > reader = wrappedInputFormat .getRecordReader (s , job , reporter );
182+ LOG .debug ("Init Thread processed reader number {} initialization" , index );
183+ return reader ;
184+ } catch (Exception e ) {
185+ if (e instanceof InterruptedException ) {
186+ Thread .currentThread ().interrupt ();
187+ }
188+ cancelsFutures ();
189+ throw new RuntimeException (e );
190+ }
191+ }));
192+ }
193+ }
194+
195+ public RecordReader <K , V > getCurReader () {
196+ return curReader ;
197+ }
140198
141199 @ Override
142200 public boolean next (K key , V value ) throws IOException {
@@ -171,7 +229,7 @@ public void close() throws IOException {
171229 curReader = null ;
172230 }
173231 }
174-
232+
175233 protected boolean initNextRecordReader () throws IOException {
176234 if (curReader != null ) {
177235 curReader .close ();
@@ -183,23 +241,43 @@ protected boolean initNextRecordReader() throws IOException {
183241
184242 // if all chunks have been processed, nothing more to do.
185243 if (idx == groupedSplit .wrappedSplits .size ()) {
244+ if (initReaderExecService != null ) {
245+ LOG .info ("Shutting down the init record reader threadpool" );
246+ initReaderExecService .shutdownNow ();
247+ }
186248 return false ;
187249 }
188250
189251 if (LOG .isDebugEnabled ()) {
190- LOG .debug ("Init record reader for index " + idx + " of " +
191- groupedSplit .wrappedSplits .size ());
252+ LOG .debug ("Init record reader for index " + idx + " of " + groupedSplit .wrappedSplits .size ());
192253 }
193254
194255 // get a record reader for the idx-th chunk
195256 try {
196- curReader = wrappedInputFormat .getRecordReader (
197- groupedSplit .wrappedSplits .get (idx ), job , reporter );
257+ // get the cur reader directly when async split opening is disabled
258+ if (initReaderExecService == null ) {
259+ curReader = wrappedInputFormat .getRecordReader (groupedSplit .wrappedSplits .get (idx ), job , reporter );
260+ } else {
261+ preInitReaders ();
262+ curReader = initedReaders .take ().get ();
263+ }
198264 } catch (Exception e ) {
199- throw new RuntimeException (e );
265+ if (e instanceof InterruptedException ) {
266+ Thread .currentThread ().interrupt ();
267+ }
268+ if (initedReaders != null ) {
269+ cancelsFutures ();
270+ }
271+ throw new RuntimeException (e );
200272 }
201273 idx ++;
202- return true ;
274+ return curReader != null ;
275+ }
276+
277+ private void cancelsFutures () {
278+ for (Future <RecordReader <K , V >> f : initedReaders ) {
279+ f .cancel (true );
280+ }
203281 }
204282
205283 @ Override
0 commit comments