Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
18 changes: 9 additions & 9 deletions src/Storages/ObjectStorage/DataLakes/Iceberg/IcebergMetadata.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -94,8 +94,7 @@ IcebergMetadata::IcebergMetadata(
Int32 format_version_,
const Poco::JSON::Object::Ptr & metadata_object_,
IcebergMetadataFilesCachePtr cache_ptr)
: WithContext(context_)
, object_storage(std::move(object_storage_))
: object_storage(std::move(object_storage_))
, configuration(std::move(configuration_))
, schema_processor(IcebergSchemaProcessor())
, log(getLogger("IcebergMetadata"))
Expand All @@ -105,6 +104,7 @@ IcebergMetadata::IcebergMetadata(
, format_version(format_version_)
, relevant_snapshot_schema_id(-1)
, table_location(last_metadata_object->getValue<String>(TABLE_LOCATION_FIELD))
, context(context_)
{
updateState(context_, true);
}
Expand Down Expand Up @@ -537,8 +537,8 @@ ManifestListPtr IcebergMetadata::getManifestList(const String & filename) const
{
ManifestList manifest_list;
StorageObjectStorage::ObjectInfo object_info(filename);
auto manifest_list_buf = StorageObjectStorageSource::createReadBuffer(object_info, object_storage, getContext(), log);
AvroForIcebergDeserializer manifest_list_deserializer(std::move(manifest_list_buf), filename, getFormatSettings(getContext()));
auto manifest_list_buf = StorageObjectStorageSource::createReadBuffer(object_info, object_storage, context, log);
AvroForIcebergDeserializer manifest_list_deserializer(std::move(manifest_list_buf), filename, getFormatSettings(context));

ManifestFileCacheKeys manifest_file_cache_keys;

Expand Down Expand Up @@ -584,8 +584,8 @@ ManifestFilePtr IcebergMetadata::getManifestFile(const String & filename, Int64
auto create_fn = [&]()
{
ObjectInfo manifest_object_info(filename);
auto buffer = StorageObjectStorageSource::createReadBuffer(manifest_object_info, object_storage, getContext(), log);
AvroForIcebergDeserializer manifest_file_deserializer(std::move(buffer), filename, getFormatSettings(getContext()));
auto buffer = StorageObjectStorageSource::createReadBuffer(manifest_object_info, object_storage, context, log);
AvroForIcebergDeserializer manifest_file_deserializer(std::move(buffer), filename, getFormatSettings(context));
auto [schema_id, schema_object] = parseTableSchemaFromManifestFile(manifest_file_deserializer, filename);
schema_processor.addIcebergTableSchema(schema_object);
return std::make_shared<ManifestFileContent>(
Expand All @@ -597,7 +597,7 @@ ManifestFilePtr IcebergMetadata::getManifestFile(const String & filename, Int64
schema_processor,
inherited_sequence_number,
table_location,
getContext());
context);
};

if (manifest_cache)
Expand All @@ -614,7 +614,7 @@ Strings IcebergMetadata::getDataFiles(const ActionsDAG * filter_dag) const
if (!relevant_snapshot)
return {};

bool use_partition_pruning = filter_dag && getContext()->getSettingsRef()[Setting::use_iceberg_partition_pruning];
bool use_partition_pruning = filter_dag && context->getSettingsRef()[Setting::use_iceberg_partition_pruning];

if (!use_partition_pruning && cached_unprunned_files_for_last_processed_snapshot.has_value())
return cached_unprunned_files_for_last_processed_snapshot.value();
Expand All @@ -625,7 +625,7 @@ Strings IcebergMetadata::getDataFiles(const ActionsDAG * filter_dag) const
ManifestFilesPruner pruner(
schema_processor, relevant_snapshot_schema_id,
use_partition_pruning ? filter_dag : nullptr,
*manifest_file_ptr, getContext());
*manifest_file_ptr, context);
const auto & data_files_in_manifest = manifest_file_ptr->getFiles();
for (const auto & manifest_file_entry : data_files_in_manifest)
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@
namespace DB
{

class IcebergMetadata : public IDataLakeMetadata, private WithContext
class IcebergMetadata : public IDataLakeMetadata
{
public:
using ConfigurationObserverPtr = StorageObjectStorage::ConfigurationObserverPtr;
Expand Down Expand Up @@ -114,6 +114,8 @@ class IcebergMetadata : public IDataLakeMetadata, private WithContext
Int64 relevant_snapshot_id{-1};
String table_location;

ContextPtr context;

mutable std::optional<Strings> cached_unprunned_files_for_last_processed_snapshot;

void updateState(const ContextPtr & local_context, bool metadata_file_changed);
Expand Down
Loading