diff --git a/tests/integration/test_integration_vespa_cloud.py b/tests/integration/test_integration_vespa_cloud.py index 4b8e7d23..3dec8fb1 100644 --- a/tests/integration/test_integration_vespa_cloud.py +++ b/tests/integration/test_integration_vespa_cloud.py @@ -29,10 +29,25 @@ from vespa.package import ( EmptyDeploymentConfiguration, + ServicesConfiguration, Validation, ValidationID, sample_package, ) +from vespa.configuration.services import ( + container, + content, + document, + documents, + node, + nodes, + redundancy, + search, + secrets, + services, +) +from vespa.configuration.vt import vt +import hashlib APP_INIT_TIMEOUT = 900 @@ -630,3 +645,83 @@ def tearDown(self) -> None: # Wait a little bit to make sure the deployment is finished time.sleep(10) self.vespa_cloud.delete(instance=self.instance, environment=self.environment) + + +# @unittest.skip("Creates cloud resources — run manually to verify vault access flow") +@unittest.skip( + "Requires interactive (Auth0) login — API key auth cannot set vault access rules. " + "To run manually: comment out this @unittest.skip decorator." +) +class TestDeployAddsVaultAccessCloud(unittest.TestCase): + """ + End-to-end test: deploy an app with secrets to verify vault access rule setup. + + Requires: + - Interactive (Auth0) login (not API key auth) — needed for vault rule modifications + - A pre-existing vault and secret in the tenant + - The app does not need to exist beforehand + """ + + def setUp(self) -> None: + self.tenant = "thttest04" + # generate a random application name + self.app_name = "test" + hashlib.md5(str(time.time()).encode()).hexdigest()[:8] + self.vault_name = "pyvespa-testvault" + self.secret_name = "my-api-key" + + schema = Schema( + name="doc", + document=Document( + fields=[ + Field(name="text", type="string", indexing=["index", "summary"]), + ] + ), + ) + services_config = ServicesConfiguration( + application_name=self.app_name, + services_config=services( + container(id=f"{self.app_name}_container", version="1.0")( + secrets( + vt( + tag="apiKey", + vault=self.vault_name, + name=self.secret_name, + ), + ), + search(), + ), + content(id=f"{self.app_name}_content", version="1.0")( + redundancy("1"), + documents(document(type_="doc", mode="index")), + nodes(node(distribution_key="0", hostalias="node1")), + ), + ), + ) + self.app_package = ApplicationPackage( + name=self.app_name, + schema=[schema], + services_config=services_config, + ) + self.vespa_cloud = VespaCloud( + tenant=self.tenant, + application=self.app_name, + application_package=self.app_package, + ) + + def test_deploy_with_secrets(self): + """Deploy an app with vault secrets — verifies vault access rules are auto-configured.""" + # Verify services.xml contains the secrets/vault reference + services_xml = self.app_package.services_to_text + self.assertIn("", services_xml) + self.assertIn(self.vault_name, services_xml) + + # Verify vault names are parsed correctly + vault_names = VespaCloud._parse_vault_names_from_services_xml(services_xml) + self.assertEqual(vault_names, {self.vault_name}) + + # Deploy — _ensure_vault_access_for_dev runs automatically + app = self.vespa_cloud.deploy() + self.assertIsNotNone(app) + + def tearDown(self) -> None: + self.vespa_cloud.delete() diff --git a/tests/unit/test_deployment.py b/tests/unit/test_deployment.py index 3cb0e587..d7864f2b 100644 --- a/tests/unit/test_deployment.py +++ b/tests/unit/test_deployment.py @@ -241,5 +241,301 @@ def test_get_last_deployable_no_deployable(self, mock_request): self.vespa_cloud._get_last_deployable(456) +class TestVaultAccessRules(unittest.TestCase): + def setUp(self): + self.tenant = "test_tenant" + self.application = "test_app" + self.application_package = MagicMock() + VespaCloud._try_get_access_token = MagicMock(return_value="fake_access_token") + self.vespa_cloud = VespaCloud( + tenant=self.tenant, + application=self.application, + application_package=self.application_package, + ) + + # --- XML parsing tests --- + + def test_parse_vault_names_no_secrets(self): + xml = "" + result = VespaCloud._parse_vault_names_from_services_xml(xml) + self.assertEqual(result, set()) + + def test_parse_vault_names_single_vault(self): + xml = """ + + + + + + """ + result = VespaCloud._parse_vault_names_from_services_xml(xml) + self.assertEqual(result, {"my-vault"}) + + def test_parse_vault_names_multiple_vaults(self): + xml = """ + + + + + + + """ + result = VespaCloud._parse_vault_names_from_services_xml(xml) + self.assertEqual(result, {"vault-a", "vault-b"}) + + def test_parse_vault_names_multiple_secrets_blocks(self): + xml = """ + + + + + + + + + + + """ + result = VespaCloud._parse_vault_names_from_services_xml(xml) + self.assertEqual(result, {"vault-1", "vault-2"}) + + def test_parse_vault_names_malformed_xml(self): + xml = "" + result = VespaCloud._parse_vault_names_from_services_xml(xml) + self.assertEqual(result, set()) + + # --- Vault rule checking tests --- + + @patch("vespa.deployment.VespaCloud._request") + def test_ensure_vault_access_rule_already_has_access(self, mock_request): + mock_request.return_value = { + "rules": [ + { + "id": 0, + "application": "test_app", + "contexts": [VespaCloud.secret_store_dev_alias], + }, + ] + } + self.vespa_cloud._ensure_vault_access_rule("my-vault") + # Should only GET, no PUT (no CSRF fetch either) + mock_request.assert_called_once_with( + "GET", + "/tenant-secret/v1/tenant/test_tenant/vault/my-vault", + ) + + @patch("vespa.deployment.VespaCloud._request") + def test_ensure_vault_access_rule_needs_access(self, mock_request): + mock_request.side_effect = [ + # GET response: no rules for this app + { + "rules": [ + { + "id": 0, + "application": "other_app", + "contexts": [VespaCloud.secret_store_dev_alias], + }, + ] + }, + # GET CSRF token + {"token": "fake-csrf-token"}, + # PUT response: confirms both rules + { + "rules": [ + { + "id": 0, + "application": "other_app", + "contexts": [VespaCloud.secret_store_dev_alias], + }, + { + "id": 1, + "application": "test_app", + "contexts": [VespaCloud.secret_store_dev_alias], + }, + ] + }, + ] + self.vespa_cloud._ensure_vault_access_rule("my-vault") + self.assertEqual(mock_request.call_count, 3) + # Verify CSRF fetch + self.assertEqual(mock_request.call_args_list[1][0], ("GET", "/csrf/v1")) + # Verify PUT call + put_call = mock_request.call_args_list[2] + self.assertEqual(put_call[0][0], "PUT") + self.assertEqual( + put_call[0][2]["rules"], + [ + { + "id": 0, + "application": "other_app", + "contexts": [VespaCloud.secret_store_dev_alias], + }, + { + "id": 1, + "application": "test_app", + "contexts": [VespaCloud.secret_store_dev_alias], + }, + ], + ) + self.assertEqual(put_call[0][3]["vespa-csrf-token"], "fake-csrf-token") + + @patch("vespa.deployment.VespaCloud._request") + def test_ensure_vault_access_rule_empty_rules(self, mock_request): + mock_request.side_effect = [ + {"rules": []}, + # GET CSRF token + {"token": "fake-csrf-token"}, + # PUT response confirms the new rule + { + "rules": [ + { + "id": 0, + "application": "test_app", + "contexts": [VespaCloud.secret_store_dev_alias], + } + ] + }, + ] + self.vespa_cloud._ensure_vault_access_rule("my-vault") + self.assertEqual(mock_request.call_count, 3) + self.assertEqual( + mock_request.call_args_list[2][0][2]["rules"], + [ + { + "id": 0, + "application": "test_app", + "contexts": [VespaCloud.secret_store_dev_alias], + } + ], + ) + + @patch("vespa.deployment.VespaCloud._request") + def test_ensure_vault_access_rule_put_response_missing_rule(self, mock_request): + mock_request.side_effect = [ + {"rules": []}, + # GET CSRF token + {"token": "fake-csrf-token"}, + # PUT response does not confirm the rule + {"rules": []}, + ] + with self.assertRaises(RuntimeError): + self.vespa_cloud._ensure_vault_access_rule("my-vault") + + @patch("vespa.deployment.VespaCloud._request") + def test_ensure_vault_access_rule_put_response_not_dict(self, mock_request): + mock_request.side_effect = [ + {"rules": []}, + # GET CSRF token + {"token": "fake-csrf-token"}, + "unexpected string response", + ] + with self.assertRaises(RuntimeError): + self.vespa_cloud._ensure_vault_access_rule("my-vault") + + # --- Error handling tests --- + + @patch("vespa.deployment.logging") + @patch("vespa.deployment.VespaCloud._request") + def test_ensure_vault_access_for_dev_api_failure_warns( + self, mock_request, mock_logging + ): + self.application_package.services_to_text = """ + + + + + + """ + mock_request.side_effect = RuntimeError("API error") + # Should not raise — just warn via logging + self.vespa_cloud._ensure_vault_access_for_dev() + mock_logging.warning.assert_called_once() + warning_msg = mock_logging.warning.call_args[0][0] + self.assertIn("Failed to set vault access rule", warning_msg) + + @patch("vespa.deployment.VespaCloud._request") + def test_ensure_vault_access_rule_api_key_raises(self, mock_request): + """When auth is api_key and the rule is missing, ValueError is raised.""" + mock_request.return_value = {"rules": []} + self.vespa_cloud.control_plane_auth_method = "api_key" + with self.assertRaises(ValueError) as ctx: + self.vespa_cloud._ensure_vault_access_rule("my-vault") + self.assertIn( + "API key authentication does not have permission", str(ctx.exception) + ) + self.assertIn("key_location", str(ctx.exception)) + # Should only GET, no PUT attempted + mock_request.assert_called_once() + + @patch("vespa.deployment.logging") + @patch("vespa.deployment.VespaCloud._request") + def test_ensure_vault_access_for_dev_api_key_raises( + self, mock_request, mock_logging + ): + """ValueError from api_key auth check propagates (not caught as warning).""" + self.application_package.services_to_text = """ + + + + + + """ + mock_request.return_value = {"rules": []} + self.vespa_cloud.control_plane_auth_method = "api_key" + with self.assertRaises(ValueError) as ctx: + self.vespa_cloud._ensure_vault_access_for_dev() + self.assertIn( + "API key authentication does not have permission", str(ctx.exception) + ) + mock_logging.warning.assert_not_called() + + # --- Integration: _ensure_vault_access_for_dev --- + + @patch("vespa.deployment.VespaCloud._ensure_vault_access_rule") + def test_ensure_vault_access_for_dev_calls_per_vault(self, mock_ensure_rule): + self.application_package.services_to_text = """ + + + + + + + """ + self.vespa_cloud._ensure_vault_access_for_dev() + called_vaults = sorted(call[0][0] for call in mock_ensure_rule.call_args_list) + self.assertEqual(called_vaults, ["vault-a", "vault-b"]) + + @patch("vespa.deployment.VespaCloud._ensure_vault_access_rule") + def test_ensure_vault_access_for_dev_no_secrets(self, mock_ensure_rule): + self.application_package.services_to_text = ( + "" + ) + self.vespa_cloud._ensure_vault_access_for_dev() + mock_ensure_rule.assert_not_called() + + # --- _get_services_xml_content tests --- + + def test_get_services_xml_content_from_package(self): + self.application_package.services_to_text = "" + result = self.vespa_cloud._get_services_xml_content() + self.assertEqual(result, "") + + def test_get_services_xml_content_from_disk(self): + self.vespa_cloud.application_package = None + with TemporaryDirectory() as tmp: + services_path = os.path.join(tmp, "services.xml") + with open(services_path, "w") as f: + f.write("") + self.vespa_cloud.application_root = tmp + result = self.vespa_cloud._get_services_xml_content() + self.assertEqual(result, "") + + def test_get_services_xml_content_none(self): + self.vespa_cloud.application_package = None + self.vespa_cloud.application_root = None + result = self.vespa_cloud._get_services_xml_content() + self.assertIsNone(result) + + if __name__ == "__main__": unittest.main() diff --git a/vespa/configuration/services.py b/vespa/configuration/services.py index 9aa10d01..ede48609 100644 --- a/vespa/configuration/services.py +++ b/vespa/configuration/services.py @@ -149,6 +149,7 @@ "searcher", "searchnode", "secret-store", + "secrets", "server", "services", "slobrok", diff --git a/vespa/configuration/services.pyi b/vespa/configuration/services.pyi index 20c4ab07..3fbe6a46 100644 --- a/vespa/configuration/services.pyi +++ b/vespa/configuration/services.pyi @@ -139,6 +139,7 @@ def searchable_copies(*c, **kwargs) -> VT: ... def searcher(*c, **kwargs) -> VT: ... def searchnode(*c, **kwargs) -> VT: ... def secret_store(*c, **kwargs) -> VT: ... +def secrets(*c, **kwargs) -> VT: ... def server(*c, **kwargs) -> VT: ... def services(*c, **kwargs) -> VT: ... def slobrok(*c, **kwargs) -> VT: ... diff --git a/vespa/deployment.py b/vespa/deployment.py index 5a0d5252..be27c9d2 100644 --- a/vespa/deployment.py +++ b/vespa/deployment.py @@ -6,12 +6,13 @@ import sys import zipfile import logging +import xml.etree.ElementTree as ET from base64 import standard_b64encode from datetime import datetime from io import BytesIO from pathlib import Path from time import sleep, strftime, gmtime -from typing import Tuple, Union, IO, Optional, List, Dict, Literal +from typing import Tuple, Union, IO, Optional, List, Dict, Literal, Set from tenacity import retry, stop_after_attempt, wait_exponential from datetime import timezone import platform @@ -523,6 +524,8 @@ def _check_configuration_server(self) -> bool: class VespaCloud(VespaDeployment): + secret_store_dev_alias = "SANDBOX" + def __init__( self, tenant: str, @@ -716,6 +719,8 @@ def deploy( raise ValueError( f"Invalid environment: {environment}. Must be 'dev' or 'perf'." ) + if environment == "dev": + self._ensure_vault_access_for_dev() if self.application_package is not None: if disk_folder is None: disk_folder = os.path.join(os.getcwd(), self.application) @@ -998,8 +1003,13 @@ def wait_for_prod_deployment( status = self.check_production_build_status(build_no) if status["status"] == "done": return status["deployed"] - if "detailed-status" in status and status["detailed-status"] not in ["success", "running"]: - raise RuntimeError(f"The build failed with status code: {status['detailed-status']}") + if "detailed-status" in status and status["detailed-status"] not in [ + "success", + "running", + ]: + raise RuntimeError( + f"The build failed with status code: {status['detailed-status']}" + ) time.sleep(poll_interval) raise TimeoutError(f"Deployment did not finish within {max_wait} seconds. ") @@ -1039,6 +1049,9 @@ def deploy_from_disk( Vespa: A Vespa connection instance. This connects to the mtls endpoint. To connect to the token endpoint, use `VespaCloud.get_application(endpoint_type="token")`. """ + if environment == "dev": + self._ensure_vault_access_for_dev(application_root=str(application_root)) + data = BytesIO(self.read_app_package_from_disk(application_root)) # Deploy the zipped application package @@ -1301,6 +1314,174 @@ def get_perf_region(self) -> str: # Only one available for now (https://cloud.vespa.ai/en/reference/zones) return "aws-us-east-1c" # Default perf region + def _get_services_xml_content( + self, application_root: Optional[str] = None + ) -> Optional[str]: + """Get services.xml content from the application package or disk. + + Args: + application_root: Optional path override for deploy_from_disk() case. + + Returns: + The services.xml content as a string, or None if not available. + """ + if self.application_package is not None: + try: + return self.application_package.services_to_text + except AttributeError: + return None + root = application_root or self.application_root + if root is not None: + services_path = Path(root) / "services.xml" + if services_path.exists(): + return services_path.read_text() + return None + + @staticmethod + def _parse_vault_names_from_services_xml(services_xml: str) -> Set[str]: + """Parse vault names referenced in elements of services.xml. + + Args: + services_xml: The services.xml content as a string. + + Returns: + A set of unique vault names found in the XML. + """ + try: + root = ET.fromstring(services_xml) + except ET.ParseError: + logging.warning( + "Failed to parse services.xml for vault references. " + "Skipping automatic vault access rule setup." + ) + return set() + + vault_names: Set[str] = set() + for secrets_elem in root.iter("secrets"): + for child in secrets_elem: + vault = child.get("vault") + if vault: + vault_names.add(vault) + return vault_names + + def _get_csrf_token(self) -> Optional[str]: + """Fetch a CSRF token from the Vespa Cloud API. + + Returns: + The CSRF token string, or None if the token could not be fetched. + """ + try: + response = self._request("GET", "/csrf/v1") + return response.get("token") + except Exception: + return None + + def _ensure_vault_access_rule(self, vault_name: str) -> None: + """Ensure the current application has dev (secret_store_dev_alias) access to the given vault. + + Checks existing access rules and adds a new one if needed. + + Args: + vault_name: The name of the vault to ensure access for. + + Raises: + RuntimeError: If the PUT response does not contain the expected access rule. + """ + path = f"/tenant-secret/v1/tenant/{self.tenant}/vault/{vault_name}" + response = self._request("GET", path) + + # Check if app already has access for dev zone + existing_rules = response.get("rules", []) + logging.info( + "Existing access rules for vault '%s': %s", vault_name, existing_rules + ) + for rule in existing_rules: + if rule.get( + "application" + ) == self.application and self.secret_store_dev_alias in rule.get( + "contexts", [] + ): + return # Already has access + + # Need to add a new rule - check auth method + if self.control_plane_auth_method == "api_key": + raise ValueError( + f"Vault '{vault_name}' does not have an access rule for application " + f"'{self.application}' in the '{self.secret_store_dev_alias}' context.\n" + f"API key authentication does not have permission to modify vault access rules.\n" + f"To fix this, remove the 'key_location' or 'key_content' parameter from " + f"VespaCloud() and deploy again to use interactive login, which has the " + f"required permissions." + ) + + new_rule = { + "application": self.application, + "contexts": [self.secret_store_dev_alias], + "id": len(existing_rules), + } + + csrf_token = self._get_csrf_token() + headers = {"vespa-csrf-token": csrf_token} if csrf_token else {} + put_body = {**response, "rules": existing_rules + [new_rule]} + logging.info( + "Adding vault access rule for application '%s' to vault '%s': %s", + self.application, + vault_name, + new_rule, + ) + put_response = self._request( + "PUT", + path, + put_body, + headers, + ) + + # Verify the rule was applied + if not isinstance(put_response, dict): + raise RuntimeError( + f"Unexpected response when setting vault access rule for '{vault_name}': " + f"expected JSON object, got {type(put_response).__name__}" + ) + updated = put_response.get("rules", []) + if not any( + r.get("application") == self.application + and self.secret_store_dev_alias in r.get("contexts", []) + for r in updated + ): + raise RuntimeError( + f"Vault access rule for application '{self.application}' on vault " + f"'{vault_name}' was not confirmed in the API response." + ) + + def _ensure_vault_access_for_dev( + self, application_root: Optional[str] = None + ) -> None: + """Ensure vault access rules are set for all vaults referenced in services.xml. + + This is called before dev deployments. Failures are warnings, not errors. + + Args: + application_root: Optional path override for deploy_from_disk() case. + """ + services_xml = self._get_services_xml_content(application_root) + if services_xml is None: + return + + vault_names = self._parse_vault_names_from_services_xml(services_xml) + if not vault_names: + return + + for vault_name in sorted(vault_names): + try: + self._ensure_vault_access_rule(vault_name) + except (RuntimeError, ConnectionError, OSError, KeyError, HTTPError) as e: + logging.warning( + "Failed to set vault access rule for '%s': %s. " + "You may need to configure vault access manually in the Vespa Cloud console.", + vault_name, + e, + ) + def get_prod_region(self): regions = self.get_prod_regions() return regions[0] @@ -1445,7 +1626,7 @@ def _request_with_access_token( self, method: str, path: str, - body: Union[BytesIO, MultipartEncoder] = BytesIO(), + body: Union[BytesIO, MultipartEncoder, dict, None] = None, headers: dict = {}, return_raw_response: bool = False, ) -> Union[dict, httpr.Response]: @@ -1464,7 +1645,7 @@ def _request( self, method: str, path: str, - body: Union[BytesIO, MultipartEncoder] = BytesIO(), + body: Union[BytesIO, MultipartEncoder, dict, None] = None, headers: dict = {}, return_raw_response: bool = False, ) -> Union[dict, httpr.Response]: @@ -1483,7 +1664,7 @@ def _request_with_api_key( self, method: str, path: str, - body: Union[BytesIO, MultipartEncoder] = BytesIO(), + body: Union[BytesIO, MultipartEncoder, dict, None] = None, headers: dict = {}, return_raw_response: bool = False, ) -> Union[dict, httpr.Response]: @@ -1502,11 +1683,18 @@ def _request_with_api_key( headers.update({"Content-Length": str(len(multipart_data_bytes))}) # Convert multipart_data_bytes to type BytesIO body_data: BytesIO = BytesIO(multipart_data_bytes) - else: - if hasattr(body, "seek"): - body.seek(0) + elif isinstance(body, dict): + body_bytes = json.dumps(body).encode("utf-8") + digest.update(body_bytes) + body_data = BytesIO(body_bytes) + headers = {**headers, "Content-Type": "application/json"} + elif isinstance(body, BytesIO): + body.seek(0) digest.update(body.read()) body_data = body + else: + digest.update(b"") + body_data = BytesIO() # Create signature content_hash = standard_b64encode(digest.finalize()).decode("UTF-8") timestamp = datetime.utcnow().isoformat() + "Z"