Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
23 changes: 23 additions & 0 deletions docker/demo/trino-batch1.commands
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

select symbol, max(ts) from stock_ticks_cow group by symbol HAVING symbol = 'GOOG';
select symbol, max(ts) from stock_ticks_mor_ro group by symbol HAVING symbol = 'GOOG';
select symbol, ts, volume, open, close from stock_ticks_cow where symbol = 'GOOG';
select symbol, ts, volume, open, close from stock_ticks_mor_ro where symbol = 'GOOG';
21 changes: 21 additions & 0 deletions docker/demo/trino-batch2-after-compaction.commands
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

select symbol, max(ts) from stock_ticks_mor_ro group by symbol HAVING symbol = 'GOOG';
select symbol, ts, volume, open, close from stock_ticks_mor_ro where symbol = 'GOOG';
20 changes: 20 additions & 0 deletions docker/demo/trino-table-check.commands
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

show tables;
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,7 @@ public abstract class ITTestBase {
protected static final String ADHOC_2_CONTAINER = "/adhoc-2";
protected static final String HIVESERVER = "/hiveserver";
protected static final String PRESTO_COORDINATOR = "/presto-coordinator-1";
protected static final String TRINO_COORDINATOR = "/trino-coordinator-1";
protected static final String HOODIE_WS_ROOT = "/var/hoodie/ws";
protected static final String HOODIE_JAVA_APP = HOODIE_WS_ROOT + "/hudi-spark-datasource/hudi-spark/run_hoodie_app.sh";
protected static final String HOODIE_GENERATE_APP = HOODIE_WS_ROOT + "/hudi-spark-datasource/hudi-spark/run_hoodie_generate_app.sh";
Expand All @@ -76,6 +77,7 @@ public abstract class ITTestBase {
HOODIE_WS_ROOT + "/docker/hoodie/hadoop/hive_base/target/hoodie-utilities.jar";
protected static final String HIVE_SERVER_JDBC_URL = "jdbc:hive2://hiveserver:10000";
protected static final String PRESTO_COORDINATOR_URL = "presto-coordinator-1:8090";
protected static final String TRINO_COORDINATOR_URL = "trino-coordinator-1:8091";
protected static final String HADOOP_CONF_DIR = "/etc/hadoop";

// Skip these lines when capturing output from hive
Expand Down Expand Up @@ -122,6 +124,12 @@ static String getPrestoConsoleCommand(String commandFile) {
.append(" -f " + commandFile).toString();
}

static String getTrinoConsoleCommand(String commandFile) {
return new StringBuilder().append("trino --server " + TRINO_COORDINATOR_URL)
.append(" --catalog hive --schema default")
.append(" -f " + commandFile).toString();
}

@BeforeEach
public void init() {
String dockerHost = (OVERRIDDEN_DOCKER_HOST != null) ? OVERRIDDEN_DOCKER_HOST : DEFAULT_DOCKER_HOST;
Expand Down Expand Up @@ -309,6 +317,20 @@ void executePrestoCopyCommand(String fromFile, String remotePath) {
.exec();
}

Pair<String, String> executeTrinoCommandFile(String commandFile) throws Exception {
String trinoCmd = getTrinoConsoleCommand(commandFile);
TestExecStartResultCallback callback = executeCommandStringInDocker(ADHOC_1_CONTAINER, trinoCmd, true);
return Pair.of(callback.getStdout().toString().trim(), callback.getStderr().toString().trim());
}

void executeTrinoCopyCommand(String fromFile, String remotePath) {
Container adhocContainer = runningContainers.get(ADHOC_1_CONTAINER);
dockerClient.copyArchiveToContainerCmd(adhocContainer.getId())
.withHostResource(fromFile)
.withRemotePath(remotePath)
.exec();
}

private void saveUpLogs() {
try {
// save up the Hive log files for introspection
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,6 @@
import org.apache.hudi.common.model.HoodieFileFormat;
import org.apache.hudi.common.util.CollectionUtils;
import org.apache.hudi.common.util.collection.Pair;

import org.apache.hudi.keygen.SimpleKeyGenerator;

import org.junit.jupiter.api.AfterEach;
Expand All @@ -33,24 +32,34 @@

/**
* Goes through steps described in https://hudi.apache.org/docker_demo.html
*
* <p>
* To run this as a standalone test in the IDE or command line. First bring up the demo setup using
* `docker/setup_demo.sh` and then run the test class as you would do normally.
*/
public class ITTestHoodieDemo extends ITTestBase {

private static final String TRINO_TABLE_CHECK_FILENAME = "trino-table-check.commands";
private static final String TRINO_BATCH1_FILENAME = "trino-batch1.commands";
private static final String TRINO_BATCH2_FILENAME = "trino-batch2-after-compaction.commands";

private static final String HDFS_DATA_DIR = "/usr/hive/data/input";
private static final String HDFS_BATCH_PATH1 = HDFS_DATA_DIR + "/batch_1.json";
private static final String HDFS_BATCH_PATH2 = HDFS_DATA_DIR + "/batch_2.json";
private static final String HDFS_PRESTO_INPUT_TABLE_CHECK_PATH = HDFS_DATA_DIR + "/presto-table-check.commands";
private static final String HDFS_PRESTO_INPUT_BATCH1_PATH = HDFS_DATA_DIR + "/presto-batch1.commands";
private static final String HDFS_PRESTO_INPUT_BATCH2_PATH = HDFS_DATA_DIR + "/presto-batch2-after-compaction.commands";
private static final String HDFS_TRINO_INPUT_TABLE_CHECK_PATH = HDFS_DATA_DIR + "/" + TRINO_TABLE_CHECK_FILENAME;
private static final String HDFS_TRINO_INPUT_BATCH1_PATH = HDFS_DATA_DIR + "/" + TRINO_BATCH1_FILENAME;
private static final String HDFS_TRINO_INPUT_BATCH2_PATH = HDFS_DATA_DIR + "/" + TRINO_BATCH2_FILENAME;

private static final String INPUT_BATCH_PATH1 = HOODIE_WS_ROOT + "/docker/demo/data/batch_1.json";
private static final String PRESTO_INPUT_TABLE_CHECK_RELATIVE_PATH = "/docker/demo/presto-table-check.commands";
private static final String PRESTO_INPUT_BATCH1_RELATIVE_PATH = "/docker/demo/presto-batch1.commands";
private static final String INPUT_BATCH_PATH2 = HOODIE_WS_ROOT + "/docker/demo/data/batch_2.json";
private static final String PRESTO_INPUT_BATCH2_RELATIVE_PATH = "/docker/demo/presto-batch2-after-compaction.commands";
private static final String TRINO_INPUT_TABLE_CHECK_RELATIVE_PATH = "/docker/demo/" + TRINO_TABLE_CHECK_FILENAME;
private static final String TRINO_INPUT_BATCH1_RELATIVE_PATH = "/docker/demo/" + TRINO_BATCH1_FILENAME;
private static final String TRINO_INPUT_BATCH2_RELATIVE_PATH = "/docker/demo/" + TRINO_BATCH2_FILENAME;

private static final String COW_BASE_PATH = "/user/hive/warehouse/stock_ticks_cow";
private static final String MOR_BASE_PATH = "/user/hive/warehouse/stock_ticks_mor";
Expand Down Expand Up @@ -110,12 +119,14 @@ public void testParquetDemo() throws Exception {
ingestFirstBatchAndHiveSync();
testHiveAfterFirstBatch();
testPrestoAfterFirstBatch();
testTrinoAfterFirstBatch();
testSparkSQLAfterFirstBatch();

// batch 2
ingestSecondBatchAndHiveSync();
testHiveAfterSecondBatch();
testPrestoAfterSecondBatch();
testTrinoAfterSecondBatch();
testSparkSQLAfterSecondBatch();
testIncrementalHiveQueryBeforeCompaction();
testIncrementalSparkSQLQuery();
Expand All @@ -125,6 +136,7 @@ public void testParquetDemo() throws Exception {

testHiveAfterSecondBatchAfterCompaction();
testPrestoAfterSecondBatchAfterCompaction();
testTrinoAfterSecondBatchAfterCompaction();
testIncrementalHiveQueryAfterCompaction();
}

Expand All @@ -133,20 +145,22 @@ public void testParquetDemo() throws Exception {
public void testHFileDemo() throws Exception {
baseFileFormat = HoodieFileFormat.HFILE;

// TODO: Preseto and SparkSQL support for HFile format
// TODO: Presto, Trino and SparkSQL support for HFile format

setupDemo();

// batch 1
ingestFirstBatchAndHiveSync();
testHiveAfterFirstBatch();
//testPrestoAfterFirstBatch();
//testTrinoAfterFirstBatch();
//testSparkSQLAfterFirstBatch();

// batch 2
ingestSecondBatchAndHiveSync();
testHiveAfterSecondBatch();
//testPrestoAfterSecondBatch();
//testTrinoAfterSecondBatch();
//testSparkSQLAfterSecondBatch();
testIncrementalHiveQueryBeforeCompaction();
//testIncrementalSparkSQLQuery();
Expand All @@ -155,14 +169,16 @@ public void testHFileDemo() throws Exception {
scheduleAndRunCompaction();
testHiveAfterSecondBatchAfterCompaction();
//testPrestoAfterSecondBatchAfterCompaction();
//testTrinoAfterSecondBatchAfterCompaction();
//testIncrementalHiveQueryAfterCompaction();
}

private void setupDemo() throws Exception {
List<String> cmds = CollectionUtils.createImmutableList("hdfs dfsadmin -safemode wait",
"hdfs dfs -mkdir -p " + HDFS_DATA_DIR,
"hdfs dfs -copyFromLocal -f " + INPUT_BATCH_PATH1 + " " + HDFS_BATCH_PATH1,
"/bin/bash " + DEMO_CONTAINER_SCRIPT);
"/bin/bash " + DEMO_CONTAINER_SCRIPT,
"mkdir -p " + HDFS_DATA_DIR);

executeCommandStringsInDocker(ADHOC_1_CONTAINER, cmds);

Expand All @@ -174,6 +190,10 @@ private void setupDemo() throws Exception {
executePrestoCopyCommand(System.getProperty("user.dir") + "/.." + PRESTO_INPUT_TABLE_CHECK_RELATIVE_PATH, HDFS_DATA_DIR);
executePrestoCopyCommand(System.getProperty("user.dir") + "/.." + PRESTO_INPUT_BATCH1_RELATIVE_PATH, HDFS_DATA_DIR);
executePrestoCopyCommand(System.getProperty("user.dir") + "/.." + PRESTO_INPUT_BATCH2_RELATIVE_PATH, HDFS_DATA_DIR);

executeTrinoCopyCommand(System.getProperty("user.dir") + "/.." + TRINO_INPUT_TABLE_CHECK_RELATIVE_PATH, HDFS_DATA_DIR);
executeTrinoCopyCommand(System.getProperty("user.dir") + "/.." + TRINO_INPUT_BATCH1_RELATIVE_PATH, HDFS_DATA_DIR);
executeTrinoCopyCommand(System.getProperty("user.dir") + "/.." + TRINO_INPUT_BATCH2_RELATIVE_PATH, HDFS_DATA_DIR);
}

private void ingestFirstBatchAndHiveSync() throws Exception {
Expand Down Expand Up @@ -335,6 +355,20 @@ private void testPrestoAfterFirstBatch() throws Exception {
"\"GOOG\",\"2018-08-31 10:29:00\",\"3391\",\"1230.1899\",\"1230.085\"", 2);
}

private void testTrinoAfterFirstBatch() throws Exception {
Pair<String, String> stdOutErrPair = executeTrinoCommandFile(HDFS_TRINO_INPUT_TABLE_CHECK_PATH);
assertStdOutContains(stdOutErrPair, "stock_ticks_cow", 2);
assertStdOutContains(stdOutErrPair, "stock_ticks_mor", 4);

stdOutErrPair = executeTrinoCommandFile(HDFS_TRINO_INPUT_BATCH1_PATH);
assertStdOutContains(stdOutErrPair,
"\"GOOG\",\"2018-08-31 10:29:00\"", 4);
assertStdOutContains(stdOutErrPair,
"\"GOOG\",\"2018-08-31 09:59:00\",\"6330\",\"1230.5\",\"1230.02\"", 2);
assertStdOutContains(stdOutErrPair,
"\"GOOG\",\"2018-08-31 10:29:00\",\"3391\",\"1230.1899\",\"1230.085\"", 2);
}

private void testHiveAfterSecondBatch() throws Exception {
Pair<String, String> stdOutErrPair = executeHiveCommandFile(HIVE_BATCH1_COMMANDS);
assertStdOutContains(stdOutErrPair, "| symbol | _c1 |\n+---------+----------------------+\n"
Expand All @@ -361,7 +395,21 @@ private void testPrestoAfterSecondBatch() throws Exception {
assertStdOutContains(stdOutErrPair,
"\"GOOG\",\"2018-08-31 10:59:00\"", 2);
assertStdOutContains(stdOutErrPair,
"\"GOOG\",\"2018-08-31 09:59:00\",\"6330\",\"1230.5\",\"1230.02\"",2);
"\"GOOG\",\"2018-08-31 09:59:00\",\"6330\",\"1230.5\",\"1230.02\"", 2);
assertStdOutContains(stdOutErrPair,
"\"GOOG\",\"2018-08-31 10:29:00\",\"3391\",\"1230.1899\",\"1230.085\"");
assertStdOutContains(stdOutErrPair,
"\"GOOG\",\"2018-08-31 10:59:00\",\"9021\",\"1227.1993\",\"1227.215\"");
}

private void testTrinoAfterSecondBatch() throws Exception {
Pair<String, String> stdOutErrPair = executeTrinoCommandFile(HDFS_TRINO_INPUT_BATCH1_PATH);
assertStdOutContains(stdOutErrPair,
"\"GOOG\",\"2018-08-31 10:29:00\"", 2);
assertStdOutContains(stdOutErrPair,
"\"GOOG\",\"2018-08-31 10:59:00\"", 2);
assertStdOutContains(stdOutErrPair,
"\"GOOG\",\"2018-08-31 09:59:00\",\"6330\",\"1230.5\",\"1230.02\"", 2);
assertStdOutContains(stdOutErrPair,
"\"GOOG\",\"2018-08-31 10:29:00\",\"3391\",\"1230.1899\",\"1230.085\"");
assertStdOutContains(stdOutErrPair,
Expand Down Expand Up @@ -390,6 +438,16 @@ private void testPrestoAfterSecondBatchAfterCompaction() throws Exception {
"\"GOOG\",\"2018-08-31 10:59:00\",\"9021\",\"1227.1993\",\"1227.215\"");
}

private void testTrinoAfterSecondBatchAfterCompaction() throws Exception {
Pair<String, String> stdOutErrPair = executeTrinoCommandFile(HDFS_TRINO_INPUT_BATCH2_PATH);
assertStdOutContains(stdOutErrPair,
"\"GOOG\",\"2018-08-31 10:59:00\"", 2);
assertStdOutContains(stdOutErrPair,
"\"GOOG\",\"2018-08-31 09:59:00\",\"6330\",\"1230.5\",\"1230.02\"");
assertStdOutContains(stdOutErrPair,
"\"GOOG\",\"2018-08-31 10:59:00\",\"9021\",\"1227.1993\",\"1227.215\"");
}

private void testSparkSQLAfterSecondBatch() throws Exception {
Pair<String, String> stdOutErrPair = executeSparkSQLCommand(SPARKSQL_BATCH2_COMMANDS, true);
assertStdOutContains(stdOutErrPair,
Expand Down