-
Notifications
You must be signed in to change notification settings - Fork 9
Glue: Deduce Iceberg table metadata location if metadata_location not specified
#1070
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from all commits
4819149
184963d
7fc341c
6d15034
c62b564
3feb31a
75e1599
0402d8d
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -317,11 +317,31 @@ bool GlueCatalog::tryGetTableMetadata( | |
| { | ||
| result.setDataLakeSpecificProperties(DataLakeSpecificProperties{.iceberg_metadata_file_location = table_params.at("metadata_location")}); | ||
| } | ||
| else if (table_outcome.GetStorageDescriptor().LocationHasBeenSet()) | ||
| { | ||
| const auto & location = table_outcome.GetStorageDescriptor().GetLocation(); | ||
|
|
||
| std::string location_with_slash = location; | ||
| if (!location_with_slash.ends_with('/')) | ||
| location_with_slash += '/'; | ||
|
|
||
| // Resolve the actual metadata file path based on table location | ||
| std::string resolved_metadata_path = resolveMetadataPathFromTableLocation(location_with_slash, result); | ||
| if (resolved_metadata_path.empty()) | ||
| { | ||
| result.setTableIsNotReadable(fmt::format("Could not determine metadata_location of table `{}`. ", | ||
| database_name + "." + table_name)); | ||
| } | ||
| else | ||
| { | ||
| result.setDataLakeSpecificProperties(DataLakeSpecificProperties{.iceberg_metadata_file_location = resolved_metadata_path}); | ||
| } | ||
| } | ||
| else | ||
| { | ||
| result.setTableIsNotReadable(fmt::format("Cannot read table `{}` because it has no metadata_location. " \ | ||
| "It means that it's unreadable with Glue catalog in ClickHouse, readable tables must have 'metadata_location' in table parameters", | ||
| database_name + "." + table_name)); | ||
| result.setTableIsNotReadable(fmt::format("Cannot read table `{}` because it has no metadata_location. " \ | ||
| "It means that it's unreadable with Glue catalog in ClickHouse, readable tables must have 'metadata_location' in table parameters", | ||
| database_name + "." + table_name)); | ||
| } | ||
| }; | ||
|
|
||
|
|
@@ -415,37 +435,41 @@ bool GlueCatalog::empty() const | |
| bool GlueCatalog::classifyTimestampTZ(const String & column_name, const TableMetadata & table_metadata) const | ||
| { | ||
| String metadata_path; | ||
| String metadata_uri; | ||
| if (auto table_specific_properties = table_metadata.getDataLakeSpecificProperties(); | ||
| table_specific_properties.has_value()) | ||
| { | ||
| metadata_path = table_specific_properties->iceberg_metadata_file_location; | ||
| metadata_uri = metadata_path; | ||
| if (metadata_path.starts_with("s3:/")) | ||
| metadata_path = metadata_path.substr(5); | ||
|
|
||
| // Delete bucket | ||
| // Delete bucket from path | ||
| std::size_t pos = metadata_path.find('/'); | ||
| if (pos != std::string::npos) | ||
| metadata_path = metadata_path.substr(pos + 1); | ||
| } | ||
| else | ||
| throw DB::Exception(DB::ErrorCodes::BAD_ARGUMENTS, "Metadata specific properties should be defined"); | ||
| throw DB::Exception(DB::ErrorCodes::BAD_ARGUMENTS, "Failed to read table metadata, reason why table is unreadable: {}", table_metadata.getReasonWhyTableIsUnreadable()); | ||
|
|
||
| if (!metadata_objects.get(metadata_path)) | ||
| if (!metadata_objects.get(metadata_uri)) | ||
| { | ||
| DB::ASTStorage * storage = table_engine_definition->as<DB::ASTStorage>(); | ||
| DB::ASTs args = storage->engine->arguments->children; | ||
|
|
||
| auto table_endpoint = settings.storage_endpoint; | ||
| String storage_endpoint = !settings.storage_endpoint.empty() ? settings.storage_endpoint : metadata_uri; | ||
|
|
||
| if (args.empty()) | ||
| args.emplace_back(std::make_shared<DB::ASTLiteral>(table_endpoint)); | ||
| args.emplace_back(std::make_shared<DB::ASTLiteral>(storage_endpoint)); | ||
| else | ||
| args[0] = std::make_shared<DB::ASTLiteral>(table_endpoint); | ||
| args[0] = std::make_shared<DB::ASTLiteral>(storage_endpoint); | ||
|
|
||
| if (args.size() == 1 && table_metadata.hasStorageCredentials()) | ||
| if (args.size() == 1) | ||
| { | ||
| auto storage_credentials = table_metadata.getStorageCredentials(); | ||
| if (storage_credentials) | ||
| storage_credentials->addCredentialsToEngineArgs(args); | ||
| if (table_metadata.hasStorageCredentials()) | ||
| table_metadata.getStorageCredentials()->addCredentialsToEngineArgs(args); | ||
| else if (!credentials.IsExpiredOrEmpty()) | ||
| DataLake::S3Credentials(credentials.GetAWSAccessKeyId(), credentials.GetAWSSecretKey(), credentials.GetSessionToken()).addCredentialsToEngineArgs(args); | ||
| } | ||
|
|
||
| auto storage_settings = std::make_shared<DB::DataLakeStorageSettings>(); | ||
|
|
@@ -464,9 +488,9 @@ bool GlueCatalog::classifyTimestampTZ(const String & column_name, const TableMet | |
| Poco::JSON::Parser parser; | ||
| Poco::Dynamic::Var result = parser.parse(metadata_file); | ||
| auto metadata_object = result.extract<Poco::JSON::Object::Ptr>(); | ||
| metadata_objects.set(metadata_path, std::make_shared<Poco::JSON::Object::Ptr>(metadata_object)); | ||
| metadata_objects.set(metadata_uri, std::make_shared<Poco::JSON::Object::Ptr>(metadata_object)); | ||
| } | ||
| auto metadata_object = *metadata_objects.get(metadata_path); | ||
| auto metadata_object = *metadata_objects.get(metadata_uri); | ||
| auto current_schema_id = metadata_object->getValue<Int64>("current-schema-id"); | ||
| auto schemas = metadata_object->getArray(DB::Iceberg::f_schemas); | ||
| for (size_t i = 0; i < schemas->size(); ++i) | ||
|
|
@@ -487,6 +511,125 @@ bool GlueCatalog::classifyTimestampTZ(const String & column_name, const TableMet | |
| return false; | ||
| } | ||
|
|
||
| /// This function tries two resolve the metadata file path by following means: | ||
| /// 1. Tries to read version-hint.text to get the latest version. | ||
| /// 2. Lists all *.metadata.json files in the metadata directory and takes the most recent one. | ||
| String GlueCatalog::resolveMetadataPathFromTableLocation(const String & table_location, const TableMetadata & table_metadata) const | ||
| { | ||
| // Construct path to version-hint.text | ||
| String version_hint_path = table_location + "metadata/version-hint.text"; | ||
|
|
||
| DB::ASTStorage * storage = table_engine_definition->as<DB::ASTStorage>(); | ||
|
Collaborator
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This code seems to be duplicated with
Collaborator
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Btw, why is it not going into upstream?
Collaborator
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Yes, I'd keep it as is for now. Later I will bring this PR to upstream, and then a small refactor will be done.
Collaborator
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
Because we want it in the next 25.8 antalya release ASAP |
||
| DB::ASTs args = storage->engine->arguments->children; | ||
|
|
||
| String storage_endpoint = !settings.storage_endpoint.empty() ? settings.storage_endpoint : table_location; | ||
| if (args.empty()) | ||
| args.emplace_back(std::make_shared<DB::ASTLiteral>(storage_endpoint)); | ||
| else | ||
| args[0] = std::make_shared<DB::ASTLiteral>(storage_endpoint); | ||
|
|
||
| if (args.size() == 1 && table_metadata.hasStorageCredentials()) | ||
| { | ||
| auto storage_credentials = table_metadata.getStorageCredentials(); | ||
| if (storage_credentials) | ||
| storage_credentials->addCredentialsToEngineArgs(args); | ||
| } | ||
|
|
||
| auto storage_settings = std::make_shared<DB::DataLakeStorageSettings>(); | ||
| storage_settings->loadFromSettingsChanges(settings.allChanged()); | ||
| auto configuration = std::make_shared<DB::StorageS3IcebergConfiguration>(storage_settings); | ||
| configuration->initialize(args, getContext(), false); | ||
|
|
||
| auto object_storage = configuration->createObjectStorage(getContext(), true); | ||
| const auto & read_settings = getContext()->getReadSettings(); | ||
|
|
||
| try | ||
| { | ||
| // Try to read version-hint.text to get the latest version | ||
| String version_hint_object_path = version_hint_path; | ||
| if (version_hint_object_path.starts_with("s3://")) | ||
| { | ||
| version_hint_object_path = version_hint_object_path.substr(5); | ||
| // Remove bucket from path | ||
| std::size_t pos = version_hint_object_path.find('/'); | ||
| if (pos != std::string::npos) | ||
| version_hint_object_path = version_hint_object_path.substr(pos + 1); | ||
| } | ||
|
Comment on lines
+549
to
+557
Collaborator
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Consider using S3::URI, I think it offers the functionality you need. All you gotta do is to instantiate it passing the path
Collaborator
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Here again, I am keeping the code that is similar to upstream. This is a good note, I will keep it in mind when making a PR to upstream |
||
|
|
||
| DB::StoredObject version_hint_stored_object(version_hint_object_path); | ||
| auto version_hint_buf = object_storage->readObject(version_hint_stored_object, read_settings); | ||
| String version_str; | ||
| readString(version_str, *version_hint_buf); | ||
|
|
||
| boost::algorithm::trim(version_str); | ||
|
|
||
| LOG_TRACE(log, "Read version {} from version-hint.text for table location '{}'", version_str, table_location); | ||
|
|
||
| return table_location + "metadata/v" + version_str + "-metadata.json"; | ||
| } | ||
| catch (...) | ||
|
Collaborator
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Instead of nest try-catch blocks, consider creating two auxiliary functions that return a
Collaborator
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Here we are trying to read objects that may not exist at all -- for we still need to have try/catch
Collaborator
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I know, but instead of having nested try catch blocks, you can have two different functions that return Wouldn't something like the below work?
Collaborator
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Yeah, maybe But I actually do not think it shall be done here (as it is a style and code beauty issue, though I agree that what you suggest looks better). I will probably refactor some of this code when submitting into upstream (e.g. as suggested in your other comments), so I do not see a good reason to spend time on it now. WDYT?
Collaborator
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Sure |
||
| { | ||
| LOG_TRACE(log, "Could not read version-hint.text from '{}', trying to find latest metadata file", version_hint_path); | ||
|
|
||
| try | ||
| { | ||
| String bucket_with_prefix; | ||
| String metadata_dir = table_location + "metadata/"; | ||
| String metadata_dir_path = metadata_dir; | ||
|
|
||
| if (metadata_dir_path.starts_with("s3://")) | ||
| { | ||
| metadata_dir_path = metadata_dir_path.substr(5); | ||
| // Remove bucket from path | ||
| std::size_t pos = metadata_dir_path.find('/'); | ||
| if (pos != std::string::npos) | ||
| { | ||
| metadata_dir_path = metadata_dir_path.substr(pos + 1); | ||
| bucket_with_prefix = table_location.substr(0, pos + 6); | ||
| } | ||
| } | ||
| else | ||
| return ""; | ||
|
|
||
| // List all files in metadata directory | ||
| DB::RelativePathsWithMetadata files; | ||
| object_storage->listObjects(metadata_dir_path, files, 0); | ||
zvonand marked this conversation as resolved.
Show resolved
Hide resolved
|
||
|
|
||
| // Filter for .metadata.json files and find the most recent one | ||
| String latest_metadata_file; | ||
| std::optional<DB::ObjectMetadata> latest_metadata; | ||
|
|
||
| for (const auto & file : files) | ||
| { | ||
| if (file->getPath().ends_with(".metadata.json")) | ||
| { | ||
| // Get file metadata to check last modified time | ||
| if (!latest_metadata.has_value() || | ||
| (file->metadata->last_modified > latest_metadata->last_modified)) | ||
| { | ||
| latest_metadata_file = file->getPath(); | ||
| latest_metadata = file->metadata; | ||
| } | ||
| } | ||
| } | ||
|
|
||
| if (!latest_metadata_file.empty()) | ||
| { | ||
| LOG_TRACE(log, "Found latest metadata file: {}", latest_metadata_file); | ||
| return bucket_with_prefix + latest_metadata_file; | ||
| } | ||
|
|
||
| LOG_TRACE(log, "No <...>.metadata.json files found,"); | ||
| return ""; | ||
| } | ||
| catch (...) | ||
| { | ||
| LOG_TRACE(log, "Failed to list metadata directory"); | ||
| return ""; | ||
| } | ||
| } | ||
| } | ||
|
|
||
| void GlueCatalog::createNamespaceIfNotExists(const String & namespace_name) const | ||
| { | ||
| Aws::Glue::Model::CreateDatabaseRequest create_request; | ||
|
|
||
Uh oh!
There was an error while loading. Please reload this page.