-
Notifications
You must be signed in to change notification settings - Fork 440
TEZ-4397 Open Tez Input splits asynchronously #195
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
|
💔 -1 overall
This message was automatically generated. |
| curReader = wrappedInputFormat.getRecordReader( | ||
| groupedSplit.wrappedSplits.get(idx), job, reporter); | ||
| curReader = initedReaders.poll().get(); | ||
| submitInitReaders(1); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
From the next stage onwards (i.e after the init part), this will start executing in sequential order as it requests for 1 additional reader. It will be good to init next set of readers in parallel.
| RecordReader<K, V> reader = wrappedInputFormat.getRecordReader(s, job, reporter); | ||
| LOG.debug("Init Thread processed reader number {} initialization", index); | ||
| return reader; | ||
| } catch(IOException e) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If thread is interrupted, it should cancel other pending tasks?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This should be done for all exceptions as well.
|
💔 -1 overall
This message was automatically generated. |
jfsii
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Added two comments - I learned a bit reading about ThreadPools and garbage collection
| throw new RuntimeException(e); | ||
| } catch (InterruptedException e) { | ||
| Thread.currentThread().interrupt(); | ||
| for (Future<RecordReader<K,V>> f : initedReaders) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
probably want to cancel on any exception and thread interrupt only on InterruptedException.
| TezSplitGrouper.TEZ_GROUPING_SPLIT_INIT_THREADS_DEFAULT); | ||
| this.numReaders = conf.getInt(TezSplitGrouper.TEZ_GROUPING_SPLIT_INIT_NUM_RECORDREADERS, | ||
| TezSplitGrouper.TEZ_GROUPING_SPLIT_INIT_NUM_RECORDREADERS_DEFAULT); | ||
| this.initReaderExecService = Executors.newFixedThreadPool(numThreads, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think we will want to somehow make this static (to share between usages of TezGroupedSplitsRecordReader) or figure out how to call shutdown in a reliable manner. Otherwise I think a long lived process that uses multiple TezGroupedSplitsRecordReaders throughout its life will end up having a large number of unused threads since they will not auto shutdown and thus not garbage collected.
https://docs.oracle.com/javase/8/docs/api/java/util/concurrent/ThreadPoolExecutor.html
See finalization section.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm not sure about the context, but if the goal is to properly shutdown a global resource somehow, it can done by the shutdownhandler, somewhere here: https://github.com/apache/tez/blob/master/tez-dag/src/main/java/org/apache/tez/dag/app/DAGAppMaster.java#L954
214e242 to
6229a16
Compare
|
💔 -1 overall
This message was automatically generated. |
|
💔 -1 overall
This message was automatically generated. |
|
Thanks @rbalamohan @jfsii and @abstractdog for the reviews. |
|
💔 -1 overall
This message was automatically generated. |
|
Latest patch LGTM. +1 |
Tez input splits can be opened asynchronously. This will reduce the amount of time spent for s3 to prepare the connection and opening the object