Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
18 changes: 12 additions & 6 deletions src/Databases/DataLake/ICatalog.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -67,13 +67,19 @@ void TableMetadata::setLocation(const std::string & location_)
auto pos_to_path = location_.substr(pos_to_bucket).find('/');
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Wouldn't it be easier to just instantiate S3::URI? AFAIK, it should support all types of S3 endpoints and its constructor takes a URI string.

https://github.com/ClickHouse/ClickHouse/blob/fbd99df81d18fac4c1e26f665d3bba316775bfd4/src/IO/S3/URI.cpp#L114

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not sure, Here location is just cut on two pieces, with Poco::URI come makes full analysis with splitting in all parts and later need to concat back.


if (pos_to_path == std::string::npos)
throw DB::Exception(DB::ErrorCodes::NOT_IMPLEMENTED, "Unexpected location format: {}", location_);

pos_to_path = pos_to_bucket + pos_to_path;
{ // empty path
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If you want to have this comment, I suggest moving it into a variable instead:

bool empty_path = pos_to_path == std::string::npos

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why is this better? It's an additional variable filled in runtime, and makes no logic changes.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is just a very opionated comment. I usually prefer named variables over comments because variables compile and we tend to forget to update comments.

In any case, I should have deleted or mentioned this is not a requirement. Don't worry about it.

location_without_path = location_;
path.clear();
bucket = location_.substr(pos_to_bucket);
}
else
{
pos_to_path = pos_to_bucket + pos_to_path;

location_without_path = location_.substr(0, pos_to_path);
path = location_.substr(pos_to_path + 1);
bucket = location_.substr(pos_to_bucket, pos_to_path - pos_to_bucket);
location_without_path = location_.substr(0, pos_to_path);
path = location_.substr(pos_to_path + 1);
bucket = location_.substr(pos_to_bucket, pos_to_path - pos_to_bucket);
}

LOG_TEST(getLogger("TableMetadata"),
"Parsed location without path: {}, path: {}",
Expand Down
14 changes: 8 additions & 6 deletions src/IO/S3/Client.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -373,7 +373,7 @@ Model::HeadObjectOutcome Client::HeadObject(HeadObjectRequest & request) const
auto bucket_uri = getURIForBucket(bucket);
if (!bucket_uri)
{
if (auto maybe_error = updateURIForBucketForHead(bucket); maybe_error.has_value())
if (auto maybe_error = updateURIForBucketForHead(bucket, request.GetKey()); maybe_error.has_value())
return *maybe_error;

if (auto region = getRegionForBucket(bucket); !region.empty())
Expand Down Expand Up @@ -578,7 +578,6 @@ Client::doRequest(RequestType & request, RequestFn request_fn) const
if (auto uri = getURIForBucket(bucket); uri.has_value())
request.overrideURI(std::move(*uri));


bool found_new_endpoint = false;
// if we found correct endpoint after 301 responses, update the cache for future requests
SCOPE_EXIT(
Expand Down Expand Up @@ -813,12 +812,15 @@ std::optional<S3::URI> Client::getURIFromError(const Aws::S3::S3Error & error) c
}

// Do a list request because head requests don't have body in response
std::optional<Aws::S3::S3Error> Client::updateURIForBucketForHead(const std::string & bucket) const
// S3 Tables don't support ListObjects, so made dirty workaroung - changed on GetObject
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Using GetObjects with very small byte range 1-2.

std::optional<Aws::S3::S3Error> Client::updateURIForBucketForHead(const std::string & bucket, const std::string & key) const
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What is the purpose of this method? The name is updateURIForBucketForHead, but it doesn't seem to update anything

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Magic inside AWS SDK. To get proper URI we must make some request to get 301 redirect. In response body AWS sends proper endpoint, and it extracted somewhere inside SDK. After that ClickHouse extracts it in method Client::getURIFromError.
But for this response must have body, while response on HeadObject request does not have one.
This is a reason for workaround with calling ListObjects instead, see this comment by Antonio (initial author of this code).
And ListObjests request is not supported by S3 Tables. That's why I changes it to GetObject for now.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could you please add a short comment about this and even add a link to this discussion? I find it surprising they did not do it.

{
ListObjectsV2Request req;
GetObjectRequest req;
req.SetBucket(bucket);
req.SetMaxKeys(1);
auto result = ListObjectsV2(req);
req.SetKey(key);
req.SetRange("bytes=0-1");
auto result = GetObject(req);

if (result.IsSuccess())
return std::nullopt;
return result.GetError();
Expand Down
2 changes: 1 addition & 1 deletion src/IO/S3/Client.h
Original file line number Diff line number Diff line change
Expand Up @@ -269,7 +269,7 @@ class Client : private Aws::S3::S3Client

void updateURIForBucket(const std::string & bucket, S3::URI new_uri) const;
std::optional<S3::URI> getURIFromError(const Aws::S3::S3Error & error) const;
std::optional<Aws::S3::S3Error> updateURIForBucketForHead(const std::string & bucket) const;
std::optional<Aws::S3::S3Error> updateURIForBucketForHead(const std::string & bucket, const std::string & key) const;

std::optional<S3::URI> getURIForBucket(const std::string & bucket) const;

Expand Down
62 changes: 62 additions & 0 deletions src/IO/S3/URI.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -157,10 +157,72 @@ URI::URI(const std::string & uri_, bool allow_archive_path_syntax)
}
}

bool URI::isAWSRegion(std::string_view region)
{
/// List from https://docs.aws.amazon.com/general/latest/gr/s3.html
static const std::unordered_set<std::string_view> regions = {
"us-east-2",
"us-east-1",
"us-west-1",
"us-west-2",
"af-south-1",
"ap-east-1",
"ap-south-2",
"ap-southeast-3",
"ap-southeast-5",
"ap-southeast-4",
"ap-south-1",
"ap-northeast-3",
"ap-northeast-2",
"ap-southeast-1",
"ap-southeast-2",
"ap-east-2",
"ap-southeast-7",
"ap-northeast-1",
"ca-central-1",
"ca-west-1",
"eu-central-1",
"eu-west-1",
"eu-west-2",
"eu-south-1",
"eu-west-3",
"eu-south-2",
"eu-north-1",
"eu-central-2",
"il-central-1",
"mx-central-1",
"me-south-1",
"me-central-1",
"sa-east-1",
"us-gov-east-1",
"us-gov-west-1"
};

/// 's3-us-west-2' is a legacy region format for S3 storage, equals to 'us-west-2'
/// See https://docs.aws.amazon.com/AmazonS3/latest/userguide/VirtualHosting.html#VirtualHostingBackwardsCompatibility
if (region.substr(0, 3) == "s3-")
region = region.substr(3);

return regions.contains(region);
}

void URI::addRegionToURI(const std::string &region)
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

As discussed during our call, the current use of this method is already "checking" if region is in the endpoint, but it's probably doing it wrong. Please correct the call site, add docs and examples.

It might be a good idea to rename this method to be addRegionToURIIfNeeded or something of the sort and do the checks inside.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't understand code in Client.cpp.
Checks initial_endpoint, and add region to new_uri.
initial_endpoint is https://s3.amazonaws.com, when new_uri is from error response:

<?xml version="1.0"?>
<?xml version="1.0" encoding="UTF-8"?>
<Error>
    <Code>PermanentRedirect</Code>
    <Message>The bucket you are attempting to access must be addressed using the specified endpoint. Please send all future requests to this endpoint.</Message>
    <Endpoint>91f4f060-1a63-4ff0-oy49hiiqy45utsdypodf5p1j7awk4usw2b--table-s3.s3-us-west-2.amazonaws.com</Endpoint>
    <Bucket>91f4f060-1a63-4ff0-oy49hiiqy45utsdypodf5p1j7awk4usw2b--table-s3</Bucket>
    <RequestId>DCPGXTJ8PA0HVQ27</RequestId>
    <HostId>ANM0Ek0N5xhfUCDwE2bxeVl2MhOl381L8SJz1JjA4lqcKJnZ1ySwT3c3fH1JGjMoCD1uYuTYa3SmEubEU14nx5IHvYGSjXM1</HostId>
</Error>

In response url with region.
What the hell is here? Why initial_endpoint???

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Initial error:

2025.06.19 11:39:14.915513 [ 85 ] {14de1b7e-5a8f-4b3a-af2d-463e0bda9ba8} <Information> AWSClient: Failed to make request to: https://91f4f060-1a63-4ff0-oy49hiiqy45utsdypodf5p1j7awk4usw2b--table-s3.s3-us-west-2.us-west-2.amazonaws.com/metadata/00001-9c6d55c7-3a32-4bd8-a25d-dd2bbdad06ba.metadata.json: Code: 198. DB::NetException: Not found address of host: 91f4f060-1a63-4ff0-oy49hiiqy45utsdypodf5p1j7awk4usw2b--table-s3.s3-us-west-2.us-west-2.amazonaws.com. (DNS_ERROR), Stack trace (when copying this message, always include the lines below):

{
if (auto pos = endpoint.find(".amazonaws.com"); pos != std::string::npos)
{
if (pos > 0)
{ /// Check if region is already in endpoint to avoid add it second time
auto prev_pos = endpoint.find_last_of("/.", pos - 1);
if (prev_pos == std::string::npos)
prev_pos = 0;
else
++prev_pos;
std::string_view endpoint_region = std::string_view(endpoint).substr(prev_pos, pos - prev_pos);
if (isAWSRegion(endpoint_region))
return;
}
endpoint = endpoint.substr(0, pos) + "." + region + endpoint.substr(pos);
}
}

void URI::validateBucket(const String & bucket, const Poco::URI & uri)
Expand Down
4 changes: 4 additions & 0 deletions src/IO/S3/URI.h
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,10 @@ struct URI

static void validateBucket(const std::string & bucket, const Poco::URI & uri);

/// Returns true if 'region' string is an AWS S3 region
/// https://docs.aws.amazon.com/general/latest/gr/s3.html
static bool isAWSRegion(std::string_view region);

private:
std::pair<std::string, std::optional<std::string>> getURIAndArchivePattern(const std::string & source);
};
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -487,7 +487,7 @@ void IcebergMetadata::updateSnapshot()

relevant_snapshot = IcebergSnapshot{
getManifestList(getProperFilePathFromMetadataInfo(
snapshot->getValue<String>(MANIFEST_LIST_PATH_FIELD), configuration_ptr->getPath(), table_location)),
snapshot->getValue<String>(MANIFEST_LIST_PATH_FIELD), configuration_ptr->getPath(), table_location, configuration_ptr->getNamespace())),
relevant_snapshot_id, total_rows, total_bytes};

if (!snapshot->has("schema-id"))
Expand Down Expand Up @@ -654,7 +654,7 @@ ManifestListPtr IcebergMetadata::getManifestList(const String & filename) const
for (size_t i = 0; i < manifest_list_deserializer.rows(); ++i)
{
const std::string file_path = manifest_list_deserializer.getValueFromRowByName(i, MANIFEST_FILE_PATH_COLUMN, TypeIndex::String).safeGet<std::string>();
const auto manifest_file_name = getProperFilePathFromMetadataInfo(file_path, configuration_ptr->getPath(), table_location);
const auto manifest_file_name = getProperFilePathFromMetadataInfo(file_path, configuration_ptr->getPath(), table_location, configuration_ptr->getNamespace());
Int64 added_sequence_number = 0;
if (format_version > 1)
added_sequence_number = manifest_list_deserializer.getValueFromRowByName(i, SEQUENCE_NUMBER_COLUMN, TypeIndex::Int64).safeGet<Int64>();
Expand Down Expand Up @@ -706,6 +706,7 @@ ManifestFilePtr IcebergMetadata::getManifestFile(const String & filename, Int64
schema_processor,
inherited_sequence_number,
table_location,
configuration_ptr->getNamespace(),
context);
};

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -141,6 +141,7 @@ ManifestFileContent::ManifestFileContent(
const IcebergSchemaProcessor & schema_processor,
Int64 inherited_sequence_number,
const String & table_location,
const String & common_namespace,
DB::ContextPtr context)
{
this->schema_id = schema_id_;
Expand Down Expand Up @@ -205,7 +206,11 @@ ManifestFileContent::ManifestFileContent(
}
const auto status = ManifestEntryStatus(manifest_file_deserializer.getValueFromRowByName(i, COLUMN_STATUS_NAME, TypeIndex::Int32).safeGet<UInt64>());

const auto file_path = getProperFilePathFromMetadataInfo(manifest_file_deserializer.getValueFromRowByName(i, SUBCOLUMN_FILE_PATH_NAME, TypeIndex::String).safeGet<String>(), common_path, table_location);
const auto file_path = getProperFilePathFromMetadataInfo(
manifest_file_deserializer.getValueFromRowByName(i, SUBCOLUMN_FILE_PATH_NAME, TypeIndex::String).safeGet<String>(),
common_path,
table_location,
common_namespace);

/// NOTE: This is weird, because in manifest file partition looks like this:
/// {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -96,6 +96,7 @@ class ManifestFileContent
const DB::IcebergSchemaProcessor & schema_processor,
Int64 inherited_sequence_number,
const std::string & table_location,
const std::string & common_namespace,
DB::ContextPtr context);

const std::vector<ManifestFileEntry> & getFiles() const;
Expand Down
21 changes: 19 additions & 2 deletions src/Storages/ObjectStorage/DataLakes/Iceberg/Utils.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,11 @@ using namespace DB;
// This function is used to get the file path inside the directory which corresponds to iceberg table from the full blob path which is written in manifest and metadata files.
// For example, if the full blob path is s3://bucket/table_name/data/00000-1-1234567890.avro, the function will return table_name/data/00000-1-1234567890.avro
// Common path should end with "<table_name>" or "<table_name>/".
std::string getProperFilePathFromMetadataInfo(std::string_view data_path, std::string_view common_path, std::string_view table_location)
std::string getProperFilePathFromMetadataInfo(
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We've spent quite some time reviewing this function together and haven't understood it fully. Please document the possible values and examples for all the arguments here, where they come from and the scenarios.

That'll make reviewing this much easier.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

data_path s3://aws-st-2-fs5vug37-iceberg/aws-public-blockchain/btc/metadata/snap-2539904009313210382-1-4f2e6056-d08e-4420-9bc9-47bc0dcbd6f9.avro
common_path aws-public-blockchain/btc
table_location s3://aws-st-2-fs5vug37-iceberg/aws-public-blockchain/btc
common_namespace aws-st-2-fs5vug37-iceberg
data_path s3://aws-st-2-fs5vug37-iceberg/ssb/lineorder_wide/data/199603/data.parquet
common_path ssb/lineorder_wide
table_location s3://aws-st-2-fs5vug37-iceberg/ssb/lineorder_wide
common_namespace aws-st-2-fs5vug37-iceberg
data_path s3://aws-st-2-fs5vug37-iceberg/nyc/test/metadata/snap-7890808452220287820-1-960673bb-b315-4df9-946e-fd34c44b98f7.avro
common_path nyc/test
table_location s3://aws-st-2-fs5vug37-iceberg/nyc/test
common_namespace aws-st-2-fs5vug37-iceberg

std::string_view data_path,
std::string_view common_path,
std::string_view table_location,
std::string_view common_namespace)
{
auto trim_backward_slash = [](std::string_view str) -> std::string_view
{
Expand Down Expand Up @@ -84,7 +88,20 @@ std::string getProperFilePathFromMetadataInfo(std::string_view data_path, std::s
}
else
{
throw ::DB::Exception(DB::ErrorCodes::BAD_ARGUMENTS, "Expected to find '{}' in data path: '{}'", common_path, data_path);
/// Data files can have different path
pos = data_path.find("://");
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The method docs say:

// For example, if the full blob path is s3://bucket/table_name/data/00000-1-1234567890.avro, the function will return table_name/data/00000-1-1234567890.avro

Which means the bucket shall not be included in the return value as far as I understand.

As far as I understand your exception handling,you skip the initial "://" and then copy everything after the next "/". But what if you are using Path-style URIs (i.e, https://s3.region-code.amazonaws.com/bucket-name/key-name)?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Try to remove bucket

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Same S3/URI comment here

if (pos == std::string::npos)
throw ::DB::Exception(DB::ErrorCodes::BAD_ARGUMENTS, "Unexpected data path: '{}'", data_path);
pos = data_path.find("/", pos + 3);
if (pos == std::string::npos)
throw ::DB::Exception(DB::ErrorCodes::BAD_ARGUMENTS, "Unexpected data path: '{}'", data_path);
if (data_path.substr(pos + 1).starts_with(common_namespace))
{
auto new_pos = data_path.find("/", pos + 1);
if (new_pos - pos == common_namespace.length() + 1) /// bucket in the path
pos = new_pos;
}
return std::string(data_path.substr(pos));
}
}

Expand Down
6 changes: 5 additions & 1 deletion src/Storages/ObjectStorage/DataLakes/Iceberg/Utils.h
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,11 @@
namespace Iceberg
{

std::string getProperFilePathFromMetadataInfo(std::string_view data_path, std::string_view common_path, std::string_view table_location);
std::string getProperFilePathFromMetadataInfo(
std::string_view data_path,
std::string_view common_path,
std::string_view table_location,
std::string_view common_namespace);

}

Expand Down
Loading