Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
23 changes: 19 additions & 4 deletions src/DataTypes/Serializations/SerializationInfo.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -246,7 +246,8 @@ void SerializationInfoByName::writeJSON(WriteBuffer & out) const
return writeString(oss.str(), out);
}

void SerializationInfoByName::readJSON(ReadBuffer & in)
SerializationInfoByName SerializationInfoByName::readJSON(
const NamesAndTypesList & columns, const Settings & settings, ReadBuffer & in)
{
String json_str;
readString(json_str, in);
Expand All @@ -262,22 +263,36 @@ void SerializationInfoByName::readJSON(ReadBuffer & in)
"Unknown version of serialization infos ({}). Should be less or equal than {}",
object->getValue<size_t>(KEY_VERSION), SERIALIZATION_INFO_VERSION);

SerializationInfoByName infos;
if (object->has(KEY_COLUMNS))
{
std::unordered_map<std::string_view, const IDataType *> column_type_by_name;
for (const auto & [name, type] : columns)
column_type_by_name.emplace(name, type.get());

auto array = object->getArray(KEY_COLUMNS);
for (const auto & elem : *array)
{
auto elem_object = elem.extract<Poco::JSON::Object::Ptr>();

if (!elem_object->has(KEY_NAME))
throw Exception(ErrorCodes::CORRUPTED_DATA,
"Missed field '{}' in SerializationInfo of columns", KEY_NAME);
"Missed field '{}' in serialization infos", KEY_NAME);

auto name = elem_object->getValue<String>(KEY_NAME);
if (auto it = find(name); it != end())
it->second->fromJSON(*elem_object);
auto it = column_type_by_name.find(name);

if (it == column_type_by_name.end())
throw Exception(ErrorCodes::CORRUPTED_DATA,
"Found unexpected column '{}' in serialization infos", name);

auto info = it->second->createSerializationInfo(settings);
info->fromJSON(*elem_object);
infos.emplace(name, std::move(info));
}
}

return infos;
}

}
8 changes: 6 additions & 2 deletions src/DataTypes/Serializations/SerializationInfo.h
Original file line number Diff line number Diff line change
Expand Up @@ -96,8 +96,10 @@ using MutableSerializationInfos = std::vector<MutableSerializationInfoPtr>;
class SerializationInfoByName : public std::map<String, MutableSerializationInfoPtr>
{
public:
using Settings = SerializationInfo::Settings;

SerializationInfoByName() = default;
SerializationInfoByName(const NamesAndTypesList & columns, const SerializationInfo::Settings & settings);
SerializationInfoByName(const NamesAndTypesList & columns, const Settings & settings);

void add(const Block & block);
void add(const SerializationInfoByName & other);
Expand All @@ -108,7 +110,9 @@ class SerializationInfoByName : public std::map<String, MutableSerializationInfo
void replaceData(const SerializationInfoByName & other);

void writeJSON(WriteBuffer & out) const;
void readJSON(ReadBuffer & in);

static SerializationInfoByName readJSON(
const NamesAndTypesList & columns, const Settings & settings, ReadBuffer & in);
};

}
7 changes: 3 additions & 4 deletions src/Storages/MergeTree/IMergeTreeDataPart.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1333,12 +1333,11 @@ void IMergeTreeDataPart::loadColumns(bool require)
.choose_kind = false,
};

SerializationInfoByName infos(loaded_columns, settings);
exists = metadata_manager->exists(SERIALIZATION_FILE_NAME);
if (exists)
SerializationInfoByName infos;
if (metadata_manager->exists(SERIALIZATION_FILE_NAME))
{
auto in = metadata_manager->read(SERIALIZATION_FILE_NAME);
infos.readJSON(*in);
infos = SerializationInfoByName::readJSON(loaded_columns, settings, *in);
}

setColumns(loaded_columns, infos);
Expand Down
8 changes: 8 additions & 0 deletions src/Storages/MergeTree/MergeTask.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -326,6 +326,7 @@ bool MergeTask::ExecuteAndFinalizeHorizontalPart::prepare()
if (!ctx->need_remove_expired_values)
{
size_t expired_columns = 0;
auto part_serialization_infos = global_ctx->new_data_part->getSerializationInfos();

for (auto & [column_name, ttl] : global_ctx->new_data_part->ttl_infos.columns_ttl)
{
Expand All @@ -335,6 +336,8 @@ bool MergeTask::ExecuteAndFinalizeHorizontalPart::prepare()
LOG_TRACE(ctx->log, "Adding expired column {} for part {}", column_name, global_ctx->new_data_part->name);
std::erase(global_ctx->gathering_column_names, column_name);
std::erase(global_ctx->merging_column_names, column_name);
std::erase(global_ctx->all_column_names, column_name);
part_serialization_infos.erase(column_name);
++expired_columns;
}
}
Expand All @@ -343,6 +346,11 @@ bool MergeTask::ExecuteAndFinalizeHorizontalPart::prepare()
{
global_ctx->gathering_columns = global_ctx->gathering_columns.filter(global_ctx->gathering_column_names);
global_ctx->merging_columns = global_ctx->merging_columns.filter(global_ctx->merging_column_names);
global_ctx->storage_columns = global_ctx->storage_columns.filter(global_ctx->all_column_names);

global_ctx->new_data_part->setColumns(
global_ctx->storage_columns,
part_serialization_infos);
}
}

Expand Down
5 changes: 3 additions & 2 deletions src/Storages/MergeTree/checkDataPart.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -94,12 +94,13 @@ IMergeTreeDataPart::Checksums checkDataPart(
};

auto ratio_of_defaults = data_part->storage.getSettings()->ratio_of_defaults_for_sparse_serialization;
SerializationInfoByName serialization_infos(columns_txt, SerializationInfo::Settings{ratio_of_defaults, false});
SerializationInfoByName serialization_infos;

if (data_part_storage.exists(IMergeTreeDataPart::SERIALIZATION_FILE_NAME))
{
auto serialization_file = data_part_storage.readFile(IMergeTreeDataPart::SERIALIZATION_FILE_NAME, {}, std::nullopt, std::nullopt);
serialization_infos.readJSON(*serialization_file);
SerializationInfo::Settings settings{ratio_of_defaults, false};
serialization_infos = SerializationInfoByName::readJSON(columns_txt, settings, *serialization_file);
}

auto get_serialization = [&serialization_infos](const auto & column)
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
100000
100000
18 changes: 18 additions & 0 deletions tests/queries/0_stateless/02733_sparse_columns_reload.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
DROP TABLE IF EXISTS t_sparse_reload;

CREATE TABLE t_sparse_reload (id UInt64, v UInt64)
ENGINE = MergeTree ORDER BY id
SETTINGS ratio_of_defaults_for_sparse_serialization = 0.95;

INSERT INTO t_sparse_reload SELECT number, 0 FROM numbers(100000);

SELECT count() FROM t_sparse_reload WHERE NOT ignore(*);

ALTER TABLE t_sparse_reload MODIFY SETTING ratio_of_defaults_for_sparse_serialization = 1.0;

DETACH TABLE t_sparse_reload;
ATTACH TABLE t_sparse_reload;

SELECT count() FROM t_sparse_reload WHERE NOT ignore(*);

DROP TABLE t_sparse_reload;