Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
13 changes: 9 additions & 4 deletions velox/connectors/hive/HiveConnectorUtil.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -635,7 +635,8 @@ bool applyPartitionFilter(
const TypePtr& type,
const std::string& partitionValue,
bool isPartitionDateDaysSinceEpoch,
common::Filter* filter) {
common::Filter* filter,
bool asLocalTime) {
if (type->isDate()) {
int32_t result = 0;
// days_since_epoch partition values are integers in string format. Eg.
Expand Down Expand Up @@ -666,7 +667,9 @@ bool applyPartitionFilter(
auto result = util::fromTimestampString(
StringView(partitionValue), util::TimestampParseMode::kPrestoCast);
VELOX_CHECK(!result.hasError());
result.value().toGMT(Timestamp::defaultTimezone());
if (asLocalTime) {
result.value().toGMT(Timestamp::defaultTimezone());
}
return applyFilter(*filter, result.value());
}
case TypeKind::VARCHAR: {
Expand All @@ -687,7 +690,8 @@ bool testFilters(
const std::unordered_map<std::string, std::optional<std::string>>&
partitionKeys,
const std::unordered_map<std::string, std::shared_ptr<HiveColumnHandle>>&
partitionKeysHandle) {
partitionKeysHandle,
bool asLocalTime) {
const auto totalRows = reader->numberOfRows();
const auto& fileTypeWithId = reader->typeWithId();
const auto& rowType = reader->rowType();
Expand All @@ -708,7 +712,8 @@ bool testFilters(
handlesIter->second->dataType(),
iter->second.value(),
handlesIter->second->isPartitionDateValueDaysSinceEpoch(),
child->filter());
child->filter(),
asLocalTime);
}
// Column is missing, most likely due to schema evolution. Or it's a
// partition key but the partition value is NULL.
Expand Down
3 changes: 2 additions & 1 deletion velox/connectors/hive/HiveConnectorUtil.h
Original file line number Diff line number Diff line change
Expand Up @@ -92,7 +92,8 @@ bool testFilters(
const std::unordered_map<std::string, std::optional<std::string>>&
partitionKey,
const std::unordered_map<std::string, std::shared_ptr<HiveColumnHandle>>&
partitionKeysHandle);
partitionKeysHandle,
bool asLocalTime);

std::unique_ptr<dwio::common::BufferedInput> createBufferedInput(
const FileHandle& fileHandle,
Expand Down
4 changes: 3 additions & 1 deletion velox/connectors/hive/SplitReader.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -295,7 +295,9 @@ bool SplitReader::filterOnStats(
baseReader_.get(),
hiveSplit_->filePath,
hiveSplit_->partitionKeys,
*partitionKeys_)) {
*partitionKeys_,
hiveConfig_->readTimestampPartitionValueAsLocalTime(
connectorQueryCtx_->sessionProperties()))) {
return true;
}
++runtimeStats.skippedSplits;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@ PositionalDeleteFileReader::PositionalDeleteFileReader(
baseFilePath_(baseFilePath),
fileHandleFactory_(fileHandleFactory),
executor_(executor),
connectorQueryCtx_(connectorQueryCtx),
hiveConfig_(hiveConfig),
ioStats_(ioStats),
fsStats_(fsStats),
Expand Down Expand Up @@ -114,7 +115,9 @@ PositionalDeleteFileReader::PositionalDeleteFileReader(
deleteReader.get(),
deleteSplit_->filePath,
deleteSplit_->partitionKeys,
{})) {
{},
hiveConfig_->readTimestampPartitionValueAsLocalTime(
connectorQueryCtx_->sessionProperties()))) {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It would be nice if we could add test for this change on the iceberg read.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@kecookier are you planning to address this comment?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@rui-mo @majetideepak I am not familiar with Iceberg now, and I need some time to add this test case. This may be done in another PR. Perhaps this PR can be merged first?

// We only count the number of base splits skipped as skippedSplits runtime
// statistics in Velox. Skipped delta split is only counted as skipped
// bytes.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,7 @@ class PositionalDeleteFileReader {
const std::string& baseFilePath_;
FileHandleFactory* const fileHandleFactory_;
folly::Executor* const executor_;
const ConnectorQueryCtx* connectorQueryCtx_;
const std::shared_ptr<const HiveConfig> hiveConfig_;
const std::shared_ptr<io::IoStatistics> ioStats_;
const std::shared_ptr<filesystems::File::IoStats> fsStats_;
Expand Down
228 changes: 178 additions & 50 deletions velox/exec/tests/TableScanTest.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -202,19 +202,8 @@ class TableScanTest : public virtual HiveConnectorTestBase {
.planNode();

std::string partitionValueStr;
if (partitionType->isTimestamp() && partitionValue.has_value()) {
auto t = util::fromTimestampString(
StringView(*partitionValue),
util::TimestampParseMode::kPrestoCast)
.thenOrThrow(folly::identity, [&](const Status& status) {
VELOX_USER_FAIL("{}", status.message());
});
t.toGMT(Timestamp::defaultTimezone());
partitionValueStr = "'" + t.toString() + "'";
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm not entirely sure why this piece needs to be removed.

Copy link
Contributor Author

@kecookier kecookier Feb 18, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm not entirely sure why this piece needs to be removed.

This code was introduced by https://github.com/facebookincubator/velox/pull/11754/files#diff-c62590f29a2bf22e50e276f2665e0a7666968880de73a6b4db745cbc0cfde6feL202 and is only called by TableScanTest.partitionedTableTimestampKey. To clarify the code, I moved these logics into TableScanTest.partitionedTableTimestampKey from L2043 to L2085.

} else {
partitionValueStr =
partitionValue.has_value() ? "'" + *partitionValue + "'" : "null";
}
partitionValueStr =
partitionValue.has_value() ? "'" + *partitionValue + "'" : "null";
assertQuery(
op, split, fmt::format("SELECT {}, * FROM tmp", partitionValueStr));

Expand Down Expand Up @@ -2001,48 +1990,187 @@ TEST_F(TableScanTest, partitionedTableTimestampKey) {
writeToFile(filePath->getPath(), vectors);
createDuckDbTable(vectors);
const std::string partitionValue = "2023-10-27 00:12:35";
testPartitionedTable(filePath->getPath(), TIMESTAMP(), partitionValue);

// Test partition filter on TIMESTAMP column.
auto partitionType = TIMESTAMP();
// Test partition value is null.
testPartitionedTable(filePath->getPath(), partitionType, std::nullopt);

auto split = exec::test::HiveConnectorSplitBuilder(filePath->getPath())
.partitionKey("pkey", partitionValue)
.build();

ColumnHandleMap assignments = {
{"pkey", partitionKey("pkey", TIMESTAMP())},
{"c0", regularColumn("c0", BIGINT())},
{"c1", regularColumn("c1", DOUBLE())}};

Timestamp ts =
util::fromTimestampString(
StringView(partitionValue), util::TimestampParseMode::kPrestoCast)
.thenOrThrow(folly::identity, [&](const Status& status) {
VELOX_USER_FAIL("{}", status.message());
});
// Read timestamp partition value as UTC.
std::string tsValue = "'" + ts.toString() + "'";

Timestamp tsAsLocalTime = ts;
tsAsLocalTime.toGMT(Timestamp::defaultTimezone());
// Read timestamp partition value as local time.
std::string tsValueAsLocal = "'" + tsAsLocalTime.toString() + "'";

{
auto split = exec::test::HiveConnectorSplitBuilder(filePath->getPath())
.partitionKey("pkey", partitionValue)
.build();
auto outputType =
ROW({"pkey", "c0", "c1"}, {TIMESTAMP(), BIGINT(), DOUBLE()});
ColumnHandleMap assignments = {
{"pkey", partitionKey("pkey", TIMESTAMP())},
{"c0", regularColumn("c0", BIGINT())},
{"c1", regularColumn("c1", DOUBLE())}};
auto plan =
PlanBuilder()
.startTableScan()
.tableName("hive_table")
.outputType(
ROW({"pkey", "c0", "c1"}, {partitionType, BIGINT(), DOUBLE()}))
.assignments(assignments)
.endTableScan()
.planNode();

auto expect = [&](bool asLocalTime) {
AssertQueryBuilder(plan, duckDbQueryRunner_)
.connectorSessionProperty(
kHiveConnectorId,
connector::hive::HiveConfig::
kReadTimestampPartitionValueAsLocalTimeSession,
asLocalTime ? "true" : "false")
.splits({split})
.assertResults(fmt::format(
"SELECT {}, * FROM tmp", asLocalTime ? tsValueAsLocal : tsValue));
};

common::SubfieldFilters filters;
// pkey = 2023-10-27 00:12:35.
auto lower = util::fromTimestampString(
StringView("2023-10-27 00:12:35"),
util::TimestampParseMode::kPrestoCast)
.value();
lower.toGMT(Timestamp::defaultTimezone());
filters[common::Subfield("pkey")] =
std::make_unique<common::TimestampRange>(lower, lower, false);
expect(true);
expect(false);
}

auto tableHandle = std::make_shared<HiveTableHandle>(
"test-hive", "hive_table", true, std::move(filters), nullptr, nullptr);
auto op = std::make_shared<TableScanNode>(
"0",
std::move(outputType),
std::move(tableHandle),
std::move(assignments));
{
auto plan =
PlanBuilder()
.startTableScan()
.tableName("hive_table")
.outputType(
ROW({"c0", "pkey", "c1"}, {BIGINT(), partitionType, DOUBLE()}))
.assignments(assignments)
.endTableScan()
.planNode();

auto expect = [&](bool asLocalTime) {
AssertQueryBuilder(plan, duckDbQueryRunner_)
.connectorSessionProperty(
kHiveConnectorId,
connector::hive::HiveConfig::
kReadTimestampPartitionValueAsLocalTimeSession,
asLocalTime ? "true" : "false")
.splits({split})
.assertResults(fmt::format(
"SELECT c0, {}, c1 FROM tmp",
asLocalTime ? tsValueAsLocal : tsValue));
};
expect(true);
expect(false);
}

auto t =
util::fromTimestampString(
StringView(partitionValue), util::TimestampParseMode::kPrestoCast)
.thenOrThrow(folly::identity, [&](const Status& status) {
VELOX_USER_FAIL("{}", status.message());
});
t.toGMT(Timestamp::defaultTimezone());
std::string partitionValueStr = "'" + t.toString() + "'";
assertQuery(
op, split, fmt::format("SELECT {}, * FROM tmp", partitionValueStr));
{
auto plan =
PlanBuilder()
.startTableScan()
.tableName("hive_table")
.outputType(
ROW({"c0", "c1", "pkey"}, {BIGINT(), DOUBLE(), partitionType}))
.assignments(assignments)
.endTableScan()
.planNode();

auto expect = [&](bool asLocalTime) {
AssertQueryBuilder(plan, duckDbQueryRunner_)
.connectorSessionProperty(
kHiveConnectorId,
connector::hive::HiveConfig::
kReadTimestampPartitionValueAsLocalTimeSession,
asLocalTime ? "true" : "false")
.splits({split})
.assertResults(fmt::format(
"SELECT c0, c1, {} FROM tmp",
asLocalTime ? tsValueAsLocal : tsValue));
};
expect(true);
expect(false);
}

{
// Select only partition key.
auto plan =
PlanBuilder()
.startTableScan()
.tableName("hive_table")
.outputType(ROW({"pkey"}, {partitionType}))
.assignments({{"pkey", partitionKey("pkey", partitionType)}})
.endTableScan()
.planNode();

auto expect = [&](bool asLocalTime) {
AssertQueryBuilder(plan, duckDbQueryRunner_)
.connectorSessionProperty(
kHiveConnectorId,
connector::hive::HiveConfig::
kReadTimestampPartitionValueAsLocalTimeSession,
asLocalTime ? "true" : "false")
.splits({split})
.assertResults(fmt::format(
"SELECT {} FROM tmp", asLocalTime ? tsValueAsLocal : tsValue));
};
expect(true);
expect(false);
}

// Test partition filter on TIMESTAMP column.
{
auto planWithSubfilter = [&](bool asLocalTime) {
auto outputType =
ROW({"pkey", "c0", "c1"}, {TIMESTAMP(), BIGINT(), DOUBLE()});
common::SubfieldFilters filters;
// pkey = 2023-10-27 00:12:35.
auto lower =
util::fromTimestampString(
StringView(partitionValue), util::TimestampParseMode::kPrestoCast)
.value();
if (asLocalTime) {
lower.toGMT(Timestamp::defaultTimezone());
}
filters[common::Subfield("pkey")] =
std::make_unique<common::TimestampRange>(lower, lower, false);
auto tableHandle = std::make_shared<HiveTableHandle>(
"test-hive",
"hive_table",
true,
std::move(filters),
nullptr,
nullptr);

return PlanBuilder()
.startTableScan()
.tableHandle(tableHandle)
.outputType(outputType)
.assignments(assignments)
.endTableScan()
.planNode();
};

auto expect = [&](bool asLocalTime) {
AssertQueryBuilder(planWithSubfilter(asLocalTime), duckDbQueryRunner_)
.connectorSessionProperty(
kHiveConnectorId,
connector::hive::HiveConfig::
kReadTimestampPartitionValueAsLocalTimeSession,
asLocalTime ? "true" : "false")
.splits({split})
.assertResults(fmt::format(
"SELECT {}, * FROM tmp", asLocalTime ? tsValueAsLocal : tsValue));
};
expect(true);
expect(false);
}
}

Expand Down
Loading