Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 3 additions & 4 deletions src/Databases/DataLake/DatabaseDataLake.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -154,11 +154,10 @@ std::shared_ptr<DataLake::ICatalog> DatabaseDataLake::getCatalog() const
case DB::DatabaseDataLakeCatalogType::GLUE:
{
catalog_impl = std::make_shared<DataLake::GlueCatalog>(
settings[DatabaseDataLakeSetting::aws_access_key_id].value,
settings[DatabaseDataLakeSetting::aws_secret_access_key].value,
settings[DatabaseDataLakeSetting::region].value,
url,
Context::getGlobalContextInstance());
Context::getGlobalContextInstance(),
settings,
table_engine_definition);
break;
}
case DB::DatabaseDataLakeCatalogType::ICEBERG_HIVE:
Expand Down
166 changes: 140 additions & 26 deletions src/Databases/DataLake/GlueCatalog.cpp
Original file line number Diff line number Diff line change
@@ -1,4 +1,6 @@
#include <Databases/DataLake/GlueCatalog.h>
#include <Poco/JSON/Object.h>
#include <Storages/ObjectStorage/DataLakes/Iceberg/Constant.h>

#if USE_AWS_S3 && USE_AVRO

Expand All @@ -8,6 +10,7 @@
#include <aws/glue/model/GetDatabasesRequest.h>

#include <Common/Exception.h>
#include <Common/CurrentMetrics.h>
#include <Core/Settings.h>
#include <Interpreters/Context.h>

Expand All @@ -29,12 +32,19 @@
#include <IO/S3/Credentials.h>
#include <IO/S3/Client.h>
#include <IO/S3Settings.h>
#include <IO/ReadBufferFromString.h>
#include <IO/ReadHelpers.h>
#include <Common/ProxyConfigurationResolverProvider.h>
#include <Databases/DataLake/Common.h>
#include <Storages/ObjectStorage/DataLakes/Iceberg/SchemaProcessor.h>
#include <Storages/ObjectStorage/DataLakes/DataLakeStorageSettings.h>
#include <Storages/ObjectStorage/DataLakes/DataLakeConfiguration.h>
#include <Parsers/ASTCreateQuery.h>
#include <Parsers/ASTFunction.h>

namespace DB::ErrorCodes
{
extern const int BAD_ARGUMENTS;
extern const int DATALAKE_DATABASE_ERROR;
}

Expand All @@ -54,20 +64,36 @@ namespace DB::StorageObjectStorageSetting
extern const StorageObjectStorageSettingsString iceberg_metadata_file_path;
}

namespace DB::DatabaseDataLakeSetting
{
extern const DatabaseDataLakeSettingsString storage_endpoint;
extern const DatabaseDataLakeSettingsString aws_access_key_id;
extern const DatabaseDataLakeSettingsString aws_secret_access_key;
extern const DatabaseDataLakeSettingsString region;
}

namespace CurrentMetrics
{
extern const Metric MarkCacheBytes;
extern const Metric MarkCacheFiles;
}

namespace DataLake
{

GlueCatalog::GlueCatalog(
const String & access_key_id,
const String & secret_access_key,
const String & region_,
const String & endpoint,
DB::ContextPtr context_)
DB::ContextPtr context_,
const DB::DatabaseDataLakeSettings & settings_,
DB::ASTPtr table_engine_definition_)
: ICatalog("")
, DB::WithContext(context_)
, log(getLogger("GlueCatalog(" + region_ + ")"))
, credentials(access_key_id, secret_access_key)
, region(region_)
, log(getLogger("GlueCatalog(" + settings_[DB::DatabaseDataLakeSetting::region].value + ")"))
, credentials(settings_[DB::DatabaseDataLakeSetting::aws_access_key_id].value, settings_[DB::DatabaseDataLakeSetting::aws_secret_access_key].value)
, region(settings_[DB::DatabaseDataLakeSetting::region].value)
, settings(settings_)
, table_engine_definition(table_engine_definition_)
, metadata_objects(CurrentMetrics::MarkCacheBytes, CurrentMetrics::MarkCacheFiles, 1024)
{
DB::S3::CredentialsConfiguration creds_config;
creds_config.use_environment_credentials = true;
Expand Down Expand Up @@ -271,6 +297,27 @@ bool GlueCatalog::tryGetTableMetadata(
database_name + "." + table_name, message_part, "ICEBERG"));
}

if (result.requiresCredentials())
setCredentials(result);

auto setup_specific_properties = [&]
{
const auto & table_params = table_outcome.GetParameters();
if (table_params.contains("metadata_location"))
{
result.setDataLakeSpecificProperties(DataLakeSpecificProperties{.iceberg_metadata_file_location = table_params.at("metadata_location")});
}
else
{
result.setTableIsNotReadable(fmt::format("Cannot read table `{}` because it has no metadata_location. " \
"It means that it's unreadable with Glue catalog in ClickHouse, readable tables must have 'metadata_location' in table parameters",
database_name + "." + table_name));
}
};

if (result.requiresDataLakeSpecificProperties())
setup_specific_properties();

if (result.requiresSchema())
{
DB::NamesAndTypesList schema;
Expand All @@ -286,27 +333,18 @@ bool GlueCatalog::tryGetTableMetadata(
if (column_params.contains("iceberg.field.current") && column_params.at("iceberg.field.current") == "false")
continue;

schema.push_back({column.GetName(), getType(column.GetType(), can_be_nullable)});
}
result.setSchema(schema);
}
String column_type = column.GetType();
if (column_type == "timestamp")
{
if (!result.requiresDataLakeSpecificProperties())
setup_specific_properties();
if (classifyTimestampTZ(column.GetName(), result))
column_type = "timestamptz";
}

if (result.requiresCredentials())
setCredentials(result);

if (result.requiresDataLakeSpecificProperties())
{
const auto & table_params = table_outcome.GetParameters();
if (table_params.contains("metadata_location"))
{
result.setDataLakeSpecificProperties(DataLakeSpecificProperties{.iceberg_metadata_file_location = table_params.at("metadata_location")});
}
else
{
result.setTableIsNotReadable(fmt::format("Cannot read table `{}` because it has no metadata_location. " \
"It means that it's unreadable with Glue catalog in ClickHouse, readable tables must have 'metadata_location' in table parameters",
database_name + "." + table_name));
schema.push_back({column.GetName(), getType(column_type, can_be_nullable)});
}
result.setSchema(schema);
}
}
else
Expand Down Expand Up @@ -364,6 +402,82 @@ bool GlueCatalog::empty() const
return true;
}

bool GlueCatalog::classifyTimestampTZ(const String & column_name, const TableMetadata & table_metadata) const
{
String metadata_path;
if (auto table_specific_properties = table_metadata.getDataLakeSpecificProperties();
table_specific_properties.has_value())
{
metadata_path = table_specific_properties->iceberg_metadata_file_location;
if (metadata_path.starts_with("s3:/"))
metadata_path = metadata_path.substr(5);

// Delete bucket
std::size_t pos = metadata_path.find('/');
if (pos != std::string::npos)
metadata_path = metadata_path.substr(pos + 1);
}
else
throw DB::Exception(DB::ErrorCodes::BAD_ARGUMENTS, "Metadata specific properties should be defined");

if (!metadata_objects.get(metadata_path))
{
DB::ASTStorage * storage = table_engine_definition->as<DB::ASTStorage>();
DB::ASTs args = storage->engine->arguments->children;

auto table_endpoint = settings[DB::DatabaseDataLakeSetting::storage_endpoint].value;
if (args.empty())
args.emplace_back(std::make_shared<DB::ASTLiteral>(table_endpoint));
else
args[0] = std::make_shared<DB::ASTLiteral>(table_endpoint);

if (args.size() == 1 && table_metadata.hasStorageCredentials())
{
auto storage_credentials = table_metadata.getStorageCredentials();
if (storage_credentials)
storage_credentials->addCredentialsToEngineArgs(args);
}

auto storage_settings = std::make_shared<DB::DataLakeStorageSettings>();
storage_settings->loadFromSettingsChanges(settings.allChanged());
auto configuration = std::make_shared<DB::StorageS3IcebergConfiguration>(storage_settings);
DB::StorageObjectStorage::Configuration::initialize(*configuration, args, getContext(), false);

auto object_storage = configuration->createObjectStorage(getContext(), true);
const auto & read_settings = getContext()->getReadSettings();

DB::StoredObject metadata_stored_object(metadata_path);
auto read_buf = object_storage->readObject(metadata_stored_object, read_settings);
String metadata_file;
readString(metadata_file, *read_buf);

Poco::JSON::Parser parser;
Poco::Dynamic::Var result = parser.parse(metadata_file);
auto metadata_object = result.extract<Poco::JSON::Object::Ptr>();
metadata_objects.set(metadata_path, std::make_shared<Poco::JSON::Object::Ptr>(metadata_object));
}
auto metadata_object = *metadata_objects.get(metadata_path);
auto current_schema_id = metadata_object->getValue<Int64>("current-schema-id");
auto schemas = metadata_object->getArray(Iceberg::f_schemas);
for (size_t i = 0; i < schemas->size(); ++i)
{
auto schema = schemas->getObject(static_cast<UInt32>(i));
if (schema->getValue<Int64>("schema-id") == current_schema_id)
{
auto fields = schema->getArray(Iceberg::f_fields);
for (size_t j = 0; j < fields->size(); ++j)
{
auto field = fields->getObject(static_cast<UInt32>(j));
if (field->getValue<String>(Iceberg::f_name) == column_name)
return field->getValue<String>(Iceberg::f_type) == Iceberg::f_timestamptz;
}
}
}

return false;
}


}

#endif
19 changes: 15 additions & 4 deletions src/Databases/DataLake/GlueCatalog.h
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,10 @@
#include <Databases/DataLake/ICatalog.h>
#include <Interpreters/Context_fwd.h>
#include <Poco/JSON/Object.h>
#include <Poco/LRUCache.h>

#include <Common/CacheBase.h>
#include <Databases/DataLake/DatabaseDataLakeSettings.h>

namespace Aws::Glue
{
Expand All @@ -20,11 +24,10 @@ class GlueCatalog final : public ICatalog, private DB::WithContext
{
public:
GlueCatalog(
const String & access_key_id,
const String & secret_access_key,
const String & region,
const String & endpoint,
DB::ContextPtr context_);
DB::ContextPtr context_,
const DB::DatabaseDataLakeSettings & settings_,
DB::ASTPtr table_engine_definition_);

~GlueCatalog() override;

Expand Down Expand Up @@ -60,10 +63,18 @@ class GlueCatalog final : public ICatalog, private DB::WithContext
const LoggerPtr log;
Aws::Auth::AWSCredentials credentials;
std::string region;
DB::DatabaseDataLakeSettings settings;
DB::ASTPtr table_engine_definition;

DataLake::ICatalog::Namespaces getDatabases(const std::string & prefix, size_t limit = 0) const;
DB::Names getTablesForDatabase(const std::string & db_name, size_t limit = 0) const;
void setCredentials(TableMetadata & metadata) const;

/// The Glue catalog does not store detailed information about the types of timestamp columns, such as whether the column is timestamp or timestamptz.
/// This method allows to clarify the actual type of the timestamp column.
bool classifyTimestampTZ(const String & column_name, const TableMetadata & table_metadata) const;

mutable DB::CacheBase<String, Poco::JSON::Object::Ptr> metadata_objects;
};

}
Expand Down
49 changes: 49 additions & 0 deletions tests/integration/test_database_glue/test.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
import pyarrow as pa
import pytest
import urllib3
import pytz
from datetime import datetime, timedelta
from minio import Minio
from pyiceberg.catalog import load_catalog
Expand All @@ -22,6 +23,7 @@
StringType,
StructType,
TimestampType,
TimestamptzType,
MapType,
DecimalType,
)
Expand Down Expand Up @@ -429,3 +431,50 @@ def test_non_existing_tables(started_cluster):
except Exception as e:
assert "DB::Exception: Table" in str(e)
assert "doesn't exist" in str(e)


def test_timestamps(started_cluster):
node = started_cluster.instances["node1"]

test_ref = f"test_list_tables_{uuid.uuid4()}"
table_name = f"{test_ref}_table"
root_namespace = f"{test_ref}_namespace"

catalog = load_catalog_impl(started_cluster)
catalog.create_namespace(root_namespace)

schema = Schema(
NestedField(
field_id=1, name="timestamp", field_type=TimestampType(), required=False
),
NestedField(
field_id=2,
name="timestamptz",
field_type=TimestamptzType(),
required=False,
),
)
table = create_table(catalog, root_namespace, table_name, schema)

create_clickhouse_glue_database(started_cluster, node, CATALOG_NAME)

data = [
{
"timestamp": datetime(2024, 1, 1, hour=12, minute=0, second=0, microsecond=0),
"timestamptz": datetime(
2024,
1,
1,
hour=12,
minute=0,
second=0,
microsecond=0,
tzinfo=pytz.timezone("UTC"),
)
}
]
df = pa.Table.from_pylist(data)
table.append(df)

assert node.query(f"SHOW CREATE TABLE {CATALOG_NAME}.`{root_namespace}.{table_name}`") == f"CREATE TABLE {CATALOG_NAME}.`{root_namespace}.{table_name}`\\n(\\n `timestamp` Nullable(DateTime64(6)),\\n `timestamptz` Nullable(DateTime64(6, \\'UTC\\'))\\n)\\nENGINE = Iceberg(\\'http://minio:9000/warehouse-glue/data/\\', \\'minio\\', \\'[HIDDEN]\\')\n"
assert node.query(f"SELECT * FROM {CATALOG_NAME}.`{root_namespace}.{table_name}`") == "2024-01-01 12:00:00.000000\t2024-01-01 12:00:00.000000\n"
Loading
Loading