diff --git a/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/functions/hive/conversion/HiveInspectors.java b/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/functions/hive/conversion/HiveInspectors.java index 4cdf7a88d6ace..8169175dccdc4 100644 --- a/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/functions/hive/conversion/HiveInspectors.java +++ b/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/functions/hive/conversion/HiveInspectors.java @@ -314,6 +314,9 @@ public static Object toFlinkObject(ObjectInspector inspector, Object data, HiveS if (inspector instanceof ListObjectInspector) { ListObjectInspector listInspector = (ListObjectInspector) inspector; List list = listInspector.getList(data); + if (list == null) { + return null; + } // flink expects a specific array type (e.g. Integer[] instead of Object[]), so we have // to get the element class @@ -332,6 +335,9 @@ public static Object toFlinkObject(ObjectInspector inspector, Object data, HiveS if (inspector instanceof MapObjectInspector) { MapObjectInspector mapInspector = (MapObjectInspector) inspector; Map map = mapInspector.getMap(data); + if (map == null) { + return null; + } Map result = new HashMap<>(map.size()); for (Map.Entry entry : map.entrySet()) { diff --git a/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/connectors/hive/TableEnvHiveConnectorITCase.java b/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/connectors/hive/TableEnvHiveConnectorITCase.java index 9af50b44ddf4e..fce1bbc369d03 100644 --- a/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/connectors/hive/TableEnvHiveConnectorITCase.java +++ b/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/connectors/hive/TableEnvHiveConnectorITCase.java @@ -531,6 +531,31 @@ public void testLocationWithComma() throws Exception { } } + @Test + public void testReadHiveDataWithEmptyMapForHiveShim20X() throws Exception { + Assume.assumeTrue(HiveShimLoader.getHiveVersion().compareTo("2.0.0") <= 0); + TableEnvironment tableEnv = getTableEnvWithHiveCatalog(); + try { + String format = "parquet"; + // test.parquet data: hehuiyuan {} [] + String folderURI = this.getClass().getResource("/parquet").getPath(); + + tableEnv.getConfig() + .getConfiguration() + .set(HiveOptions.TABLE_EXEC_HIVE_FALLBACK_MAPRED_READER, true); + tableEnv.executeSql( + String.format( + "create external table src_t (a string, b map, c array) stored as %s location 'file://%s'", + format, folderURI)); + + List results = + CollectionUtil.iteratorToList( + tableEnv.sqlQuery("select * from src_t").execute().collect()); + } finally { + tableEnv.executeSql("drop table if exists src_t"); + } + } + private TableEnvironment getTableEnvWithHiveCatalog() { TableEnvironment tableEnv = HiveTestUtils.createTableEnvWithBlinkPlannerBatchMode(SqlDialect.HIVE); diff --git a/flink-connectors/flink-connector-hive/src/test/resources/parquet/test.parquet b/flink-connectors/flink-connector-hive/src/test/resources/parquet/test.parquet new file mode 100644 index 0000000000000..deb0067b117cc Binary files /dev/null and b/flink-connectors/flink-connector-hive/src/test/resources/parquet/test.parquet differ