Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 9 additions & 0 deletions programs/server/Server.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -146,6 +146,10 @@
# include <azure/core/diagnostics/logger.hpp>
#endif

#if USE_PARQUET
# include <Processors/Formats/Impl/ParquetFileMetaDataCache.h>
#endif


#include <incbin.h>
/// A minimal file used when the server is run without installation
Expand Down Expand Up @@ -286,6 +290,7 @@ namespace ServerSetting
extern const ServerSettingsUInt64 primary_index_cache_size;
extern const ServerSettingsDouble primary_index_cache_size_ratio;
extern const ServerSettingsBool use_legacy_mongodb_integration;
extern const ServerSettingsUInt64 input_format_parquet_metadata_cache_max_size;
}

}
Expand Down Expand Up @@ -2234,6 +2239,10 @@ try
if (dns_cache_updater)
dns_cache_updater->start();

#if USE_PARQUET
ParquetFileMetaDataCache::instance()->setMaxSizeInBytes(server_settings[ServerSetting::input_format_parquet_metadata_cache_max_size]);
#endif

/// Set current database name before loading tables and databases because
/// system logs may copy global context.
std::string default_database = server_settings[ServerSetting::default_database].toString();
Expand Down
2 changes: 1 addition & 1 deletion src/Processors/Formats/IInputFormat.h
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,7 @@ class IInputFormat : public SourceWithKeyCondition
void needOnlyCount() { need_only_count = true; }

/// Set additional info/key/id related to underlying storage of the ReadBuffer
virtual void setStorageRelatedUniqueKey(const ServerSettings & /* server_settings */, const Settings & /*settings*/, const String & /*key*/) {}
virtual void setStorageRelatedUniqueKey(const Settings & /*settings*/, const String & /*key*/) {}

protected:
ReadBuffer & getReadBuffer() const { chassert(in); return *in; }
Expand Down
15 changes: 3 additions & 12 deletions src/Processors/Formats/Impl/ParquetBlockInputFormat.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@
#include <Common/FieldVisitorsAccurateComparison.h>
#include <Processors/Formats/Impl/Parquet/ParquetRecordReader.h>
#include <Processors/Formats/Impl/Parquet/parquetBloomFilterHash.h>
#include <Processors/Formats/Impl/ParquetFileMetaDataCache.h>
#include <Interpreters/convertFieldToType.h>

namespace ProfileEvents
Expand Down Expand Up @@ -522,15 +523,6 @@ static std::vector<Range> getHyperrectangleForRowGroup(const parquet::FileMetaDa
return hyperrectangle;
}

ParquetFileMetaDataCache::ParquetFileMetaDataCache(UInt64 max_size_bytes)
: CacheBase(max_size_bytes) {}

ParquetFileMetaDataCache * ParquetFileMetaDataCache::instance(UInt64 max_size_bytes)
{
static ParquetFileMetaDataCache instance(max_size_bytes);
return &instance;
}

std::shared_ptr<parquet::FileMetaData> ParquetBlockInputFormat::readMetadataFromFile()
{
createArrowFileIfNotCreated();
Expand All @@ -547,7 +539,7 @@ std::shared_ptr<parquet::FileMetaData> ParquetBlockInputFormat::getFileMetaData(
return readMetadataFromFile();
}

auto [parquet_file_metadata, loaded] = ParquetFileMetaDataCache::instance(metadata_cache.max_size_bytes)->getOrSet(
auto [parquet_file_metadata, loaded] = ParquetFileMetaDataCache::instance()->getOrSet(
metadata_cache.key,
[&]()
{
Expand Down Expand Up @@ -834,11 +826,10 @@ void ParquetBlockInputFormat::initializeIfNeeded()
}
}

void ParquetBlockInputFormat::setStorageRelatedUniqueKey(const ServerSettings & server_settings, const Settings & settings, const String & key_)
void ParquetBlockInputFormat::setStorageRelatedUniqueKey(const Settings & settings, const String & key_)
{
metadata_cache.key = key_;
metadata_cache.use_cache = settings[Setting::input_format_parquet_use_metadata_cache];
metadata_cache.max_size_bytes = server_settings[ServerSetting::input_format_parquet_metadata_cache_max_size];
}

void ParquetBlockInputFormat::initializeRowGroupBatchReader(size_t row_group_batch_idx)
Expand Down
18 changes: 4 additions & 14 deletions src/Processors/Formats/Impl/ParquetBlockInputFormat.h
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@
#include "config.h"
#if USE_PARQUET

#include <Common/CacheBase.h>
#include <Processors/Formats/IInputFormat.h>
#include <Processors/Formats/ISchemaReader.h>
#include <Formats/FormatSettings.h>
Expand Down Expand Up @@ -73,7 +72,7 @@ class ParquetBlockInputFormat : public IInputFormat

size_t getApproxBytesReadForChunk() const override { return previous_approx_bytes_read_for_chunk; }

void setStorageRelatedUniqueKey(const ServerSettings & server_settings, const Settings & settings, const String & key_) override;
void setStorageRelatedUniqueKey(const Settings & settings, const String & key_) override;

private:
Chunk read() override;
Expand Down Expand Up @@ -350,8 +349,9 @@ class ParquetBlockInputFormat : public IInputFormat
{
String key;
bool use_cache = false;
UInt64 max_size_bytes{0};
} metadata_cache;
};

Cache metadata_cache;
};

class ParquetSchemaReader : public ISchemaReader
Expand All @@ -370,16 +370,6 @@ class ParquetSchemaReader : public ISchemaReader
std::shared_ptr<parquet::FileMetaData> metadata;
};

class ParquetFileMetaDataCache : public CacheBase<String, parquet::FileMetaData>
{
public:
static ParquetFileMetaDataCache * instance(UInt64 max_size_bytes);
void clear() {}

private:
ParquetFileMetaDataCache(UInt64 max_size_bytes);
};

}

#endif
20 changes: 20 additions & 0 deletions src/Processors/Formats/Impl/ParquetFileMetaDataCache.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
#include <Processors/Formats/Impl/ParquetFileMetaDataCache.h>

#ifdef USE_PARQUET

namespace DB
{

ParquetFileMetaDataCache::ParquetFileMetaDataCache()
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This looks too silly

Copy link
Collaborator

@zvonand zvonand Feb 24, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why do we need this singleton here? Can't the cache be just a member somewhere?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It is used by two classes: ParquetBlockInputFormat and ParquetMetadataInputFormat

: CacheBase<String, parquet::FileMetaData>(0)
{}

ParquetFileMetaDataCache * ParquetFileMetaDataCache::instance()
{
static ParquetFileMetaDataCache instance;
return &instance;
}

}

#endif
30 changes: 30 additions & 0 deletions src/Processors/Formats/Impl/ParquetFileMetaDataCache.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
#pragma once

#include "config.h"

#if USE_PARQUET

namespace parquet
{

class FileMetaData;

}

#include <Common/CacheBase.h>

namespace DB
{

class ParquetFileMetaDataCache : public CacheBase<String, parquet::FileMetaData>
{
public:
static ParquetFileMetaDataCache * instance();

private:
ParquetFileMetaDataCache();
};

}

#endif
53 changes: 49 additions & 4 deletions src/Processors/Formats/Impl/ParquetMetadataInputFormat.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -22,8 +22,17 @@
#include <parquet/statistics.h>
#include "ArrowBufferedStreams.h"
#include <DataTypes/NestedUtils.h>
#include <Core/Settings.h>
#include <Common/ProfileEvents.h>
#include <Processors/Formats/Impl/ParquetFileMetaDataCache.h>


namespace ProfileEvents
{
extern const Event ParquetMetaDataCacheHits;
extern const Event ParquetMetaDataCacheMisses;
}

namespace DB
{

Expand All @@ -32,6 +41,11 @@ namespace ErrorCodes
extern const int BAD_ARGUMENTS;
}

namespace Setting
{
extern const SettingsBool input_format_parquet_use_metadata_cache;
}

static NamesAndTypesList getHeaderForParquetMetadata()
{
NamesAndTypesList names_and_types{
Expand Down Expand Up @@ -129,10 +143,35 @@ void checkHeader(const Block & header)
static std::shared_ptr<parquet::FileMetaData> getFileMetadata(
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There are two implementations of getFileMetadata. One for ParquetBlockInputFormat.cpp and one for ParquetMetadataInputFormat. I am still not sure if I should leave it duplicated or put it somewhere.. The logic is pretty much the same, but requires a couple of arguments to be passed

Copy link
Collaborator

@zvonand zvonand Feb 24, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I do not see a lot of duplication (as code is really a bit different). But it depends on whether we are planning to keep this here or move it to upstream later.

If we are keeping it here, I would avoid unnecessary refactoring in this case -- this code does not look like it will change a lot, but changing code in more places may cause additional conflicts

ReadBuffer & in,
const FormatSettings & format_settings,
std::atomic<int> & is_stopped)
std::atomic<int> & is_stopped,
ParquetMetadataInputFormat::Cache metadata_cache)
{
auto arrow_file = asArrowFile(in, format_settings, is_stopped, "Parquet", PARQUET_MAGIC_BYTES, /* avoid_buffering */ true);
return parquet::ReadMetaData(arrow_file);
// in-memory cache is not implemented for local file operations, only for remote files
// there is a chance the user sets `input_format_parquet_use_metadata_cache=1` for a local file operation
// and the cache_key won't be set. Therefore, we also need to check for metadata_cache.key
if (!metadata_cache.use_cache || metadata_cache.key.empty())
{
auto arrow_file = asArrowFile(in, format_settings, is_stopped, "Parquet", PARQUET_MAGIC_BYTES, /* avoid_buffering */ true);
return parquet::ReadMetaData(arrow_file);
}

auto [parquet_file_metadata, loaded] = ParquetFileMetaDataCache::instance()->getOrSet(
metadata_cache.key,
[&]()
{
auto arrow_file = asArrowFile(in, format_settings, is_stopped, "Parquet", PARQUET_MAGIC_BYTES, /* avoid_buffering */ true);
return parquet::ReadMetaData(arrow_file);
}
);

if (loaded)
ProfileEvents::increment(ProfileEvents::ParquetMetaDataCacheMisses);
else
ProfileEvents::increment(ProfileEvents::ParquetMetaDataCacheHits);

return parquet_file_metadata;


}

ParquetMetadataInputFormat::ParquetMetadataInputFormat(ReadBuffer & in_, Block header_, const FormatSettings & format_settings_)
Expand All @@ -147,7 +186,7 @@ Chunk ParquetMetadataInputFormat::read()
if (done)
return res;

auto metadata = getFileMetadata(*in, format_settings, is_stopped);
auto metadata = getFileMetadata(*in, format_settings, is_stopped, metadata_cache);

const auto & header = getPort().getHeader();
auto names_and_types = getHeaderForParquetMetadata();
Expand Down Expand Up @@ -486,6 +525,12 @@ void ParquetMetadataInputFormat::resetParser()
done = false;
}

void ParquetMetadataInputFormat::setStorageRelatedUniqueKey(const Settings & settings, const String & key_)
{
metadata_cache.key = key_;
metadata_cache.use_cache = settings[Setting::input_format_parquet_use_metadata_cache];
}

ParquetMetadataSchemaReader::ParquetMetadataSchemaReader(ReadBuffer & in_)
: ISchemaReader(in_)
{
Expand Down
10 changes: 10 additions & 0 deletions src/Processors/Formats/Impl/ParquetMetadataInputFormat.h
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,14 @@ class ParquetMetadataInputFormat : public IInputFormat

void resetParser() override;

void setStorageRelatedUniqueKey(const Settings & settings, const String & key_) override;

struct Cache
{
String key;
bool use_cache = false;
};

private:
Chunk read() override;

Expand All @@ -78,6 +86,8 @@ class ParquetMetadataInputFormat : public IInputFormat
const FormatSettings format_settings;
bool done = false;
std::atomic<int> is_stopped{0};

Cache metadata_cache;
};

class ParquetMetadataSchemaReader : public ISchemaReader
Expand Down
2 changes: 1 addition & 1 deletion src/Storages/ObjectStorage/StorageObjectStorageSource.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -415,7 +415,7 @@ StorageObjectStorageSource::ReaderHolder StorageObjectStorageSource::createReade
input_format->needOnlyCount();

if (!object_info->getPath().empty())
input_format->setStorageRelatedUniqueKey(context_->getServerSettings(), context_->getSettingsRef(), object_info->getPath() + ":" + object_info->metadata->etag);
input_format->setStorageRelatedUniqueKey(context_->getSettingsRef(), object_info->getPath() + ":" + object_info->metadata->etag);

builder.init(Pipe(input_format));

Expand Down
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
10
10
10
10
10
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,10 @@ SELECT COUNT(*)
FROM s3(s3_conn, filename = 'test_03262_*', format = Parquet)
SETTINGS input_format_parquet_use_metadata_cache=1, optimize_count_from_files=0, log_comment='test_03262_parquet_metadata_cache';

SELECT COUNT(*)
FROM s3(s3_conn, filename = 'test_03262_*', format = ParquetMetadata)
SETTINGS input_format_parquet_use_metadata_cache=1, log_comment='test_03262_parquet_metadata_format_metadata_cache';

SYSTEM FLUSH LOGS;

SELECT ProfileEvents['ParquetMetaDataCacheHits']
Expand All @@ -25,4 +29,11 @@ AND type = 'QueryFinish'
ORDER BY event_time desc
LIMIT 1;

SELECT ProfileEvents['ParquetMetaDataCacheHits']
FROM system.query_log
where log_comment = 'test_03262_parquet_metadata_format_metadata_cache'
AND type = 'QueryFinish'
ORDER BY event_time desc
LIMIT 1;

DROP TABLE t_parquet_03262;
Loading