2020
2121import java .io .IOException ;
2222import java .util .Objects ;
23+ import java .util .concurrent .BlockingDeque ;
24+ import java .util .concurrent .ExecutorService ;
25+ import java .util .concurrent .Executors ;
26+ import java .util .concurrent .Future ;
27+ import java .util .concurrent .LinkedBlockingDeque ;
28+ import java .util .concurrent .atomic .AtomicBoolean ;
29+ import java .util .concurrent .atomic .AtomicInteger ;
2330
31+ import com .google .common .util .concurrent .ThreadFactoryBuilder ;
32+ import org .apache .tez .mapreduce .grouper .TezSplitGrouper ;
2433import org .slf4j .Logger ;
2534import org .slf4j .LoggerFactory ;
2635import org .apache .hadoop .classification .InterfaceAudience .Public ;
@@ -129,14 +138,69 @@ public class TezGroupedSplitsRecordReader implements RecordReader<K, V> {
129138 int idx = 0 ;
130139 long progress ;
131140 RecordReader <K , V > curReader ;
132-
141+ private final AtomicInteger initIndex ;
142+ private final int numReaders ;
143+ private ExecutorService initReaderExecService ;
144+ private BlockingDeque <Future <RecordReader <K , V >>> initedReaders ;
145+ private AtomicBoolean failureOccurred = new AtomicBoolean (false );
146+
133147 public TezGroupedSplitsRecordReader (TezGroupedSplit split , JobConf job ,
134148 Reporter reporter ) throws IOException {
135149 this .groupedSplit = split ;
136150 this .job = job ;
137151 this .reporter = reporter ;
152+ this .initIndex = new AtomicInteger (0 );
153+ int numThreads = conf .getInt (TezSplitGrouper .TEZ_GROUPING_SPLIT_INIT_THREADS ,
154+ TezSplitGrouper .TEZ_GROUPING_SPLIT_INIT_THREADS_DEFAULT );
155+ this .numReaders = Math .min (groupedSplit .wrappedSplits .size (),
156+ conf .getInt (TezSplitGrouper .TEZ_GROUPING_SPLIT_INIT_NUM_RECORDREADERS ,
157+ TezSplitGrouper .TEZ_GROUPING_SPLIT_INIT_NUM_RECORDREADERS_DEFAULT ));
158+ // skip multi-threaded split opening when number of readers is less than 1
159+ if (numReaders > 1 ) {
160+ this .initReaderExecService = Executors .newFixedThreadPool (numThreads ,
161+ new ThreadFactoryBuilder ()
162+ .setDaemon (true )
163+ .setPriority (Thread .MAX_PRIORITY )
164+ .setNameFormat ("TEZ-Split-Init-Thread-%d" )
165+ .build ());
166+ this .initedReaders = new LinkedBlockingDeque <>();
167+ }
138168 initNextRecordReader ();
139169 }
170+
171+ private void preInitReaders () {
172+ if (initReaderExecService == null ) {
173+ return ;
174+ }
175+ for (int i = 0 ; i < numReaders ; i ++) {
176+ initedReaders .offer (this .initReaderExecService .submit (() -> {
177+ if (failureOccurred .get ()) {
178+ return null ;
179+ }
180+ try {
181+ int index = initIndex .getAndIncrement ();
182+ if (index >= groupedSplit .wrappedSplits .size ()) {
183+ return null ;
184+ }
185+ InputSplit s = groupedSplit .wrappedSplits .get (index );
186+ RecordReader <K , V > reader = wrappedInputFormat .getRecordReader (s , job , reporter );
187+ LOG .debug ("Init Thread processed reader number {} initialization" , index );
188+ return reader ;
189+ } catch (Exception e ) {
190+ failureOccurred .set (true );
191+ if (e instanceof InterruptedException ) {
192+ Thread .currentThread ().interrupt ();
193+ }
194+ cancelsFutures ();
195+ throw new RuntimeException (e );
196+ }
197+ }));
198+ }
199+ }
200+
201+ public RecordReader <K , V > getCurReader () {
202+ return curReader ;
203+ }
140204
141205 @ Override
142206 public boolean next (K key , V value ) throws IOException {
@@ -171,7 +235,7 @@ public void close() throws IOException {
171235 curReader = null ;
172236 }
173237 }
174-
238+
175239 protected boolean initNextRecordReader () throws IOException {
176240 if (curReader != null ) {
177241 curReader .close ();
@@ -183,23 +247,45 @@ protected boolean initNextRecordReader() throws IOException {
183247
184248 // if all chunks have been processed, nothing more to do.
185249 if (idx == groupedSplit .wrappedSplits .size ()) {
250+ if (initReaderExecService != null ) {
251+ LOG .info ("Shutting down the init record reader threadpool" );
252+ initReaderExecService .shutdownNow ();
253+ }
186254 return false ;
187255 }
188256
189257 if (LOG .isDebugEnabled ()) {
190- LOG .debug ("Init record reader for index " + idx + " of " +
258+ LOG .debug ("Init record reader for index " + idx + " of " +
191259 groupedSplit .wrappedSplits .size ());
192260 }
193261
194262 // get a record reader for the idx-th chunk
195263 try {
196- curReader = wrappedInputFormat .getRecordReader (
197- groupedSplit .wrappedSplits .get (idx ), job , reporter );
264+ // get the cur reader directly when async split opening is disabled
265+ if (initReaderExecService == null ) {
266+ curReader = wrappedInputFormat .getRecordReader (groupedSplit .wrappedSplits .get (idx ), job , reporter );
267+ } else {
268+ preInitReaders ();
269+ curReader = initedReaders .take ().get ();
270+ }
198271 } catch (Exception e ) {
199- throw new RuntimeException (e );
272+ failureOccurred .set (true );
273+ if (e instanceof InterruptedException ) {
274+ Thread .currentThread ().interrupt ();
275+ }
276+ if (initedReaders != null ) {
277+ cancelsFutures ();
278+ }
279+ throw new RuntimeException (e );
200280 }
201281 idx ++;
202- return true ;
282+ return curReader != null ;
283+ }
284+
285+ private void cancelsFutures () {
286+ for (Future <RecordReader <K , V >> f : initedReaders ) {
287+ f .cancel (true );
288+ }
203289 }
204290
205291 @ Override
0 commit comments