Skip to content
Merged
Show file tree
Hide file tree
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions prowler/CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ All notable changes to the **Prowler SDK** are documented in this file.
- `cloudstorage_bucket_soft_delete_enabled` check for GCP provider [(#9028)](https://github.com/prowler-cloud/prowler/pull/9028)
- `cloudstorage_bucket_logging_enabled` check for GCP provider [(#9091)](https://github.com/prowler-cloud/prowler/pull/9091)
- `cloudstorage_bucket_sufficient_retention_period` check for GCP provider [(#9149)](https://github.com/prowler-cloud/prowler/pull/9149)
- `cloudstorage_bucket_uses_vpc_service_controls` check for GCP provider [(#9256)](https://github.com/prowler-cloud/prowler/pull/9256)
- C5 compliance framework for Azure provider [(#9081)](https://github.com/prowler-cloud/prowler/pull/9081)
- C5 compliance framework for the GCP provider [(#9097)](https://github.com/prowler-cloud/prowler/pull/9097)
- `organization_repository_creation_limited` check for GitHub provider [(#8844)](https://github.com/prowler-cloud/prowler/pull/8844)
Expand Down
Empty file.
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
from prowler.providers.common.provider import Provider
from prowler.providers.gcp.services.accesscontextmanager.accesscontextmanager_service import (
AccessContextManager,
)

accesscontextmanager_client = AccessContextManager(Provider.get_global_provider())
Original file line number Diff line number Diff line change
@@ -0,0 +1,96 @@
from pydantic.v1 import BaseModel

import prowler.providers.gcp.config as config
from prowler.lib.logger import logger
from prowler.providers.gcp.gcp_provider import GcpProvider
from prowler.providers.gcp.lib.service.service import GCPService
from prowler.providers.gcp.services.cloudresourcemanager.cloudresourcemanager_client import (
cloudresourcemanager_client,
)


class AccessContextManager(GCPService):
def __init__(self, provider: GcpProvider):
super().__init__("accesscontextmanager", provider, api_version="v1")
self.service_perimeters = []
self._get_service_perimeters()

def _get_service_perimeters(self):
for org in cloudresourcemanager_client.organizations:
try:
access_policies = []
try:
request = self.client.accessPolicies().list(
parent=f"organizations/{org.id}"
)
response = request.execute(
num_retries=config.DEFAULT_RETRY_ATTEMPTS
)
access_policies = response.get("accessPolicies", [])
except Exception as error:
logger.error(
f"{self.region} -- {error.__class__.__name__}[{error.__traceback__.tb_lineno}]: {error}"
)
continue

for policy in access_policies:
try:
request = (
self.client.accessPolicies()
.servicePerimeters()
.list(parent=policy["name"])
)
while request is not None:
response = request.execute(
num_retries=config.DEFAULT_RETRY_ATTEMPTS
)

for perimeter in response.get("servicePerimeters", []):
status = perimeter.get("status", {})
spec = perimeter.get("spec", {})

perimeter_config = status if status else spec

resources = perimeter_config.get("resources", [])
restricted_services = perimeter_config.get(
"restrictedServices", []
)

self.service_perimeters.append(
ServicePerimeter(
name=perimeter["name"],
title=perimeter.get("title", ""),
perimeter_type=perimeter.get(
"perimeterType", ""
),
resources=resources,
restricted_services=restricted_services,
policy_name=policy["name"],
)
)

request = (
self.client.accessPolicies()
.servicePerimeters()
.list_next(
previous_request=request, previous_response=response
)
)
except Exception as error:
logger.error(
f"{self.region} -- {error.__class__.__name__}[{error.__traceback__.tb_lineno}]: {error}"
)

except Exception as error:
logger.error(
f"{self.region} -- {error.__class__.__name__}[{error.__traceback__.tb_lineno}]: {error}"
)


class ServicePerimeter(BaseModel):
name: str
title: str
perimeter_type: str
resources: list[str]
restricted_services: list[str]
policy_name: str
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
{
"Provider": "gcp",
"CheckID": "cloudstorage_bucket_uses_vpc_service_controls",
"CheckTitle": "Ensure That VPC Service Controls Are Used for Cloud Storage Buckets",
"CheckType": [],
"ServiceName": "cloudstorage",
"SubServiceName": "",
"ResourceIdTemplate": "",
"Severity": "medium",
"ResourceType": "storage.googleapis.com/Bucket",
"Description": "**Google Cloud Storage buckets** are evaluated to ensure they are protected by **VPC Service Controls**. VPC Service Controls establish security boundaries by restricting access to Cloud Storage resources to specific networks and trusted clients, preventing unauthorized data access and exfiltration.",
"Risk": "Buckets without VPC Service Controls protection may be vulnerable to unauthorized data access and exfiltration, even with proper IAM policies in place. VPC Service Controls provide an additional layer of network-level security that restricts API access based on the context of the request.",
"RelatedUrl": "",
"AdditionalURLs": [
"https://www.trendmicro.com/cloudoneconformity/knowledge-base/gcp/CloudStorage/use-vpc-service-controls.html",
"https://cloud.google.com/vpc-service-controls/docs/create-service-perimeters"
],
"Remediation": {
"Code": {
"CLI": "",
"NativeIaC": "",
"Other": "1) Open Google Cloud Console → Security → VPC Service Controls\n2) Create a new service perimeter or select an existing one\n3) Add the relevant GCP projects to the perimeter's protected resources\n4) Add 'storage.googleapis.com' to the list of restricted services\n5) Configure appropriate ingress and egress rules\n6) Save the perimeter configuration",
"Terraform": ""
},
"Recommendation": {
"Text": "Enable VPC Service Controls for all Cloud Storage buckets by adding their projects to a service perimeter with storage.googleapis.com as a restricted service. This prevents data exfiltration and ensures API calls are only allowed from authorized networks.",
"Url": "https://hub.prowler.com/check/cloudstorage_bucket_uses_vpc_service_controls"
}
},
"Categories": [
"internet-exposed"
],
"DependsOn": [],
"RelatedTo": [],
"Notes": ""
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,44 @@
from prowler.lib.check.models import Check, Check_Report_GCP
from prowler.providers.gcp.services.accesscontextmanager.accesscontextmanager_client import (
accesscontextmanager_client,
)
from prowler.providers.gcp.services.cloudstorage.cloudstorage_client import (
cloudstorage_client,
)


class cloudstorage_bucket_uses_vpc_service_controls(Check):
"""
Ensure Cloud Storage buckets are protected by VPC Service Controls.

Reports PASS if a bucket's project is in a VPC Service Controls perimeter
with storage.googleapis.com as a restricted service, otherwise FAIL.
"""

def execute(self) -> list[Check_Report_GCP]:
findings = []

protected_projects = {}
for perimeter in accesscontextmanager_client.service_perimeters:
if any(
service == "storage.googleapis.com"
for service in perimeter.restricted_services
):
for resource in perimeter.resources:
protected_projects[resource] = perimeter.title

for bucket in cloudstorage_client.buckets:
report = Check_Report_GCP(metadata=self.metadata(), resource=bucket)
report.status = "FAIL"
report.status_extended = (
f"Bucket {bucket.name} is not protected by VPC Service Controls."
)
project_resource_id = f"projects/{bucket.project_id}"

if project_resource_id in protected_projects:
report.status = "PASS"
report.status_extended = f"Bucket {bucket.name} is protected by VPC Service Controls perimeter {protected_projects[project_resource_id]}."

findings.append(report)

return findings
72 changes: 72 additions & 0 deletions tests/providers/gcp/gcp_fixtures.py
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,7 @@ def mock_api_client(GCPService, service, api_version, _):
mock_api_policies_calls(client)
mock_api_sink_calls(client)
mock_api_services_calls(client)
mock_api_access_policies_calls(client)

return client

Expand Down Expand Up @@ -1076,3 +1077,74 @@ def mock_api_services_calls(client: MagicMock):
]
}
client.services().list_next.return_value = None


def mock_api_access_policies_calls(client: MagicMock):
# Mock access policies list based on parent organization
def mock_list_access_policies(parent):
return_value = MagicMock()
# Only return policies for the first organization (123456789)
if parent == "organizations/123456789":
return_value.execute.return_value = {
"accessPolicies": [
{
"name": "accessPolicies/123456",
"title": "Test Access Policy 1",
},
{
"name": "accessPolicies/789012",
"title": "Test Access Policy 2",
},
]
}
elif parent == "organizations/987654321":
# No policies for the second organization
return_value.execute.return_value = {"accessPolicies": []}
else:
return_value.execute.return_value = {"accessPolicies": []}
return return_value

client.accessPolicies().list = mock_list_access_policies

# Mock service perimeters list based on parent access policy
def mock_list_service_perimeters(parent):
return_value = MagicMock()
if parent == "accessPolicies/123456":
return_value.execute.return_value = {
"servicePerimeters": [
{
"name": "accessPolicies/123456/servicePerimeters/perimeter1",
"title": "Test Perimeter 1",
"perimeterType": "PERIMETER_TYPE_REGULAR",
"status": {
"resources": [
f"projects/{GCP_PROJECT_ID}",
],
"restrictedServices": [
"storage.googleapis.com",
"bigquery.googleapis.com",
],
},
},
{
"name": "accessPolicies/123456/servicePerimeters/perimeter2",
"title": "Test Perimeter 2",
"perimeterType": "PERIMETER_TYPE_BRIDGE",
"spec": {
"resources": [],
"restrictedServices": [
"compute.googleapis.com",
],
},
},
]
}
elif parent == "accessPolicies/789012":
# No perimeters for the second policy
return_value.execute.return_value = {"servicePerimeters": []}
else:
return_value.execute.return_value = {"servicePerimeters": []}
return return_value

client.accessPolicies().servicePerimeters().list = mock_list_service_perimeters
client.accessPolicies().servicePerimeters().list_next.return_value = None
Original file line number Diff line number Diff line change
@@ -0,0 +1,99 @@
from unittest.mock import MagicMock, patch

from tests.providers.gcp.gcp_fixtures import (
GCP_PROJECT_ID,
mock_api_client,
mock_is_api_active,
set_mocked_gcp_provider,
)


class TestAccessContextManagerService:
def test_service(self):
# Mock cloudresourcemanager_client before importing accesscontextmanager
mock_crm_client = MagicMock()
mock_crm_client.organizations = [
MagicMock(id="123456789", name="Organization 1"),
]

with (
patch(
"prowler.providers.gcp.lib.service.service.GCPService.__is_api_active__",
new=mock_is_api_active,
),
patch(
"prowler.providers.gcp.lib.service.service.GCPService.__generate_client__",
new=mock_api_client,
),
patch(
"prowler.providers.common.provider.Provider.get_global_provider",
return_value=set_mocked_gcp_provider(),
),
patch(
"prowler.providers.gcp.services.accesscontextmanager.accesscontextmanager_service.cloudresourcemanager_client",
new=mock_crm_client,
),
):
from prowler.providers.gcp.services.accesscontextmanager.accesscontextmanager_service import (
AccessContextManager,
)

accesscontextmanager_client = AccessContextManager(
set_mocked_gcp_provider(project_ids=[GCP_PROJECT_ID])
)
assert accesscontextmanager_client.service == "accesscontextmanager"
assert accesscontextmanager_client.project_ids == [GCP_PROJECT_ID]

# Should have 2 service perimeters from the first access policy
assert len(accesscontextmanager_client.service_perimeters) == 2

# First service perimeter
assert (
accesscontextmanager_client.service_perimeters[0].name
== "accessPolicies/123456/servicePerimeters/perimeter1"
)
assert (
accesscontextmanager_client.service_perimeters[0].title
== "Test Perimeter 1"
)
assert (
accesscontextmanager_client.service_perimeters[0].perimeter_type
== "PERIMETER_TYPE_REGULAR"
)
assert accesscontextmanager_client.service_perimeters[0].resources == [
f"projects/{GCP_PROJECT_ID}"
]
assert accesscontextmanager_client.service_perimeters[
0
].restricted_services == [
"storage.googleapis.com",
"bigquery.googleapis.com",
]
assert (
accesscontextmanager_client.service_perimeters[0].policy_name
== "accessPolicies/123456"
)

# Second service perimeter
assert (
accesscontextmanager_client.service_perimeters[1].name
== "accessPolicies/123456/servicePerimeters/perimeter2"
)
assert (
accesscontextmanager_client.service_perimeters[1].title
== "Test Perimeter 2"
)
assert (
accesscontextmanager_client.service_perimeters[1].perimeter_type
== "PERIMETER_TYPE_BRIDGE"
)
assert accesscontextmanager_client.service_perimeters[1].resources == []
assert accesscontextmanager_client.service_perimeters[
1
].restricted_services == [
"compute.googleapis.com",
]
assert (
accesscontextmanager_client.service_perimeters[1].policy_name
== "accessPolicies/123456"
)
Loading