Skip to content
Merged
Changes from 11 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -270,7 +270,7 @@ public long getCurrentSize() throws IOException {
if (output == null) {
return 0;
}
return output.getPos();
return output.getPos() + logFile.getFileSize();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@loukey-lj Could you explain how this affects the size calculation? Should output.getPos() return the size written already?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@yihua When appending data to an old log file,org.apache.hudi.common.table.log.HoodieLogFormatWriter#getOutputStream postition always start at 0, after flush, org.apache.hudi.common.table.log.HoodieLogFormatWriter#getCurrentSize returned result is the size of the append dataset, not the total size of the entire file. u can debug follow code.

StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.enableCheckpointing(1000 * 20);
env.setParallelism(1);

    StreamTableEnvironment tableEnvironment = StreamTableEnvironment.create(env);
    final DataStreamSource<Tuple3<String, Long, Long>> tuple3DataStreamSource = env.addSource(new SourceFunction<Tuple3<String, Long, Long>>() {
        @Override
        public void run(SourceContext<Tuple3<String, Long, Long>> ctx) throws Exception {
            while (!Thread.interrupted()){
                ctx.collect(new Tuple3<>("1",System.currentTimeMillis(), System.currentTimeMillis()));
                Thread.sleep(1000 * 10 );
            }
        }
        @Override
        public void cancel() {
        }
    });

    tableEnvironment.createTemporaryView("s", tuple3DataStreamSource);

    tableEnvironment.executeSql("\n" +
                    "\n" +
                    "create table if not exists h(\n" +
                    "  `id` string PRIMARY KEY NOT ENFORCED , \n" +
                    "  `ts` bigint , \n" +
                    "  `time` bigint \n" +
                    ") \n" +
                    "with (\n" +
                    "\t'connector' = 'hudi',\n 'write.bucket_assign.tasks'='1', " +
                    "\t'hoodie.datasource.write.keygenerator.class'='org.apache.hudi.keygen.SimpleAvroKeyGenerator',\n" +
                    "\t'table.type' = 'MERGE_ON_READ',\n" +
                    "\t'hive_sync.enable' = 'false',\n" +
                    "\t'write.tasks'='1',\n" +
                    "\t'path' = 'hdfs://xx',\n" +
                    "\t'hoodie.cleaner.commits.retained' = '1'\n" +
     ")\n");

    tableEnvironment.executeSql("insert into h  SELECT * from s \n");

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@loukey-lj Got it. This is a HDFS-specific problem.

}

/**
Expand Down