@@ -19,6 +19,7 @@ use std::sync::Arc;
1919
2020use arrow:: util:: pretty:: pretty_format_batches;
2121use arrow_array:: RecordBatch ;
22+ use datafusion_common:: { DataFusionError , Result } ;
2223use rand:: { thread_rng, Rng } ;
2324use tokio:: task:: JoinSet ;
2425
@@ -132,7 +133,20 @@ struct QueryGroup {
132133}
133134
134135impl AggregationFuzzer {
136+ /// Run the fuzzer, printing an error and panicking if any of the tasks fail
135137 pub async fn run ( & self ) {
138+ let res = self . run_inner ( ) . await ;
139+
140+ if let Err ( e) = res {
141+ // Print the error via `Display` so that it displays nicely (the default `unwrap()`
142+ // prints using `Debug` which escapes newlines, and makes multi-line messages
143+ // hard to read
144+ println ! ( "{e}" ) ;
145+ panic ! ( "Error!" ) ;
146+ }
147+ }
148+
149+ async fn run_inner ( & self ) -> Result < ( ) > {
136150 let mut join_set = JoinSet :: new ( ) ;
137151 let mut rng = thread_rng ( ) ;
138152
@@ -157,16 +171,20 @@ impl AggregationFuzzer {
157171
158172 let tasks = self . generate_fuzz_tasks ( query_groups) . await ;
159173 for task in tasks {
160- join_set. spawn ( async move {
161- task. run ( ) . await ;
162- } ) ;
174+ join_set. spawn ( async move { task. run ( ) . await } ) ;
163175 }
164176 }
165177
166178 while let Some ( join_handle) = join_set. join_next ( ) . await {
167179 // propagate errors
168- join_handle. unwrap ( ) ;
180+ join_handle. map_err ( |e| {
181+ DataFusionError :: Internal ( format ! (
182+ "AggregationFuzzer task error: {:?}" ,
183+ e
184+ ) )
185+ } ) ??;
169186 }
187+ Ok ( ( ) )
170188 }
171189
172190 async fn generate_fuzz_tasks (
@@ -237,45 +255,53 @@ struct AggregationFuzzTestTask {
237255}
238256
239257impl AggregationFuzzTestTask {
240- async fn run ( & self ) {
258+ async fn run ( & self ) -> Result < ( ) > {
241259 let task_result = run_sql ( & self . sql , & self . ctx_with_params . ctx )
242260 . await
243- . expect ( "should success to run sql" ) ;
244- self . check_result ( & task_result, & self . expected_result ) ;
261+ . map_err ( |e| e . context ( self . context_error_report ( ) ) ) ? ;
262+ self . check_result ( & task_result, & self . expected_result )
245263 }
246264
247- // TODO: maybe we should persist the `expected_result` and `task_result`,
248- // because the readability is not so good if we just print it.
249- fn check_result ( & self , task_result : & [ RecordBatch ] , expected_result : & [ RecordBatch ] ) {
250- let result = check_equality_of_batches ( task_result, expected_result) ;
251- if let Err ( e) = result {
265+ fn check_result (
266+ & self ,
267+ task_result : & [ RecordBatch ] ,
268+ expected_result : & [ RecordBatch ] ,
269+ ) -> Result < ( ) > {
270+ check_equality_of_batches ( task_result, expected_result) . map_err ( |e| {
252271 // If we found inconsistent result, we print the test details for reproducing at first
253- println ! (
254- "##### AggregationFuzzer error report #####
255- ### Sql:\n {}\n \
256- ### Schema:\n {}\n \
257- ### Session context params:\n {:?}\n \
258- ### Inconsistent row:\n \
259- - row_idx:{}\n \
260- - task_row:{}\n \
261- - expected_row:{}\n \
262- ### Task total result:\n {}\n \
263- ### Expected total result:\n {}\n \
264- ### Input:\n {}\n \
265- ",
266- self . sql,
267- self . dataset_ref. batches[ 0 ] . schema_ref( ) ,
268- self . ctx_with_params. params,
272+ let message = format ! (
273+ "{}\n \
274+ ### Inconsistent row:\n \
275+ - row_idx:{}\n \
276+ - task_row:{}\n \
277+ - expected_row:{}\n \
278+ ### Task total result:\n {}\n \
279+ ### Expected total result:\n {}\n \
280+ ",
281+ self . context_error_report( ) ,
269282 e. row_idx,
270283 e. lhs_row,
271284 e. rhs_row,
272285 pretty_format_batches( task_result) . unwrap( ) ,
273286 pretty_format_batches( expected_result) . unwrap( ) ,
274- pretty_format_batches( & self . dataset_ref. batches) . unwrap( ) ,
275287 ) ;
288+ DataFusionError :: Internal ( message)
289+ } )
290+ }
276291
277- // Then we just panic
278- panic ! ( ) ;
279- }
292+ /// Returns a formatted error message
293+ fn context_error_report ( & self ) -> String {
294+ format ! (
295+ "##### AggregationFuzzer error report #####\n \
296+ ### Sql:\n {}\n \
297+ ### Schema:\n {}\n \
298+ ### Session context params:\n {:?}\n \
299+ ### Input:\n {}\n \
300+ ",
301+ self . sql,
302+ self . dataset_ref. batches[ 0 ] . schema_ref( ) ,
303+ self . ctx_with_params. params,
304+ pretty_format_batches( & self . dataset_ref. batches) . unwrap( ) ,
305+ )
280306 }
281307}
0 commit comments