Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
36 changes: 36 additions & 0 deletions src/Storages/MergeTree/IMergeTreeDataPart.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -279,6 +279,42 @@ String IMergeTreeDataPart::MinMaxIndex::getFileColumnName(const String & column_
return stream_name;
}

Block IMergeTreeDataPart::MinMaxIndex::getBlock(const MergeTreeData & data) const
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Fun fact: I copied this method from my first ClickHouse PR ever (or my second, I don't quite recall). This PR took over a year to be reviewed, and once merged, was reverted within a week :).

ClickHouse#39507

{
if (!initialized)
throw Exception(ErrorCodes::LOGICAL_ERROR, "Attempt to get block from uninitialized MinMax index.");

Block block;

const auto metadata_snapshot = data.getInMemoryMetadataPtr();
const auto & partition_key = metadata_snapshot->getPartitionKey();

const auto minmax_column_names = data.getMinMaxColumnsNames(partition_key);
const auto minmax_column_types = data.getMinMaxColumnsTypes(partition_key);
const auto minmax_idx_size = minmax_column_types.size();

for (size_t i = 0; i < minmax_idx_size; ++i)
{
const auto & data_type = minmax_column_types[i];
const auto & column_name = minmax_column_names[i];

const auto column = data_type->createColumn();

auto range = hyperrectangle.at(i);
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@Enmk maybe I'll undo this change then so we can rely on references since we have reached the conclusion ranges will always be included for minmax? But maybe that's not true

range.shrinkToIncludedIfPossible();

const auto & min_val = range.left;
const auto & max_val = range.right;

column->insert(min_val);
column->insert(max_val);

block.insert(ColumnWithTypeAndName(column->getPtr(), data_type, column_name));
}

return block;
}

void IMergeTreeDataPart::incrementStateMetric(MergeTreeDataPartState state_) const
{
switch (state_)
Expand Down
2 changes: 2 additions & 0 deletions src/Storages/MergeTree/IMergeTreeDataPart.h
Original file line number Diff line number Diff line change
Expand Up @@ -362,6 +362,8 @@ class IMergeTreeDataPart : public std::enable_shared_from_this<IMergeTreeDataPar
static String getFileColumnName(const String & column_name, const MergeTreeSettingsPtr & storage_settings_);
/// For Load
static String getFileColumnName(const String & column_name, const Checksums & checksums_);

Block getBlock(const MergeTreeData & data) const;
};

using MinMaxIndexPtr = std::shared_ptr<MinMaxIndex>;
Expand Down
9 changes: 3 additions & 6 deletions src/Storages/MergeTree/MergeTreeData.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -6268,16 +6268,13 @@ void MergeTreeData::exportPartToTableImpl(

MergeTreeSequentialSourceType read_type = MergeTreeSequentialSourceType::Export;

NamesAndTypesList partition_columns;
Block block_with_partition_values;
if (metadata_snapshot->hasPartitionKey())
{
const auto & partition_key = metadata_snapshot->getPartitionKey();
if (!partition_key.column_names.empty())
partition_columns = partition_key.expression->getRequiredColumnsWithTypes();
/// todo arthur do I need to init minmax_idx?
block_with_partition_values = manifest.data_part->minmax_idx->getBlock(*this);
}

auto block_with_partition_values = manifest.data_part->partition.getBlockWithPartitionValues(partition_columns);

auto destination_storage = DatabaseCatalog::instance().tryGetTable(manifest.destination_storage_id, getContext());
if (!destination_storage)
{
Expand Down
8 changes: 6 additions & 2 deletions src/Storages/ObjectStorage/ObjectStorageFilePathGenerator.h
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
#include <Storages/PartitionedSink.h>
#include <Poco/String.h>
#include <Functions/generateSnowflakeID.h>
#include <boost/algorithm/string/replace.hpp>

namespace DB
{
Expand All @@ -19,12 +20,15 @@ namespace DB

struct ObjectStorageWildcardFilePathGenerator : ObjectStorageFilePathGenerator
{
static constexpr const char * FILE_WILDCARD = "{_file}";
explicit ObjectStorageWildcardFilePathGenerator(const std::string & raw_path_) : raw_path(raw_path_) {}

using ObjectStorageFilePathGenerator::getPathForWrite; // Bring base class overloads into scope
std::string getPathForWrite(const std::string & partition_id, const std::string & /* file_name_override */) const override
std::string getPathForWrite(const std::string & partition_id, const std::string & file_name_override) const override
{
return PartitionedSink::replaceWildcards(raw_path, partition_id);
const auto partition_replaced_path = PartitionedSink::replaceWildcards(raw_path, partition_id);
const auto final_path = boost::replace_all_copy(partition_replaced_path, FILE_WILDCARD, file_name_override);
return final_path;
}

std::string getPathForRead() const override
Expand Down
8 changes: 7 additions & 1 deletion src/Storages/ObjectStorage/StorageObjectStorage.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -468,7 +468,13 @@ bool StorageObjectStorage::optimize(

bool StorageObjectStorage::supportsImport() const
{
return configuration->partition_strategy != nullptr && configuration->partition_strategy_type == PartitionStrategyFactory::StrategyType::HIVE;
if (!configuration->partition_strategy)
return false;

if (configuration->partition_strategy_type == PartitionStrategyFactory::StrategyType::WILDCARD)
return configuration->getRawPath().hasExportFilenameWildcard();

return configuration->partition_strategy_type == PartitionStrategyFactory::StrategyType::HIVE;
}

SinkToStoragePtr StorageObjectStorage::import(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -159,6 +159,12 @@ bool StorageObjectStorageConfiguration::Path::hasPartitionWildcard() const
return path.find(PARTITION_ID_WILDCARD) != String::npos;
}

bool StorageObjectStorageConfiguration::Path::hasExportFilenameWildcard() const
{
return path.find(ObjectStorageWildcardFilePathGenerator::FILE_WILDCARD) != String::npos;
}


bool StorageObjectStorageConfiguration::Path::hasGlobsIgnorePartitionWildcard() const
{
if (!hasPartitionWildcard())
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,7 @@ class StorageObjectStorageConfiguration
std::string path;

bool hasPartitionWildcard() const;
bool hasExportFilenameWildcard() const;
bool hasGlobsIgnorePartitionWildcard() const;
bool hasGlobs() const;
std::string cutGlobs(bool supports_partial_prefix) const;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,3 +14,20 @@
2 2020
3 2020
4 2021
---- Export 2020_1_1_0 and 2021_2_2_0 to wildcard table
---- Both data parts should appear
1 2020
2 2020
3 2020
4 2021
---- Export the same part again, it should be idempotent
1 2020
2 2020
3 2020
4 2021
---- Export 2020_1_1_0 and 2021_2_2_0 to wildcard table with partition expression with function
---- Both data parts should appear
1 2020
2 2020
3 2020
4 2021
Original file line number Diff line number Diff line change
@@ -1,19 +1,21 @@
#!/usr/bin/env bash
# Tags: no-fasttest

CURDIR=$(cd "$(dirname "${BASH_SOURCE[0]}")" && pwd)
# shellcheck source=../shell_config.sh
. "$CURDIR"/../shell_config.sh

mt_table="mt_table_${RANDOM}"
mt_table_partition_expression_with_function="mt_table_partition_expression_with_function_${RANDOM}"
s3_table="s3_table_${RANDOM}"
s3_table_wildcard="s3_table_wildcard_${RANDOM}"
s3_table_wildcard_partition_expression_with_function="s3_table_wildcard_partition_expression_with_function_${RANDOM}"
mt_table_roundtrip="mt_table_roundtrip_${RANDOM}"

query() {
$CLICKHOUSE_CLIENT --query "$1"
}

query "DROP TABLE IF EXISTS $mt_table, $s3_table, $mt_table_roundtrip"
query "DROP TABLE IF EXISTS $mt_table, $s3_table, $mt_table_roundtrip, $s3_table_wildcard, $s3_table_wildcard_partition_expression_with_function, $mt_table_partition_expression_with_function"

query "CREATE TABLE $mt_table (id UInt64, year UInt16) ENGINE = MergeTree() PARTITION BY year ORDER BY tuple()"
query "CREATE TABLE $s3_table (id UInt64, year UInt16) ENGINE = S3(s3_conn, filename='$s3_table', format=Parquet, partition_strategy='hive') PARTITION BY year"
Expand All @@ -36,4 +38,31 @@ query "CREATE TABLE $mt_table_roundtrip ENGINE = MergeTree() PARTITION BY year O
echo "---- Data in roundtrip MergeTree table (should match s3_table)"
query "SELECT * FROM $s3_table ORDER BY id"

query "DROP TABLE IF EXISTS $mt_table, $s3_table, $mt_table_roundtrip"
query "CREATE TABLE $s3_table_wildcard (id UInt64, year UInt16) ENGINE = S3(s3_conn, filename='$s3_table_wildcard/{_partition_id}/{_file}.parquet', format=Parquet, partition_strategy='wildcard') PARTITION BY year"

echo "---- Export 2020_1_1_0 and 2021_2_2_0 to wildcard table"
query "ALTER TABLE $mt_table EXPORT PART '2020_1_1_0' TO TABLE $s3_table_wildcard SETTINGS allow_experimental_export_merge_tree_part = 1"
query "ALTER TABLE $mt_table EXPORT PART '2021_2_2_0' TO TABLE $s3_table_wildcard SETTINGS allow_experimental_export_merge_tree_part = 1"

echo "---- Both data parts should appear"
query "SELECT * FROM s3(s3_conn, filename='$s3_table_wildcard/**.parquet') ORDER BY id"

echo "---- Export the same part again, it should be idempotent"
query "ALTER TABLE $mt_table EXPORT PART '2020_1_1_0' TO TABLE $s3_table_wildcard SETTINGS allow_experimental_export_merge_tree_part = 1"

query "SELECT * FROM s3(s3_conn, filename='$s3_table_wildcard/**.parquet') ORDER BY id"

query "CREATE TABLE $mt_table_partition_expression_with_function (id UInt64, year UInt16) ENGINE = MergeTree() PARTITION BY toString(year) ORDER BY tuple()"
query "CREATE TABLE $s3_table_wildcard_partition_expression_with_function (id UInt64, year UInt16) ENGINE = S3(s3_conn, filename='$s3_table_wildcard_partition_expression_with_function/{_partition_id}/{_file}.parquet', format=Parquet, partition_strategy='wildcard') PARTITION BY toString(year)"

# insert
query "INSERT INTO $mt_table_partition_expression_with_function VALUES (1, 2020), (2, 2020), (3, 2020), (4, 2021)"

echo "---- Export 2020_1_1_0 and 2021_2_2_0 to wildcard table with partition expression with function"
query "ALTER TABLE $mt_table_partition_expression_with_function EXPORT PART 'cb217c742dc7d143b61583011996a160_1_1_0' TO TABLE $s3_table_wildcard_partition_expression_with_function SETTINGS allow_experimental_export_merge_tree_part = 1"
query "ALTER TABLE $mt_table_partition_expression_with_function EXPORT PART '3be6d49ecf9749a383964bc6fab22d10_2_2_0' TO TABLE $s3_table_wildcard_partition_expression_with_function SETTINGS allow_experimental_export_merge_tree_part = 1"

echo "---- Both data parts should appear"
query "SELECT * FROM s3(s3_conn, filename='$s3_table_wildcard_partition_expression_with_function/**.parquet') ORDER BY id"

query "DROP TABLE IF EXISTS $mt_table, $s3_table, $mt_table_roundtrip, $s3_table_wildcard, $s3_table_wildcard_partition_expression_with_function, $mt_table_partition_expression_with_function"
Loading