diff --git a/src/Databases/DataLake/GlueCatalog.cpp b/src/Databases/DataLake/GlueCatalog.cpp index de9b4f533f28..e76e81c6c3c3 100644 --- a/src/Databases/DataLake/GlueCatalog.cpp +++ b/src/Databases/DataLake/GlueCatalog.cpp @@ -317,11 +317,31 @@ bool GlueCatalog::tryGetTableMetadata( { result.setDataLakeSpecificProperties(DataLakeSpecificProperties{.iceberg_metadata_file_location = table_params.at("metadata_location")}); } + else if (table_outcome.GetStorageDescriptor().LocationHasBeenSet()) + { + const auto & location = table_outcome.GetStorageDescriptor().GetLocation(); + + std::string location_with_slash = location; + if (!location_with_slash.ends_with('/')) + location_with_slash += '/'; + + // Resolve the actual metadata file path based on table location + std::string resolved_metadata_path = resolveMetadataPathFromTableLocation(location_with_slash, result); + if (resolved_metadata_path.empty()) + { + result.setTableIsNotReadable(fmt::format("Could not determine metadata_location of table `{}`. ", + database_name + "." + table_name)); + } + else + { + result.setDataLakeSpecificProperties(DataLakeSpecificProperties{.iceberg_metadata_file_location = resolved_metadata_path}); + } + } else { - result.setTableIsNotReadable(fmt::format("Cannot read table `{}` because it has no metadata_location. " \ - "It means that it's unreadable with Glue catalog in ClickHouse, readable tables must have 'metadata_location' in table parameters", - database_name + "." + table_name)); + result.setTableIsNotReadable(fmt::format("Cannot read table `{}` because it has no metadata_location. " \ + "It means that it's unreadable with Glue catalog in ClickHouse, readable tables must have 'metadata_location' in table parameters", + database_name + "." + table_name)); } }; @@ -415,37 +435,41 @@ bool GlueCatalog::empty() const bool GlueCatalog::classifyTimestampTZ(const String & column_name, const TableMetadata & table_metadata) const { String metadata_path; + String metadata_uri; if (auto table_specific_properties = table_metadata.getDataLakeSpecificProperties(); table_specific_properties.has_value()) { metadata_path = table_specific_properties->iceberg_metadata_file_location; + metadata_uri = metadata_path; if (metadata_path.starts_with("s3:/")) metadata_path = metadata_path.substr(5); - // Delete bucket + // Delete bucket from path std::size_t pos = metadata_path.find('/'); if (pos != std::string::npos) metadata_path = metadata_path.substr(pos + 1); } else - throw DB::Exception(DB::ErrorCodes::BAD_ARGUMENTS, "Metadata specific properties should be defined"); + throw DB::Exception(DB::ErrorCodes::BAD_ARGUMENTS, "Failed to read table metadata, reason why table is unreadable: {}", table_metadata.getReasonWhyTableIsUnreadable()); - if (!metadata_objects.get(metadata_path)) + if (!metadata_objects.get(metadata_uri)) { DB::ASTStorage * storage = table_engine_definition->as(); DB::ASTs args = storage->engine->arguments->children; - auto table_endpoint = settings.storage_endpoint; + String storage_endpoint = !settings.storage_endpoint.empty() ? settings.storage_endpoint : metadata_uri; + if (args.empty()) - args.emplace_back(std::make_shared(table_endpoint)); + args.emplace_back(std::make_shared(storage_endpoint)); else - args[0] = std::make_shared(table_endpoint); + args[0] = std::make_shared(storage_endpoint); - if (args.size() == 1 && table_metadata.hasStorageCredentials()) + if (args.size() == 1) { - auto storage_credentials = table_metadata.getStorageCredentials(); - if (storage_credentials) - storage_credentials->addCredentialsToEngineArgs(args); + if (table_metadata.hasStorageCredentials()) + table_metadata.getStorageCredentials()->addCredentialsToEngineArgs(args); + else if (!credentials.IsExpiredOrEmpty()) + DataLake::S3Credentials(credentials.GetAWSAccessKeyId(), credentials.GetAWSSecretKey(), credentials.GetSessionToken()).addCredentialsToEngineArgs(args); } auto storage_settings = std::make_shared(); @@ -464,9 +488,9 @@ bool GlueCatalog::classifyTimestampTZ(const String & column_name, const TableMet Poco::JSON::Parser parser; Poco::Dynamic::Var result = parser.parse(metadata_file); auto metadata_object = result.extract(); - metadata_objects.set(metadata_path, std::make_shared(metadata_object)); + metadata_objects.set(metadata_uri, std::make_shared(metadata_object)); } - auto metadata_object = *metadata_objects.get(metadata_path); + auto metadata_object = *metadata_objects.get(metadata_uri); auto current_schema_id = metadata_object->getValue("current-schema-id"); auto schemas = metadata_object->getArray(DB::Iceberg::f_schemas); for (size_t i = 0; i < schemas->size(); ++i) @@ -487,6 +511,125 @@ bool GlueCatalog::classifyTimestampTZ(const String & column_name, const TableMet return false; } +/// This function tries two resolve the metadata file path by following means: +/// 1. Tries to read version-hint.text to get the latest version. +/// 2. Lists all *.metadata.json files in the metadata directory and takes the most recent one. +String GlueCatalog::resolveMetadataPathFromTableLocation(const String & table_location, const TableMetadata & table_metadata) const +{ + // Construct path to version-hint.text + String version_hint_path = table_location + "metadata/version-hint.text"; + + DB::ASTStorage * storage = table_engine_definition->as(); + DB::ASTs args = storage->engine->arguments->children; + + String storage_endpoint = !settings.storage_endpoint.empty() ? settings.storage_endpoint : table_location; + if (args.empty()) + args.emplace_back(std::make_shared(storage_endpoint)); + else + args[0] = std::make_shared(storage_endpoint); + + if (args.size() == 1 && table_metadata.hasStorageCredentials()) + { + auto storage_credentials = table_metadata.getStorageCredentials(); + if (storage_credentials) + storage_credentials->addCredentialsToEngineArgs(args); + } + + auto storage_settings = std::make_shared(); + storage_settings->loadFromSettingsChanges(settings.allChanged()); + auto configuration = std::make_shared(storage_settings); + configuration->initialize(args, getContext(), false); + + auto object_storage = configuration->createObjectStorage(getContext(), true); + const auto & read_settings = getContext()->getReadSettings(); + + try + { + // Try to read version-hint.text to get the latest version + String version_hint_object_path = version_hint_path; + if (version_hint_object_path.starts_with("s3://")) + { + version_hint_object_path = version_hint_object_path.substr(5); + // Remove bucket from path + std::size_t pos = version_hint_object_path.find('/'); + if (pos != std::string::npos) + version_hint_object_path = version_hint_object_path.substr(pos + 1); + } + + DB::StoredObject version_hint_stored_object(version_hint_object_path); + auto version_hint_buf = object_storage->readObject(version_hint_stored_object, read_settings); + String version_str; + readString(version_str, *version_hint_buf); + + boost::algorithm::trim(version_str); + + LOG_TRACE(log, "Read version {} from version-hint.text for table location '{}'", version_str, table_location); + + return table_location + "metadata/v" + version_str + "-metadata.json"; + } + catch (...) + { + LOG_TRACE(log, "Could not read version-hint.text from '{}', trying to find latest metadata file", version_hint_path); + + try + { + String bucket_with_prefix; + String metadata_dir = table_location + "metadata/"; + String metadata_dir_path = metadata_dir; + + if (metadata_dir_path.starts_with("s3://")) + { + metadata_dir_path = metadata_dir_path.substr(5); + // Remove bucket from path + std::size_t pos = metadata_dir_path.find('/'); + if (pos != std::string::npos) + { + metadata_dir_path = metadata_dir_path.substr(pos + 1); + bucket_with_prefix = table_location.substr(0, pos + 6); + } + } + else + return ""; + + // List all files in metadata directory + DB::RelativePathsWithMetadata files; + object_storage->listObjects(metadata_dir_path, files, 0); + + // Filter for .metadata.json files and find the most recent one + String latest_metadata_file; + std::optional latest_metadata; + + for (const auto & file : files) + { + if (file->getPath().ends_with(".metadata.json")) + { + // Get file metadata to check last modified time + if (!latest_metadata.has_value() || + (file->metadata->last_modified > latest_metadata->last_modified)) + { + latest_metadata_file = file->getPath(); + latest_metadata = file->metadata; + } + } + } + + if (!latest_metadata_file.empty()) + { + LOG_TRACE(log, "Found latest metadata file: {}", latest_metadata_file); + return bucket_with_prefix + latest_metadata_file; + } + + LOG_TRACE(log, "No <...>.metadata.json files found,"); + return ""; + } + catch (...) + { + LOG_TRACE(log, "Failed to list metadata directory"); + return ""; + } + } +} + void GlueCatalog::createNamespaceIfNotExists(const String & namespace_name) const { Aws::Glue::Model::CreateDatabaseRequest create_request; diff --git a/src/Databases/DataLake/GlueCatalog.h b/src/Databases/DataLake/GlueCatalog.h index bed6e93c5dcc..3bf1769ac83e 100644 --- a/src/Databases/DataLake/GlueCatalog.h +++ b/src/Databases/DataLake/GlueCatalog.h @@ -81,6 +81,8 @@ class GlueCatalog final : public ICatalog, private DB::WithContext /// This method allows to clarify the actual type of the timestamp column. bool classifyTimestampTZ(const String & column_name, const TableMetadata & table_metadata) const; + String resolveMetadataPathFromTableLocation(const String & table_location, const TableMetadata & table_metadata) const; + mutable DB::CacheBase metadata_objects; };