Skip to content

Commit 60095a4

Browse files
csringhoferImpala Public Jenkins
authored andcommitted
IMPALA-5050: Add support to read TIMESTAMP_MILLIS and TIMESTAMP_MICROS from Parquet
Changes: - parquet.thrift is updated to a newer version which contains the timestamp logical type. - INT64 columns with converted types TIMESTAMP_MILLIS and TIMESTAMP_MICROS can be read as TIMESTAMP. - If the logical type is timestamp, then the type will contain the information whether the UTC->local conversion is necessary. This feature is only supported for the new timestamp types, so INT96 timestamps must still use flag convert_legacy_hive_parquet_utc_timestamps. - Min/max stat filtering is enabled again for columns that need UTC->local conversion. This was disabled in IMPALA-7559 because it could incorrectly drop column chunks. - CREATE TABLE LIKE PARQUET converts these columns to TIMESTAMP - before the change, an error was returned instead. - Bulk of the Parquet column stat logic was moved to a new class called "ColumnStatsReader". Testing: - Added unit tests for timezone conversion (this needed a new public function in timezone_db.h and adding CET to tzdb_tiny). - Added parquet files (created with parquet-mr) with int64 timestamp columns. Change-Id: I4c7c01fffa31b3d2ca3480adf6ff851137dadac3 Reviewed-on: http://gerrit.cloudera.org:8080/11057 Reviewed-by: Impala Public Jenkins <impala-public-jenkins@cloudera.com> Tested-by: Impala Public Jenkins <impala-public-jenkins@cloudera.com>
1 parent c75b371 commit 60095a4

24 files changed

Lines changed: 927 additions & 209 deletions

be/src/exec/hdfs-parquet-scanner.cc

Lines changed: 22 additions & 27 deletions
Original file line numberDiff line numberDiff line change
@@ -453,21 +453,6 @@ Status HdfsParquetScanner::EvaluateStatsConjuncts(
453453

454454
if (!state_->query_options().parquet_read_statistics) return Status::OK();
455455

456-
// IMPALA-7559: if the values are converted from UTC to local time, then either the
457-
// stats need to be converted from UTC to local, or the predicate's min/max values
458-
// need to be converted from local to UTC. Doing this correctly is quite complex if
459-
// the timestamps fall into timezone rules changes (DST change or historical rule
460-
// change), so currently stat filtering is simply disabled for these columns.
461-
//
462-
// Note that parquet-mr only writes stats if min and max are equal, because it cannot
463-
// order timestamps correctly, so the only case affected here is when every value is
464-
// the same in the column chunk.
465-
// TODO: This topic needs more investigation related to IMPALA-5050, which will add
466-
// support for INT64 millisec/microsec timestamp columns, and also a metadata field
467-
// whether utc->local conversion is necessary. I am not sure how parquet-mr handles
468-
// stats for these types at the moment.
469-
bool disable_min_max_filter_for_timestamps = IsTimezoneConversionNeededForTimestamps();
470-
471456
const TupleDescriptor* min_max_tuple_desc = scan_node_->min_max_tuple_desc();
472457
if (!min_max_tuple_desc) return Status::OK();
473458

@@ -516,30 +501,36 @@ Status HdfsParquetScanner::EvaluateStatsConjuncts(
516501
const parquet::ColumnChunk& col_chunk = row_group.columns[col_idx];
517502
const ColumnType& col_type = slot_desc->type();
518503

504+
DCHECK(node->element != nullptr);
505+
506+
ColumnStatsReader stat_reader(col_chunk, col_type, col_order, *node->element);
507+
if (col_type.IsTimestampType()) {
508+
stat_reader.SetTimestampDecoder(CreateTimestampDecoder(*node->element));
509+
}
510+
519511
int64_t null_count = 0;
520-
bool null_count_result = ColumnStatsBase::ReadNullCountStat(col_chunk, &null_count);
512+
bool null_count_result = stat_reader.ReadNullCountStat(&null_count);
521513
if (null_count_result && null_count == col_chunk.meta_data.num_values) {
522514
*skip_row_group = true;
523515
break;
524516
}
525517

526-
if (col_type.IsTimestampType() && disable_min_max_filter_for_timestamps) continue;
527-
528-
bool stats_read = false;
529-
void* slot = min_max_tuple_->GetSlot(slot_desc->tuple_offset());
530518
const string& fn_name = eval->root().function_name();
519+
ColumnStatsReader::StatsField stats_field;
531520
if (fn_name == "lt" || fn_name == "le") {
532521
// We need to get min stats.
533-
stats_read = ColumnStatsBase::ReadFromThrift(
534-
col_chunk, col_type, col_order, ColumnStatsBase::StatsField::MIN, slot);
522+
stats_field = ColumnStatsReader::StatsField::MIN;
535523
} else if (fn_name == "gt" || fn_name == "ge") {
536524
// We need to get max stats.
537-
stats_read = ColumnStatsBase::ReadFromThrift(
538-
col_chunk, col_type, col_order, ColumnStatsBase::StatsField::MAX, slot);
525+
stats_field = ColumnStatsReader::StatsField::MAX;
539526
} else {
540527
DCHECK(false) << "Unsupported function name for statistics evaluation: " << fn_name;
528+
continue;
541529
}
542530

531+
void* slot = min_max_tuple_->GetSlot(slot_desc->tuple_offset());
532+
bool stats_read = stat_reader.ReadFromThrift(stats_field, slot);
533+
543534
if (stats_read) {
544535
TupleRow row;
545536
row.SetTuple(0, min_max_tuple_);
@@ -1677,9 +1668,13 @@ Status HdfsParquetScanner::ValidateEndOfRowGroup(
16771668
return Status::OK();
16781669
}
16791670

1680-
bool HdfsParquetScanner::IsTimezoneConversionNeededForTimestamps() {
1681-
return FLAGS_convert_legacy_hive_parquet_utc_timestamps &&
1671+
ParquetTimestampDecoder HdfsParquetScanner::CreateTimestampDecoder(
1672+
const parquet::SchemaElement& element) {
1673+
bool timestamp_conversion_needed_for_int96_timestamps =
1674+
FLAGS_convert_legacy_hive_parquet_utc_timestamps &&
16821675
file_version_.application == "parquet-mr";
1683-
}
16841676

1677+
return ParquetTimestampDecoder(element, &state_->local_time_zone(),
1678+
timestamp_conversion_needed_for_int96_timestamps);
1679+
}
16851680
}

be/src/exec/hdfs-parquet-scanner.h

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -343,9 +343,9 @@ class HdfsParquetScanner : public HdfsScanner {
343343
llvm::Function** process_scratch_batch_fn)
344344
WARN_UNUSED_RESULT;
345345

346-
/// Returns true if the timestamps are expected to be in UTC and need to be
347-
/// converted to local time.
348-
bool IsTimezoneConversionNeededForTimestamps();
346+
/// Initializes a ParquetTimestampDecoder depending on writer, timezone, and the schema
347+
/// of the column.
348+
ParquetTimestampDecoder CreateTimestampDecoder(const parquet::SchemaElement& element);
349349

350350
/// The rep and def levels are set to this value to indicate the end of a row group.
351351
static const int16_t ROW_GROUP_END = numeric_limits<int16_t>::min();

be/src/exec/parquet-column-readers.cc

Lines changed: 108 additions & 42 deletions
Original file line numberDiff line numberDiff line change
@@ -372,16 +372,15 @@ class ScalarColumnReader : public BaseScalarColumnReader {
372372
/// the max length for VARCHAR columns. Unused otherwise.
373373
int fixed_len_size_;
374374

375-
/// Query-global timezone used as local timezone when executing the query.
376-
const Timezone& local_time_zone_;
375+
/// Contains extra data needed for Timestamp decoding.
376+
ParquetTimestampDecoder timestamp_decoder_;
377377
};
378378

379379
template <typename InternalType, parquet::Type::type PARQUET_TYPE, bool MATERIALIZED>
380380
ScalarColumnReader<InternalType, PARQUET_TYPE, MATERIALIZED>::ScalarColumnReader(
381381
HdfsParquetScanner* parent, const SchemaNode& node, const SlotDescriptor* slot_desc)
382382
: BaseScalarColumnReader(parent, node, slot_desc),
383-
dict_decoder_(parent->scan_node_->mem_tracker()),
384-
local_time_zone_(parent->state_->local_time_zone()) {
383+
dict_decoder_(parent->scan_node_->mem_tracker()) {
385384
if (!MATERIALIZED) {
386385
// We're not materializing any values, just counting them. No need (or ability) to
387386
// initialize state used to materialize values.
@@ -399,9 +398,14 @@ ScalarColumnReader<InternalType, PARQUET_TYPE, MATERIALIZED>::ScalarColumnReader
399398
} else {
400399
fixed_len_size_ = -1;
401400
}
402-
needs_conversion_ = slot_desc_->type().type == TYPE_CHAR ||
403-
(slot_desc_->type().type == TYPE_TIMESTAMP &&
404-
parent->IsTimezoneConversionNeededForTimestamps());
401+
402+
needs_conversion_ = slot_desc_->type().type == TYPE_CHAR;
403+
404+
if (slot_desc_->type().type == TYPE_TIMESTAMP) {
405+
timestamp_decoder_ = parent->CreateTimestampDecoder(*node.element);
406+
dict_decoder_.SetTimestampHelper(timestamp_decoder_);
407+
needs_conversion_ = timestamp_decoder_.NeedsConversion();
408+
}
405409
}
406410

407411
template <typename InternalType, parquet::Type::type PARQUET_TYPE, bool MATERIALIZED>
@@ -641,6 +645,30 @@ bool ScalarColumnReader<InternalType, PARQUET_TYPE, MATERIALIZED>::DecodeValue(
641645
return true;
642646
}
643647

648+
template <>
649+
template <Encoding::type ENCODING>
650+
bool ScalarColumnReader<TimestampValue, parquet::Type::INT64, true>::DecodeValue(
651+
uint8_t** RESTRICT data, const uint8_t* RESTRICT data_end,
652+
TimestampValue* RESTRICT val) RESTRICT {
653+
DCHECK_EQ(page_encoding_, ENCODING);
654+
if (ENCODING == Encoding::PLAIN_DICTIONARY) {
655+
if (UNLIKELY(!dict_decoder_.GetNextValue(val))) {
656+
SetDictDecodeError();
657+
return false;
658+
}
659+
} else {
660+
DCHECK_EQ(ENCODING, Encoding::PLAIN);
661+
int encoded_len =
662+
timestamp_decoder_.Decode<parquet::Type::INT64>(*data, data_end, val);
663+
if (UNLIKELY(encoded_len < 0)) {
664+
SetPlainDecodeError();
665+
return false;
666+
}
667+
*data += encoded_len;
668+
}
669+
return true;
670+
}
671+
644672
template <typename InternalType, parquet::Type::type PARQUET_TYPE, bool MATERIALIZED>
645673
void ScalarColumnReader<InternalType, PARQUET_TYPE, MATERIALIZED>
646674
::ReadPositionBatched(int16_t rep_level, int64_t* pos) {
@@ -674,14 +702,31 @@ ::NeedsConversionInline() const {
674702
return needs_conversion_;
675703
}
676704

705+
template <>
706+
inline bool ScalarColumnReader<TimestampValue, parquet::Type::INT64, true>
707+
::NeedsConversionInline() const {
708+
return needs_conversion_;
709+
}
710+
677711
template <>
678712
bool ScalarColumnReader<TimestampValue, parquet::Type::INT96, true>::ConvertSlot(
679713
const TimestampValue* src, void* slot) {
680714
// Conversion should only happen when this flag is enabled.
681715
DCHECK(FLAGS_convert_legacy_hive_parquet_utc_timestamps);
716+
DCHECK(timestamp_decoder_.NeedsConversion());
682717
TimestampValue* dst_ts = reinterpret_cast<TimestampValue*>(slot);
683718
*dst_ts = *src;
684-
if (dst_ts->HasDateAndTime()) dst_ts->UtcToLocal(local_time_zone_);
719+
timestamp_decoder_.ConvertToLocalTime(dst_ts);
720+
return true;
721+
}
722+
723+
template <>
724+
bool ScalarColumnReader<TimestampValue, parquet::Type::INT64, true>::ConvertSlot(
725+
const TimestampValue* src, void* slot) {
726+
DCHECK(timestamp_decoder_.NeedsConversion());
727+
TimestampValue* dst_ts = reinterpret_cast<TimestampValue*>(slot);
728+
*dst_ts = *src;
729+
timestamp_decoder_.ConvertToLocalTime(static_cast<TimestampValue*>(dst_ts));
685730
return true;
686731
}
687732

@@ -691,6 +736,12 @@ ::NeedsValidationInline() const {
691736
return true;
692737
}
693738

739+
template <>
740+
inline bool ScalarColumnReader<TimestampValue, parquet::Type::INT64, true>
741+
::NeedsValidationInline() const {
742+
return true;
743+
}
744+
694745
template <>
695746
bool ScalarColumnReader<TimestampValue, parquet::Type::INT96, true>::ValidateValue(
696747
TimestampValue* val) const {
@@ -711,6 +762,23 @@ bool ScalarColumnReader<TimestampValue, parquet::Type::INT96, true>::ValidateVal
711762
return true;
712763
}
713764

765+
template <>
766+
bool ScalarColumnReader<TimestampValue, parquet::Type::INT64, true>::ValidateValue(
767+
TimestampValue* val) const {
768+
// The range was already checked during the int64_t->TimestampValue conversion, which
769+
// sets the date to invalid if it was out of range.
770+
if (UNLIKELY(!val->HasDate())) {
771+
ErrorMsg msg(TErrorCode::PARQUET_TIMESTAMP_OUT_OF_RANGE,
772+
filename(), node_.element->name);
773+
Status status = parent_->state_->LogOrReturnError(msg);
774+
if (!status.ok()) parent_->parse_status_ = status;
775+
return false;
776+
}
777+
DCHECK(TimestampValue::IsValidDate(val->date()));
778+
DCHECK(TimestampValue::IsValidTime(val->time()));
779+
return true;
780+
}
781+
714782
class BoolColumnReader : public BaseScalarColumnReader {
715783
public:
716784
BoolColumnReader(HdfsParquetScanner* parent, const SchemaNode& node,
@@ -1507,7 +1575,7 @@ void CollectionColumnReader::UpdateDerivedState() {
15071575
}
15081576

15091577
/// Returns a column reader for decimal types based on its size and parquet type.
1510-
static ParquetColumnReader* GetDecimalColumnReader(const SchemaNode& node,
1578+
static ParquetColumnReader* CreateDecimalColumnReader(const SchemaNode& node,
15111579
const SlotDescriptor* slot_desc, HdfsParquetScanner* parent) {
15121580
switch (node.element->type) {
15131581
case parquet::Type::FIXED_LEN_BYTE_ARRAY:
@@ -1554,84 +1622,82 @@ static ParquetColumnReader* GetDecimalColumnReader(const SchemaNode& node,
15541622
ParquetColumnReader* ParquetColumnReader::Create(const SchemaNode& node,
15551623
bool is_collection_field, const SlotDescriptor* slot_desc,
15561624
HdfsParquetScanner* parent) {
1557-
ParquetColumnReader* reader = nullptr;
15581625
if (is_collection_field) {
15591626
// Create collection reader (note this handles both NULL and non-NULL 'slot_desc')
1560-
reader = new CollectionColumnReader(parent, node, slot_desc);
1627+
return new CollectionColumnReader(parent, node, slot_desc);
15611628
} else if (slot_desc != nullptr) {
15621629
// Create the appropriate ScalarColumnReader type to read values into 'slot_desc'
15631630
switch (slot_desc->type().type) {
15641631
case TYPE_BOOLEAN:
1565-
reader = new BoolColumnReader(parent, node, slot_desc);
1566-
break;
1632+
return new BoolColumnReader(parent, node, slot_desc);
15671633
case TYPE_TINYINT:
1568-
reader = new ScalarColumnReader<int8_t, parquet::Type::INT32, true>(parent, node,
1634+
return new ScalarColumnReader<int8_t, parquet::Type::INT32, true>(parent, node,
15691635
slot_desc);
1570-
break;
15711636
case TYPE_SMALLINT:
1572-
reader = new ScalarColumnReader<int16_t, parquet::Type::INT32, true>(parent, node,
1637+
return new ScalarColumnReader<int16_t, parquet::Type::INT32, true>(parent, node,
15731638
slot_desc);
1574-
break;
15751639
case TYPE_INT:
1576-
reader = new ScalarColumnReader<int32_t, parquet::Type::INT32, true>(parent, node,
1640+
return new ScalarColumnReader<int32_t, parquet::Type::INT32, true>(parent, node,
15771641
slot_desc);
1578-
break;
15791642
case TYPE_BIGINT:
15801643
switch (node.element->type) {
15811644
case parquet::Type::INT32:
1582-
reader = new ScalarColumnReader<int64_t, parquet::Type::INT32, true>(parent,
1645+
return new ScalarColumnReader<int64_t, parquet::Type::INT32, true>(parent,
15831646
node, slot_desc);
1584-
break;
15851647
default:
1586-
reader = new ScalarColumnReader<int64_t, parquet::Type::INT64, true>(parent,
1648+
return new ScalarColumnReader<int64_t, parquet::Type::INT64, true>(parent,
15871649
node, slot_desc);
1588-
break;
15891650
}
1590-
break;
15911651
case TYPE_FLOAT:
1592-
reader = new ScalarColumnReader<float, parquet::Type::FLOAT, true>(parent, node,
1652+
return new ScalarColumnReader<float, parquet::Type::FLOAT, true>(parent, node,
15931653
slot_desc);
1594-
break;
15951654
case TYPE_DOUBLE:
15961655
switch (node.element->type) {
15971656
case parquet::Type::INT32:
1598-
reader = new ScalarColumnReader<double , parquet::Type::INT32, true>(parent,
1657+
return new ScalarColumnReader<double , parquet::Type::INT32, true>(parent,
15991658
node, slot_desc);
1600-
break;
16011659
case parquet::Type::FLOAT:
1602-
reader = new ScalarColumnReader<double, parquet::Type::FLOAT, true>(parent,
1660+
return new ScalarColumnReader<double, parquet::Type::FLOAT, true>(parent,
16031661
node, slot_desc);
1604-
break;
16051662
default:
1606-
reader = new ScalarColumnReader<double, parquet::Type::DOUBLE, true>(parent,
1663+
return new ScalarColumnReader<double, parquet::Type::DOUBLE, true>(parent,
16071664
node, slot_desc);
1608-
break;
16091665
}
1610-
break;
16111666
case TYPE_TIMESTAMP:
1612-
reader = new ScalarColumnReader<TimestampValue, parquet::Type::INT96, true>(
1613-
parent, node, slot_desc);
1614-
break;
1667+
return CreateTimestampColumnReader(node, slot_desc, parent);
16151668
case TYPE_STRING:
16161669
case TYPE_VARCHAR:
16171670
case TYPE_CHAR:
1618-
reader = new ScalarColumnReader<StringValue, parquet::Type::BYTE_ARRAY, true>(
1671+
return new ScalarColumnReader<StringValue, parquet::Type::BYTE_ARRAY, true>(
16191672
parent, node, slot_desc);
1620-
break;
16211673
case TYPE_DECIMAL:
1622-
reader = GetDecimalColumnReader(node, slot_desc, parent);
1623-
break;
1674+
return CreateDecimalColumnReader(node, slot_desc, parent);
16241675
default:
16251676
DCHECK(false) << slot_desc->type().DebugString();
1677+
return nullptr;
16261678
}
16271679
} else {
16281680
// Special case for counting scalar values (e.g. count(*), no materialized columns in
16291681
// the file, only materializing a position slot). We won't actually read any values,
16301682
// only the rep and def levels, so it doesn't matter what kind of reader we make.
1631-
reader = new ScalarColumnReader<int8_t, parquet::Type::INT32, false>(parent, node,
1683+
return new ScalarColumnReader<int8_t, parquet::Type::INT32, false>(parent, node,
16321684
slot_desc);
16331685
}
1634-
return parent->obj_pool_.Add(reader);
1686+
}
1687+
1688+
ParquetColumnReader* ParquetColumnReader::CreateTimestampColumnReader(
1689+
const SchemaNode& node, const SlotDescriptor* slot_desc,
1690+
HdfsParquetScanner* parent) {
1691+
if (node.element->type == parquet::Type::INT96) {
1692+
return new ScalarColumnReader<TimestampValue, parquet::Type::INT96, true>(
1693+
parent, node, slot_desc);
1694+
}
1695+
else if (node.element->type == parquet::Type::INT64) {
1696+
return new ScalarColumnReader<TimestampValue, parquet::Type::INT64, true>(
1697+
parent, node, slot_desc);
1698+
}
1699+
DCHECK(false) << slot_desc->type().DebugString();
1700+
return nullptr;
16351701
}
16361702

16371703
}

be/src/exec/parquet-column-readers.h

Lines changed: 9 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -132,10 +132,12 @@ class ParquetLevelDecoder {
132132
/// level pair at a time. The current def and rep level are exposed to the user, and the
133133
/// corresponding value (if defined) can optionally be copied into a slot via
134134
/// ReadValue(). Can also write position slots.
135+
///
136+
/// The constructor adds the object to the obj_pool of the parent HdfsParquetScanner.
135137
class ParquetColumnReader {
136138
public:
137139
/// Creates a column reader for 'node' and associates it with the given parent scanner.
138-
/// Adds the new column reader to the parent's object pool.
140+
/// The constructor of column readers add the new object to the parent's object pool.
139141
/// 'slot_desc' may be NULL, in which case the returned column reader can only be used
140142
/// to read def/rep levels.
141143
/// 'is_collection_field' should be set to true if the returned reader is reading a
@@ -155,6 +157,9 @@ class ParquetColumnReader {
155157
static ParquetColumnReader* Create(const SchemaNode& node, bool is_collection_field,
156158
const SlotDescriptor* slot_desc, HdfsParquetScanner* parent);
157159

160+
static ParquetColumnReader* CreateTimestampColumnReader(const SchemaNode& node,
161+
const SlotDescriptor* slot_desc, HdfsParquetScanner* parent);
162+
158163
virtual ~ParquetColumnReader() { }
159164

160165
int def_level() const { return def_level_; }
@@ -303,6 +308,9 @@ class ParquetColumnReader {
303308
tuple_offset_(slot_desc == NULL ? -1 : slot_desc->tuple_offset()),
304309
null_indicator_offset_(slot_desc == NULL ? NullIndicatorOffset() :
305310
slot_desc->null_indicator_offset()) {
311+
DCHECK(parent != nullptr);
312+
parent->obj_pool_.Add(this);
313+
306314
DCHECK_GE(node_.max_rep_level, 0);
307315
DCHECK_LE(node_.max_rep_level, std::numeric_limits<int16_t>::max());
308316
DCHECK_GE(node_.max_def_level, 0);

0 commit comments

Comments
 (0)