Skip to content

Commit c6d86b2

Browse files
authored
Merge branch 'main' into row_group_limit_pruning
2 parents 63cd878 + 5258352 commit c6d86b2

File tree

5 files changed

+183
-71
lines changed

5 files changed

+183
-71
lines changed

.github/workflows/audit.yml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -42,7 +42,7 @@ jobs:
4242
steps:
4343
- uses: actions/checkout@1af3b93b6815bc44a9784bd300feb67ff0d1eeb3 # v6.0.0
4444
- name: Install cargo-audit
45-
uses: taiki-e/install-action@763e3324d4fd026c9bd284c504378585777a87d5 # v2.62.57
45+
uses: taiki-e/install-action@1ee706eb04986370fc60419ba172594c51067f29 # v2.62.58
4646
with:
4747
tool: cargo-audit
4848
- name: Run audit check

.github/workflows/rust.yml

Lines changed: 10 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -55,7 +55,7 @@ jobs:
5555
with:
5656
rust-version: stable
5757
- name: Rust Dependency Cache
58-
uses: Swatinem/rust-cache@f13886b937689c021905a6b90929199931d60db1 # v2.8.1
58+
uses: Swatinem/rust-cache@779680da715d629ac1d338a641029a2f4372abb5 # v2.8.2
5959
with:
6060
shared-key: "amd-ci-check" # this job uses it's own cache becase check has a separate cache and we need it to be fast as it blocks other jobs
6161
save-if: ${{ github.ref_name == 'main' }}
@@ -108,7 +108,7 @@ jobs:
108108
with:
109109
rust-version: stable
110110
- name: Rust Dependency Cache
111-
uses: Swatinem/rust-cache@f13886b937689c021905a6b90929199931d60db1 # v2.8.1
111+
uses: Swatinem/rust-cache@779680da715d629ac1d338a641029a2f4372abb5 # v2.8.2
112112
with:
113113
save-if: false # set in linux-test
114114
shared-key: "amd-ci"
@@ -176,7 +176,7 @@ jobs:
176176
with:
177177
rust-version: stable
178178
- name: Rust Dependency Cache
179-
uses: Swatinem/rust-cache@f13886b937689c021905a6b90929199931d60db1 # v2.8.1
179+
uses: Swatinem/rust-cache@779680da715d629ac1d338a641029a2f4372abb5 # v2.8.2
180180
with:
181181
save-if: false # set in linux-test
182182
shared-key: "amd-ci"
@@ -293,7 +293,7 @@ jobs:
293293
with:
294294
rust-version: stable
295295
- name: Rust Dependency Cache
296-
uses: Swatinem/rust-cache@f13886b937689c021905a6b90929199931d60db1 # v2.8.1
296+
uses: Swatinem/rust-cache@779680da715d629ac1d338a641029a2f4372abb5 # v2.8.2
297297
with:
298298
save-if: ${{ github.ref_name == 'main' }}
299299
shared-key: "amd-ci"
@@ -337,7 +337,7 @@ jobs:
337337
- name: Setup Rust toolchain
338338
run: rustup toolchain install stable
339339
- name: Rust Dependency Cache
340-
uses: Swatinem/rust-cache@f13886b937689c021905a6b90929199931d60db1 # v2.8.1
340+
uses: Swatinem/rust-cache@779680da715d629ac1d338a641029a2f4372abb5 # v2.8.2
341341
with:
342342
save-if: false # set in linux-test
343343
shared-key: "amd-ci"
@@ -370,7 +370,7 @@ jobs:
370370
with:
371371
rust-version: stable
372372
- name: Rust Dependency Cache
373-
uses: Swatinem/rust-cache@f13886b937689c021905a6b90929199931d60db1 # v2.8.1
373+
uses: Swatinem/rust-cache@779680da715d629ac1d338a641029a2f4372abb5 # v2.8.2
374374
with:
375375
save-if: ${{ github.ref_name == 'main' }}
376376
shared-key: "amd-ci-linux-test-example"
@@ -446,7 +446,7 @@ jobs:
446446
sudo apt-get update -qq
447447
sudo apt-get install -y -qq clang
448448
- name: Setup wasm-pack
449-
uses: taiki-e/install-action@763e3324d4fd026c9bd284c504378585777a87d5 # v2.62.57
449+
uses: taiki-e/install-action@1ee706eb04986370fc60419ba172594c51067f29 # v2.62.58
450450
with:
451451
tool: wasm-pack
452452
- name: Run tests with headless mode
@@ -677,7 +677,7 @@ jobs:
677677
- name: Install Clippy
678678
run: rustup component add clippy
679679
- name: Rust Dependency Cache
680-
uses: Swatinem/rust-cache@f13886b937689c021905a6b90929199931d60db1 # v2.8.1
680+
uses: Swatinem/rust-cache@779680da715d629ac1d338a641029a2f4372abb5 # v2.8.2
681681
with:
682682
save-if: ${{ github.ref_name == 'main' }}
683683
shared-key: "amd-ci-clippy"
@@ -749,7 +749,7 @@ jobs:
749749
- name: Setup Rust toolchain
750750
uses: ./.github/actions/setup-builder
751751
- name: Install cargo-msrv
752-
uses: taiki-e/install-action@763e3324d4fd026c9bd284c504378585777a87d5 # v2.62.57
752+
uses: taiki-e/install-action@1ee706eb04986370fc60419ba172594c51067f29 # v2.62.58
753753
with:
754754
tool: cargo-msrv
755755

@@ -794,4 +794,4 @@ jobs:
794794
- uses: actions/checkout@1af3b93b6815bc44a9784bd300feb67ff0d1eeb3 # v6.0.0
795795
with:
796796
persist-credentials: false
797-
- uses: crate-ci/typos@626c4bedb751ce0b7f03262ca97ddda9a076ae1c # v1.39.2
797+
- uses: crate-ci/typos@2d0ce569feab1f8752f1dde43cc2f2aa53236e06 # v1.40.0

datafusion/physical-plan/src/joins/join_hash_map.rs

Lines changed: 68 additions & 54 deletions
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,7 @@
2222
use std::fmt::{self, Debug};
2323
use std::ops::Sub;
2424

25+
use arrow::datatypes::ArrowNativeType;
2526
use hashbrown::hash_table::Entry::{Occupied, Vacant};
2627
use hashbrown::HashTable;
2728

@@ -254,39 +255,50 @@ impl JoinHashMapType for JoinHashMapU64 {
254255
// Type of offsets for obtaining indices from JoinHashMap.
255256
pub(crate) type JoinHashMapOffset = (usize, Option<u64>);
256257

257-
// Macro for traversing chained values with limit.
258-
// Early returns in case of reaching output tuples limit.
259-
macro_rules! chain_traverse {
260-
(
261-
$input_indices:ident, $match_indices:ident,
262-
$hash_values:ident, $next_chain:ident,
263-
$input_idx:ident, $chain_idx:ident, $remaining_output:ident, $one:ident, $zero:ident
264-
) => {{
265-
// now `one` and `zero` are in scope from the outer function
266-
let mut match_row_idx = $chain_idx - $one;
267-
loop {
268-
$match_indices.push(match_row_idx.into());
269-
$input_indices.push($input_idx as u32);
270-
$remaining_output -= 1;
271-
272-
let next = $next_chain[match_row_idx.into() as usize];
273-
274-
if $remaining_output == 0 {
275-
// we compare against `zero` (of type T) here too
276-
let next_offset = if $input_idx == $hash_values.len() - 1 && next == $zero
277-
{
278-
None
279-
} else {
280-
Some(($input_idx, Some(next.into())))
281-
};
282-
return ($input_indices, $match_indices, next_offset);
283-
}
284-
if next == $zero {
285-
break;
286-
}
287-
match_row_idx = next - $one;
258+
/// Traverses the chain of matching indices, collecting results up to the remaining limit.
259+
/// Returns `Some(offset)` if the limit was reached and there are more results to process,
260+
/// or `None` if the chain was fully traversed.
261+
#[inline(always)]
262+
fn traverse_chain<T>(
263+
next_chain: &[T],
264+
input_idx: usize,
265+
start_chain_idx: T,
266+
remaining: &mut usize,
267+
input_indices: &mut Vec<u32>,
268+
match_indices: &mut Vec<u64>,
269+
is_last_input: bool,
270+
) -> Option<JoinHashMapOffset>
271+
where
272+
T: Copy + TryFrom<usize> + PartialOrd + Into<u64> + Sub<Output = T>,
273+
<T as TryFrom<usize>>::Error: Debug,
274+
T: ArrowNativeType,
275+
{
276+
let zero = T::usize_as(0);
277+
let one = T::usize_as(1);
278+
let mut match_row_idx = start_chain_idx - one;
279+
280+
loop {
281+
match_indices.push(match_row_idx.into());
282+
input_indices.push(input_idx as u32);
283+
*remaining -= 1;
284+
285+
let next = next_chain[match_row_idx.into() as usize];
286+
287+
if *remaining == 0 {
288+
// Limit reached - return offset for next call
289+
return if is_last_input && next == zero {
290+
// Finished processing the last input row
291+
None
292+
} else {
293+
Some((input_idx, Some(next.into())))
294+
};
288295
}
289-
}};
296+
if next == zero {
297+
// End of chain
298+
return None;
299+
}
300+
match_row_idx = next - one;
301+
}
290302
}
291303

292304
pub fn update_from_iter<'a, T>(
@@ -380,10 +392,10 @@ pub fn get_matched_indices_with_limit_offset<T>(
380392
where
381393
T: Copy + TryFrom<usize> + PartialOrd + Into<u64> + Sub<Output = T>,
382394
<T as TryFrom<usize>>::Error: Debug,
395+
T: ArrowNativeType,
383396
{
384397
let mut input_indices = Vec::with_capacity(limit);
385398
let mut match_indices = Vec::with_capacity(limit);
386-
let zero = T::try_from(0).unwrap();
387399
let one = T::try_from(1).unwrap();
388400

389401
// Check if hashmap consists of unique values
@@ -409,47 +421,49 @@ where
409421

410422
// Calculate initial `hash_values` index before iterating
411423
let to_skip = match offset {
412-
// None `initial_next_idx` indicates that `initial_idx` processing has'n been started
424+
// None `initial_next_idx` indicates that `initial_idx` processing hasn't been started
413425
(idx, None) => idx,
414426
// Zero `initial_next_idx` indicates that `initial_idx` has been processed during
415427
// previous iteration, and it should be skipped
416428
(idx, Some(0)) => idx + 1,
417429
// Otherwise, process remaining `initial_idx` matches by traversing `next_chain`,
418430
// to start with the next index
419431
(idx, Some(next_idx)) => {
420-
let next_idx: T = T::try_from(next_idx as usize).unwrap();
421-
chain_traverse!(
422-
input_indices,
423-
match_indices,
424-
hash_values,
432+
let next_idx: T = T::usize_as(next_idx as usize);
433+
let is_last = idx == hash_values.len() - 1;
434+
if let Some(next_offset) = traverse_chain(
425435
next_chain,
426436
idx,
427437
next_idx,
428-
remaining_output,
429-
one,
430-
zero
431-
);
438+
&mut remaining_output,
439+
&mut input_indices,
440+
&mut match_indices,
441+
is_last,
442+
) {
443+
return (input_indices, match_indices, Some(next_offset));
444+
}
432445
idx + 1
433446
}
434447
};
435448

436-
let mut row_idx = to_skip;
437-
for &hash in &hash_values[to_skip..] {
449+
let hash_values_len = hash_values.len();
450+
for (i, &hash) in hash_values[to_skip..].iter().enumerate() {
451+
let row_idx = to_skip + i;
438452
if let Some((_, idx)) = map.find(hash, |(h, _)| hash == *h) {
439453
let idx: T = *idx;
440-
chain_traverse!(
441-
input_indices,
442-
match_indices,
443-
hash_values,
454+
let is_last = row_idx == hash_values_len - 1;
455+
if let Some(next_offset) = traverse_chain(
444456
next_chain,
445457
row_idx,
446458
idx,
447-
remaining_output,
448-
one,
449-
zero
450-
);
459+
&mut remaining_output,
460+
&mut input_indices,
461+
&mut match_indices,
462+
is_last,
463+
) {
464+
return (input_indices, match_indices, Some(next_offset));
465+
}
451466
}
452-
row_idx += 1;
453467
}
454468
(input_indices, match_indices, None)
455469
}

datafusion/proto-common/src/from_proto/mod.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -900,7 +900,7 @@ impl TryFrom<&protobuf::CsvOptions> for CsvOptions {
900900
quote: proto_opts.quote[0],
901901
terminator: proto_opts.terminator.first().copied(),
902902
escape: proto_opts.escape.first().copied(),
903-
double_quote: proto_opts.has_header.first().map(|h| *h != 0),
903+
double_quote: proto_opts.double_quote.first().map(|h| *h != 0),
904904
newlines_in_values: proto_opts.newlines_in_values.first().map(|h| *h != 0),
905905
compression: proto_opts.compression().into(),
906906
schema_infer_max_rec: proto_opts.schema_infer_max_rec.map(|h| h as usize),

0 commit comments

Comments
 (0)