Skip to content

Commit d6aebe7

Browse files
committed
support stdin as source for zfile compression.
Signed-off-by: Yifan Yuan <tuji.yyf@alibaba-inc.com>
1 parent b33ab47 commit d6aebe7

File tree

3 files changed

+66
-26
lines changed

3 files changed

+66
-26
lines changed

src/overlaybd/zfile/test/test.cpp

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -170,6 +170,7 @@ TEST_F(ZFileTest, verify_compression) {
170170
opt.block_size = 1<<bs;
171171
CompressArgs args(opt);
172172
zfile_compress(fsrc.get(), nullptr, &args);
173+
fsrc->lseek(0, SEEK_SET);
173174
int ret = zfile_compress(fsrc.get(), fdst.get(), &args);
174175
auto fzfile = zfile_open_ro(fdst.get(), opt.verify);
175176
EXPECT_EQ(ret, 0);

src/overlaybd/zfile/zfile.cpp

Lines changed: 22 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -1106,8 +1106,6 @@ int zfile_compress(IFile *file, IFile *as, const CompressArgs *args) {
11061106
if (ret < 0) {
11071107
LOG_ERRNO_RETURN(0, -1, "failed to write header");
11081108
}
1109-
auto raw_data_size = file->lseek(0, SEEK_END);
1110-
LOG_INFO("source data size: `", raw_data_size);
11111109
auto block_size = opt.block_size;
11121110
LOG_INFO("block size: `", block_size);
11131111
auto buf_size = block_size + BUF_SIZE;
@@ -1124,39 +1122,41 @@ int zfile_compress(IFile *file, IFile *as, const CompressArgs *args) {
11241122
compressed_len.resize(nbatch);
11251123
raw_chunk_len.resize(nbatch);
11261124
LOG_INFO("compress with start....");
1127-
off_t i = 0;
1128-
while (i < raw_data_size) {
1125+
off_t infile_size = 0;
1126+
while (true) {
11291127
int n = 0;
1130-
auto step = std::min((ssize_t)block_size * nbatch, (ssize_t)(raw_data_size - i));
1131-
auto ret = file->pread(raw_data, step, i);
1132-
if (ret < step) {
1133-
LOG_ERRNO_RETURN(0, -1, "failed to read from source file. (readn: `)", ret);
1134-
}
1135-
i += step;
1136-
while (step > 0) {
1137-
if (step < block_size) {
1138-
raw_chunk_len[n++] = step;
1128+
auto readn = file->read(raw_data, block_size * nbatch);
1129+
if (readn == 0) {
1130+
break;
1131+
}
1132+
if (readn < 0) {
1133+
LOG_ERRNO_RETURN(0, -1, "failed to read from source file. (readn: `)", readn);
1134+
}
1135+
infile_size += readn;
1136+
while (readn > 0) {
1137+
if (readn < block_size) {
1138+
raw_chunk_len[n++] = readn;
11391139
break;
11401140
}
11411141
raw_chunk_len[n++] = block_size;
1142-
step -= block_size;
1142+
readn -= block_size;
11431143
}
1144-
ret = compressor->compress_batch(raw_data, &(raw_chunk_len[0]), compressed_data,
1144+
readn = compressor->compress_batch(raw_data, &(raw_chunk_len[0]), compressed_data,
11451145
n * buf_size, &(compressed_len[0]), n);
1146-
if (ret != 0)
1146+
if (readn != 0)
11471147
return -1;
11481148
for (off_t j = 0; j < n; j++) {
1149-
ret = as->write(&compressed_data[j * buf_size], compressed_len[j]);
1150-
if (ret < (ssize_t)compressed_len[j]) {
1149+
readn = as->write(&compressed_data[j * buf_size], compressed_len[j]);
1150+
if (readn < (ssize_t)compressed_len[j]) {
11511151
LOG_ERRNO_RETURN(0, -1, "failed to write compressed data.");
11521152
}
11531153
if (crc32_verify) {
11541154
auto crc32_code = crc32c_salt(&compressed_data[j * buf_size], compressed_len[j]);
11551155
LOG_DEBUG("append ` bytes crc32_code: {offset: `, count: `, crc32: `}",
11561156
sizeof(uint32_t), moffset, compressed_len[j], HEX(crc32_code).width(8));
11571157
compressed_len[j] += sizeof(uint32_t);
1158-
ret = as->write(&crc32_code, sizeof(uint32_t));
1159-
if (ret < (ssize_t)sizeof(uint32_t)) {
1158+
readn = as->write(&crc32_code, sizeof(uint32_t));
1159+
if (readn < (ssize_t)sizeof(uint32_t)) {
11601160
LOG_ERRNO_RETURN(0, -1, "failed to write crc32code, offset: `, crc32: `",
11611161
moffset, HEX(crc32_code).width(8));
11621162
}
@@ -1176,8 +1176,8 @@ int zfile_compress(IFile *file, IFile *as, const CompressArgs *args) {
11761176
LOG_INFO("index checksum: `", HEX(pht->index_crc).width(8));
11771177
pht->index_offset = index_offset;
11781178
pht->index_size = index_size;
1179-
pht->original_file_size = raw_data_size;
1180-
LOG_INFO("write trailer.");
1179+
pht->original_file_size = infile_size;
1180+
LOG_INFO("write trailer. (source file size: `)", infile_size);
11811181
ret = write_header_trailer(as, false, true, true, pht);
11821182
if (ret < 0)
11831183
LOG_ERRNO_RETURN(0, -1, "failed to write trailer");

src/tools/overlaybd-zfile.cpp

Lines changed: 43 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -16,10 +16,13 @@
1616

1717
#include "../overlaybd/zfile/zfile.h"
1818
#include "../overlaybd/tar/tar_file.h"
19+
#include <cstdint>
1920
#include <photon/common/uuid.h>
2021
#include <photon/common/utility.h>
2122
#include <photon/fs/localfs.h>
2223
#include <photon/common/alog.h>
24+
#include <photon/net/basic_socket.h>
25+
#include <photon/fs/virtual-file.h>
2326
#include <cstdio>
2427
#include <cstdlib>
2528
#include <errno.h>
@@ -31,6 +34,7 @@
3134
#include <unistd.h>
3235
#include <photon/photon.h>
3336
#include "CLI11.hpp"
37+
#include "photon/net/basic_socket.h"
3438
#include "photon/fs/filesystem.h"
3539

3640
using namespace std;
@@ -39,6 +43,24 @@ using namespace ZFile;
3943

4044
IFileSystem *lfs = nullptr;
4145

46+
class IStreamFile : public VirtualReadOnlyFile{
47+
public:
48+
virtual ssize_t read(void *buf, size_t count) override {
49+
return photon::net::read(0, buf, count);
50+
}
51+
52+
virtual off_t lseek(off_t offset, int whence) override {
53+
return INT64_MAX;
54+
}
55+
56+
UNIMPLEMENTED(int fstat(struct stat *buf) override);
57+
UNIMPLEMENTED_POINTER(IFileSystem* filesystem() override);
58+
};
59+
60+
IFile *new_streamFile(){
61+
return new IStreamFile;
62+
}
63+
4264
int verify_crc(IFile* src_file) {
4365

4466
if (is_zfile(src_file) != 1) {
@@ -71,7 +93,7 @@ int main(int argc, char **argv) {
7193
->default_val(4);
7294
app.add_option("source_file", fn_src, "source file path")
7395
->type_name("FILEPATH")
74-
->check(CLI::ExistingFile)
96+
// ->check(CLI::ExistingFile)
7597
->required();
7698
app.add_option("target_file", fn_dst, "target file path")->type_name("FILEPATH");
7799
app.add_flag("--verbose", verbose, "output debug info")->default_val(false);
@@ -82,8 +104,16 @@ int main(int argc, char **argv) {
82104
DEFER({photon::fini();});
83105

84106
lfs = new_localfs_adaptor();
107+
108+
85109
if (verify) {
86-
auto file = lfs->open(fn_src.c_str(), O_RDONLY);
110+
IFile *file = nullptr;
111+
if (fn_src.empty()) {
112+
LOG_INFO("read source from STDIN");
113+
file = new_streamFile();
114+
} else {
115+
file = lfs->open(fn_src.c_str(), O_RDONLY);
116+
}
87117
if (!file) {
88118
fprintf(stderr, "failed to open file %s\n", fn_src.c_str());
89119
exit(-1);
@@ -95,6 +125,13 @@ int main(int argc, char **argv) {
95125
printf("%s is a valid zfile blob.\n", fn_src.c_str());
96126
return 0;
97127
}
128+
bool pipe = false;
129+
if (fn_dst == "") {
130+
LOG_INFO("read source from STDIN");
131+
pipe = true;
132+
fn_dst = fn_src;
133+
fn_src = "";
134+
}
98135

99136
CompressOptions opt;
100137
opt.verify = 1;
@@ -119,7 +156,7 @@ int main(int argc, char **argv) {
119156
CompressArgs args(opt);
120157
if (!extract) {
121158
printf("compress file %s as %s\n", fn_src.c_str(), fn_dst.c_str());
122-
IFile *infile = lfs->open(fn_src.c_str(), O_RDONLY);
159+
IFile *infile = (!pipe ? lfs->open(fn_src.c_str(), O_RDONLY) : new_streamFile() );
123160
if (infile == nullptr) {
124161
fprintf(stderr, "failed to open file %s\n", fn_src.c_str());
125162
exit(-1);
@@ -141,8 +178,10 @@ int main(int argc, char **argv) {
141178
printf("compress file done.\n");
142179
return ret;
143180
} else {
181+
if (pipe) {
182+
LOG_ERROR_RETURN(0, -1, "decompression can't use STDIN");
183+
}
144184
printf("decompress file %s as %s\n", fn_src.c_str(), fn_dst.c_str());
145-
146185
IFile *infile = fs->open(fn_src.c_str(), O_RDONLY);
147186
if (infile == nullptr) {
148187
fprintf(stderr, "failed to open file %s\n", fn_dst.c_str());

0 commit comments

Comments
 (0)