@@ -50,42 +50,42 @@ public class AverageRecordSizeUtils {
5050 static long averageBytesPerRecord (HoodieTimeline commitTimeline , HoodieWriteConfig hoodieWriteConfig ) {
5151 long avgSize = hoodieWriteConfig .getCopyOnWriteRecordSizeEstimate ();
5252 long fileSizeThreshold = (long ) (hoodieWriteConfig .getRecordSizeEstimationThreshold () * hoodieWriteConfig .getParquetSmallFileLimit ());
53- if (!commitTimeline .empty ()) {
54- // Go over the reverse ordered commits to get a more recent estimate of average record size.
55- Iterator <HoodieInstant > instants = commitTimeline .getReverseOrderedInstants ().iterator ();
56- while (instants .hasNext ()) {
57- HoodieInstant instant = instants .next ();
58- try {
59- HoodieCommitMetadata commitMetadata = HoodieCommitMetadata
60- .fromBytes (commitTimeline .getInstantDetails (instant ).get (), HoodieCommitMetadata .class );
61- if (instant .getAction ().equals (COMMIT_ACTION ) || instant .getAction ().equals (REPLACE_COMMIT_ACTION )) {
62- long totalBytesWritten = commitMetadata .fetchTotalBytesWritten ();
63- long totalRecordsWritten = commitMetadata .fetchTotalRecordsWritten ();
64- if (totalBytesWritten > fileSizeThreshold && totalRecordsWritten > 0 ) {
65- avgSize = (long ) Math .ceil ((1.0 * totalBytesWritten ) / totalRecordsWritten );
66- break ;
67- }
68- } else if (instant .getAction ().equals (DELTA_COMMIT_ACTION )) {
69- // lets consider only base files in case of delta commits
70- AtomicLong totalBytesWritten = new AtomicLong (0L );
71- AtomicLong totalRecordsWritten = new AtomicLong (0L );
72- commitMetadata .getWriteStats ().stream ()
73- .filter (hoodieWriteStat -> FSUtils .isBaseFile (new Path (hoodieWriteStat .getPath ())))
74- .forEach (hoodieWriteStat -> {
75- totalBytesWritten .addAndGet (hoodieWriteStat .getTotalWriteBytes ());
76- totalRecordsWritten .addAndGet (hoodieWriteStat .getNumWrites ());
77- });
78- if (totalBytesWritten .get () > fileSizeThreshold && totalRecordsWritten .get () > 0 ) {
79- avgSize = (long ) Math .ceil ((1.0 * totalBytesWritten .get ()) / totalRecordsWritten .get ());
80- break ;
81- }
53+ if (!commitTimeline .empty ()) {
54+ // Go over the reverse ordered commits to get a more recent estimate of average record size.
55+ Iterator <HoodieInstant > instants = commitTimeline .getReverseOrderedInstants ().iterator ();
56+ while (instants .hasNext ()) {
57+ HoodieInstant instant = instants .next ();
58+ try {
59+ HoodieCommitMetadata commitMetadata = HoodieCommitMetadata
60+ .fromBytes (commitTimeline .getInstantDetails (instant ).get (), HoodieCommitMetadata .class );
61+ if (instant .getAction ().equals (COMMIT_ACTION ) || instant .getAction ().equals (REPLACE_COMMIT_ACTION )) {
62+ long totalBytesWritten = commitMetadata .fetchTotalBytesWritten ();
63+ long totalRecordsWritten = commitMetadata .fetchTotalRecordsWritten ();
64+ if (totalBytesWritten > fileSizeThreshold && totalRecordsWritten > 0 ) {
65+ avgSize = (long ) Math .ceil ((1.0 * totalBytesWritten ) / totalRecordsWritten );
66+ break ;
67+ }
68+ } else if (instant .getAction ().equals (DELTA_COMMIT_ACTION )) {
69+ // lets consider only base files in case of delta commits
70+ AtomicLong totalBytesWritten = new AtomicLong (0L );
71+ AtomicLong totalRecordsWritten = new AtomicLong (0L );
72+ commitMetadata .getWriteStats ().stream ()
73+ .filter (hoodieWriteStat -> FSUtils .isBaseFile (new Path (hoodieWriteStat .getPath ())))
74+ .forEach (hoodieWriteStat -> {
75+ totalBytesWritten .addAndGet (hoodieWriteStat .getTotalWriteBytes ());
76+ totalRecordsWritten .addAndGet (hoodieWriteStat .getNumWrites ());
77+ });
78+ if (totalBytesWritten .get () > fileSizeThreshold && totalRecordsWritten .get () > 0 ) {
79+ avgSize = (long ) Math .ceil ((1.0 * totalBytesWritten .get ()) / totalRecordsWritten .get ());
80+ break ;
8281 }
83- } catch (IOException ioe ) {
84- // make this fail safe.
85- LOG .error ("Error trying to compute average bytes/record " , ioe );
8682 }
83+ } catch (IOException ioe ) {
84+ // make this fail safe.
85+ LOG .error ("Error trying to compute average bytes/record " , ioe );
8786 }
8887 }
88+ }
8989 return avgSize ;
9090 }
9191}
0 commit comments