2323import java .io .File ;
2424import java .io .IOException ;
2525import java .io .UncheckedIOException ;
26+ import java .net .URI ;
2627import java .nio .file .Files ;
2728import java .nio .file .Path ;
29+ import java .nio .file .Paths ;
2830import java .util .Collections ;
2931import java .util .concurrent .ExecutorService ;
3032import java .util .concurrent .Executors ;
4244
4345class ITRunSync {
4446
47+ @ Test
48+ void testSingleSyncMode (@ TempDir Path tempDir ) throws IOException {
49+ String tableName = "test-table" ;
50+ try (GenericTable table =
51+ TestJavaHudiTable .forStandardSchema (
52+ tableName , tempDir , null , HoodieTableType .COPY_ON_WRITE )) {
53+ table .insertRows (20 );
54+ File configFile = writeConfigFile (tempDir , table , tableName );
55+ String [] args = new String [] {"--datasetConfig" , configFile .getPath ()};
56+ RunSync .main (args );
57+ Path icebergMetadataPath = Paths .get (URI .create (table .getBasePath () + "/metadata" ));
58+ waitForNumIcebergCommits (icebergMetadataPath , 2 );
59+ }
60+ }
61+
4562 @ Test
4663 void testContinuousSyncMode (@ TempDir Path tempDir ) throws IOException {
4764 ExecutorService runner = Executors .newSingleThreadExecutor ();
@@ -50,19 +67,7 @@ void testContinuousSyncMode(@TempDir Path tempDir) throws IOException {
5067 TestJavaHudiTable .forStandardSchema (
5168 tableName , tempDir , null , HoodieTableType .COPY_ON_WRITE )) {
5269 table .insertRows (20 );
53- RunSync .DatasetConfig config =
54- RunSync .DatasetConfig .builder ()
55- .sourceFormat ("HUDI" )
56- .targetFormats (Collections .singletonList ("ICEBERG" ))
57- .datasets (
58- Collections .singletonList (
59- RunSync .DatasetConfig .Table .builder ()
60- .tableBasePath (table .getBasePath ())
61- .tableName (tableName )
62- .build ()))
63- .build ();
64- File configFile = new File (tempDir + "config.yaml" );
65- RunSync .YAML_MAPPER .writeValue (configFile , config );
70+ File configFile = writeConfigFile (tempDir , table , tableName );
6671 String [] args = new String [] {"--datasetConfig" , configFile .getPath (), "--continuousMode" };
6772 runner .submit (
6873 () -> {
@@ -72,17 +77,34 @@ void testContinuousSyncMode(@TempDir Path tempDir) throws IOException {
7277 throw new UncheckedIOException (ex );
7378 }
7479 });
75- Path metadataPath = tempDir . resolve ( tableName + "_v1" ). resolve ( " metadata" );
76- waitForNumIcebergCommits (metadataPath , 2 );
80+ Path icebergMetadataPath = Paths . get ( URI . create ( table . getBasePath () + "/ metadata" ) );
81+ waitForNumIcebergCommits (icebergMetadataPath , 2 );
7782 // write more data now that table is initialized and data is synced
7883 table .insertRows (20 );
79- waitForNumIcebergCommits (metadataPath , 3 );
80- assertEquals (3 , numIcebergMetadataJsonFiles (metadataPath ));
84+ waitForNumIcebergCommits (icebergMetadataPath , 3 );
85+ assertEquals (3 , numIcebergMetadataJsonFiles (icebergMetadataPath ));
8186 } finally {
8287 runner .shutdownNow ();
8388 }
8489 }
8590
91+ private static File writeConfigFile (Path tempDir , GenericTable table , String tableName ) throws IOException {
92+ RunSync .DatasetConfig config =
93+ RunSync .DatasetConfig .builder ()
94+ .sourceFormat ("HUDI" )
95+ .targetFormats (Collections .singletonList ("ICEBERG" ))
96+ .datasets (
97+ Collections .singletonList (
98+ RunSync .DatasetConfig .Table .builder ()
99+ .tableBasePath (table .getBasePath ())
100+ .tableName (tableName )
101+ .build ()))
102+ .build ();
103+ File configFile = new File (tempDir + "config.yaml" );
104+ RunSync .YAML_MAPPER .writeValue (configFile , config );
105+ return configFile ;
106+ }
107+
86108 @ SneakyThrows
87109 private static void waitForNumIcebergCommits (Path metadataPath , int count ) {
88110 long start = System .currentTimeMillis ();
0 commit comments