2929import org .slf4j .Logger ;
3030import org .slf4j .LoggerFactory ;
3131
32+ import java .io .IOException ;
3233import java .util .Iterator ;
3334import java .util .concurrent .atomic .AtomicLong ;
3435
@@ -49,42 +50,42 @@ public class AverageRecordSizeUtils {
4950 static long averageBytesPerRecord (HoodieTimeline commitTimeline , HoodieWriteConfig hoodieWriteConfig ) {
5051 long avgSize = hoodieWriteConfig .getCopyOnWriteRecordSizeEstimate ();
5152 long fileSizeThreshold = (long ) (hoodieWriteConfig .getRecordSizeEstimationThreshold () * hoodieWriteConfig .getParquetSmallFileLimit ());
52- try {
5353 if (!commitTimeline .empty ()) {
5454 // Go over the reverse ordered commits to get a more recent estimate of average record size.
5555 Iterator <HoodieInstant > instants = commitTimeline .getReverseOrderedInstants ().iterator ();
5656 while (instants .hasNext ()) {
5757 HoodieInstant instant = instants .next ();
58- HoodieCommitMetadata commitMetadata = HoodieCommitMetadata
59- .fromBytes (commitTimeline .getInstantDetails (instant ).get (), HoodieCommitMetadata .class );
60- if (instant .getAction ().equals (COMMIT_ACTION ) || instant .getAction ().equals (REPLACE_COMMIT_ACTION )) {
61- long totalBytesWritten = commitMetadata .fetchTotalBytesWritten ();
62- long totalRecordsWritten = commitMetadata .fetchTotalRecordsWritten ();
63- if (totalBytesWritten > fileSizeThreshold && totalRecordsWritten > 0 ) {
64- avgSize = (long ) Math .ceil ((1.0 * totalBytesWritten ) / totalRecordsWritten );
65- break ;
66- }
67- } else if (instant .getAction ().equals (DELTA_COMMIT_ACTION )) {
68- // lets consider only base files in case of delta commits
69- AtomicLong totalBytesWritten = new AtomicLong (0L );
70- AtomicLong totalRecordsWritten = new AtomicLong (0L );
71- commitMetadata .getWriteStats ().stream ()
72- .filter (hoodieWriteStat -> FSUtils .isBaseFile (new Path (hoodieWriteStat .getPath ())))
73- .forEach (hoodieWriteStat -> {
74- totalBytesWritten .addAndGet (hoodieWriteStat .getTotalWriteBytes ());
75- totalRecordsWritten .addAndGet (hoodieWriteStat .getNumWrites ());
76- });
77- if (totalBytesWritten .get () > fileSizeThreshold && totalRecordsWritten .get () > 0 ) {
78- avgSize = (long ) Math .ceil ((1.0 * totalBytesWritten .get ()) / totalRecordsWritten .get ());
79- break ;
58+ try {
59+ HoodieCommitMetadata commitMetadata = HoodieCommitMetadata
60+ .fromBytes (commitTimeline .getInstantDetails (instant ).get (), HoodieCommitMetadata .class );
61+ if (instant .getAction ().equals (COMMIT_ACTION ) || instant .getAction ().equals (REPLACE_COMMIT_ACTION )) {
62+ long totalBytesWritten = commitMetadata .fetchTotalBytesWritten ();
63+ long totalRecordsWritten = commitMetadata .fetchTotalRecordsWritten ();
64+ if (totalBytesWritten > fileSizeThreshold && totalRecordsWritten > 0 ) {
65+ avgSize = (long ) Math .ceil ((1.0 * totalBytesWritten ) / totalRecordsWritten );
66+ break ;
67+ }
68+ } else if (instant .getAction ().equals (DELTA_COMMIT_ACTION )) {
69+ // lets consider only base files in case of delta commits
70+ AtomicLong totalBytesWritten = new AtomicLong (0L );
71+ AtomicLong totalRecordsWritten = new AtomicLong (0L );
72+ commitMetadata .getWriteStats ().stream ()
73+ .filter (hoodieWriteStat -> FSUtils .isBaseFile (new Path (hoodieWriteStat .getPath ())))
74+ .forEach (hoodieWriteStat -> {
75+ totalBytesWritten .addAndGet (hoodieWriteStat .getTotalWriteBytes ());
76+ totalRecordsWritten .addAndGet (hoodieWriteStat .getNumWrites ());
77+ });
78+ if (totalBytesWritten .get () > fileSizeThreshold && totalRecordsWritten .get () > 0 ) {
79+ avgSize = (long ) Math .ceil ((1.0 * totalBytesWritten .get ()) / totalRecordsWritten .get ());
80+ break ;
81+ }
8082 }
83+ } catch (IOException ioe ) {
84+ // make this fail safe.
85+ LOG .error ("Error trying to compute average bytes/record " , ioe );
8186 }
8287 }
8388 }
84- } catch (Throwable t ) {
85- // make this fail safe.
86- LOG .error ("Error trying to compute average bytes/record " , t );
87- }
8889 return avgSize ;
8990 }
9091}
0 commit comments