diff --git a/src/Databases/DataLake/DatabaseDataLake.cpp b/src/Databases/DataLake/DatabaseDataLake.cpp index 4abdeb03a3a4..c46e5245bfb2 100644 --- a/src/Databases/DataLake/DatabaseDataLake.cpp +++ b/src/Databases/DataLake/DatabaseDataLake.cpp @@ -154,11 +154,10 @@ std::shared_ptr DatabaseDataLake::getCatalog() const case DB::DatabaseDataLakeCatalogType::GLUE: { catalog_impl = std::make_shared( - settings[DatabaseDataLakeSetting::aws_access_key_id].value, - settings[DatabaseDataLakeSetting::aws_secret_access_key].value, - settings[DatabaseDataLakeSetting::region].value, url, - Context::getGlobalContextInstance()); + Context::getGlobalContextInstance(), + settings, + table_engine_definition); break; } case DB::DatabaseDataLakeCatalogType::ICEBERG_HIVE: diff --git a/src/Databases/DataLake/GlueCatalog.cpp b/src/Databases/DataLake/GlueCatalog.cpp index fde1019463c8..4a397a28e79d 100644 --- a/src/Databases/DataLake/GlueCatalog.cpp +++ b/src/Databases/DataLake/GlueCatalog.cpp @@ -1,4 +1,6 @@ #include +#include +#include #if USE_AWS_S3 && USE_AVRO @@ -8,6 +10,7 @@ #include #include +#include #include #include @@ -29,12 +32,19 @@ #include #include #include +#include +#include #include #include #include +#include +#include +#include +#include namespace DB::ErrorCodes { + extern const int BAD_ARGUMENTS; extern const int DATALAKE_DATABASE_ERROR; } @@ -54,20 +64,36 @@ namespace DB::StorageObjectStorageSetting extern const StorageObjectStorageSettingsString iceberg_metadata_file_path; } +namespace DB::DatabaseDataLakeSetting +{ + extern const DatabaseDataLakeSettingsString storage_endpoint; + extern const DatabaseDataLakeSettingsString aws_access_key_id; + extern const DatabaseDataLakeSettingsString aws_secret_access_key; + extern const DatabaseDataLakeSettingsString region; +} + +namespace CurrentMetrics +{ + extern const Metric MarkCacheBytes; + extern const Metric MarkCacheFiles; +} + namespace DataLake { GlueCatalog::GlueCatalog( - const String & access_key_id, - const String & secret_access_key, - const String & region_, const String & endpoint, - DB::ContextPtr context_) + DB::ContextPtr context_, + const DB::DatabaseDataLakeSettings & settings_, + DB::ASTPtr table_engine_definition_) : ICatalog("") , DB::WithContext(context_) - , log(getLogger("GlueCatalog(" + region_ + ")")) - , credentials(access_key_id, secret_access_key) - , region(region_) + , log(getLogger("GlueCatalog(" + settings_[DB::DatabaseDataLakeSetting::region].value + ")")) + , credentials(settings_[DB::DatabaseDataLakeSetting::aws_access_key_id].value, settings_[DB::DatabaseDataLakeSetting::aws_secret_access_key].value) + , region(settings_[DB::DatabaseDataLakeSetting::region].value) + , settings(settings_) + , table_engine_definition(table_engine_definition_) + , metadata_objects(CurrentMetrics::MarkCacheBytes, CurrentMetrics::MarkCacheFiles, 1024) { DB::S3::CredentialsConfiguration creds_config; creds_config.use_environment_credentials = true; @@ -271,6 +297,27 @@ bool GlueCatalog::tryGetTableMetadata( database_name + "." + table_name, message_part, "ICEBERG")); } + if (result.requiresCredentials()) + setCredentials(result); + + auto setup_specific_properties = [&] + { + const auto & table_params = table_outcome.GetParameters(); + if (table_params.contains("metadata_location")) + { + result.setDataLakeSpecificProperties(DataLakeSpecificProperties{.iceberg_metadata_file_location = table_params.at("metadata_location")}); + } + else + { + result.setTableIsNotReadable(fmt::format("Cannot read table `{}` because it has no metadata_location. " \ + "It means that it's unreadable with Glue catalog in ClickHouse, readable tables must have 'metadata_location' in table parameters", + database_name + "." + table_name)); + } + }; + + if (result.requiresDataLakeSpecificProperties()) + setup_specific_properties(); + if (result.requiresSchema()) { DB::NamesAndTypesList schema; @@ -286,27 +333,18 @@ bool GlueCatalog::tryGetTableMetadata( if (column_params.contains("iceberg.field.current") && column_params.at("iceberg.field.current") == "false") continue; - schema.push_back({column.GetName(), getType(column.GetType(), can_be_nullable)}); - } - result.setSchema(schema); - } + String column_type = column.GetType(); + if (column_type == "timestamp") + { + if (!result.requiresDataLakeSpecificProperties()) + setup_specific_properties(); + if (classifyTimestampTZ(column.GetName(), result)) + column_type = "timestamptz"; + } - if (result.requiresCredentials()) - setCredentials(result); - - if (result.requiresDataLakeSpecificProperties()) - { - const auto & table_params = table_outcome.GetParameters(); - if (table_params.contains("metadata_location")) - { - result.setDataLakeSpecificProperties(DataLakeSpecificProperties{.iceberg_metadata_file_location = table_params.at("metadata_location")}); - } - else - { - result.setTableIsNotReadable(fmt::format("Cannot read table `{}` because it has no metadata_location. " \ - "It means that it's unreadable with Glue catalog in ClickHouse, readable tables must have 'metadata_location' in table parameters", - database_name + "." + table_name)); + schema.push_back({column.GetName(), getType(column_type, can_be_nullable)}); } + result.setSchema(schema); } } else @@ -364,6 +402,82 @@ bool GlueCatalog::empty() const return true; } +bool GlueCatalog::classifyTimestampTZ(const String & column_name, const TableMetadata & table_metadata) const +{ + String metadata_path; + if (auto table_specific_properties = table_metadata.getDataLakeSpecificProperties(); + table_specific_properties.has_value()) + { + metadata_path = table_specific_properties->iceberg_metadata_file_location; + if (metadata_path.starts_with("s3:/")) + metadata_path = metadata_path.substr(5); + + // Delete bucket + std::size_t pos = metadata_path.find('/'); + if (pos != std::string::npos) + metadata_path = metadata_path.substr(pos + 1); + } + else + throw DB::Exception(DB::ErrorCodes::BAD_ARGUMENTS, "Metadata specific properties should be defined"); + + if (!metadata_objects.get(metadata_path)) + { + DB::ASTStorage * storage = table_engine_definition->as(); + DB::ASTs args = storage->engine->arguments->children; + + auto table_endpoint = settings[DB::DatabaseDataLakeSetting::storage_endpoint].value; + if (args.empty()) + args.emplace_back(std::make_shared(table_endpoint)); + else + args[0] = std::make_shared(table_endpoint); + + if (args.size() == 1 && table_metadata.hasStorageCredentials()) + { + auto storage_credentials = table_metadata.getStorageCredentials(); + if (storage_credentials) + storage_credentials->addCredentialsToEngineArgs(args); + } + + auto storage_settings = std::make_shared(); + storage_settings->loadFromSettingsChanges(settings.allChanged()); + auto configuration = std::make_shared(storage_settings); + DB::StorageObjectStorage::Configuration::initialize(*configuration, args, getContext(), false); + + auto object_storage = configuration->createObjectStorage(getContext(), true); + const auto & read_settings = getContext()->getReadSettings(); + + DB::StoredObject metadata_stored_object(metadata_path); + auto read_buf = object_storage->readObject(metadata_stored_object, read_settings); + String metadata_file; + readString(metadata_file, *read_buf); + + Poco::JSON::Parser parser; + Poco::Dynamic::Var result = parser.parse(metadata_file); + auto metadata_object = result.extract(); + metadata_objects.set(metadata_path, std::make_shared(metadata_object)); + } + auto metadata_object = *metadata_objects.get(metadata_path); + auto current_schema_id = metadata_object->getValue("current-schema-id"); + auto schemas = metadata_object->getArray(Iceberg::f_schemas); + for (size_t i = 0; i < schemas->size(); ++i) + { + auto schema = schemas->getObject(static_cast(i)); + if (schema->getValue("schema-id") == current_schema_id) + { + auto fields = schema->getArray(Iceberg::f_fields); + for (size_t j = 0; j < fields->size(); ++j) + { + auto field = fields->getObject(static_cast(j)); + if (field->getValue(Iceberg::f_name) == column_name) + return field->getValue(Iceberg::f_type) == Iceberg::f_timestamptz; + } + } + } + + return false; +} + + } #endif diff --git a/src/Databases/DataLake/GlueCatalog.h b/src/Databases/DataLake/GlueCatalog.h index 259a22059b27..5e4b4f6a1def 100644 --- a/src/Databases/DataLake/GlueCatalog.h +++ b/src/Databases/DataLake/GlueCatalog.h @@ -7,6 +7,10 @@ #include #include #include +#include + +#include +#include namespace Aws::Glue { @@ -20,11 +24,10 @@ class GlueCatalog final : public ICatalog, private DB::WithContext { public: GlueCatalog( - const String & access_key_id, - const String & secret_access_key, - const String & region, const String & endpoint, - DB::ContextPtr context_); + DB::ContextPtr context_, + const DB::DatabaseDataLakeSettings & settings_, + DB::ASTPtr table_engine_definition_); ~GlueCatalog() override; @@ -60,10 +63,18 @@ class GlueCatalog final : public ICatalog, private DB::WithContext const LoggerPtr log; Aws::Auth::AWSCredentials credentials; std::string region; + DB::DatabaseDataLakeSettings settings; + DB::ASTPtr table_engine_definition; DataLake::ICatalog::Namespaces getDatabases(const std::string & prefix, size_t limit = 0) const; DB::Names getTablesForDatabase(const std::string & db_name, size_t limit = 0) const; void setCredentials(TableMetadata & metadata) const; + + /// The Glue catalog does not store detailed information about the types of timestamp columns, such as whether the column is timestamp or timestamptz. + /// This method allows to clarify the actual type of the timestamp column. + bool classifyTimestampTZ(const String & column_name, const TableMetadata & table_metadata) const; + + mutable DB::CacheBase metadata_objects; }; } diff --git a/tests/integration/test_database_glue/test.py b/tests/integration/test_database_glue/test.py index 25e314ea4784..a5df170ef6e1 100644 --- a/tests/integration/test_database_glue/test.py +++ b/tests/integration/test_database_glue/test.py @@ -7,6 +7,7 @@ import pyarrow as pa import pytest import urllib3 +import pytz from datetime import datetime, timedelta from minio import Minio from pyiceberg.catalog import load_catalog @@ -22,6 +23,7 @@ StringType, StructType, TimestampType, + TimestamptzType, MapType, DecimalType, ) @@ -429,3 +431,50 @@ def test_non_existing_tables(started_cluster): except Exception as e: assert "DB::Exception: Table" in str(e) assert "doesn't exist" in str(e) + + +def test_timestamps(started_cluster): + node = started_cluster.instances["node1"] + + test_ref = f"test_list_tables_{uuid.uuid4()}" + table_name = f"{test_ref}_table" + root_namespace = f"{test_ref}_namespace" + + catalog = load_catalog_impl(started_cluster) + catalog.create_namespace(root_namespace) + + schema = Schema( + NestedField( + field_id=1, name="timestamp", field_type=TimestampType(), required=False + ), + NestedField( + field_id=2, + name="timestamptz", + field_type=TimestamptzType(), + required=False, + ), + ) + table = create_table(catalog, root_namespace, table_name, schema) + + create_clickhouse_glue_database(started_cluster, node, CATALOG_NAME) + + data = [ + { + "timestamp": datetime(2024, 1, 1, hour=12, minute=0, second=0, microsecond=0), + "timestamptz": datetime( + 2024, + 1, + 1, + hour=12, + minute=0, + second=0, + microsecond=0, + tzinfo=pytz.timezone("UTC"), + ) + } + ] + df = pa.Table.from_pylist(data) + table.append(df) + + assert node.query(f"SHOW CREATE TABLE {CATALOG_NAME}.`{root_namespace}.{table_name}`") == f"CREATE TABLE {CATALOG_NAME}.`{root_namespace}.{table_name}`\\n(\\n `timestamp` Nullable(DateTime64(6)),\\n `timestamptz` Nullable(DateTime64(6, \\'UTC\\'))\\n)\\nENGINE = Iceberg(\\'http://minio:9000/warehouse-glue/data/\\', \\'minio\\', \\'[HIDDEN]\\')\n" + assert node.query(f"SELECT * FROM {CATALOG_NAME}.`{root_namespace}.{table_name}`") == "2024-01-01 12:00:00.000000\t2024-01-01 12:00:00.000000\n" diff --git a/tests/integration/test_database_iceberg/test.py b/tests/integration/test_database_iceberg/test.py index c729f709654c..10800a99cb1b 100644 --- a/tests/integration/test_database_iceberg/test.py +++ b/tests/integration/test_database_iceberg/test.py @@ -11,6 +11,7 @@ import pytest import requests import urllib3 +import pytz from minio import Minio from pyiceberg.catalog import load_catalog from pyiceberg.partitioning import PartitionField, PartitionSpec @@ -24,6 +25,7 @@ StringType, StructType, TimestampType, + TimestamptzType ) from helpers.cluster import ClickHouseCluster, ClickHouseInstance, is_arm @@ -375,6 +377,54 @@ def record(key): assert 'bbb\nbbb\nbbb' == node.query(f"SELECT symbol FROM {CATALOG_NAME}.`{namespace}.{table_name_2}`").strip() +def test_timestamps(started_cluster): + + node = started_cluster.instances["node1"] + + test_ref = f"test_list_tables_{uuid.uuid4()}" + table_name = f"{test_ref}_table" + root_namespace = f"{test_ref}_namespace" + + catalog = load_catalog_impl(started_cluster) + catalog.create_namespace(root_namespace) + + schema = Schema( + NestedField( + field_id=1, name="timestamp", field_type=TimestampType(), required=False + ), + NestedField( + field_id=2, + name="timestamptz", + field_type=TimestamptzType(), + required=False, + ), + ) + table = create_table(catalog, root_namespace, table_name, schema) + + create_clickhouse_iceberg_database(started_cluster, node, CATALOG_NAME) + + data = [ + { + "timestamp": datetime(2024, 1, 1, hour=12, minute=0, second=0, microsecond=0), + "timestamptz": datetime( + 2024, + 1, + 1, + hour=12, + minute=0, + second=0, + microsecond=0, + tzinfo=pytz.timezone("UTC"), + ) + } + ] + df = pa.Table.from_pylist(data) + table.append(df) + + assert node.query(f"SHOW CREATE TABLE {CATALOG_NAME}.`{root_namespace}.{table_name}`") == f"CREATE TABLE {CATALOG_NAME}.`{root_namespace}.{table_name}`\\n(\\n `timestamp` Nullable(DateTime64(6)),\\n `timestamptz` Nullable(DateTime64(6, \\'UTC\\'))\\n)\\nENGINE = Iceberg(\\'http://minio:9000/warehouse-rest/data/\\', \\'minio\\', \\'[HIDDEN]\\')\n" + assert node.query(f"SELECT * FROM {CATALOG_NAME}.`{root_namespace}.{table_name}`") == "2024-01-01 12:00:00.000000\t2024-01-01 12:00:00.000000\n" + + def test_non_existing_tables(started_cluster): node = started_cluster.instances["node1"] @@ -425,3 +475,4 @@ def test_non_existing_tables(started_cluster): except Exception as e: assert "DB::Exception: Table" in str(e) assert "doesn't exist" in str(e) +