Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions src/Common/CurrentMetrics.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
M(Merge, "Number of executing background merges") \
M(MergeParts, "Number of source parts participating in current background merges") \
M(Move, "Number of currently executing moves") \
M(Export, "Number of currently executing exports") \
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

i would probably add few counters to profile events smth like
PartsExports (counter of successfull exports)
PartsExportFailures (counter of exports with exception)
PartsExportDuplicated (when target exists)
PartsExportBytes (sum of the bytes written by exports)
PartsExportTotalMilliseconds (overall time of exports)

(Names - if you will have better ideas - feel free :) )

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Added these except for PartsExportBytes because I still don't have the writing stats integrated.

M(PartMutation, "Number of mutations (ALTER DELETE/UPDATE)") \
M(ReplicatedFetch, "Number of data parts being fetched from replica") \
M(ReplicatedSend, "Number of data parts being sent to replicas") \
Expand Down
4 changes: 4 additions & 0 deletions src/Common/ProfileEvents.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,10 @@
M(AsyncInsertBytes, "Data size in bytes of asynchronous INSERT queries.", ValueType::Bytes) \
M(AsyncInsertRows, "Number of rows inserted by asynchronous INSERT queries.", ValueType::Number) \
M(AsyncInsertCacheHits, "Number of times a duplicate hash id has been found in asynchronous INSERT hash id cache.", ValueType::Number) \
M(PartsExports, "Number of successful part exports.", ValueType::Number) \
M(PartsExportFailures, "Number of failed part exports.", ValueType::Number) \
M(PartsExportDuplicated, "Number of part exports that failed because target already exists.", ValueType::Number) \
M(PartsExportTotalMilliseconds, "Total time spent on part export operations.", ValueType::Milliseconds) \
M(FailedQuery, "Number of failed queries.", ValueType::Number) \
M(FailedSelectQuery, "Same as FailedQuery, but only for SELECT queries.", ValueType::Number) \
M(FailedInsertQuery, "Same as FailedQuery, but only for INSERT queries.", ValueType::Number) \
Expand Down
3 changes: 3 additions & 0 deletions src/Core/Settings.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -6703,6 +6703,9 @@ Allows to change the behaviour of the result type of `dateTrunc` function.
Possible values:
- 0 - When the second argument is `DateTime64/Date32` the return type will be `DateTime64/Date32` regardless of the time unit in the first argument.
- 1 - For `Date32` the result is always `Date`. For `DateTime64` the result is `DateTime` for time units `second` and higher.
)", 0) \
DECLARE(Bool, export_merge_tree_part_overwrite_file_if_exists, false, R"(
Overwrite file if it already exists when exporting a merge tree part
)", 0) \
\
/* ####################################################### */ \
Expand Down
1 change: 1 addition & 0 deletions src/Core/SettingsChangesHistory.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -79,6 +79,7 @@ const VersionToSettingsChangesMap & getSettingsChangesHistory()
{"object_storage_cluster_join_mode", "allow", "allow", "New setting"},
{"object_storage_remote_initiator", false, false, "New setting."},
{"allow_experimental_export_merge_tree_part", false, false, "New setting."},
{"export_merge_tree_part_overwrite_file_if_exists", false, false, "New setting."},
});
addSettingsChanges(settings_changes_history, "25.6",
{
Expand Down
4 changes: 4 additions & 0 deletions src/Interpreters/Context.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@
#include <Storages/MarkCache.h>
#include <Storages/MergeTree/MergeList.h>
#include <Storages/MergeTree/MovesList.h>
#include <Storages/MergeTree/ExportList.h>
#include <Storages/MergeTree/ReplicatedFetchList.h>
#include <Storages/MergeTree/MergeTreeData.h>
#include <Storages/MergeTree/MergeTreeSettings.h>
Expand Down Expand Up @@ -465,6 +466,7 @@ struct ContextSharedPart : boost::noncopyable
GlobalOvercommitTracker global_overcommit_tracker;
MergeList merge_list; /// The list of executable merge (for (Replicated)?MergeTree)
MovesList moves_list; /// The list of executing moves (for (Replicated)?MergeTree)
ExportsList exports_list; /// The list of executing exports (for (Replicated)?MergeTree)
ReplicatedFetchList replicated_fetch_list;
RefreshSet refresh_set; /// The list of active refreshes (for MaterializedView)
ConfigurationPtr users_config TSA_GUARDED_BY(mutex); /// Config with the users, profiles and quotas sections.
Expand Down Expand Up @@ -1158,6 +1160,8 @@ MergeList & Context::getMergeList() { return shared->merge_list; }
const MergeList & Context::getMergeList() const { return shared->merge_list; }
MovesList & Context::getMovesList() { return shared->moves_list; }
const MovesList & Context::getMovesList() const { return shared->moves_list; }
ExportsList & Context::getExportsList() { return shared->exports_list; }
const ExportsList & Context::getExportsList() const { return shared->exports_list; }
ReplicatedFetchList & Context::getReplicatedFetchList() { return shared->replicated_fetch_list; }
const ReplicatedFetchList & Context::getReplicatedFetchList() const { return shared->replicated_fetch_list; }
RefreshSet & Context::getRefreshSet() { return shared->refresh_set; }
Expand Down
4 changes: 4 additions & 0 deletions src/Interpreters/Context.h
Original file line number Diff line number Diff line change
Expand Up @@ -88,6 +88,7 @@ class AsynchronousMetrics;
class BackgroundSchedulePool;
class MergeList;
class MovesList;
class ExportsList;
class ReplicatedFetchList;
class RefreshSet;
class Cluster;
Expand Down Expand Up @@ -1141,6 +1142,9 @@ class Context: public ContextData, public std::enable_shared_from_this<Context>
MovesList & getMovesList();
const MovesList & getMovesList() const;

ExportsList & getExportsList();
const ExportsList & getExportsList() const;

ReplicatedFetchList & getReplicatedFetchList();
const ReplicatedFetchList & getReplicatedFetchList() const;

Expand Down
26 changes: 11 additions & 15 deletions src/Storages/IStorage.h
Original file line number Diff line number Diff line change
Expand Up @@ -444,24 +444,20 @@ class IStorage : public std::enable_shared_from_this<IStorage>, public TypePromo
return false;
}

struct ImportStats
{
ExecutionStatus status;
std::size_t elapsed_ns = 0;
std::size_t bytes_on_disk = 0;
std::size_t read_rows = 0;
std::size_t read_bytes = 0;
std::string file_path = "";
};

/*
It is currently only implemented in StorageObjectStorage.
It is meant to be used to import merge tree data parts into object storage. It is similar to the write API,
but it won't re-partition the data and should allow the filename to be set by the caller.
*/
virtual SinkToStoragePtr import(
const std::string & /* file_name */,
Block & /* block_with_partition_values */,
ContextPtr /* context */,
std::function<void(ImportStats)> /* stats_log */)
{
throw Exception(ErrorCodes::NOT_IMPLEMENTED, "Import is not implemented for storage {}", getName());
}
std::string & /* destination_file_path */,
bool /* overwrite_if_exists */,
ContextPtr /* context */)
{
throw Exception(ErrorCodes::NOT_IMPLEMENTED, "Import is not implemented for storage {}", getName());
}


/** Writes the data to a table in distributed manner.
Expand Down
66 changes: 66 additions & 0 deletions src/Storages/MergeTree/ExportList.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,66 @@
#include <Storages/MergeTree/ExportList.h>

namespace DB
{

ExportsListElement::ExportsListElement(
const StorageID & source_table_id_,
const StorageID & destination_table_id_,
UInt64 part_size_,
const String & part_name_,
const String & target_file_name_,
UInt64 total_rows_to_read_,
UInt64 total_size_bytes_compressed_,
UInt64 total_size_bytes_uncompressed_,
time_t create_time_,
const ContextPtr & context)
: source_table_id(source_table_id_)
, destination_table_id(destination_table_id_)
, part_size(part_size_)
, part_name(part_name_)
, destination_file_path(target_file_name_)
, total_rows_to_read(total_rows_to_read_)
, total_size_bytes_compressed(total_size_bytes_compressed_)
, total_size_bytes_uncompressed(total_size_bytes_uncompressed_)
, create_time(create_time_)
{
thread_group = ThreadGroup::createForBackgroundProcess(context);
}

ExportsListElement::~ExportsListElement()
{
background_memory_tracker.adjustOnBackgroundTaskEnd(&thread_group->memory_tracker);
}

ExportInfo ExportsListElement::getInfo() const
{
ExportInfo res;
res.source_database = source_table_id.database_name;
res.source_table = source_table_id.table_name;
res.destination_database = destination_table_id.database_name;
res.destination_table = destination_table_id.table_name;
res.part_name = part_name;
res.destination_file_path = destination_file_path;
res.rows_read = rows_read;
res.total_rows_to_read = total_rows_to_read;
res.total_size_bytes_compressed = total_size_bytes_compressed;
res.total_size_bytes_uncompressed = total_size_bytes_uncompressed;
res.bytes_read_uncompressed = bytes_read_uncompressed;
res.memory_usage = getMemoryUsage();
res.peak_memory_usage = getPeakMemoryUsage();
res.create_time = create_time;
res.elapsed = elapsed;
return res;
}

UInt64 ExportsListElement::getMemoryUsage() const
{
return thread_group->memory_tracker.get();
}

UInt64 ExportsListElement::getPeakMemoryUsage() const
{
return thread_group->memory_tracker.getPeak();
}

}
90 changes: 90 additions & 0 deletions src/Storages/MergeTree/ExportList.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,90 @@
#pragma once

#include <Storages/MergeTree/BackgroundProcessList.h>
#include <Interpreters/StorageID.h>
#include <Common/Stopwatch.h>
#include <Common/CurrentMetrics.h>
#include <Common/ThreadStatus.h>
#include <Poco/URI.h>
#include <boost/noncopyable.hpp>

namespace CurrentMetrics
{
extern const Metric Export;
}

namespace DB
{

struct ExportInfo
{
String source_database;
String source_table;
String destination_database;
String destination_table;
String part_name;
String destination_file_path;
UInt64 rows_read;
UInt64 total_rows_to_read;
UInt64 total_size_bytes_compressed;
UInt64 total_size_bytes_uncompressed;
UInt64 bytes_read_uncompressed;
UInt64 memory_usage;
UInt64 peak_memory_usage;
time_t create_time = 0;
Float64 elapsed;
};

struct ExportsListElement : private boost::noncopyable
{
const StorageID source_table_id;
const StorageID destination_table_id;
const UInt64 part_size;
const String part_name;
const String destination_file_path;
UInt64 rows_read {0};
UInt64 total_rows_to_read {0};
UInt64 total_size_bytes_compressed {0};
UInt64 total_size_bytes_uncompressed {0};
UInt64 bytes_read_uncompressed {0};
time_t create_time {0};
Float64 elapsed {0};

Stopwatch watch;
ThreadGroupPtr thread_group;

ExportsListElement(
const StorageID & source_table_id_,
const StorageID & destination_table_id_,
UInt64 part_size_,
const String & part_name_,
const String & destination_file_path_,
UInt64 total_rows_to_read_,
UInt64 total_size_bytes_compressed_,
UInt64 total_size_bytes_uncompressed_,
time_t create_time_,
const ContextPtr & context);

~ExportsListElement();

ExportInfo getInfo() const;

UInt64 getMemoryUsage() const;
UInt64 getPeakMemoryUsage() const;
};


class ExportsList final : public BackgroundProcessList<ExportsListElement, ExportInfo>
{
private:
using Parent = BackgroundProcessList<ExportsListElement, ExportInfo>;

public:
ExportsList()
: Parent(CurrentMetrics::Export)
{}
};

using ExportsListEntry = BackgroundProcessListEntry<ExportsListElement, ExportInfo>;

}
Loading
Loading