-
Notifications
You must be signed in to change notification settings - Fork 1.6k
Upgrading to oauth2client 2.0. #1479
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Merged
+88
−121
Merged
Changes from all commits
Commits
File filter
Filter by extension
Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
There are no files selected for viewing
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -90,47 +90,31 @@ def _callFUT(self, client_email, private_key_path, scope=None): | |
| def test_it(self): | ||
| from gcloud import credentials as MUT | ||
| from gcloud._testing import _Monkey | ||
| from gcloud._testing import _NamedTemporaryFile | ||
|
|
||
| CLIENT_EMAIL = '[email protected]' | ||
| PRIVATE_KEY = b'SEEkR1t' | ||
| client = _Client() | ||
| with _Monkey(MUT, client=client): | ||
| with _NamedTemporaryFile() as temp: | ||
| with open(temp.name, 'wb') as file_obj: | ||
| file_obj.write(PRIVATE_KEY) | ||
| found = self._callFUT(CLIENT_EMAIL, temp.name) | ||
| MOCK_FILENAME = 'foo.path' | ||
| MOCK_CRED_CLASS = _MockServiceAccountCredentials() | ||
| with _Monkey(MUT, ServiceAccountCredentials=MOCK_CRED_CLASS): | ||
| found = self._callFUT(CLIENT_EMAIL, MOCK_FILENAME) | ||
|
|
||
| self.assertTrue(found is client._signed) | ||
| expected_called_with = { | ||
| 'service_account_name': CLIENT_EMAIL, | ||
| 'private_key': PRIVATE_KEY, | ||
| 'scope': None, | ||
| } | ||
| self.assertEqual(client._called_with, expected_called_with) | ||
| self.assertTrue(found is MOCK_CRED_CLASS._result) | ||
| self.assertEqual(MOCK_CRED_CLASS.p12_called, | ||
| [(CLIENT_EMAIL, MOCK_FILENAME, None)]) | ||
|
|
||
| def test_it_with_scope(self): | ||
| from gcloud import credentials as MUT | ||
| from gcloud._testing import _Monkey | ||
| from gcloud._testing import _NamedTemporaryFile | ||
|
|
||
| CLIENT_EMAIL = '[email protected]' | ||
| PRIVATE_KEY = b'SEEkR1t' | ||
| SCOPE = 'SCOPE' | ||
| client = _Client() | ||
| with _Monkey(MUT, client=client): | ||
| with _NamedTemporaryFile() as temp: | ||
| with open(temp.name, 'wb') as file_obj: | ||
| file_obj.write(PRIVATE_KEY) | ||
| found = self._callFUT(CLIENT_EMAIL, temp.name, SCOPE) | ||
| MOCK_FILENAME = 'foo.path' | ||
| MOCK_CRED_CLASS = _MockServiceAccountCredentials() | ||
| with _Monkey(MUT, ServiceAccountCredentials=MOCK_CRED_CLASS): | ||
| found = self._callFUT(CLIENT_EMAIL, MOCK_FILENAME, SCOPE) | ||
|
|
||
| self.assertTrue(found is client._signed) | ||
| expected_called_with = { | ||
| 'service_account_name': CLIENT_EMAIL, | ||
| 'private_key': PRIVATE_KEY, | ||
| 'scope': SCOPE, | ||
| } | ||
| self.assertEqual(client._called_with, expected_called_with) | ||
| self.assertTrue(found is MOCK_CRED_CLASS._result) | ||
| self.assertEqual(MOCK_CRED_CLASS.p12_called, | ||
| [(CLIENT_EMAIL, MOCK_FILENAME, SCOPE)]) | ||
|
|
||
|
|
||
| class Test_get_for_service_account_json(unittest2.TestCase): | ||
|
|
@@ -279,8 +263,7 @@ def _run_with_fake_crypto(self, credentials, private_key_text, | |
| result = self._callFUT(credentials, string_to_sign) | ||
|
|
||
| if crypt._pkcs12_key_as_pem_called: | ||
| self.assertEqual(crypt._private_key_text, | ||
| base64.b64encode(private_key_text)) | ||
| self.assertEqual(crypt._private_key_text, private_key_text) | ||
| self.assertEqual(crypt._private_key_password, 'notasecret') | ||
| self.assertEqual(openssl_crypto._loaded, | ||
| [(openssl_crypto.FILETYPE_PEM, _Crypt._KEY)]) | ||
|
|
@@ -296,22 +279,28 @@ def _run_with_fake_crypto(self, credentials, private_key_text, | |
| self.assertEqual(result, sign_result) | ||
|
|
||
| def test_p12_type(self): | ||
| from oauth2client.client import SignedJwtAssertionCredentials | ||
| from oauth2client.service_account import ServiceAccountCredentials | ||
| ACCOUNT_NAME = 'dummy_service_account_name' | ||
| PRIVATE_KEY_TEXT = b'dummy_private_key_text' | ||
| STRING_TO_SIGN = b'dummy_signature' | ||
| CREDENTIALS = SignedJwtAssertionCredentials( | ||
| ACCOUNT_NAME, PRIVATE_KEY_TEXT, []) | ||
| SIGNER = object() | ||
| CREDENTIALS = ServiceAccountCredentials( | ||
| ACCOUNT_NAME, SIGNER) | ||
| CREDENTIALS._private_key_pkcs12 = PRIVATE_KEY_TEXT | ||
| CREDENTIALS._private_key_password = 'notasecret' | ||
| self._run_with_fake_crypto(CREDENTIALS, PRIVATE_KEY_TEXT, | ||
| STRING_TO_SIGN) | ||
|
|
||
| def test_p12_type_non_bytes_to_sign(self): | ||
| from oauth2client.client import SignedJwtAssertionCredentials | ||
| from oauth2client.service_account import ServiceAccountCredentials | ||
| ACCOUNT_NAME = 'dummy_service_account_name' | ||
| PRIVATE_KEY_TEXT = b'dummy_private_key_text' | ||
| STRING_TO_SIGN = u'dummy_signature' | ||
| CREDENTIALS = SignedJwtAssertionCredentials( | ||
| ACCOUNT_NAME, PRIVATE_KEY_TEXT, []) | ||
| SIGNER = object() | ||
| CREDENTIALS = ServiceAccountCredentials( | ||
| ACCOUNT_NAME, SIGNER) | ||
| CREDENTIALS._private_key_pkcs12 = PRIVATE_KEY_TEXT | ||
| CREDENTIALS._private_key_password = 'notasecret' | ||
| self._run_with_fake_crypto(CREDENTIALS, PRIVATE_KEY_TEXT, | ||
| STRING_TO_SIGN) | ||
|
|
||
|
|
@@ -321,21 +310,16 @@ def test_json_type(self): | |
|
|
||
| PRIVATE_KEY_TEXT = 'dummy_private_key_pkcs8_text' | ||
| STRING_TO_SIGN = b'dummy_signature' | ||
|
|
||
| def _get_private_key(private_key_pkcs8_text): | ||
| return private_key_pkcs8_text | ||
|
|
||
| with _Monkey(service_account, _get_private_key=_get_private_key): | ||
| CREDENTIALS = service_account._ServiceAccountCredentials( | ||
| 'dummy_service_account_id', 'dummy_service_account_email', | ||
| 'dummy_private_key_id', PRIVATE_KEY_TEXT, []) | ||
|
|
||
| SIGNER = object() | ||
| CREDENTIALS = service_account.ServiceAccountCredentials( | ||
| 'dummy_service_account_email', SIGNER) | ||
| CREDENTIALS._private_key_pkcs8_pem = PRIVATE_KEY_TEXT | ||
| self._run_with_fake_crypto(CREDENTIALS, PRIVATE_KEY_TEXT, | ||
| STRING_TO_SIGN) | ||
|
|
||
| def test_gae_type(self): | ||
| # Relies on setUp fixing up App Engine imports. | ||
| from oauth2client.appengine import AppAssertionCredentials | ||
| from oauth2client.contrib.appengine import AppAssertionCredentials | ||
| from gcloud._testing import _Monkey | ||
| from gcloud import credentials | ||
|
|
||
|
|
@@ -387,33 +371,20 @@ def test_bad_type(self): | |
| None, None, None) | ||
| self.assertRaises(ValueError, self._callFUT, CREDENTIALS) | ||
|
|
||
| def test_p12_type(self): | ||
| from oauth2client.client import SignedJwtAssertionCredentials | ||
| SERVICE_ACCOUNT_NAME = 'SERVICE_ACCOUNT_NAME' | ||
| CREDENTIALS = SignedJwtAssertionCredentials(SERVICE_ACCOUNT_NAME, | ||
| b'bogus_key', []) | ||
| found = self._callFUT(CREDENTIALS) | ||
| self.assertEqual(found, SERVICE_ACCOUNT_NAME) | ||
|
|
||
| def test_json_type(self): | ||
| def test_service_account_type(self): | ||
| from oauth2client import service_account | ||
| from gcloud._testing import _Monkey | ||
|
|
||
| def _get_private_key(private_key_pkcs8_text): | ||
| return private_key_pkcs8_text | ||
|
|
||
| SERVICE_ACCOUNT_NAME = 'SERVICE_ACCOUNT_NAME' | ||
| with _Monkey(service_account, _get_private_key=_get_private_key): | ||
| CREDENTIALS = service_account._ServiceAccountCredentials( | ||
| 'bogus_id', SERVICE_ACCOUNT_NAME, 'bogus_id', | ||
| 'bogus_key_text', []) | ||
| SIGNER = object() | ||
| CREDENTIALS = service_account.ServiceAccountCredentials( | ||
| SERVICE_ACCOUNT_NAME, SIGNER) | ||
|
|
||
| found = self._callFUT(CREDENTIALS) | ||
| self.assertEqual(found, SERVICE_ACCOUNT_NAME) | ||
|
|
||
| def test_gae_type(self): | ||
| # Relies on setUp fixing up App Engine imports. | ||
| from oauth2client.appengine import AppAssertionCredentials | ||
| from oauth2client.contrib.appengine import AppAssertionCredentials | ||
| from gcloud._testing import _Monkey | ||
| from gcloud import credentials | ||
|
|
||
|
|
@@ -483,25 +454,26 @@ def test_bad_argument(self): | |
| self.assertRaises(TypeError, self._callFUT, None) | ||
|
|
||
| def test_signed_jwt_for_p12(self): | ||
| import base64 | ||
| from oauth2client import client | ||
| from oauth2client import service_account | ||
| from gcloud._testing import _Monkey | ||
| from gcloud import credentials as MUT | ||
|
|
||
| scopes = [] | ||
| PRIVATE_KEY = b'dummy_private_key_text' | ||
| credentials = client.SignedJwtAssertionCredentials( | ||
| 'dummy_service_account_name', PRIVATE_KEY, scopes) | ||
| SIGNER = object() | ||
| credentials = service_account.ServiceAccountCredentials( | ||
| 'dummy_service_account_email', SIGNER) | ||
| credentials._private_key_pkcs12 = PRIVATE_KEY | ||
| credentials._private_key_password = password = 'password-nope' | ||
|
|
||
| crypt = _Crypt() | ||
| load_result = object() | ||
| openssl_crypto = _OpenSSLCrypto(load_result, None) | ||
|
|
||
| with _Monkey(MUT, crypt=crypt, crypto=openssl_crypto): | ||
| result = self._callFUT(credentials) | ||
|
|
||
| self.assertEqual(crypt._private_key_text, | ||
| base64.b64encode(PRIVATE_KEY)) | ||
| self.assertEqual(crypt._private_key_password, 'notasecret') | ||
| self.assertEqual(crypt._private_key_text, PRIVATE_KEY) | ||
| self.assertEqual(crypt._private_key_password, password) | ||
| self.assertEqual(result, load_result) | ||
| self.assertEqual(openssl_crypto._loaded, | ||
| [(openssl_crypto.FILETYPE_PEM, _Crypt._KEY)]) | ||
|
|
@@ -515,14 +487,10 @@ def test_service_account_via_json_key(self): | |
| scopes = [] | ||
|
|
||
| PRIVATE_TEXT = 'dummy_private_key_pkcs8_text' | ||
|
|
||
| def _get_private_key(private_key_pkcs8_text): | ||
| return private_key_pkcs8_text | ||
|
|
||
| with _Monkey(service_account, _get_private_key=_get_private_key): | ||
| credentials = service_account._ServiceAccountCredentials( | ||
| 'dummy_service_account_id', 'dummy_service_account_email', | ||
| 'dummy_private_key_id', PRIVATE_TEXT, scopes) | ||
| SIGNER = object() | ||
| credentials = service_account.ServiceAccountCredentials( | ||
| 'dummy_service_account_email', SIGNER, scopes=scopes) | ||
| credentials._private_key_pkcs8_pem = PRIVATE_TEXT | ||
|
|
||
| load_result = object() | ||
| openssl_crypto = _OpenSSLCrypto(load_result, None) | ||
|
|
@@ -541,14 +509,11 @@ def test_without_pyopenssl(self): | |
| from gcloud import credentials as credentials_mod | ||
|
|
||
| PRIVATE_TEXT = 'dummy_private_key_pkcs8_text' | ||
| SIGNER = object() | ||
|
|
||
| def _get_private_key(private_key_pkcs8_text): | ||
| return private_key_pkcs8_text | ||
|
|
||
| with _Monkey(service_account, _get_private_key=_get_private_key): | ||
| credentials = service_account._ServiceAccountCredentials( | ||
| 'dummy_service_account_id', 'dummy_service_account_email', | ||
| 'dummy_private_key_id', PRIVATE_TEXT, '') | ||
| credentials = service_account.ServiceAccountCredentials( | ||
| 'dummy_service_account_email', SIGNER) | ||
| credentials._private_key_pkcs8_pem = PRIVATE_TEXT | ||
|
|
||
| with _Monkey(credentials_mod, crypto=None): | ||
| with self.assertRaises(EnvironmentError): | ||
|
|
@@ -648,6 +613,7 @@ def create_scoped(self, scopes): | |
|
|
||
|
|
||
| class _Client(object): | ||
|
|
||
| def __init__(self): | ||
| self._signed = _Credentials() | ||
|
|
||
|
|
@@ -659,10 +625,6 @@ def get_application_default(): | |
|
|
||
| self.GoogleCredentials = GoogleCredentials | ||
|
|
||
| def SignedJwtAssertionCredentials(self, **kw): | ||
| self._called_with = kw | ||
| return self._signed | ||
|
|
||
|
|
||
| class _Crypt(object): | ||
|
|
||
|
|
@@ -724,3 +686,14 @@ def non_transactional(*args, **kwargs): | |
| def do_nothing_wrapper(func): | ||
| return func | ||
| return do_nothing_wrapper | ||
|
|
||
|
|
||
| class _MockServiceAccountCredentials(object): | ||
|
|
||
| def __init__(self): | ||
| self.p12_called = [] | ||
| self._result = _Credentials() | ||
|
|
||
| def from_p12_keyfile(self, email, path, scopes=None): | ||
| self.p12_called.append((email, path, scopes)) | ||
| return self._result | ||
Oops, something went wrong.
Add this suggestion to a batch that can be applied as a single commit.
This suggestion is invalid because no changes were made to the code.
Suggestions cannot be applied while the pull request is closed.
Suggestions cannot be applied while viewing a subset of changes.
Only one suggestion per line can be applied in a batch.
Add this suggestion to a batch that can be applied as a single commit.
Applying suggestions on deleted lines is not supported.
You must change the existing code in this line in order to create a valid suggestion.
Outdated suggestions cannot be applied.
This suggestion has been applied or marked resolved.
Suggestions cannot be applied from pending reviews.
Suggestions cannot be applied on multi-line comments.
Suggestions cannot be applied while the pull request is queued to merge.
Suggestion cannot be applied right now. Please check back later.
This comment was marked as spam.
Sorry, something went wrong.
Uh oh!
There was an error while loading. Please reload this page.
This comment was marked as spam.
Sorry, something went wrong.
Uh oh!
There was an error while loading. Please reload this page.