Skip to content

Commit e7c3ae4

Browse files
authored
Merge pull request #586 from Altinity/metadata_cache_for_parquet_24_12_2
Parquet File Metadata caching implementation
2 parents 2c1de21 + 242ebda commit e7c3ae4

9 files changed

Lines changed: 152 additions & 7 deletions

src/Common/ProfileEvents.cpp

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -914,8 +914,10 @@ The server successfully detected this situation and will download merged part fr
914914
M(MemoryWorkerRun, "Number of runs done by MemoryWorker in background", ValueType::Number) \
915915
M(MemoryWorkerRunElapsedMicroseconds, "Total time spent by MemoryWorker for background work", ValueType::Microseconds) \
916916
\
917-
M(ParquetFetchWaitTimeMicroseconds, "Time of waiting fetching parquet data", ValueType::Microseconds) \
918-
917+
M(ParquetFetchWaitTimeMicroseconds, "Time of waiting fetching parquet data", ValueType::Microseconds) \
918+
\
919+
M(ParquetMetaDataCacheHits, "Number of times the read from filesystem cache hit the cache.", ValueType::Number) \
920+
M(ParquetMetaDataCacheMisses, "Number of times the read from filesystem cache miss the cache.", ValueType::Number) \
919921

920922
#ifdef APPLY_FOR_EXTERNAL_EVENTS
921923
#define APPLY_FOR_EVENTS(M) APPLY_FOR_BUILTIN_EVENTS(M) APPLY_FOR_EXTERNAL_EVENTS(M)

src/Core/FormatFactorySettings.h

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1251,7 +1251,7 @@ Set the quoting rule for identifiers in SHOW CREATE query
12511251
DECLARE(IdentifierQuotingStyle, show_create_query_identifier_quoting_style, IdentifierQuotingStyle::Backticks, R"(
12521252
Set the quoting style for identifiers in SHOW CREATE query
12531253
)", 0) \
1254-
1254+
DECLARE(Bool, input_format_parquet_use_metadata_cache, false, R"(Enable parquet file metadata caching)", 0) \
12551255
// End of FORMAT_FACTORY_SETTINGS
12561256

12571257
#define OBSOLETE_FORMAT_SETTINGS(M, ALIAS) \

src/Core/ServerSettings.cpp

Lines changed: 2 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -212,9 +212,8 @@ namespace DB
212212
DECLARE(UInt64, threadpool_writer_queue_size, 1000000, "Number of tasks which is possible to push into background pool for write requests to object storages", 0) \
213213
DECLARE(UInt64, iceberg_catalog_threadpool_pool_size, 50, "Size of background pool for iceberg catalog", 0) \
214214
DECLARE(UInt64, iceberg_catalog_threadpool_queue_size, 1000000, "Number of tasks which is possible to push into iceberg catalog pool", 0) \
215-
DECLARE(UInt32, allow_feature_tier, 0, "0 - All feature tiers allowed (experimental, beta, production). 1 - Only beta and production feature tiers allowed. 2 - Only production feature tier allowed", 0) \
216-
217-
215+
DECLARE(UInt32, allow_feature_tier, 0, "0 - All feature tiers allowed (experimental, beta, production). 1 - Only beta and production feature tiers allowed. 2 - Only production feature tier allowed", 0) \
216+
DECLARE(UInt64, input_format_parquet_metadata_cache_max_size, 500000000, "Maximum size of parquet file metadata cache", 0) \
218217
// clang-format on
219218

220219
/// If you add a setting which can be updated at runtime, please update 'changeable_settings' map in dumpToSystemServerSettingsColumns below

src/Processors/Formats/IInputFormat.h

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -67,6 +67,9 @@ class IInputFormat : public SourceWithKeyCondition
6767

6868
void needOnlyCount() { need_only_count = true; }
6969

70+
/// Set additional info/key/id related to underlying storage of the ReadBuffer
71+
virtual void setStorageRelatedUniqueKey(const ServerSettings & /* server_settings */, const Settings & /*settings*/, const String & /*key*/) {}
72+
7073
protected:
7174
ReadBuffer & getReadBuffer() const { chassert(in); return *in; }
7275

src/Processors/Formats/Impl/ParquetBlockInputFormat.cpp

Lines changed: 84 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,9 @@
33

44
#if USE_PARQUET
55

6+
#include <Core/Settings.h>
7+
#include <Core/ServerSettings.h>
8+
#include <Common/ProfileEvents.h>
69
#include <Common/logger_useful.h>
710
#include <Common/ThreadPool.h>
811
#include <Formats/FormatFactory.h>
@@ -33,6 +36,8 @@
3336
namespace ProfileEvents
3437
{
3538
extern const Event ParquetFetchWaitTimeMicroseconds;
39+
extern const Event ParquetMetaDataCacheHits;
40+
extern const Event ParquetMetaDataCacheMisses;
3641
}
3742

3843
namespace CurrentMetrics
@@ -49,6 +54,16 @@ namespace CurrentMetrics
4954
namespace DB
5055
{
5156

57+
namespace Setting
58+
{
59+
extern const SettingsBool input_format_parquet_use_metadata_cache;
60+
}
61+
62+
namespace ServerSetting
63+
{
64+
extern const ServerSettingsUInt64 input_format_parquet_metadata_cache_max_size;
65+
}
66+
5267
namespace ErrorCodes
5368
{
5469
extern const int BAD_ARGUMENTS;
@@ -507,6 +522,58 @@ static std::vector<Range> getHyperrectangleForRowGroup(const parquet::FileMetaDa
507522
return hyperrectangle;
508523
}
509524

525+
ParquetFileMetaDataCache::ParquetFileMetaDataCache(UInt64 max_size_bytes)
526+
: CacheBase(max_size_bytes) {}
527+
528+
ParquetFileMetaDataCache * ParquetFileMetaDataCache::instance(UInt64 max_size_bytes)
529+
{
530+
static ParquetFileMetaDataCache instance(max_size_bytes);
531+
return &instance;
532+
}
533+
534+
std::shared_ptr<parquet::FileMetaData> ParquetBlockInputFormat::readMetadataFromFile()
535+
{
536+
createArrowFileIfNotCreated();
537+
return parquet::ReadMetaData(arrow_file);
538+
}
539+
540+
std::shared_ptr<parquet::FileMetaData> ParquetBlockInputFormat::getFileMetaData()
541+
{
542+
// in-memory cache is not implemented for local file operations, only for remote files
543+
// there is a chance the user sets `input_format_parquet_use_metadata_cache=1` for a local file operation
544+
// and the cache_key won't be set. Therefore, we also need to check for metadata_cache.key
545+
if (!metadata_cache.use_cache || metadata_cache.key.empty())
546+
{
547+
return readMetadataFromFile();
548+
}
549+
550+
auto [parquet_file_metadata, loaded] = ParquetFileMetaDataCache::instance(metadata_cache.max_size_bytes)->getOrSet(
551+
metadata_cache.key,
552+
[&]()
553+
{
554+
return readMetadataFromFile();
555+
}
556+
);
557+
if (loaded)
558+
ProfileEvents::increment(ProfileEvents::ParquetMetaDataCacheMisses);
559+
else
560+
ProfileEvents::increment(ProfileEvents::ParquetMetaDataCacheHits);
561+
return parquet_file_metadata;
562+
}
563+
564+
void ParquetBlockInputFormat::createArrowFileIfNotCreated()
565+
{
566+
if (arrow_file)
567+
{
568+
return;
569+
}
570+
571+
// Create arrow file adapter.
572+
// TODO: Make the adapter do prefetching on IO threads, based on the full set of ranges that
573+
// we'll need to read (which we know in advance). Use max_download_threads for that.
574+
arrow_file = asArrowFile(*in, format_settings, is_stopped, "Parquet", PARQUET_MAGIC_BYTES, /* avoid_buffering */ true);
575+
}
576+
510577
std::unordered_set<std::size_t> getBloomFilterFilteringColumnKeys(const KeyCondition::RPN & rpn)
511578
{
512579
std::unordered_set<std::size_t> column_keys;
@@ -606,7 +673,7 @@ void ParquetBlockInputFormat::initializeIfNeeded()
606673
if (is_stopped)
607674
return;
608675

609-
metadata = parquet::ReadMetaData(arrow_file);
676+
metadata = getFileMetaData();
610677
const bool prefetch_group = supportPrefetch();
611678

612679
std::shared_ptr<arrow::Schema> schema;
@@ -706,6 +773,8 @@ void ParquetBlockInputFormat::initializeIfNeeded()
706773
}
707774
}
708775

776+
bool has_row_groups_to_read = false;
777+
709778
auto skip_row_group_based_on_filters = [&](int row_group)
710779
{
711780
if (!format_settings.parquet.filter_push_down && !format_settings.parquet.bloom_filter_push_down)
@@ -755,9 +824,23 @@ void ParquetBlockInputFormat::initializeIfNeeded()
755824
row_group_batches.back().total_bytes_compressed += row_group_size;
756825
auto rows = adaptive_chunk_size(row_group);
757826
row_group_batches.back().adaptive_chunk_size = rows ? rows : format_settings.parquet.max_block_size;
827+
828+
has_row_groups_to_read = true;
829+
}
830+
831+
if (has_row_groups_to_read)
832+
{
833+
createArrowFileIfNotCreated();
758834
}
759835
}
760836

837+
void ParquetBlockInputFormat::setStorageRelatedUniqueKey(const ServerSettings & server_settings, const Settings & settings, const String & key_)
838+
{
839+
metadata_cache.key = key_;
840+
metadata_cache.use_cache = settings[Setting::input_format_parquet_use_metadata_cache];
841+
metadata_cache.max_size_bytes = server_settings[ServerSetting::input_format_parquet_metadata_cache_max_size];
842+
}
843+
761844
void ParquetBlockInputFormat::initializeRowGroupBatchReader(size_t row_group_batch_idx)
762845
{
763846
const bool row_group_prefetch = supportPrefetch();

src/Processors/Formats/Impl/ParquetBlockInputFormat.h

Lines changed: 24 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@
22
#include "config.h"
33
#if USE_PARQUET
44

5+
#include <Common/CacheBase.h>
56
#include <Processors/Formats/IInputFormat.h>
67
#include <Processors/Formats/ISchemaReader.h>
78
#include <Formats/FormatSettings.h>
@@ -72,6 +73,8 @@ class ParquetBlockInputFormat : public IInputFormat
7273

7374
size_t getApproxBytesReadForChunk() const override { return previous_approx_bytes_read_for_chunk; }
7475

76+
void setStorageRelatedUniqueKey(const ServerSettings & server_settings, const Settings & settings, const String & key_) override;
77+
7578
private:
7679
Chunk read() override;
7780

@@ -90,6 +93,11 @@ class ParquetBlockInputFormat : public IInputFormat
9093

9194
void threadFunction(size_t row_group_batch_idx);
9295

96+
void createArrowFileIfNotCreated();
97+
std::shared_ptr<parquet::FileMetaData> readMetadataFromFile();
98+
99+
std::shared_ptr<parquet::FileMetaData> getFileMetaData();
100+
93101
inline bool supportPrefetch() const;
94102

95103
// Data layout in the file:
@@ -338,6 +346,12 @@ class ParquetBlockInputFormat : public IInputFormat
338346
std::exception_ptr background_exception = nullptr;
339347
std::atomic<int> is_stopped{0};
340348
bool is_initialized = false;
349+
struct Cache
350+
{
351+
String key;
352+
bool use_cache = false;
353+
UInt64 max_size_bytes{0};
354+
} metadata_cache;
341355
};
342356

343357
class ParquetSchemaReader : public ISchemaReader
@@ -356,6 +370,16 @@ class ParquetSchemaReader : public ISchemaReader
356370
std::shared_ptr<parquet::FileMetaData> metadata;
357371
};
358372

373+
class ParquetFileMetaDataCache : public CacheBase<String, parquet::FileMetaData>
374+
{
375+
public:
376+
static ParquetFileMetaDataCache * instance(UInt64 max_size_bytes);
377+
void clear() {}
378+
379+
private:
380+
ParquetFileMetaDataCache(UInt64 max_size_bytes);
381+
};
382+
359383
}
360384

361385
#endif

src/Storages/ObjectStorage/StorageObjectStorageSource.cpp

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -414,6 +414,9 @@ StorageObjectStorageSource::ReaderHolder StorageObjectStorageSource::createReade
414414
if (need_only_count)
415415
input_format->needOnlyCount();
416416

417+
if (!object_info->getPath().empty())
418+
input_format->setStorageRelatedUniqueKey(context_->getServerSettings(), context_->getSettingsRef(), object_info->getPath() + ":" + object_info->metadata->etag);
419+
417420
builder.init(Pipe(input_format));
418421

419422
if (auto transformer = configuration->getSchemaTransformer(object_info->getPath()))
Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,3 @@
1+
10
2+
10
3+
10
Lines changed: 28 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,28 @@
1+
-- Tags: no-parallel, no-fasttest
2+
3+
DROP TABLE IF EXISTS t_parquet_03262;
4+
5+
CREATE TABLE t_parquet_03262 (a UInt64)
6+
ENGINE = S3(s3_conn, filename = 'test_03262_{_partition_id}', format = Parquet)
7+
PARTITION BY a;
8+
9+
INSERT INTO t_parquet_03262 SELECT number FROM numbers(10) SETTINGS s3_truncate_on_insert=1;
10+
11+
SELECT COUNT(*)
12+
FROM s3(s3_conn, filename = 'test_03262_*', format = Parquet)
13+
SETTINGS input_format_parquet_use_metadata_cache=1;
14+
15+
SELECT COUNT(*)
16+
FROM s3(s3_conn, filename = 'test_03262_*', format = Parquet)
17+
SETTINGS input_format_parquet_use_metadata_cache=1, log_comment='test_03262_parquet_metadata_cache';
18+
19+
SYSTEM FLUSH LOGS;
20+
21+
SELECT ProfileEvents['ParquetMetaDataCacheHits']
22+
FROM system.query_log
23+
where log_comment = 'test_03262_parquet_metadata_cache'
24+
AND type = 'QueryFinish'
25+
ORDER BY event_time desc
26+
LIMIT 1;
27+
28+
DROP TABLE t_parquet_03262;

0 commit comments

Comments
 (0)