diff --git a/hudi-sync/hudi-datahub-sync/src/main/java/org/apache/hudi/sync/datahub/DataHubSyncClient.java b/hudi-sync/hudi-datahub-sync/src/main/java/org/apache/hudi/sync/datahub/DataHubSyncClient.java index 4c050451c5a22..35a19d799cfeb 100644 --- a/hudi-sync/hudi-datahub-sync/src/main/java/org/apache/hudi/sync/datahub/DataHubSyncClient.java +++ b/hudi-sync/hudi-datahub-sync/src/main/java/org/apache/hudi/sync/datahub/DataHubSyncClient.java @@ -19,6 +19,7 @@ package org.apache.hudi.sync.datahub; +import com.linkedin.common.Status; import org.apache.hudi.common.table.HoodieTableMetaClient; import org.apache.hudi.common.table.TableSchemaResolver; import org.apache.hudi.common.util.Option; @@ -51,7 +52,6 @@ import org.apache.avro.AvroTypeException; import org.apache.avro.Schema; import org.apache.parquet.schema.MessageType; - import java.util.Collections; import java.util.List; import java.util.Map; @@ -61,6 +61,7 @@ public class DataHubSyncClient extends HoodieSyncClient { protected final DataHubSyncConfig config; private final DatasetUrn datasetUrn; + private static final Status SOFT_DELETE_FALSE = new Status().setRemoved(false); public DataHubSyncClient(DataHubSyncConfig config) { super(config); @@ -81,45 +82,33 @@ public void updateLastCommitTimeSynced(String tableName) { @Override public void updateTableProperties(String tableName, Map tableProperties) { MetadataChangeProposalWrapper propertiesChangeProposal = MetadataChangeProposalWrapper.builder() - .entityType("dataset") - .entityUrn(datasetUrn) - .upsert() - .aspect(new DatasetProperties().setCustomProperties(new StringMap(tableProperties))) - .build(); + .entityType("dataset") + .entityUrn(datasetUrn) + .upsert() + .aspect(new DatasetProperties().setCustomProperties(new StringMap(tableProperties))) + .build(); + + DatahubResponseLogger responseLogger = new DatahubResponseLogger(); try (RestEmitter emitter = config.getRestEmitter()) { - emitter.emit(propertiesChangeProposal, null).get(); + emitter.emit(propertiesChangeProposal, responseLogger).get(); } catch (Exception e) { - throw new HoodieDataHubSyncException("Fail to change properties for Dataset " + datasetUrn + ": " + tableProperties, e); + throw new HoodieDataHubSyncException("Fail to change properties for Dataset " + datasetUrn + ": " + + tableProperties, e); } } @Override public void updateTableSchema(String tableName, MessageType schema) { - Schema avroSchema = getAvroSchemaWithoutMetadataFields(metaClient); - List fields = avroSchema.getFields().stream().map(f -> new SchemaField() - .setFieldPath(f.name()) - .setType(toSchemaFieldDataType(f.schema().getType())) - .setDescription(f.doc(), SetMode.IGNORE_NULL) - .setNativeDataType(f.schema().getType().getName())).collect(Collectors.toList()); - - final SchemaMetadata.PlatformSchema platformSchema = new SchemaMetadata.PlatformSchema(); - platformSchema.setOtherSchema(new OtherSchema().setRawSchema(avroSchema.toString())); - MetadataChangeProposalWrapper schemaChangeProposal = MetadataChangeProposalWrapper.builder() - .entityType("dataset") - .entityUrn(datasetUrn) - .upsert() - .aspect(new SchemaMetadata() - .setSchemaName(tableName) - .setVersion(0) - .setHash("") - .setPlatform(datasetUrn.getPlatformEntity()) - .setPlatformSchema(platformSchema) - .setFields(new SchemaFieldArray(fields))) - .build(); - try (RestEmitter emitter = config.getRestEmitter()) { - emitter.emit(schemaChangeProposal, null).get(); + DatahubResponseLogger responseLogger = new DatahubResponseLogger(); + MetadataChangeProposalWrapper schemaChange = createSchemaMetadataUpdate(tableName); + emitter.emit(schemaChange, responseLogger).get(); + + // When updating an entity, it is ncessary to set its soft-delete status to false, or else the update won't get + // reflected in the UI. + MetadataChangeProposalWrapper softDeleteUndoProposal = createUndoSoftDelete(); + emitter.emit(softDeleteUndoProposal, responseLogger).get(); } catch (Exception e) { throw new HoodieDataHubSyncException("Fail to change schema for Dataset " + datasetUrn, e); } @@ -127,7 +116,7 @@ public void updateTableSchema(String tableName, MessageType schema) { @Override public Map getMetastoreSchema(String tableName) { - throw new UnsupportedOperationException("Not supported: `getTableSchema`"); + throw new UnsupportedOperationException("Not supported: `getMetastoreSchema`"); } @Override @@ -135,7 +124,43 @@ public void close() { // no op; } - static Schema getAvroSchemaWithoutMetadataFields(HoodieTableMetaClient metaClient) { + private MetadataChangeProposalWrapper createUndoSoftDelete() { + MetadataChangeProposalWrapper softDeleteUndoProposal = MetadataChangeProposalWrapper.builder() + .entityType("dataset") + .entityUrn(datasetUrn) + .upsert() + .aspect(SOFT_DELETE_FALSE) + .aspectName("status") + .build(); + return softDeleteUndoProposal; + } + + private MetadataChangeProposalWrapper createSchemaMetadataUpdate(String tableName) { + Schema avroSchema = getAvroSchemaWithoutMetadataFields(metaClient); + List fields = avroSchema.getFields().stream().map(f -> new SchemaField() + .setFieldPath(f.name()) + .setType(toSchemaFieldDataType(f.schema().getType())) + .setDescription(f.doc(), SetMode.IGNORE_NULL) + .setNativeDataType(f.schema().getType().getName())).collect(Collectors.toList()); + + final SchemaMetadata.PlatformSchema platformSchema = new SchemaMetadata.PlatformSchema(); + platformSchema.setOtherSchema(new OtherSchema().setRawSchema(avroSchema.toString())); + + return MetadataChangeProposalWrapper.builder() + .entityType("dataset") + .entityUrn(datasetUrn) + .upsert() + .aspect(new SchemaMetadata() + .setSchemaName(tableName) + .setVersion(0) + .setHash("") + .setPlatform(datasetUrn.getPlatformEntity()) + .setPlatformSchema(platformSchema) + .setFields(new SchemaFieldArray(fields))) + .build(); + } + + Schema getAvroSchemaWithoutMetadataFields(HoodieTableMetaClient metaClient) { try { return new TableSchemaResolver(metaClient).getTableAvroSchema(true); } catch (Exception e) { diff --git a/hudi-sync/hudi-datahub-sync/src/main/java/org/apache/hudi/sync/datahub/DatahubResponseLogger.java b/hudi-sync/hudi-datahub-sync/src/main/java/org/apache/hudi/sync/datahub/DatahubResponseLogger.java new file mode 100644 index 0000000000000..e99e7109e5eb7 --- /dev/null +++ b/hudi-sync/hudi-datahub-sync/src/main/java/org/apache/hudi/sync/datahub/DatahubResponseLogger.java @@ -0,0 +1,51 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.hudi.sync.datahub; + +import datahub.client.Callback; +import datahub.client.MetadataWriteResponse; +import org.apache.log4j.LogManager; +import org.apache.log4j.Logger; + +/** + * Handle responses to requests to Datahub Metastore. Just logs them. + */ +public class DatahubResponseLogger implements Callback { + private static final Logger LOG = LogManager.getLogger(DatahubResponseLogger.class); + + @Override + public void onCompletion(MetadataWriteResponse response) { + LOG.info("Completed Datahub RestEmitter request. " + + "Status: " + (response.isSuccess() ? " succeeded" : " failed")); + if (!response.isSuccess()) { + LOG.error("Request failed. " + response); + } + + if (LOG.isDebugEnabled()) { + LOG.debug("Response details: " + response); + } + } + + @Override + public void onFailure(Throwable e) { + LOG.error("Error during Datahub RestEmitter request", e); + } + +} diff --git a/hudi-sync/hudi-datahub-sync/src/test/java/org/apache/hudi/sync/datahub/DummyPartitionValueExtractor.java b/hudi-sync/hudi-datahub-sync/src/test/java/org/apache/hudi/sync/datahub/DummyPartitionValueExtractor.java new file mode 100644 index 0000000000000..3c00e313a990e --- /dev/null +++ b/hudi-sync/hudi-datahub-sync/src/test/java/org/apache/hudi/sync/datahub/DummyPartitionValueExtractor.java @@ -0,0 +1,34 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.hudi.sync.datahub; + +import org.apache.hudi.sync.common.model.PartitionValueExtractor; + +import java.util.Collections; +import java.util.List; + +public class DummyPartitionValueExtractor implements PartitionValueExtractor { + + @Override + public List extractPartitionValuesInPath(String partitionPath) { + return Collections.emptyList(); + } + +} diff --git a/hudi-sync/hudi-datahub-sync/src/test/java/org/apache/hudi/sync/datahub/TestDataHubSyncClient.java b/hudi-sync/hudi-datahub-sync/src/test/java/org/apache/hudi/sync/datahub/TestDataHubSyncClient.java new file mode 100644 index 0000000000000..279167fc0458a --- /dev/null +++ b/hudi-sync/hudi-datahub-sync/src/test/java/org/apache/hudi/sync/datahub/TestDataHubSyncClient.java @@ -0,0 +1,130 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.hudi.sync.datahub; + +import datahub.client.MetadataWriteResponse; +import datahub.client.rest.RestEmitter; +import datahub.event.MetadataChangeProposalWrapper; +import org.apache.avro.Schema; +import org.apache.hadoop.conf.Configuration; +import org.apache.hudi.common.table.HoodieTableMetaClient; +import org.apache.hudi.sync.datahub.config.DataHubSyncConfig; +import org.junit.jupiter.api.AfterEach; +import org.junit.jupiter.api.BeforeAll; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.io.TempDir; +import org.mockito.Mock; +import org.mockito.Mockito; +import org.mockito.MockitoAnnotations; +import java.io.IOException; +import java.nio.file.Paths; +import java.util.Properties; +import java.util.concurrent.CompletableFuture; +import static org.apache.hudi.sync.common.HoodieSyncConfig.META_SYNC_BASE_PATH; +import static org.apache.hudi.sync.common.HoodieSyncConfig.META_SYNC_PARTITION_EXTRACTOR_CLASS; +import static org.mockito.ArgumentMatchers.any; +import static org.mockito.Mockito.times; +import static org.mockito.Mockito.verify; + +public class TestDataHubSyncClient { + + @Mock + RestEmitter restEmitterMock; + + @TempDir + static java.nio.file.Path tmpDir; + + private static String TRIP_EXAMPLE_SCHEMA; + private static Schema avroSchema; + private static String tableBasePath; + + @BeforeAll + public static void beforeAll() throws IOException { + TRIP_EXAMPLE_SCHEMA = "{\"type\": \"record\",\"name\": \"triprec\",\"fields\": [ " + + "{\"name\": \"ts\",\"type\": \"long\"}]}"; + + avroSchema = new Schema.Parser().parse(TRIP_EXAMPLE_SCHEMA); + + Properties props = new Properties(); + props.put("hoodie.table.name", "some_table"); + tableBasePath = Paths.get(tmpDir.toString(), "some_table").toString(); + HoodieTableMetaClient.initTableAndGetMetaClient(new Configuration(), + tableBasePath, props); + } + + @BeforeEach + public void beforeEach() { + MockitoAnnotations.initMocks(this); + } + + @AfterEach + public void afterEach() { + } + + @Test + public void testUpdateTableSchemaInvokesRestEmiiter() throws IOException { + Properties props = new Properties(); + props.put(META_SYNC_PARTITION_EXTRACTOR_CLASS.key(), DummyPartitionValueExtractor.class.getName()); + props.put(META_SYNC_BASE_PATH.key(), tableBasePath); + + Mockito.when( + restEmitterMock.emit(any(MetadataChangeProposalWrapper.class), Mockito.any()) + ).thenReturn( + CompletableFuture.completedFuture(MetadataWriteResponse.builder().build()) + ); + + DatahubSyncConfigStub configStub = new DatahubSyncConfigStub(props, restEmitterMock); + DataHubSyncClientStub dhClient = new DataHubSyncClientStub(configStub); + + dhClient.updateTableSchema("some_table", null); + verify(restEmitterMock, times(2)).emit(any(MetadataChangeProposalWrapper.class), + Mockito.any()); + } + + public class DataHubSyncClientStub extends DataHubSyncClient { + + public DataHubSyncClientStub(DataHubSyncConfig config) { + super(config); + } + + @Override + Schema getAvroSchemaWithoutMetadataFields(HoodieTableMetaClient metaClient) { + return avroSchema; + } + + } + + public class DatahubSyncConfigStub extends DataHubSyncConfig { + + private final RestEmitter emitterMock; + + public DatahubSyncConfigStub(Properties props, RestEmitter emitterMock) { + super(props); + this.emitterMock = emitterMock; + } + + @Override + public RestEmitter getRestEmitter() { + return emitterMock; + } + } + +}