diff --git a/api/src/main/java/marquez/api/OpenLineageResource.java b/api/src/main/java/marquez/api/OpenLineageResource.java index 259a500a53..aaafad9496 100644 --- a/api/src/main/java/marquez/api/OpenLineageResource.java +++ b/api/src/main/java/marquez/api/OpenLineageResource.java @@ -107,6 +107,21 @@ public Response getLineage( return Response.ok(lineageService.lineage(nodeId, depth, true)).build(); } + @Timed + @ResponseMetered + @ExceptionMetered + @GET + @Consumes(APPLICATION_JSON) + @Produces(APPLICATION_JSON) + @Path("/lineage/direct") + public Response getDirectLineage(@QueryParam("parentJobNodeId") @NotNull NodeId parentJobNodeId) { + if (!parentJobNodeId.isJobType()) { + throw new IllegalArgumentException("Only job expected, got " + parentJobNodeId.getValue()); + } + throwIfNotExists(parentJobNodeId); + return Response.ok(lineageService.parentDirectLineage(parentJobNodeId.asJobId())).build(); + } + @Timed @ResponseMetered @ExceptionMetered diff --git a/api/src/main/java/marquez/db/Columns.java b/api/src/main/java/marquez/db/Columns.java index dfdf67492a..a5df6a2c4b 100644 --- a/api/src/main/java/marquez/db/Columns.java +++ b/api/src/main/java/marquez/db/Columns.java @@ -183,7 +183,7 @@ public static String stringOrNull(final ResultSet results, final String column) public static String stringOrThrow(final ResultSet results, final String column) throws SQLException { if (results.getObject(column) == null) { - throw new IllegalArgumentException(); + throw new IllegalArgumentException("no column found for " + column); } return results.getString(column); } diff --git a/api/src/main/java/marquez/db/LineageDao.java b/api/src/main/java/marquez/db/LineageDao.java index c45a06e5a9..3600db31c7 100644 --- a/api/src/main/java/marquez/db/LineageDao.java +++ b/api/src/main/java/marquez/db/LineageDao.java @@ -10,7 +10,10 @@ import java.util.Optional; import java.util.Set; import java.util.UUID; +import marquez.common.models.DatasetId; +import marquez.common.models.JobId; import marquez.db.mappers.DatasetDataMapper; +import marquez.db.mappers.DirectLineageEdgeMapper; import marquez.db.mappers.JobDataMapper; import marquez.db.mappers.JobRowMapper; import marquez.db.mappers.RunMapper; @@ -25,8 +28,19 @@ @RegisterRowMapper(JobDataMapper.class) @RegisterRowMapper(RunMapper.class) @RegisterRowMapper(JobRowMapper.class) +@RegisterRowMapper(DirectLineageEdgeMapper.class) public interface LineageDao { + public record DirectLineage(Collection edges) {} + + public record DirectLineageEdge( + JobId job1, + String direction, + DatasetId dataset, + String direction2, + JobId job2, + JobId job2parent) {} + /** * Fetch all of the jobs that consume or produce the datasets that are consumed or produced by the * input jobIds. This returns a single layer from the BFS using datasets as edges. Jobs that have @@ -79,6 +93,34 @@ SELECT DISTINCT ON (j.uuid) j.*, inputs AS input_uuids, outputs AS output_uuids """) Set getLineage(@BindList Set jobIds, int depth); + /** + * 1 level of lineage for all the children jobs of the given parent + * + * @param parentJobNamespace the namespace of the parent + * @param parentJobName the name of the parent + * @return edges form job to dataset to job + */ + @SqlQuery( + """ + SELECT + jobs.namespace_name AS job_namespace, jobs."name" AS job_name, + jvim.io_type AS io1, + d.namespace_name AS ds_namespace, d."name" AS ds_name, + jvim2.io_type AS io2, + jv2.namespace_name AS job2_namespace, jv2.job_name AS job2_name, + jv2.namespace_name AS job2_parent_namespace, j2.parent_job_name AS job2_parent_name + FROM jobs_view jobs + INNER JOIN job_versions jv ON jv.uuid = jobs.current_version_uuid + LEFT JOIN job_versions_io_mapping jvim ON jvim.job_version_uuid = jobs.current_version_uuid + LEFT JOIN datasets d ON d.uuid = jvim.dataset_uuid + LEFT JOIN job_versions_io_mapping jvim2 ON jvim2.dataset_uuid = d.uuid AND jvim2.job_version_uuid <> jvim.job_version_uuid AND jvim2.io_type <> jvim.io_type + LEFT JOIN job_versions jv2 ON jv2.uuid = jvim2.job_version_uuid + LEFT JOIN jobs_view j2 ON jv2.job_uuid = j2.uuid + WHERE jobs.namespace_name = :parentJobNamespace AND jobs.parent_job_name = :parentJobName ; + """) + Collection getDirectLineageFromParent( + String parentJobNamespace, String parentJobName); + @SqlQuery( """ SELECT ds.*, dv.fields, dv.lifecycle_state diff --git a/api/src/main/java/marquez/db/mappers/DirectLineageEdgeMapper.java b/api/src/main/java/marquez/db/mappers/DirectLineageEdgeMapper.java new file mode 100644 index 0000000000..1ffe70844f --- /dev/null +++ b/api/src/main/java/marquez/db/mappers/DirectLineageEdgeMapper.java @@ -0,0 +1,54 @@ +/* + * Copyright 2023 contributors to the Marquez project + * SPDX-License-Identifier: Apache-2.0 + */ +package marquez.db.mappers; + +import static marquez.db.Columns.stringOrNull; +import static marquez.db.Columns.stringOrThrow; + +import java.sql.ResultSet; +import java.sql.SQLException; +import lombok.NonNull; +import marquez.common.models.DatasetId; +import marquez.common.models.DatasetName; +import marquez.common.models.JobId; +import marquez.common.models.JobName; +import marquez.common.models.NamespaceName; +import marquez.db.LineageDao.DirectLineageEdge; +import org.jdbi.v3.core.mapper.RowMapper; +import org.jdbi.v3.core.statement.StatementContext; + +/** Maps the result set of direct lineage to a DirectLineageEdge */ +public final class DirectLineageEdgeMapper implements RowMapper { + @Override + public DirectLineageEdge map(@NonNull ResultSet results, @NonNull StatementContext context) + throws SQLException { + JobId job1 = + JobId.of( + NamespaceName.of(stringOrThrow(results, "job_namespace")), + JobName.of(stringOrThrow(results, "job_name"))); + String io1 = stringOrNull(results, "io1"); + String ds_namespace = stringOrNull(results, "ds_namespace"); + DatasetId ds = + ds_namespace == null + ? null + : new DatasetId( + NamespaceName.of(ds_namespace), DatasetName.of(stringOrNull(results, "ds_name"))); + String io2 = stringOrNull(results, "io2"); + String job2_namespace = stringOrNull(results, "job2_namespace"); + JobId job2 = + job2_namespace == null + ? null + : JobId.of( + NamespaceName.of(job2_namespace), JobName.of(stringOrThrow(results, "job2_name"))); + String job2parent_namespace = stringOrNull(results, "job2_parent_namespace"); + JobId job2parent = + job2parent_namespace == null + ? null + : JobId.of( + NamespaceName.of(job2parent_namespace), + JobName.of(stringOrThrow(results, "job2_parent_name"))); + return new DirectLineageEdge(job1, io1, ds, io2, job2, job2parent); + } +} diff --git a/api/src/main/java/marquez/service/LineageService.java b/api/src/main/java/marquez/service/LineageService.java index 1c2dc34a05..30bb3bcea7 100644 --- a/api/src/main/java/marquez/service/LineageService.java +++ b/api/src/main/java/marquez/service/LineageService.java @@ -5,10 +5,16 @@ package marquez.service; +import static java.util.stream.Collectors.filtering; +import static java.util.stream.Collectors.groupingBy; +import static java.util.stream.Collectors.mapping; +import static java.util.stream.Collectors.toList; + import com.google.common.base.Functions; import com.google.common.collect.ImmutableSet; import com.google.common.collect.ImmutableSortedSet; import com.google.common.collect.Maps; +import java.util.Collection; import java.util.Collections; import java.util.HashMap; import java.util.HashSet; @@ -41,6 +47,19 @@ @Slf4j public class LineageService extends DelegatingLineageDao { + + public record JobWithParent(JobId job, JobId parent) {} + + public record DatasetLineage( + DatasetId dataset, + Collection consumers, + Collection producers) {} + + public record ChildLineage( + JobId job, Collection inputs, Collection outputs) {} + + public record ParentLineage(JobId parent, Collection children) {} + private final JobDao jobDao; public LineageService(LineageDao delegate, JobDao jobDao) { @@ -48,6 +67,67 @@ public LineageService(LineageDao delegate, JobDao jobDao) { this.jobDao = jobDao; } + /** + * This method is specialized for returning one level of lineage from a parent job. It finds all + * the children of the provided parent node It then finds the input and output datasets those + * children write to. It finally returns the other jobs consuming or producing those datasets (and + * their parent). + * + * @param parentJobId the parent job + * @return 1 level of lineage for all the children jobs of the given parent + */ + public ParentLineage parentDirectLineage(JobId parentJobId) { + log.debug("Attempting to get lineage for parent job '{}'", parentJobId); + + Collection directLineageFromParent = + getDirectLineageFromParent( + parentJobId.getNamespace().getValue(), parentJobId.getName().getValue()); + + Map>>>> grouped = + directLineageFromParent.stream() + .collect( + groupingBy( + DirectLineageEdge::job1, + filtering( + e -> e.direction() != null, + groupingBy( + DirectLineageEdge::direction, + filtering( + e -> e.dataset() != null, + groupingBy( + DirectLineageEdge::dataset, + filtering( + e -> e.direction2() != null, + groupingBy( + DirectLineageEdge::direction2, + mapping( + e -> new JobWithParent(e.job2(), e.job2parent()), + toList()))))))))); + + List children = + grouped.entrySet().stream() + .map( + e -> + new ChildLineage( + e.getKey(), + toDatasetLineages(e.getValue().get("INPUT")), + toDatasetLineages(e.getValue().get("OUTPUT")))) + .collect(toList()); + return new ParentLineage(parentJobId, children); + } + + private Collection toDatasetLineages( + Map>> datasets) { + return datasets == null + ? null + : datasets.entrySet().stream() + .map( + e -> + new DatasetLineage( + e.getKey(), e.getValue().get("INPUT"), e.getValue().get("OUTPUT"))) + .collect(toList()); + } + // TODO make input parameters easily extendable if adding more options like 'withJobFacets' public Lineage lineage(NodeId nodeId, int depth, boolean withRunFacets) { log.debug("Attempting to get lineage for node '{}' with depth '{}'", nodeId.getValue(), depth); diff --git a/api/src/main/java/marquez/service/models/EventTypeResolver.java b/api/src/main/java/marquez/service/models/EventTypeResolver.java index 15aab272a3..fa4f52adb8 100644 --- a/api/src/main/java/marquez/service/models/EventTypeResolver.java +++ b/api/src/main/java/marquez/service/models/EventTypeResolver.java @@ -79,6 +79,7 @@ public JavaType typeFromId(DatabindContext context, String id) throws IOExceptio .filter(s -> s.getName().equals(type)) .findAny() .map(EventSchemaURL::getSubType) + .map(p -> (Class) p) .orElse(LINEAGE_EVENT.subType); return context.constructSpecializedType(superType, subType); diff --git a/api/src/test/java/marquez/OpenLineageIntegrationTest.java b/api/src/test/java/marquez/OpenLineageIntegrationTest.java index 4729a64971..e0793290c6 100644 --- a/api/src/test/java/marquez/OpenLineageIntegrationTest.java +++ b/api/src/test/java/marquez/OpenLineageIntegrationTest.java @@ -5,6 +5,7 @@ package marquez; +import static java.util.Arrays.asList; import static marquez.db.LineageTestUtils.PRODUCER_URL; import static marquez.db.LineageTestUtils.SCHEMA_URL; import static org.assertj.core.api.Assertions.as; @@ -47,6 +48,7 @@ import lombok.extern.slf4j.Slf4j; import marquez.api.JdbiUtils; import marquez.client.MarquezClient; +import marquez.client.MarquezClient.ParentLineage; import marquez.client.models.Dataset; import marquez.client.models.DatasetVersion; import marquez.client.models.Job; @@ -316,6 +318,14 @@ public void testOpenLineageJobHierarchyAirflowIntegrationWithParentRunFacet() .hasFieldOrPropertyWithValue("parentJobName", null); List runsList = client.listRuns(NAMESPACE_NAME, dagName); assertThat(runsList).isNotEmpty().hasSize(1); + + ParentLineage directLineage = client.getDirectLineage(new JobId(NAMESPACE_NAME, dagName)); + assertThat(directLineage.parent().getNamespace()).isEqualTo(NAMESPACE_NAME); + assertThat(directLineage.parent().getName()).isEqualTo(dagName); + assertThat(directLineage.children()).size().isEqualTo(2); + + assertThat(directLineage.children().stream().map(c -> c.job().getName()).sorted().toList()) + .isEqualTo(asList("the_dag.task1", "the_dag.task2")); } @Test @@ -390,6 +400,14 @@ public void testOpenLineageJobHierarchyAirflowIntegrationWithParentAndParentRunF .hasFieldOrPropertyWithValue("parentJobName", null); List runsList = client.listRuns(NAMESPACE_NAME, dagName); assertThat(runsList).isNotEmpty().hasSize(1); + + ParentLineage directLineage = client.getDirectLineage(new JobId(NAMESPACE_NAME, dagName)); + assertThat(directLineage.parent().getNamespace()).isEqualTo(NAMESPACE_NAME); + assertThat(directLineage.parent().getName()).isEqualTo(dagName); + assertThat(directLineage.children()).size().isEqualTo(2); + + assertThat(directLineage.children().stream().map(c -> c.job().getName()).sorted().toList()) + .isEqualTo(asList("the_dag.task1", "the_dag.task2")); } @Test diff --git a/api/src/test/java/marquez/db/LineageDaoTest.java b/api/src/test/java/marquez/db/LineageDaoTest.java index 354ab495dc..c0dad3ad71 100644 --- a/api/src/test/java/marquez/db/LineageDaoTest.java +++ b/api/src/test/java/marquez/db/LineageDaoTest.java @@ -12,11 +12,14 @@ import static marquez.db.LineageTestUtils.newDatasetFacet; import static marquez.db.LineageTestUtils.writeDownstreamLineage; import static org.assertj.core.api.Assertions.assertThat; +import static org.junit.Assert.assertNotNull; +import static org.junit.Assert.assertTrue; import com.google.common.base.Functions; import java.sql.SQLException; import java.time.Instant; import java.util.Arrays; +import java.util.Collection; import java.util.Collections; import java.util.HashSet; import java.util.LinkedList; @@ -29,6 +32,7 @@ import java.util.stream.Stream; import marquez.api.JdbiUtils; import marquez.common.models.JobType; +import marquez.db.LineageDao.DirectLineageEdge; import marquez.db.LineageTestUtils.DatasetConsumerJob; import marquez.db.LineageTestUtils.JobLineage; import marquez.db.models.JobRow; @@ -165,6 +169,12 @@ public void testGetLineage() { .containsAll( expected.getOutput().map(ds -> ds.getDatasetRow().getUuid()).stream()::iterator); } + + Collection FromParent = + lineageDao.getDirectLineageFromParent( + disjointJob.getJob().getNamespaceName(), disjointJob.getJob().getName()); + assertNotNull(FromParent); + assertTrue(FromParent.toString(), FromParent.size() == 0); } @Test @@ -311,6 +321,14 @@ public void testGetLineageWithJobThatHasNoDownstreamConsumers() { assertThat(lineage).hasSize(1).contains(writeJob.getJob().getUuid()); } + @Test + public void testGetFromParent() { + FacetTestUtils.createLineageWithFacets(openLineageDao); + Collection FromParent = + lineageDao.getDirectLineageFromParent("namespace", "name"); + assertTrue(FromParent.toString(), FromParent.size() == 2); + } + @Test public void testGetLineageWithJobThatHasNoDatasets() { diff --git a/api/src/test/java/marquez/db/LineageTestUtils.java b/api/src/test/java/marquez/db/LineageTestUtils.java index 84dc6d18f1..94a6b25d98 100644 --- a/api/src/test/java/marquez/db/LineageTestUtils.java +++ b/api/src/test/java/marquez/db/LineageTestUtils.java @@ -20,6 +20,7 @@ import java.util.stream.IntStream; import java.util.stream.Stream; import javax.validation.Valid; +import lombok.Builder; import lombok.Value; import marquez.common.Utils; import marquez.db.models.UpdateLineageRow; @@ -206,6 +207,28 @@ public static List writeDownstreamLineage( List downstream, JobFacet jobFacet, Dataset dataset) { + return writeDownstreamLineageWithParent(openLineageDao, downstream, jobFacet, dataset, null); + } + + /** + * Recursive function which supports writing a lineage graph by supplying an input dataset and a + * list of {@link DatasetConsumerJob}s. Each consumer may output up to one dataset, which will be + * consumed by the number of consumers specified by the {@link DatasetConsumerJob#numConsumers} + * property. + * + * @param openLineageDao + * @param downstream + * @param jobFacet + * @param dataset + * @param parentRunFacet + * @return + */ + public static List writeDownstreamLineageWithParent( + OpenLineageDao openLineageDao, + List downstream, + JobFacet jobFacet, + Dataset dataset, + @Valid LineageEvent.ParentRunFacet parentRunFacet) { DatasetConsumerJob consumer = downstream.get(0); return IntStream.range(0, consumer.getNumConsumers()) .mapToObj( @@ -230,17 +253,19 @@ public static List writeDownstreamLineage( "COMPLETE", jobFacet, Collections.singletonList(dataset), - outputs.stream().collect(Collectors.toList())); + outputs.stream().collect(Collectors.toList()), + parentRunFacet); List downstreamLineage = outputs.stream() .flatMap( out -> { if (consumer.numConsumers > 0) { - return writeDownstreamLineage( + return writeDownstreamLineageWithParent( openLineageDao, downstream.subList(1, downstream.size()), jobFacet, - out) + out, + parentRunFacet) .stream(); } else { return Stream.empty(); @@ -281,4 +306,16 @@ public static class DatasetConsumerJob { int numConsumers; Optional outputDatasetName; } + + @Value + @Builder + public static class CreateJobLineage { + String jobName; + String status; + JobFacet jobFacet; + List inputs; + List outputs; + @Valid LineageEvent.ParentRunFacet parentRunFacet; + ImmutableMap runFacets; + } } diff --git a/api/src/test/java/marquez/service/LineageServiceTest.java b/api/src/test/java/marquez/service/LineageServiceTest.java index 7f6828dfa0..61be6eb605 100644 --- a/api/src/test/java/marquez/service/LineageServiceTest.java +++ b/api/src/test/java/marquez/service/LineageServiceTest.java @@ -6,14 +6,19 @@ package marquez.service; import static marquez.db.LineageTestUtils.NAMESPACE; +import static marquez.db.LineageTestUtils.PRODUCER_URL; +import static marquez.db.LineageTestUtils.SCHEMA_URL; import static marquez.db.LineageTestUtils.newDatasetFacet; import static marquez.db.LineageTestUtils.writeDownstreamLineage; import static org.assertj.core.api.Assertions.assertThat; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertNull; import java.util.Arrays; import java.util.LinkedList; import java.util.List; import java.util.Optional; +import java.util.UUID; import marquez.api.JdbiUtils; import marquez.common.models.DatasetName; import marquez.common.models.InputDatasetVersion; @@ -30,11 +35,15 @@ import marquez.db.OpenLineageDao; import marquez.db.models.UpdateLineageRow; import marquez.jdbi.MarquezJdbiExternalPostgresExtension; +import marquez.service.LineageService.ParentLineage; import marquez.service.models.Edge; import marquez.service.models.JobData; import marquez.service.models.Lineage; import marquez.service.models.LineageEvent.Dataset; import marquez.service.models.LineageEvent.JobFacet; +import marquez.service.models.LineageEvent.JobLink; +import marquez.service.models.LineageEvent.ParentRunFacet; +import marquez.service.models.LineageEvent.RunLink; import marquez.service.models.LineageEvent.SchemaField; import marquez.service.models.Node; import marquez.service.models.NodeId; @@ -433,6 +442,74 @@ public void testLineageForOrphanedDataset() { .containsExactlyInAnyOrder(datasetNodeId); } + @Test + public void testParentLineage() { + String parentJobName1 = "parentJob1"; + String parentJobName2 = "parentJob2"; + ParentRunFacet parentRunFacet1 = + new ParentRunFacet( + PRODUCER_URL, + SCHEMA_URL, + new RunLink(UUID.randomUUID().toString()), + JobLink.builder().namespace(NAMESPACE).name(parentJobName1).build()); + ParentRunFacet parentRunFacet2 = + new ParentRunFacet( + PRODUCER_URL, + SCHEMA_URL, + new RunLink(UUID.randomUUID().toString()), + JobLink.builder().namespace(NAMESPACE).name(parentJobName2).build()); + UpdateLineageRow writeJob = + LineageTestUtils.createLineageRow( + openLineageDao, + "writeJob", + "COMPLETE", + jobFacet, + Arrays.asList(), + Arrays.asList(dataset), + parentRunFacet1); + List jobRows = + LineageTestUtils.writeDownstreamLineageWithParent( + openLineageDao, + new LinkedList<>( + Arrays.asList( + new DatasetConsumerJob("readJob", 2, Optional.of("outputData")), + new DatasetConsumerJob("downstreamJob", 1, Optional.of("outputData2")), + new DatasetConsumerJob("finalConsumer", 1, Optional.empty()))), + jobFacet, + dataset, + parentRunFacet2); + + ParentLineage parentLineage = + lineageService.parentDirectLineage( + JobId.of(new NamespaceName(NAMESPACE), new JobName(parentJobName1))); + assertEquals(NAMESPACE, parentLineage.parent().getNamespace().getValue()); + assertEquals(parentJobName1, parentLineage.parent().getName().getValue()); + assertEquals(1, parentLineage.children().size()); + parentLineage + .children() + .forEach( + c -> { + assertEquals("parentJob1.writeJob", c.job().getName().getValue()); + assertNull(c.inputs()); + c.outputs() + .forEach( + i -> { + assertEquals(dataset.getName(), i.dataset().getName().getValue()); + i.consumers() + .forEach( + co -> { + assertThat(co.job().getName().getValue()) + .matches("parentJob2.readJob.*<-commonDataset"); + assertThat(co.parent().getName().getValue()) + .isEqualTo("parentJob2"); + // we don't go further than one level and don't see downstreamJob + // and finalConsumer + }); + assertNull(i.producers()); + }); + }); + } + private boolean jobNameEquals(Node node, String writeJob) { return node.getId().asJobId().getName().getValue().equals(writeJob); } diff --git a/clients/java/src/main/java/marquez/client/MarquezClient.java b/clients/java/src/main/java/marquez/client/MarquezClient.java index 3d55eac28f..4155fb75b1 100644 --- a/clients/java/src/main/java/marquez/client/MarquezClient.java +++ b/clients/java/src/main/java/marquez/client/MarquezClient.java @@ -21,6 +21,7 @@ import java.net.URL; import java.time.Instant; import java.time.ZonedDateTime; +import java.util.Collection; import java.util.List; import java.util.Properties; import java.util.Set; @@ -35,9 +36,11 @@ import lombok.Value; import lombok.extern.slf4j.Slf4j; import marquez.client.models.Dataset; +import marquez.client.models.DatasetId; import marquez.client.models.DatasetMeta; import marquez.client.models.DatasetVersion; import marquez.client.models.Job; +import marquez.client.models.JobId; import marquez.client.models.JobMeta; import marquez.client.models.JobVersion; import marquez.client.models.LineageEvent; @@ -125,6 +128,11 @@ public Lineage getLineage(NodeId nodeId, int depth) { return Lineage.fromJson(bodyAsJson); } + public ParentLineage getDirectLineage(JobId parentJobId) { + final String bodyAsJson = http.get(url.toDirectLineageUrl(parentJobId)); + return ParentLineage.fromJson(bodyAsJson); + } + public Lineage getColumnLineage(NodeId nodeId) { return getColumnLineage(nodeId, DEFAULT_LINEAGE_GRAPH_DEPTH, false); } @@ -703,4 +711,20 @@ String toJson() { return Utils.toJson(this); } } + + public record JobWithParent(JobId job, JobId parent) {} + + public record DatasetLineage( + DatasetId dataset, + Collection consumers, + Collection producers) {} + + public record ChildLineage( + JobId job, Collection inputs, Collection outputs) {} + + public record ParentLineage(JobId parent, Collection children) { + static ParentLineage fromJson(final String json) { + return Utils.fromJson(json, new TypeReference() {}); + } + } } diff --git a/clients/java/src/main/java/marquez/client/MarquezPathV1.java b/clients/java/src/main/java/marquez/client/MarquezPathV1.java index 2ceba79f9e..56fcdc0af2 100644 --- a/clients/java/src/main/java/marquez/client/MarquezPathV1.java +++ b/clients/java/src/main/java/marquez/client/MarquezPathV1.java @@ -182,6 +182,10 @@ static String lineagePath() { return path("/lineage/"); } + static String directLineagePath() { + return path("/lineage/direct"); + } + static String columnLineagePath() { return path("/column-lineage/"); } diff --git a/clients/java/src/main/java/marquez/client/MarquezUrl.java b/clients/java/src/main/java/marquez/client/MarquezUrl.java index cc460c4a4f..73fed0a83b 100644 --- a/clients/java/src/main/java/marquez/client/MarquezUrl.java +++ b/clients/java/src/main/java/marquez/client/MarquezUrl.java @@ -13,6 +13,7 @@ import static marquez.client.MarquezPathV1.datasetPath; import static marquez.client.MarquezPathV1.datasetTagPath; import static marquez.client.MarquezPathV1.datasetVersionPath; +import static marquez.client.MarquezPathV1.directLineagePath; import static marquez.client.MarquezPathV1.fieldTagPath; import static marquez.client.MarquezPathV1.jobPath; import static marquez.client.MarquezPathV1.jobVersionPath; @@ -31,8 +32,6 @@ import static marquez.client.MarquezPathV1.searchPath; import static marquez.client.MarquezPathV1.sourcePath; -import com.google.common.annotations.VisibleForTesting; -import com.google.common.collect.ImmutableMap; import java.net.MalformedURLException; import java.net.URI; import java.net.URISyntaxException; @@ -41,13 +40,20 @@ import java.time.ZonedDateTime; import java.util.HashMap; import java.util.Map; + import javax.annotation.Nullable; + +import org.apache.http.client.utils.URIBuilder; + +import com.google.common.annotations.VisibleForTesting; +import com.google.common.collect.ImmutableMap; + import lombok.NonNull; +import marquez.client.models.JobId; import marquez.client.models.NodeId; import marquez.client.models.RunState; import marquez.client.models.SearchFilter; import marquez.client.models.SearchSort; -import org.apache.http.client.utils.URIBuilder; class MarquezUrl { @@ -197,7 +203,7 @@ URL toCreateTagsUrl(String name) { URL toSearchUrl( @NonNull String query, @Nullable SearchFilter filter, @Nullable SearchSort sort, int limit) { - final ImmutableMap.Builder queryParams = new ImmutableMap.Builder(); + final ImmutableMap.Builder queryParams = new ImmutableMap.Builder<>(); queryParams.put("q", query); if (filter != null) { queryParams.put("filter", filter); @@ -210,17 +216,24 @@ URL toSearchUrl( } URL toLineageUrl(NodeId nodeId, int depth) { - final ImmutableMap.Builder queryParams = new ImmutableMap.Builder(); + final ImmutableMap.Builder queryParams = new ImmutableMap.Builder<>(); queryParams.put("nodeId", nodeId.getValue()); queryParams.put("depth", String.valueOf(depth)); return from(lineagePath(), queryParams.build()); } URL toColumnLineageUrl(NodeId nodeId, int depth, boolean withDownstream) { - final ImmutableMap.Builder queryParams = new ImmutableMap.Builder(); + final ImmutableMap.Builder queryParams = new ImmutableMap.Builder<>(); queryParams.put("nodeId", nodeId.getValue()); queryParams.put("depth", String.valueOf(depth)); queryParams.put("withDownstream", String.valueOf(withDownstream)); return from(columnLineagePath(), queryParams.build()); } + + public URL toDirectLineageUrl(@NonNull JobId parentJobId) { + final ImmutableMap.Builder queryParams = new ImmutableMap.Builder<>(); + queryParams.put("parentJobNodeId", NodeId.of(parentJobId).getValue()); + return from(directLineagePath(), queryParams.build()); + } + }