|
13 | 13 | */ |
14 | 14 | package com.facebook.presto.plugin.postgresql; |
15 | 15 |
|
| 16 | +import com.esri.core.geometry.ogc.OGCGeometry; |
16 | 17 | import com.facebook.airlift.json.JsonObjectMapperProvider; |
17 | 18 | import com.facebook.presto.common.type.StandardTypes; |
18 | 19 | import com.facebook.presto.common.type.Type; |
|
21 | 22 | import com.facebook.presto.plugin.jdbc.BaseJdbcClient; |
22 | 23 | import com.facebook.presto.plugin.jdbc.BaseJdbcConfig; |
23 | 24 | import com.facebook.presto.plugin.jdbc.DriverConnectionFactory; |
| 25 | +import com.facebook.presto.plugin.jdbc.JdbcColumnHandle; |
24 | 26 | import com.facebook.presto.plugin.jdbc.JdbcConnectorId; |
25 | 27 | import com.facebook.presto.plugin.jdbc.JdbcIdentity; |
| 28 | +import com.facebook.presto.plugin.jdbc.JdbcSplit; |
26 | 29 | import com.facebook.presto.plugin.jdbc.JdbcTypeHandle; |
| 30 | +import com.facebook.presto.plugin.jdbc.QueryBuilder; |
27 | 31 | import com.facebook.presto.plugin.jdbc.ReadMapping; |
28 | 32 | import com.facebook.presto.spi.ConnectorSession; |
29 | 33 | import com.facebook.presto.spi.ConnectorTableMetadata; |
|
48 | 52 | import java.sql.PreparedStatement; |
49 | 53 | import java.sql.ResultSet; |
50 | 54 | import java.sql.SQLException; |
| 55 | +import java.util.List; |
| 56 | +import java.util.Map; |
51 | 57 | import java.util.Optional; |
52 | 58 | import java.util.UUID; |
53 | 59 |
|
| 60 | +import static com.esri.core.geometry.ogc.OGCGeometry.fromBinary; |
| 61 | +import static com.facebook.presto.common.type.StandardTypes.GEOMETRY; |
| 62 | +import static com.facebook.presto.common.type.StandardTypes.SPHERICAL_GEOGRAPHY; |
54 | 63 | import static com.facebook.presto.common.type.VarbinaryType.VARBINARY; |
| 64 | +import static com.facebook.presto.common.type.VarcharType.VARCHAR; |
| 65 | +import static com.facebook.presto.geospatial.GeometryUtils.wktFromJtsGeometry; |
| 66 | +import static com.facebook.presto.geospatial.serde.EsriGeometrySerde.serialize; |
| 67 | +import static com.facebook.presto.geospatial.serde.JtsGeometrySerde.deserialize; |
55 | 68 | import static com.facebook.presto.plugin.jdbc.JdbcErrorCode.JDBC_ERROR; |
| 69 | +import static com.facebook.presto.plugin.jdbc.QueryBuilder.quote; |
| 70 | +import static com.facebook.presto.plugin.jdbc.ReadMapping.sliceReadMapping; |
56 | 71 | import static com.facebook.presto.spi.StandardErrorCode.ALREADY_EXISTS; |
57 | 72 | import static com.facebook.presto.spi.StandardErrorCode.INVALID_FUNCTION_ARGUMENT; |
58 | 73 | import static com.fasterxml.jackson.core.JsonFactory.Feature.CANONICALIZE_FIELD_NAMES; |
59 | 74 | import static com.fasterxml.jackson.databind.SerializationFeature.ORDER_MAP_ENTRIES_BY_KEYS; |
| 75 | +import static com.google.common.collect.ImmutableMap.toImmutableMap; |
60 | 76 | import static io.airlift.slice.Slices.utf8Slice; |
| 77 | +import static io.airlift.slice.Slices.wrappedBuffer; |
61 | 78 | import static io.airlift.slice.Slices.wrappedLongArray; |
62 | 79 | import static java.lang.Long.reverseBytes; |
63 | 80 | import static java.lang.String.format; |
64 | 81 | import static java.nio.charset.StandardCharsets.UTF_8; |
| 82 | +import static java.util.Objects.requireNonNull; |
| 83 | +import static java.util.function.Function.identity; |
65 | 84 |
|
66 | 85 | public class PostgreSqlClient |
67 | 86 | extends BaseJdbcClient |
@@ -114,19 +133,72 @@ protected String toSqlType(Type type) |
114 | 133 | return super.toSqlType(type); |
115 | 134 | } |
116 | 135 |
|
| 136 | + @Override |
| 137 | + public PreparedStatement buildSql(ConnectorSession session, Connection connection, JdbcSplit split, List<JdbcColumnHandle> columnHandles) throws SQLException |
| 138 | + { |
| 139 | + Map<String, String> columnExpressions = columnHandles.stream() |
| 140 | + .filter(handle -> handle.getJdbcTypeHandle().getJdbcTypeName().equalsIgnoreCase(GEOMETRY)) |
| 141 | + .map(JdbcColumnHandle::getColumnName) |
| 142 | + .collect(toImmutableMap( |
| 143 | + identity(), |
| 144 | + columnName -> "ST_AsBinary(" + quote(identifierQuote, columnName) + ")")); |
| 145 | + |
| 146 | + return new QueryBuilder(identifierQuote).buildSql( |
| 147 | + this, |
| 148 | + session, |
| 149 | + connection, |
| 150 | + split.getCatalogName(), |
| 151 | + split.getSchemaName(), |
| 152 | + split.getTableName(), |
| 153 | + columnHandles, |
| 154 | + columnExpressions, |
| 155 | + split.getTupleDomain(), |
| 156 | + split.getAdditionalPredicate()); |
| 157 | + } |
| 158 | + |
117 | 159 | @Override |
118 | 160 | public Optional<ReadMapping> toPrestoType(ConnectorSession session, JdbcTypeHandle typeHandle) |
119 | 161 | { |
| 162 | + String typeName = typeHandle.getJdbcTypeName(); |
| 163 | + |
120 | 164 | if (typeHandle.getJdbcTypeName().equals("jsonb") || typeHandle.getJdbcTypeName().equals("json")) { |
121 | 165 | return Optional.of(jsonColumnMapping()); |
122 | 166 | } |
123 | 167 |
|
124 | 168 | else if (typeHandle.getJdbcTypeName().equals("uuid")) { |
125 | 169 | return Optional.of(uuidColumnMapping()); |
126 | 170 | } |
| 171 | + else if (typeName.equalsIgnoreCase(GEOMETRY) || typeName.equalsIgnoreCase(SPHERICAL_GEOGRAPHY)) { |
| 172 | + return Optional.of(geometryReadMapping()); |
| 173 | + } |
127 | 174 | return super.toPrestoType(session, typeHandle); |
128 | 175 | } |
129 | 176 |
|
| 177 | + protected static ReadMapping geometryReadMapping() |
| 178 | + { |
| 179 | + return sliceReadMapping(VARCHAR, |
| 180 | + (resultSet, columnIndex) -> getAsText(stGeomFromBinary(wrappedBuffer(resultSet.getBytes(columnIndex))))); |
| 181 | + } |
| 182 | + |
| 183 | + protected static Slice getAsText(Slice input) |
| 184 | + { |
| 185 | + return utf8Slice(wktFromJtsGeometry(deserialize(input))); |
| 186 | + } |
| 187 | + |
| 188 | + private static Slice stGeomFromBinary(Slice input) |
| 189 | + { |
| 190 | + requireNonNull(input, "input is null"); |
| 191 | + OGCGeometry geometry; |
| 192 | + try { |
| 193 | + geometry = fromBinary(input.toByteBuffer().slice()); |
| 194 | + } |
| 195 | + catch (IllegalArgumentException | IndexOutOfBoundsException e) { |
| 196 | + throw new PrestoException(INVALID_FUNCTION_ARGUMENT, "Invalid Well-Known Binary (WKB)", e); |
| 197 | + } |
| 198 | + geometry.setSpatialReference(null); |
| 199 | + return serialize(geometry); |
| 200 | + } |
| 201 | + |
130 | 202 | @Override |
131 | 203 | public void createTable(ConnectorSession session, ConnectorTableMetadata tableMetadata) |
132 | 204 | { |
@@ -159,7 +231,7 @@ protected void renameTable(JdbcIdentity identity, String catalogName, SchemaTabl |
159 | 231 |
|
160 | 232 | private ReadMapping jsonColumnMapping() |
161 | 233 | { |
162 | | - return ReadMapping.sliceReadMapping( |
| 234 | + return sliceReadMapping( |
163 | 235 | jsonType, |
164 | 236 | (resultSet, columnIndex) -> jsonParse(utf8Slice(resultSet.getString(columnIndex)))); |
165 | 237 | } |
@@ -190,7 +262,7 @@ public static JsonParser createJsonParser(JsonFactory factory, Slice json) |
190 | 262 |
|
191 | 263 | private ReadMapping uuidColumnMapping() |
192 | 264 | { |
193 | | - return ReadMapping.sliceReadMapping( |
| 265 | + return sliceReadMapping( |
194 | 266 | uuidType, |
195 | 267 | (resultSet, columnIndex) -> uuidSlice((UUID) resultSet.getObject(columnIndex))); |
196 | 268 | } |
|
0 commit comments