diff --git a/pom.xml b/pom.xml index 77e87bafa9f5..dbcc9318e913 100644 --- a/pom.xml +++ b/pom.xml @@ -112,6 +112,7 @@ 1.2.0 1.0-rc2 1.1 + 0.6.0 1.22.0 1.4.5 0.5.160304 @@ -524,6 +525,26 @@ + + com.google.auth + google-auth-library-credentials + ${google-auth.version} + + + + com.google.auth + google-auth-library-oauth2-http + ${google-auth.version} + + + + com.google.guava + guava-jdk5 + + + + com.google.apis google-api-services-bigquery diff --git a/runners/google-cloud-dataflow-java/pom.xml b/runners/google-cloud-dataflow-java/pom.xml index 0a2c9f6a8873..f74e15f7ad9c 100644 --- a/runners/google-cloud-dataflow-java/pom.xml +++ b/runners/google-cloud-dataflow-java/pom.xml @@ -163,11 +163,6 @@ google-api-client - - com.google.oauth-client - google-oauth-client - - com.google.http-client google-http-client @@ -194,6 +189,16 @@ google-api-services-clouddebugger + + com.google.auth + google-auth-library-credentials + + + + com.google.auth + google-auth-library-oauth2-http + + com.google.cloud.bigdataoss util diff --git a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/util/DataflowTransport.java b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/util/DataflowTransport.java index 0391594ed57e..e0026de0a4a9 100644 --- a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/util/DataflowTransport.java +++ b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/util/DataflowTransport.java @@ -20,10 +20,11 @@ import static org.apache.beam.sdk.util.Transport.getJsonFactory; import static org.apache.beam.sdk.util.Transport.getTransport; -import com.google.api.client.auth.oauth2.Credential; import com.google.api.client.http.HttpRequestInitializer; import com.google.api.services.clouddebugger.v2.Clouddebugger; import com.google.api.services.dataflow.Dataflow; +import com.google.auth.Credentials; +import com.google.auth.http.HttpCredentialsAdapter; import com.google.cloud.hadoop.util.ChainingHttpRequestInitializer; import com.google.common.collect.ImmutableList; import java.net.MalformedURLException; @@ -91,11 +92,13 @@ public static Clouddebugger.Builder newClouddebuggerClient(DataflowPipelineOptio } private static HttpRequestInitializer chainHttpRequestInitializer( - Credential credential, HttpRequestInitializer httpRequestInitializer) { + Credentials credential, HttpRequestInitializer httpRequestInitializer) { if (credential == null) { return httpRequestInitializer; } else { - return new ChainingHttpRequestInitializer(credential, httpRequestInitializer); + return new ChainingHttpRequestInitializer( + new HttpCredentialsAdapter(credential), + httpRequestInitializer); } } } diff --git a/sdks/java/core/pom.xml b/sdks/java/core/pom.xml index 7d4d9abb33f6..4dd02f32df04 100644 --- a/sdks/java/core/pom.xml +++ b/sdks/java/core/pom.xml @@ -248,18 +248,14 @@ runtime + + com.google.auth + google-auth-library-credentials + + com.google.auth google-auth-library-oauth2-http - 0.4.0 - - - - com.google.guava - guava-jdk5 - - @@ -321,11 +317,6 @@ runtime - - com.google.oauth-client - google-oauth-client-java6 - - com.google.oauth-client google-oauth-client diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/options/GcpOptions.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/options/GcpOptions.java index 0ea6faf9ffe7..ffdab9838e05 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/options/GcpOptions.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/options/GcpOptions.java @@ -18,8 +18,7 @@ package org.apache.beam.sdk.options; import com.fasterxml.jackson.annotation.JsonIgnore; -import com.google.api.client.auth.oauth2.Credential; -import com.google.api.client.googleapis.auth.oauth2.GoogleOAuthConstants; +import com.google.auth.Credentials; import com.google.common.annotations.VisibleForTesting; import com.google.common.base.Strings; import com.google.common.io.Files; @@ -40,31 +39,14 @@ import org.slf4j.LoggerFactory; /** - * Options used to configure Google Cloud Platform project and credentials. + * Options used to configure Google Cloud Platform specific options such as the project + * and credentials. * - *

These options configure which of the following three different mechanisms for obtaining a - * credential are used: - *

    - *
  1. - * It can fetch the - * - * application default credentials. - *
  2. - *
  3. - * The user can specify a client secrets file and go through the OAuth2 - * webflow. The credential will then be cached in the user's home - * directory for reuse. - *
  4. - *
  5. - * The user can specify a file containing a service account private key along - * with the service account name. - *
  6. - *
- * - *

The default mechanism is to use the + *

These options defer to the * - * application default credentials. The other options can be - * used by setting the corresponding properties. + * application default credentials for authentication. See the + * Google Auth Library for + * alternative mechanisms for creating credentials. */ @Description("Options used to configure Google Cloud Platform project and credentials.") public interface GcpOptions extends GoogleApiDebugOptions, PipelineOptions { @@ -77,79 +59,6 @@ public interface GcpOptions extends GoogleApiDebugOptions, PipelineOptions { String getProject(); void setProject(String value); - /** - * This option controls which file to use when attempting to create the credentials using the - * service account method. - * - *

This option if specified, needs be combined with the - * {@link GcpOptions#getServiceAccountName() serviceAccountName}. - */ - @JsonIgnore - @Description("Controls which file to use when attempting to create the credentials " - + "using the service account method. This option if specified, needs to be combined with " - + "the serviceAccountName option.") - String getServiceAccountKeyfile(); - void setServiceAccountKeyfile(String value); - - /** - * This option controls which service account to use when attempting to create the credentials - * using the service account method. - * - *

This option if specified, needs be combined with the - * {@link GcpOptions#getServiceAccountKeyfile() serviceAccountKeyfile}. - */ - @JsonIgnore - @Description("Controls which service account to use when attempting to create the credentials " - + "using the service account method. This option if specified, needs to be combined with " - + "the serviceAccountKeyfile option.") - String getServiceAccountName(); - void setServiceAccountName(String value); - - /** - * This option controls which file to use when attempting to create the credentials - * using the OAuth 2 webflow. After the OAuth2 webflow, the credentials will be stored - * within credentialDir. - */ - @JsonIgnore - @Description("This option controls which file to use when attempting to create the credentials " - + "using the OAuth 2 webflow. After the OAuth2 webflow, the credentials will be stored " - + "within credentialDir.") - String getSecretsFile(); - void setSecretsFile(String value); - - /** - * This option controls which credential store to use when creating the credentials - * using the OAuth 2 webflow. - */ - @Description("This option controls which credential store to use when creating the credentials " - + "using the OAuth 2 webflow.") - @Default.String("cloud_dataflow") - String getCredentialId(); - void setCredentialId(String value); - - /** - * Directory for storing dataflow credentials after execution of the OAuth 2 webflow. Defaults - * to using the $HOME/.store/data-flow directory. - */ - @Description("Directory for storing dataflow credentials after execution of the OAuth 2 webflow. " - + "Defaults to using the $HOME/.store/data-flow directory.") - @Default.InstanceFactory(CredentialDirFactory.class) - String getCredentialDir(); - void setCredentialDir(String value); - - /** - * Returns the default credential directory of ${user.home}/.store/data-flow. - */ - class CredentialDirFactory implements DefaultValueFactory { - @Override - public String create(PipelineOptions options) { - File home = new File(System.getProperty("user.home")); - File store = new File(home, ".store"); - File dataflow = new File(store, "data-flow"); - return dataflow.getPath(); - } - } - /** * The class of the credential factory that should be created and used to create * credentials. If gcpCredential has not been set explicitly, an instance of this class will @@ -173,9 +82,8 @@ void setCredentialFactoryClass( + "If no credential has been set explicitly, the default is to use the instance factory " + "that constructs a credential based upon the currently set credentialFactoryClass.") @Default.InstanceFactory(GcpUserCredentialsFactory.class) - @Hidden - Credential getGcpCredential(); - void setGcpCredential(Credential value); + Credentials getGcpCredential(); + void setGcpCredential(Credentials value); /** * Attempts to infer the default project based upon the environment this application @@ -251,9 +159,9 @@ Map getEnvironment() { * Attempts to load the GCP credentials. See * {@link CredentialFactory#getCredential()} for more details. */ - class GcpUserCredentialsFactory implements DefaultValueFactory { + class GcpUserCredentialsFactory implements DefaultValueFactory { @Override - public Credential create(PipelineOptions options) { + public Credentials create(PipelineOptions options) { GcpOptions gcpOptions = options.as(GcpOptions.class); try { CredentialFactory factory = InstanceBuilder.ofType(CredentialFactory.class) @@ -268,28 +176,6 @@ public Credential create(PipelineOptions options) { } } - /** - * The token server URL to use for OAuth 2 authentication. Normally, the default is sufficient, - * but some specialized use cases may want to override this value. - */ - @Description("The token server URL to use for OAuth 2 authentication. Normally, the default " - + "is sufficient, but some specialized use cases may want to override this value.") - @Default.String(GoogleOAuthConstants.TOKEN_SERVER_URL) - @Hidden - String getTokenServerUrl(); - void setTokenServerUrl(String value); - - /** - * The authorization server URL to use for OAuth 2 authentication. Normally, the default is - * sufficient, but some specialized use cases may want to override this value. - */ - @Description("The authorization server URL to use for OAuth 2 authentication. Normally, the " - + "default is sufficient, but some specialized use cases may want to override this value.") - @Default.String(GoogleOAuthConstants.AUTHORIZATION_SERVER_URL) - @Hidden - String getAuthorizationServerEncodedUrl(); - void setAuthorizationServerEncodedUrl(String value); - /** * A GCS path for storing temporary files in GCP. * diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/testing/BigqueryMatcher.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/testing/BigqueryMatcher.java index 95208cebac4f..9b8589a8b365 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/testing/BigqueryMatcher.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/testing/BigqueryMatcher.java @@ -19,8 +19,6 @@ import static com.google.common.base.Preconditions.checkArgument; -import com.google.api.client.auth.oauth2.Credential; -import com.google.api.client.googleapis.auth.oauth2.GoogleCredential; import com.google.api.client.http.HttpTransport; import com.google.api.client.json.JsonFactory; import com.google.api.client.util.BackOff; @@ -32,6 +30,9 @@ import com.google.api.services.bigquery.model.QueryResponse; import com.google.api.services.bigquery.model.TableCell; import com.google.api.services.bigquery.model.TableRow; +import com.google.auth.Credentials; +import com.google.auth.http.HttpCredentialsAdapter; +import com.google.auth.oauth2.GoogleCredentials; import com.google.common.annotations.VisibleForTesting; import com.google.common.base.Strings; import com.google.common.collect.Lists; @@ -136,9 +137,9 @@ protected boolean matchesSafely(PipelineResult pipelineResult) { Bigquery newBigqueryClient(String applicationName) { HttpTransport transport = Transport.getTransport(); JsonFactory jsonFactory = Transport.getJsonFactory(); - Credential credential = getDefaultCredential(transport, jsonFactory); + Credentials credential = getDefaultCredential(); - return new Bigquery.Builder(transport, jsonFactory, credential) + return new Bigquery.Builder(transport, jsonFactory, new HttpCredentialsAdapter(credential)) .setApplicationName(applicationName) .build(); } @@ -168,10 +169,10 @@ private void validateArgument(String name, String value) { !Strings.isNullOrEmpty(value), "Expected valid %s, but was %s", name, value); } - private Credential getDefaultCredential(HttpTransport transport, JsonFactory jsonFactory) { - GoogleCredential credential; + private Credentials getDefaultCredential() { + GoogleCredentials credential; try { - credential = GoogleCredential.getApplicationDefault(transport, jsonFactory); + credential = GoogleCredentials.getApplicationDefault(); } catch (IOException e) { throw new RuntimeException("Failed to get application default credential.", e); } diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/CredentialFactory.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/CredentialFactory.java index 884a77a38466..622965087497 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/CredentialFactory.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/CredentialFactory.java @@ -17,7 +17,7 @@ */ package org.apache.beam.sdk.util; -import com.google.api.client.auth.oauth2.Credential; +import com.google.auth.Credentials; import java.io.IOException; import java.security.GeneralSecurityException; @@ -25,5 +25,5 @@ * Construct an oauth credential to be used by the SDK and the SDK workers. */ public interface CredentialFactory { - Credential getCredential() throws IOException, GeneralSecurityException; + Credentials getCredential() throws IOException, GeneralSecurityException; } diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/Credentials.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/Credentials.java deleted file mode 100644 index 1e77f4dc324a..000000000000 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/Credentials.java +++ /dev/null @@ -1,192 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.beam.sdk.util; - -import static com.google.common.base.Preconditions.checkArgument; - -import com.google.api.client.auth.oauth2.Credential; -import com.google.api.client.extensions.java6.auth.oauth2.AbstractPromptReceiver; -import com.google.api.client.extensions.java6.auth.oauth2.AuthorizationCodeInstalledApp; -import com.google.api.client.googleapis.auth.oauth2.GoogleAuthorizationCodeFlow; -import com.google.api.client.googleapis.auth.oauth2.GoogleClientSecrets; -import com.google.api.client.googleapis.auth.oauth2.GoogleCredential; -import com.google.api.client.googleapis.auth.oauth2.GoogleOAuthConstants; -import com.google.api.client.googleapis.javanet.GoogleNetHttpTransport; -import com.google.api.client.http.GenericUrl; -import com.google.api.client.http.HttpTransport; -import com.google.api.client.json.JsonFactory; -import com.google.api.client.json.jackson2.JacksonFactory; -import com.google.api.client.util.store.FileDataStoreFactory; -import java.io.File; -import java.io.FileReader; -import java.io.IOException; -import java.security.GeneralSecurityException; -import java.util.Arrays; -import java.util.Collection; -import java.util.List; -import org.apache.beam.sdk.options.GcpOptions; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -/** - * Provides support for loading credentials. - */ -public class Credentials { - - private static final Logger LOG = LoggerFactory.getLogger(Credentials.class); - - /** - * OAuth 2.0 scopes used by a local worker (not on GCE). - * The scope cloud-platform provides access to all Cloud Platform resources. - * cloud-platform isn't sufficient yet for talking to datastore so we request - * those resources separately. - * - *

Note that trusted scope relationships don't apply to OAuth tokens, so for - * services we access directly (GCS) as opposed to through the backend - * (BigQuery, GCE), we need to explicitly request that scope. - */ - private static final List SCOPES = Arrays.asList( - "https://www.googleapis.com/auth/cloud-platform", - "https://www.googleapis.com/auth/devstorage.full_control", - "https://www.googleapis.com/auth/userinfo.email", - "https://www.googleapis.com/auth/datastore"); - - private static class PromptReceiver extends AbstractPromptReceiver { - @Override - public String getRedirectUri() { - return GoogleOAuthConstants.OOB_REDIRECT_URI; - } - } - - /** - * Initializes OAuth2 credentials. - * - *

This can use 3 different mechanisms for obtaining a credential: - *

    - *
  1. - * It can fetch the - * - * application default credentials. - *
  2. - *
  3. - * The user can specify a client secrets file and go through the OAuth2 - * webflow. The credential will then be cached in the user's home - * directory for reuse. Provide the property "secrets_file" to use this - * mechanism. - *
  4. - *
  5. - * The user can specify a file containing a service account. - * Provide the properties "service_account_keyfile" and - * "service_account_name" to use this mechanism. - *
  6. - *
- * The default mechanism is to use the - * - * application default credentials. The other options can be used by providing the - * corresponding properties. - */ - public static Credential getCredential(GcpOptions options) - throws IOException, GeneralSecurityException { - String keyFile = options.getServiceAccountKeyfile(); - String accountName = options.getServiceAccountName(); - - if (keyFile != null && accountName != null) { - try { - return getCredentialFromFile(keyFile, accountName, SCOPES); - } catch (GeneralSecurityException e) { - throw new IOException("Unable to obtain credentials from file", e); - } - } - - if (options.getSecretsFile() != null) { - return getCredentialFromClientSecrets(options, SCOPES); - } - - try { - return GoogleCredential.getApplicationDefault().createScoped(SCOPES); - } catch (IOException e) { - throw new RuntimeException("Unable to get application default credentials. Please see " - + "https://developers.google.com/accounts/docs/application-default-credentials " - + "for details on how to specify credentials. This version of the SDK is " - + "dependent on the gcloud core component version 2015.02.05 or newer to " - + "be able to get credentials from the currently authorized user via gcloud auth.", e); - } - } - - /** - * Loads OAuth2 credential from a local file. - */ - private static Credential getCredentialFromFile( - String keyFile, String accountId, Collection scopes) - throws IOException, GeneralSecurityException { - GoogleCredential credential = new GoogleCredential.Builder() - .setTransport(Transport.getTransport()) - .setJsonFactory(Transport.getJsonFactory()) - .setServiceAccountId(accountId) - .setServiceAccountScopes(scopes) - .setServiceAccountPrivateKeyFromP12File(new File(keyFile)) - .build(); - - LOG.info("Created credential from file {}", keyFile); - return credential; - } - - /** - * Loads OAuth2 credential from client secrets, which may require an - * interactive authorization prompt. - */ - private static Credential getCredentialFromClientSecrets( - GcpOptions options, Collection scopes) - throws IOException, GeneralSecurityException { - String clientSecretsFile = options.getSecretsFile(); - - checkArgument(clientSecretsFile != null); - HttpTransport httpTransport = GoogleNetHttpTransport.newTrustedTransport(); - - JsonFactory jsonFactory = JacksonFactory.getDefaultInstance(); - GoogleClientSecrets clientSecrets; - - try { - clientSecrets = GoogleClientSecrets.load(jsonFactory, - new FileReader(clientSecretsFile)); - } catch (IOException e) { - throw new RuntimeException( - "Could not read the client secrets from file: " + clientSecretsFile, - e); - } - - FileDataStoreFactory dataStoreFactory = - new FileDataStoreFactory(new java.io.File(options.getCredentialDir())); - - GoogleAuthorizationCodeFlow flow = new GoogleAuthorizationCodeFlow.Builder( - httpTransport, jsonFactory, clientSecrets, scopes) - .setDataStoreFactory(dataStoreFactory) - .setTokenServerUrl(new GenericUrl(options.getTokenServerUrl())) - .setAuthorizationServerEncodedUrl(options.getAuthorizationServerEncodedUrl()) - .build(); - - // The credentialId identifies the credential if we're using a persistent - // credential store. - Credential credential = - new AuthorizationCodeInstalledApp(flow, new PromptReceiver()) - .authorize(options.getCredentialId()); - - LOG.info("Got credential from client secret"); - return credential; - } -} diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/GcpCredentialFactory.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/GcpCredentialFactory.java index 0497e750bbe1..feb93f7b6db3 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/GcpCredentialFactory.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/GcpCredentialFactory.java @@ -17,10 +17,11 @@ */ package org.apache.beam.sdk.util; -import com.google.api.client.auth.oauth2.Credential; +import com.google.auth.Credentials; +import com.google.auth.oauth2.GoogleCredentials; import java.io.IOException; -import java.security.GeneralSecurityException; -import org.apache.beam.sdk.options.GcpOptions; +import java.util.Arrays; +import java.util.List; import org.apache.beam.sdk.options.PipelineOptions; /** @@ -28,18 +29,38 @@ * Returns a GCP credential. */ public class GcpCredentialFactory implements CredentialFactory { - private GcpOptions options; + /** + * The scope cloud-platform provides access to all Cloud Platform resources. + * cloud-platform isn't sufficient yet for talking to datastore so we request + * those resources separately. + * + *

Note that trusted scope relationships don't apply to OAuth tokens, so for + * services we access directly (GCS) as opposed to through the backend + * (BigQuery, GCE), we need to explicitly request that scope. + */ + private static final List SCOPES = Arrays.asList( + "https://www.googleapis.com/auth/cloud-platform", + "https://www.googleapis.com/auth/devstorage.full_control", + "https://www.googleapis.com/auth/userinfo.email", + "https://www.googleapis.com/auth/datastore", + "https://www.googleapis.com/auth/pubsub"); - private GcpCredentialFactory(GcpOptions options) { - this.options = options; - } + private static final GcpCredentialFactory INSTANCE = new GcpCredentialFactory(); public static GcpCredentialFactory fromOptions(PipelineOptions options) { - return new GcpCredentialFactory(options.as(GcpOptions.class)); + return INSTANCE; } @Override - public Credential getCredential() throws IOException, GeneralSecurityException { - return Credentials.getCredential(options); + public Credentials getCredential() throws IOException { + try { + return GoogleCredentials.getApplicationDefault().createScoped(SCOPES); + } catch (IOException e) { + throw new RuntimeException("Unable to get application default credentials. Please see " + + "https://developers.google.com/accounts/docs/application-default-credentials " + + "for details on how to specify credentials. This version of the SDK is " + + "dependent on the gcloud core component version 2015.02.05 or newer to " + + "be able to get credentials from the currently authorized user via gcloud auth.", e); + } } } diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/NoopCredentialFactory.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/NoopCredentialFactory.java index 5d9255286a77..29c3e728e56b 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/NoopCredentialFactory.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/NoopCredentialFactory.java @@ -17,9 +17,8 @@ */ package org.apache.beam.sdk.util; -import com.google.api.client.auth.oauth2.Credential; +import com.google.auth.Credentials; import java.io.IOException; -import java.security.GeneralSecurityException; import org.apache.beam.sdk.options.PipelineOptions; /** @@ -27,12 +26,14 @@ * Always returns a null Credential object. */ public class NoopCredentialFactory implements CredentialFactory { + private static final NoopCredentialFactory INSTANCE = new NoopCredentialFactory(); + public static NoopCredentialFactory fromOptions(PipelineOptions options) { - return new NoopCredentialFactory(); + return INSTANCE; } @Override - public Credential getCredential() throws IOException, GeneralSecurityException { + public Credentials getCredential() throws IOException { return null; } } diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/PubsubGrpcClient.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/PubsubGrpcClient.java index 02152ba49ac0..201877c031fa 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/PubsubGrpcClient.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/PubsubGrpcClient.java @@ -20,7 +20,7 @@ import static com.google.common.base.Preconditions.checkState; -import com.google.auth.oauth2.GoogleCredentials; +import com.google.auth.Credentials; import com.google.common.annotations.VisibleForTesting; import com.google.common.base.Strings; import com.google.common.collect.ImmutableList; @@ -56,7 +56,6 @@ import io.grpc.netty.NettyChannelBuilder; import java.io.IOException; import java.util.ArrayList; -import java.util.Collections; import java.util.List; import java.util.Map; import java.util.concurrent.Executors; @@ -74,10 +73,6 @@ public class PubsubGrpcClient extends PubsubClient { private static final String PUBSUB_ADDRESS = "pubsub.googleapis.com"; private static final int PUBSUB_PORT = 443; - // Will be needed when credentials are correctly constructed and scoped. - @SuppressWarnings("unused") - private static final List PUBSUB_SCOPES = - Collections.singletonList("https://www.googleapis.com/auth/pubsub"); private static final int LIST_BATCH_SIZE = 1000; private static final int DEFAULT_TIMEOUT_S = 15; @@ -92,15 +87,12 @@ public PubsubClient newClient( .negotiationType(NegotiationType.TLS) .sslContext(GrpcSslContexts.forClient().ciphers(null).build()) .build(); - // TODO: GcpOptions needs to support building com.google.auth.oauth2.Credentials from the - // various command line options. It currently only supports the older - // com.google.api.client.auth.oauth2.Credentials. - GoogleCredentials credentials = GoogleCredentials.getApplicationDefault(); + return new PubsubGrpcClient(timestampLabel, idLabel, DEFAULT_TIMEOUT_S, channel, - credentials); + options.getGcpCredential()); } @Override @@ -128,7 +120,7 @@ public String getKind() { /** * Credentials determined from options and environment. */ - private final GoogleCredentials credentials; + private final Credentials credentials; /** * Label to use for custom timestamps, or {@literal null} if should use Pubsub publish time @@ -157,7 +149,7 @@ public String getKind() { @Nullable String idLabel, int timeoutSec, ManagedChannel publisherChannel, - GoogleCredentials credentials) { + Credentials credentials) { this.timestampLabel = timestampLabel; this.idLabel = idLabel; this.timeoutSec = timeoutSec; diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/PubsubJsonClient.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/PubsubJsonClient.java index bdb5c04f8cbf..215a1364908d 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/PubsubJsonClient.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/PubsubJsonClient.java @@ -20,7 +20,6 @@ import static com.google.common.base.Preconditions.checkState; -import com.google.api.client.auth.oauth2.Credential; import com.google.api.client.http.HttpRequestInitializer; import com.google.api.services.pubsub.Pubsub; import com.google.api.services.pubsub.Pubsub.Builder; @@ -36,6 +35,8 @@ import com.google.api.services.pubsub.model.ReceivedMessage; import com.google.api.services.pubsub.model.Subscription; import com.google.api.services.pubsub.model.Topic; +import com.google.auth.Credentials; +import com.google.auth.http.HttpCredentialsAdapter; import com.google.cloud.hadoop.util.ChainingHttpRequestInitializer; import com.google.common.annotations.VisibleForTesting; import com.google.common.base.Strings; @@ -55,11 +56,13 @@ public class PubsubJsonClient extends PubsubClient { private static class PubsubJsonClientFactory implements PubsubClientFactory { private static HttpRequestInitializer chainHttpRequestInitializer( - Credential credential, HttpRequestInitializer httpRequestInitializer) { + Credentials credential, HttpRequestInitializer httpRequestInitializer) { if (credential == null) { return httpRequestInitializer; } else { - return new ChainingHttpRequestInitializer(credential, httpRequestInitializer); + return new ChainingHttpRequestInitializer( + new HttpCredentialsAdapter(credential), + httpRequestInitializer); } } diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/TestCredential.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/TestCredential.java index 4b81a0ef54f2..f34527e34c10 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/TestCredential.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/TestCredential.java @@ -17,35 +17,43 @@ */ package org.apache.beam.sdk.util; -import com.google.api.client.auth.oauth2.BearerToken; -import com.google.api.client.auth.oauth2.Credential; -import com.google.api.client.auth.oauth2.TokenResponse; -import com.google.api.client.testing.http.MockHttpTransport; +import com.google.auth.Credentials; import java.io.IOException; +import java.net.URI; +import java.util.Collections; +import java.util.List; +import java.util.Map; /** * Fake credential, for use in testing. */ -public class TestCredential extends Credential { +public class TestCredential extends Credentials { + @Override + public String getAuthenticationType() { + return "Test"; + } - private final String token; + @Override + public Map> getRequestMetadata() throws IOException { + return Collections.emptyMap(); + } - public TestCredential() { - this("NULL"); + @Override + public Map> getRequestMetadata(URI uri) throws IOException { + return Collections.emptyMap(); } - public TestCredential(String token) { - super(new Builder( - BearerToken.authorizationHeaderAccessMethod()) - .setTransport(new MockHttpTransport())); - this.token = token; + @Override + public boolean hasRequestMetadata() { + return false; + } + + @Override + public boolean hasRequestMetadataOnly() { + return true; } @Override - protected TokenResponse executeRefreshToken() throws IOException { - TokenResponse response = new TokenResponse(); - response.setExpiresInSeconds(5L * 60); - response.setAccessToken(token); - return response; + public void refresh() throws IOException { } } diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/Transport.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/Transport.java index 1f612991e9e1..38eecc212aaa 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/Transport.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/Transport.java @@ -17,7 +17,6 @@ */ package org.apache.beam.sdk.util; -import com.google.api.client.auth.oauth2.Credential; import com.google.api.client.googleapis.javanet.GoogleNetHttpTransport; import com.google.api.client.http.HttpRequestInitializer; import com.google.api.client.http.HttpTransport; @@ -27,6 +26,8 @@ import com.google.api.services.cloudresourcemanager.CloudResourceManager; import com.google.api.services.pubsub.Pubsub; import com.google.api.services.storage.Storage; +import com.google.auth.Credentials; +import com.google.auth.http.HttpCredentialsAdapter; import com.google.cloud.hadoop.util.ChainingHttpRequestInitializer; import com.google.common.collect.ImmutableList; import java.io.IOException; @@ -161,11 +162,13 @@ private static ApiComponents apiComponentsFromUrl(String urlString) { } private static HttpRequestInitializer chainHttpRequestInitializer( - Credential credential, HttpRequestInitializer httpRequestInitializer) { + Credentials credential, HttpRequestInitializer httpRequestInitializer) { if (credential == null) { return httpRequestInitializer; } else { - return new ChainingHttpRequestInitializer(credential, httpRequestInitializer); + return new ChainingHttpRequestInitializer( + new HttpCredentialsAdapter(credential), + httpRequestInitializer); } } } diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/util/PubsubGrpcClientTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/util/PubsubGrpcClientTest.java index cbdf5da298b9..9767cdee05f3 100644 --- a/sdks/java/core/src/test/java/org/apache/beam/sdk/util/PubsubGrpcClientTest.java +++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/util/PubsubGrpcClientTest.java @@ -20,7 +20,7 @@ import static org.junit.Assert.assertEquals; -import com.google.auth.oauth2.GoogleCredentials; +import com.google.auth.Credentials; import com.google.common.collect.ImmutableList; import com.google.common.collect.ImmutableMap; import com.google.common.collect.Iterables; @@ -52,7 +52,6 @@ import org.junit.Test; import org.junit.runner.RunWith; import org.junit.runners.JUnit4; -import org.mockito.Mockito; /** * Tests for PubsubGrpcClient. @@ -60,7 +59,7 @@ @RunWith(JUnit4.class) public class PubsubGrpcClientTest { private ManagedChannel inProcessChannel; - private GoogleCredentials mockCredentials; + private Credentials testCredentials; private PubsubClient client; private String channelName; @@ -83,8 +82,8 @@ public void setup() { channelName = String.format("%s-%s", PubsubGrpcClientTest.class.getName(), ThreadLocalRandom.current().nextInt()); inProcessChannel = InProcessChannelBuilder.forName(channelName).directExecutor().build(); - mockCredentials = Mockito.mock(GoogleCredentials.class); - client = new PubsubGrpcClient(TIMESTAMP_LABEL, ID_LABEL, 10, inProcessChannel, mockCredentials); + testCredentials = new TestCredential(); + client = new PubsubGrpcClient(TIMESTAMP_LABEL, ID_LABEL, 10, inProcessChannel, testCredentials); } @After diff --git a/sdks/java/io/google-cloud-platform/pom.xml b/sdks/java/io/google-cloud-platform/pom.xml index fa5107259ad4..d61ee64ac931 100644 --- a/sdks/java/io/google-cloud-platform/pom.xml +++ b/sdks/java/io/google-cloud-platform/pom.xml @@ -173,8 +173,13 @@ - com.google.oauth-client - google-oauth-client + com.google.auth + google-auth-library-credentials + + + + com.google.auth + google-auth-library-oauth2-http diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/datastore/DatastoreV1.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/datastore/DatastoreV1.java index 1d1075c0cdbb..d1a9a67d2c1d 100644 --- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/datastore/DatastoreV1.java +++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/datastore/DatastoreV1.java @@ -31,11 +31,14 @@ import static com.google.datastore.v1.client.DatastoreHelper.makeUpsert; import static com.google.datastore.v1.client.DatastoreHelper.makeValue; -import com.google.api.client.auth.oauth2.Credential; +import com.google.api.client.http.HttpRequestInitializer; import com.google.api.client.util.BackOff; import com.google.api.client.util.BackOffUtils; import com.google.api.client.util.Sleeper; +import com.google.auth.Credentials; +import com.google.auth.http.HttpCredentialsAdapter; import com.google.auto.value.AutoValue; +import com.google.cloud.hadoop.util.ChainingHttpRequestInitializer; import com.google.common.annotations.VisibleForTesting; import com.google.common.base.MoreObjects; import com.google.common.collect.ImmutableList; @@ -229,6 +232,7 @@ public abstract static class Read extends PTransform @Nullable public abstract String getNamespace(); public abstract int getNumQuerySplits(); + @Override public abstract String toString(); abstract Builder toBuilder(); @@ -1005,17 +1009,20 @@ static class V1DatastoreFactory implements Serializable { /** Builds a Cloud Datastore client for the given pipeline options and project. */ public Datastore getDatastore(PipelineOptions pipelineOptions, String projectId) { + Credentials credential = pipelineOptions.as(GcpOptions.class).getGcpCredential(); + HttpRequestInitializer initializer; + if (credential != null) { + initializer = new ChainingHttpRequestInitializer( + new HttpCredentialsAdapter(credential), + new RetryHttpRequestInitializer()); + } else { + initializer = new RetryHttpRequestInitializer(); + } + DatastoreOptions.Builder builder = new DatastoreOptions.Builder() .projectId(projectId) - .initializer( - new RetryHttpRequestInitializer() - ); - - Credential credential = pipelineOptions.as(GcpOptions.class).getGcpCredential(); - if (credential != null) { - builder.credential(credential); - } + .initializer(initializer); return DatastoreFactory.get().create(builder.build()); } diff --git a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/datastore/V1TestUtil.java b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/datastore/V1TestUtil.java index b680a0e23e75..76a1fc87eb9f 100644 --- a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/datastore/V1TestUtil.java +++ b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/datastore/V1TestUtil.java @@ -25,10 +25,13 @@ import static com.google.datastore.v1.client.DatastoreHelper.makeUpsert; import static com.google.datastore.v1.client.DatastoreHelper.makeValue; -import com.google.api.client.auth.oauth2.Credential; +import com.google.api.client.http.HttpRequestInitializer; import com.google.api.client.util.BackOff; import com.google.api.client.util.BackOffUtils; import com.google.api.client.util.Sleeper; +import com.google.auth.Credentials; +import com.google.auth.http.HttpCredentialsAdapter; +import com.google.cloud.hadoop.util.ChainingHttpRequestInitializer; import com.google.datastore.v1.CommitRequest; import com.google.datastore.v1.Entity; import com.google.datastore.v1.EntityResult; @@ -131,17 +134,21 @@ public void processElement(ProcessContext c) throws Exception { * Build a new datastore client. */ static Datastore getDatastore(PipelineOptions pipelineOptions, String projectId) { + Credentials credential = pipelineOptions.as(GcpOptions.class).getGcpCredential(); + HttpRequestInitializer initializer; + if (credential != null) { + initializer = new ChainingHttpRequestInitializer( + new HttpCredentialsAdapter(credential), + new RetryHttpRequestInitializer()); + } else { + initializer = new RetryHttpRequestInitializer(); + } + DatastoreOptions.Builder builder = new DatastoreOptions.Builder() .projectId(projectId) - .initializer( - new RetryHttpRequestInitializer() - ); + .initializer(initializer); - Credential credential = pipelineOptions.as(GcpOptions.class).getGcpCredential(); - if (credential != null) { - builder.credential(credential); - } return DatastoreFactory.get().create(builder.build()); } @@ -209,6 +216,7 @@ interface MutationBuilder { *A MutationBuilder that performs upsert operation. */ static class UpsertMutationBuilder implements MutationBuilder { + @Override public Mutation.Builder apply(Entity entity) { return makeUpsert(entity); } @@ -218,6 +226,7 @@ public Mutation.Builder apply(Entity entity) { * A MutationBuilder that performs delete operation. */ static class DeleteMutationBuilder implements MutationBuilder { + @Override public Mutation.Builder apply(Entity entity) { return makeDelete(entity.getKey()); }