Skip to content

Commit 6c5038e

Browse files
committed
[HUDI-4025] Add Presto query node to validate presto integration
1 parent 5c4813f commit 6c5038e

9 files changed

Lines changed: 252 additions & 73 deletions

File tree

hudi-integ-test/pom.xml

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -360,6 +360,11 @@
360360
<scope>test</scope>
361361
</dependency>
362362

363+
<dependency>
364+
<groupId>com.facebook.presto</groupId>
365+
<artifactId>presto-jdbc</artifactId>
366+
</dependency>
367+
363368
<dependency>
364369
<groupId>org.awaitility</groupId>
365370
<artifactId>awaitility</artifactId>

hudi-integ-test/src/main/java/org/apache/hudi/integ/testsuite/HoodieTestSuiteJob.java

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -62,6 +62,7 @@
6262
import java.util.Map;
6363

6464
import static org.apache.hudi.common.table.HoodieTableConfig.ARCHIVELOG_FOLDER;
65+
import static org.apache.hudi.common.util.StringUtils.EMPTY_STRING;
6566

6667
/**
6768
* This is the entry point for running a Hudi Test Suite. Although this class has similarities with {@link HoodieDeltaStreamer} this class does not extend it since do not want to create a dependency
@@ -310,5 +311,16 @@ public static class HoodieTestSuiteConfig extends HoodieDeltaStreamer.Config {
310311

311312
@Parameter(names = {"--use-hudi-data-to-generate-updates"}, description = "Use data from hudi to generate updates for new batches ")
312313
public Boolean useHudiToGenerateUpdates = false;
314+
315+
@Parameter(names = {"--presto-jdbc-url"}, description = "Presto JDBC URL in the format jdbc:presto://<host>:<port>/<catalog>/<schema> "
316+
+ "e.g. URL to connect to Presto running on localhost port 8080 with the catalog `hive` and the schema `sales`: "
317+
+ "jdbc:presto://localhost:8080/hive/sales")
318+
public String prestoJdbcUrl = EMPTY_STRING;
319+
320+
@Parameter(names = {"--presto-jdbc-username"}, description = "Username to use for authentication")
321+
public String prestoUsername = "test";
322+
323+
@Parameter(names = {"--presto-jdbc-password"}, description = "Password corresponding to the username to use for authentication")
324+
public String prestoPassword;
313325
}
314326
}

hudi-integ-test/src/main/java/org/apache/hudi/integ/testsuite/configuration/DeltaConfig.java

Lines changed: 27 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -43,7 +43,7 @@ public class DeltaConfig implements Serializable {
4343
private final SerializableConfiguration configuration;
4444

4545
public DeltaConfig(DeltaOutputMode deltaOutputMode, DeltaInputType deltaInputType,
46-
SerializableConfiguration configuration) {
46+
SerializableConfiguration configuration) {
4747
this.deltaOutputMode = deltaOutputMode;
4848
this.deltaInputType = deltaInputType;
4949
this.configuration = configuration;
@@ -74,6 +74,8 @@ public static class Config {
7474
public static final String CHILDREN = "children";
7575
public static final String HIVE_QUERIES = "hive_queries";
7676
public static final String HIVE_PROPERTIES = "hive_props";
77+
public static final String PRESTO_QUERIES = "presto_queries";
78+
public static final String PRESTO_PROPERTIES = "presto_props";
7779
private static String NUM_RECORDS_INSERT = "num_records_insert";
7880
private static String NUM_RECORDS_UPSERT = "num_records_upsert";
7981
private static String NUM_RECORDS_DELETE = "num_records_delete";
@@ -278,7 +280,7 @@ public Option<String> getPreCombineField() {
278280

279281
public Option<String> getPartitionField() {
280282
return !configsMap.containsKey(PARTITION_FIELD) ? Option.empty()
281-
: Option.of(configsMap.get(PARTITION_FIELD).toString());
283+
: Option.of(configsMap.get(PARTITION_FIELD).toString());
282284
}
283285

284286
public String getMergeCondition() {
@@ -314,7 +316,7 @@ public Map<String, Object> getOtherConfigs() {
314316

315317
public List<Pair<String, Integer>> getHiveQueries() {
316318
try {
317-
return (List<Pair<String, Integer>>) this.configsMap.getOrDefault("hive_queries", new ArrayList<>());
319+
return (List<Pair<String, Integer>>) this.configsMap.getOrDefault(HIVE_QUERIES, new ArrayList<>());
318320
} catch (Exception e) {
319321
throw new RuntimeException("unable to get hive queries from configs");
320322
}
@@ -328,6 +330,18 @@ public List<String> getHiveProperties() {
328330
return (List<String>) this.configsMap.getOrDefault(HIVE_PROPERTIES, new ArrayList<>());
329331
}
330332

333+
public List<String> getPrestoProperties() {
334+
return (List<String>) this.configsMap.getOrDefault(PRESTO_PROPERTIES, new ArrayList<>());
335+
}
336+
337+
public List<Pair<String, Integer>> getPrestoQueries() {
338+
try {
339+
return (List<Pair<String, Integer>>) this.configsMap.getOrDefault(PRESTO_QUERIES, new ArrayList<>());
340+
} catch (Exception e) {
341+
throw new RuntimeException("unable to get presto queries from configs");
342+
}
343+
}
344+
331345
@Override
332346
public String toString() {
333347
try {
@@ -444,6 +458,16 @@ public Builder withHiveProperties(List<String> hiveProperties) {
444458
return this;
445459
}
446460

461+
public Builder withPrestoProperties(List<String> prestoProperties) {
462+
this.configsMap.put(PRESTO_PROPERTIES, prestoProperties);
463+
return this;
464+
}
465+
466+
public Builder withPrestoQueryAndResults(List<Pair<String, Integer>> prestoQueries) {
467+
this.configsMap.put(PRESTO_QUERIES, prestoQueries);
468+
return this;
469+
}
470+
447471
public Builder withConfigsMap(Map<String, Object> configsMap) {
448472
this.configsMap = configsMap;
449473
return this;

hudi-integ-test/src/main/java/org/apache/hudi/integ/testsuite/dag/DagUtils.java

Lines changed: 63 additions & 35 deletions
Original file line numberDiff line numberDiff line change
@@ -53,7 +53,11 @@
5353
import java.util.stream.Collectors;
5454

5555
import static org.apache.hudi.integ.testsuite.configuration.DeltaConfig.Config.CONFIG_NAME;
56+
import static org.apache.hudi.integ.testsuite.configuration.DeltaConfig.Config.HIVE_PROPERTIES;
57+
import static org.apache.hudi.integ.testsuite.configuration.DeltaConfig.Config.HIVE_QUERIES;
5658
import static org.apache.hudi.integ.testsuite.configuration.DeltaConfig.Config.NO_DEPENDENCY_VALUE;
59+
import static org.apache.hudi.integ.testsuite.configuration.DeltaConfig.Config.PRESTO_PROPERTIES;
60+
import static org.apache.hudi.integ.testsuite.configuration.DeltaConfig.Config.PRESTO_QUERIES;
5761

5862
/**
5963
* Utility class to SerDe workflow dag.
@@ -172,7 +176,8 @@ private static DagNode convertJsonToDagNode(JsonNode node, String type, String n
172176
DeltaConfig.Config config = DeltaConfig.Config.newBuilder().withConfigsMap(convertJsonNodeToMap(node))
173177
.withName(name).build();
174178
return (DagNode) ReflectionUtils.loadClass(generateFQN(type), config);
175-
} catch (ClassNotFoundException e) {
179+
}
180+
catch (ClassNotFoundException e) {
176181
throw new RuntimeException(e);
177182
}
178183
}
@@ -192,11 +197,17 @@ private static Map<String, Object> convertJsonNodeToMap(JsonNode node) {
192197
while (itr.hasNext()) {
193198
Entry<String, JsonNode> entry = itr.next();
194199
switch (entry.getKey()) {
195-
case DeltaConfig.Config.HIVE_QUERIES:
196-
configsMap.put(DeltaConfig.Config.HIVE_QUERIES, getHiveQueries(entry));
200+
case HIVE_QUERIES:
201+
configsMap.put(HIVE_QUERIES, getQueries(entry));
202+
break;
203+
case HIVE_PROPERTIES:
204+
configsMap.put(HIVE_PROPERTIES, getQuerySessionProperties(entry));
197205
break;
198-
case DeltaConfig.Config.HIVE_PROPERTIES:
199-
configsMap.put(DeltaConfig.Config.HIVE_PROPERTIES, getProperties(entry));
206+
case PRESTO_QUERIES:
207+
configsMap.put(PRESTO_QUERIES, getQueries(entry));
208+
break;
209+
case PRESTO_PROPERTIES:
210+
configsMap.put(PRESTO_PROPERTIES, getQuerySessionProperties(entry));
200211
break;
201212
default:
202213
configsMap.put(entry.getKey(), getValue(entry.getValue()));
@@ -206,25 +217,27 @@ private static Map<String, Object> convertJsonNodeToMap(JsonNode node) {
206217
return configsMap;
207218
}
208219

209-
private static List<Pair<String, Integer>> getHiveQueries(Entry<String, JsonNode> entry) {
220+
private static List<Pair<String, Integer>> getQueries(Entry<String, JsonNode> entry) {
210221
List<Pair<String, Integer>> queries = new ArrayList<>();
211222
try {
212223
List<JsonNode> flattened = new ArrayList<>();
213224
flattened.add(entry.getValue());
214-
queries = (List<Pair<String, Integer>>)getHiveQueryMapper().readValue(flattened.toString(), List.class);
215-
} catch (Exception e) {
225+
queries = (List<Pair<String, Integer>>) getQueryMapper().readValue(flattened.toString(), List.class);
226+
}
227+
catch (Exception e) {
216228
e.printStackTrace();
217229
}
218230
return queries;
219231
}
220232

221-
private static List<String> getProperties(Entry<String, JsonNode> entry) {
233+
private static List<String> getQuerySessionProperties(Entry<String, JsonNode> entry) {
222234
List<String> properties = new ArrayList<>();
223235
try {
224236
List<JsonNode> flattened = new ArrayList<>();
225237
flattened.add(entry.getValue());
226-
properties = (List<String>)getHivePropertyMapper().readValue(flattened.toString(), List.class);
227-
} catch (Exception e) {
238+
properties = (List<String>) getQueryEnginePropertyMapper().readValue(flattened.toString(), List.class);
239+
}
240+
catch (Exception e) {
228241
e.printStackTrace();
229242
}
230243
return properties;
@@ -233,15 +246,20 @@ private static List<String> getProperties(Entry<String, JsonNode> entry) {
233246
private static Object getValue(JsonNode node) {
234247
if (node.isInt()) {
235248
return node.asInt();
236-
} else if (node.isLong()) {
249+
}
250+
else if (node.isLong()) {
237251
return node.asLong();
238-
} else if (node.isShort()) {
252+
}
253+
else if (node.isShort()) {
239254
return node.asInt();
240-
} else if (node.isBoolean()) {
255+
}
256+
else if (node.isBoolean()) {
241257
return node.asBoolean();
242-
} else if (node.isDouble()) {
258+
}
259+
else if (node.isDouble()) {
243260
return node.asDouble();
244-
} else if (node.isFloat()) {
261+
}
262+
else if (node.isFloat()) {
245263
return node.asDouble();
246264
}
247265
return node.textValue();
@@ -254,13 +272,20 @@ private static JsonNode createJsonNode(DagNode node, String type) throws IOExcep
254272
while (itr.hasNext()) {
255273
Entry<String, JsonNode> entry = itr.next();
256274
switch (entry.getKey()) {
257-
case DeltaConfig.Config.HIVE_QUERIES:
258-
((ObjectNode) configNode).put(DeltaConfig.Config.HIVE_QUERIES,
259-
MAPPER.readTree(getHiveQueryMapper().writeValueAsString(node.getConfig().getHiveQueries())));
275+
case HIVE_QUERIES:
276+
((ObjectNode) configNode).put(HIVE_QUERIES,
277+
MAPPER.readTree(getQueryMapper().writeValueAsString(node.getConfig().getHiveQueries())));
278+
break;
279+
case HIVE_PROPERTIES:
280+
((ObjectNode) configNode).put(HIVE_PROPERTIES,
281+
MAPPER.readTree(getQueryEnginePropertyMapper().writeValueAsString(node.getConfig().getHiveProperties())));
282+
case PRESTO_QUERIES:
283+
((ObjectNode) configNode).put(PRESTO_QUERIES,
284+
MAPPER.readTree(getQueryMapper().writeValueAsString(node.getConfig().getHiveQueries())));
260285
break;
261-
case DeltaConfig.Config.HIVE_PROPERTIES:
262-
((ObjectNode) configNode).put(DeltaConfig.Config.HIVE_PROPERTIES,
263-
MAPPER.readTree(getHivePropertyMapper().writeValueAsString(node.getConfig().getHiveProperties())));
286+
case PRESTO_PROPERTIES:
287+
((ObjectNode) configNode).put(PRESTO_PROPERTIES,
288+
MAPPER.readTree(getQueryEnginePropertyMapper().writeValueAsString(node.getConfig().getHiveProperties())));
264289
break;
265290
default:
266291
break;
@@ -293,21 +318,22 @@ public static String toString(InputStream inputStream) throws IOException {
293318
return result.toString("utf-8");
294319
}
295320

296-
private static ObjectMapper getHiveQueryMapper() {
321+
private static ObjectMapper getQueryMapper() {
297322
SimpleModule module = new SimpleModule();
298323
ObjectMapper queryMapper = new ObjectMapper();
299-
module.addSerializer(List.class, new HiveQuerySerializer());
300-
module.addDeserializer(List.class, new HiveQueryDeserializer());
324+
module.addSerializer(List.class, new QuerySerializer());
325+
module.addDeserializer(List.class, new QueryDeserializer());
301326
queryMapper.registerModule(module);
302327
return queryMapper;
303328
}
304329

305-
private static final class HiveQuerySerializer extends JsonSerializer<List> {
330+
private static final class QuerySerializer extends JsonSerializer<List> {
306331
Integer index = 0;
332+
307333
@Override
308334
public void serialize(List pairs, JsonGenerator gen, SerializerProvider serializers) throws IOException {
309335
gen.writeStartObject();
310-
for (Pair pair : (List<Pair>)pairs) {
336+
for (Pair pair : (List<Pair>) pairs) {
311337
gen.writeStringField("query" + index, pair.getLeft().toString());
312338
gen.writeNumberField("result" + index, Integer.parseInt(pair.getRight().toString()));
313339
index++;
@@ -316,7 +342,7 @@ public void serialize(List pairs, JsonGenerator gen, SerializerProvider serializ
316342
}
317343
}
318344

319-
private static final class HiveQueryDeserializer extends JsonDeserializer<List> {
345+
private static final class QueryDeserializer extends JsonDeserializer<List> {
320346
@Override
321347
public List deserialize(JsonParser parser, DeserializationContext context) throws IOException {
322348
List<Pair<String, Integer>> pairs = new ArrayList<>();
@@ -334,7 +360,8 @@ public List deserialize(JsonParser parser, DeserializationContext context) throw
334360

335361
if (fieldName.contains("query")) {
336362
query = parser.getValueAsString();
337-
} else if (fieldName.contains("result")) {
363+
}
364+
else if (fieldName.contains("result")) {
338365
result = parser.getValueAsInt();
339366
pairs.add(Pair.of(query, result));
340367
}
@@ -344,29 +371,30 @@ public List deserialize(JsonParser parser, DeserializationContext context) throw
344371
}
345372
}
346373

347-
private static ObjectMapper getHivePropertyMapper() {
374+
private static ObjectMapper getQueryEnginePropertyMapper() {
348375
SimpleModule module = new SimpleModule();
349376
ObjectMapper propMapper = new ObjectMapper();
350-
module.addSerializer(List.class, new HivePropertySerializer());
351-
module.addDeserializer(List.class, new HivePropertyDeserializer());
377+
module.addSerializer(List.class, new QueryEnginePropertySerializer());
378+
module.addDeserializer(List.class, new QueryEnginePropertyDeserializer());
352379
propMapper.registerModule(module);
353380
return propMapper;
354381
}
355382

356-
private static final class HivePropertySerializer extends JsonSerializer<List> {
383+
private static final class QueryEnginePropertySerializer extends JsonSerializer<List> {
357384
Integer index = 0;
385+
358386
@Override
359387
public void serialize(List props, JsonGenerator gen, SerializerProvider serializers) throws IOException {
360388
gen.writeStartObject();
361-
for (String prop : (List<String>)props) {
389+
for (String prop : (List<String>) props) {
362390
gen.writeStringField("prop" + index, prop);
363391
index++;
364392
}
365393
gen.writeEndObject();
366394
}
367395
}
368396

369-
private static final class HivePropertyDeserializer extends JsonDeserializer<List> {
397+
private static final class QueryEnginePropertyDeserializer extends JsonDeserializer<List> {
370398
@Override
371399
public List deserialize(JsonParser parser, DeserializationContext context) throws IOException {
372400
List<String> props = new ArrayList<>();
Lines changed: 62 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,62 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing, software
13+
* distributed under the License is distributed on an "AS IS" BASIS,
14+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15+
* See the License for the specific language governing permissions and
16+
* limitations under the License.
17+
*/
18+
19+
package org.apache.hudi.integ.testsuite.dag.nodes;
20+
21+
import org.apache.hudi.common.util.collection.Pair;
22+
23+
import java.sql.ResultSet;
24+
import java.sql.SQLException;
25+
import java.sql.Statement;
26+
import java.util.List;
27+
28+
public abstract class BaseQueryNode extends DagNode<Boolean> {
29+
30+
public void setSessionProperties(List<String> properties, Statement stmt) throws SQLException {
31+
for (String prop : properties) {
32+
executeStatement(prop, stmt);
33+
}
34+
}
35+
36+
public void executeAndValidateQueries(List<Pair<String, Integer>> queriesWithResult, Statement stmt) throws SQLException {
37+
for (Pair<String, Integer> queryAndResult : queriesWithResult) {
38+
log.info("Running {}", queryAndResult.getLeft());
39+
ResultSet res = stmt.executeQuery(queryAndResult.getLeft());
40+
if (!res.next()) {
41+
log.info("res.next() was False - typically this means the query returned no rows.");
42+
assert 0 == queryAndResult.getRight();
43+
}
44+
else {
45+
Integer result = res.getInt(1);
46+
if (!queryAndResult.getRight().equals(result)) {
47+
throw new AssertionError(
48+
"QUERY: " + queryAndResult.getLeft()
49+
+ " | EXPECTED RESULT = " + queryAndResult.getRight()
50+
+ " | ACTUAL RESULT = " + result
51+
);
52+
}
53+
}
54+
log.info("Successfully validated query!");
55+
}
56+
}
57+
58+
private void executeStatement(String query, Statement stmt) throws SQLException {
59+
log.info("Executing statement {}", stmt.toString());
60+
stmt.execute(query);
61+
}
62+
}

0 commit comments

Comments
 (0)