Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 7 additions & 0 deletions prowler/CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,13 @@

All notable changes to the **Prowler SDK** are documented in this file.

## [v5.15.0] (Prowler UNRELEASED)

### Added
- `cloudstorage_uses_vpc_service_controls` check for GCP provider [(#9256)](https://github.com/prowler-cloud/prowler/pull/9256)

---

## [v5.14.1] (Prowler UNRELEASED)

### Fixed
Expand Down
Empty file.
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
from prowler.providers.common.provider import Provider
from prowler.providers.gcp.services.accesscontextmanager.accesscontextmanager_service import (
AccessContextManager,
)

accesscontextmanager_client = AccessContextManager(Provider.get_global_provider())
Original file line number Diff line number Diff line change
@@ -0,0 +1,101 @@
from pydantic.v1 import BaseModel

import prowler.providers.gcp.config as config
from prowler.lib.logger import logger
from prowler.providers.gcp.gcp_provider import GcpProvider
from prowler.providers.gcp.lib.service.service import GCPService
from prowler.providers.gcp.services.cloudresourcemanager.cloudresourcemanager_client import (
cloudresourcemanager_client,
)


class AccessContextManager(GCPService):
def __init__(self, provider: GcpProvider):
super().__init__("accesscontextmanager", provider, api_version="v1")
self.service_perimeters = []
self._get_service_perimeters()

def _get_service_perimeters(self):
for org in cloudresourcemanager_client.organizations:
try:
access_policies = []
try:
request = self.client.accessPolicies().list(
parent=f"organizations/{org.id}"
)
while request is not None:
response = request.execute(
num_retries=config.DEFAULT_RETRY_ATTEMPTS
)
access_policies.extend(response.get("accessPolicies", []))

request = self.client.accessPolicies().list_next(
previous_request=request, previous_response=response
)
except Exception as error:
logger.error(
f"{self.region} -- {error.__class__.__name__}[{error.__traceback__.tb_lineno}]: {error}"
)
continue

for policy in access_policies:
try:
request = (
self.client.accessPolicies()
.servicePerimeters()
.list(parent=policy["name"])
)
while request is not None:
response = request.execute(
num_retries=config.DEFAULT_RETRY_ATTEMPTS
)

for perimeter in response.get("servicePerimeters", []):
status = perimeter.get("status", {})
spec = perimeter.get("spec", {})

perimeter_config = status if status else spec

resources = perimeter_config.get("resources", [])
restricted_services = perimeter_config.get(
"restrictedServices", []
)

self.service_perimeters.append(
ServicePerimeter(
name=perimeter["name"],
title=perimeter.get("title", ""),
perimeter_type=perimeter.get(
"perimeterType", ""
),
resources=resources,
restricted_services=restricted_services,
policy_name=policy["name"],
)
)

request = (
self.client.accessPolicies()
.servicePerimeters()
.list_next(
previous_request=request, previous_response=response
)
)
except Exception as error:
logger.error(
f"{self.region} -- {error.__class__.__name__}[{error.__traceback__.tb_lineno}]: {error}"
)

except Exception as error:
logger.error(
f"{self.region} -- {error.__class__.__name__}[{error.__traceback__.tb_lineno}]: {error}"
)


class ServicePerimeter(BaseModel):
name: str
title: str
perimeter_type: str
resources: list[str]
restricted_services: list[str]
policy_name: str
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,14 @@ def __init__(self, provider: GcpProvider):
def _get_iam_policy(self):
for project_id in self.project_ids:
try:
# Get project details to obtain project number
project_details = (
self.client.projects()
.get(projectId=project_id)
.execute(num_retries=DEFAULT_RETRY_ATTEMPTS)
)
project_number = project_details.get("projectNumber", "")

policy = (
self.client.projects()
.getIamPolicy(resource=project_id)
Expand All @@ -41,6 +49,7 @@ def _get_iam_policy(self):
self.cloud_resource_manager_projects.append(
Project(
id=project_id,
number=project_number,
audit_logging=audit_logging,
audit_configs=audit_configs,
)
Expand Down Expand Up @@ -96,6 +105,7 @@ class Binding(BaseModel):

class Project(BaseModel):
id: str
number: str = ""
audit_logging: bool
audit_configs: list[AuditConfig] = []

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
{
"Provider": "gcp",
"CheckID": "cloudstorage_uses_vpc_service_controls",
"CheckTitle": "Cloud Storage services are protected by VPC Service Controls",
"CheckType": [],
"ServiceName": "cloudstorage",
"SubServiceName": "",
"ResourceIdTemplate": "",
"Severity": "medium",
"ResourceType": "cloudresourcemanager.googleapis.com/Project",
"Description": "**GCP Projects** are evaluated to ensure they have **VPC Service Controls** enabled for Cloud Storage. VPC Service Controls establish security boundaries by restricting access to Cloud Storage resources to specific networks and trusted clients, preventing unauthorized data access and exfiltration.",
"Risk": "Projects without VPC Service Controls protection for Cloud Storage may be vulnerable to unauthorized data access and exfiltration, even with proper IAM policies in place. VPC Service Controls provide an additional layer of network-level security that restricts API access based on the context of the request.",
"RelatedUrl": "",
"AdditionalURLs": [
"https://www.trendmicro.com/cloudoneconformity/knowledge-base/gcp/CloudStorage/use-vpc-service-controls.html",
"https://cloud.google.com/vpc-service-controls/docs/create-service-perimeters"
],
"Remediation": {
"Code": {
"CLI": "",
"NativeIaC": "",
"Other": "1) Open Google Cloud Console → Security → VPC Service Controls\n2) Create a new service perimeter or select an existing one\n3) Add the relevant GCP projects to the perimeter's protected resources\n4) Add 'storage.googleapis.com' to the list of restricted services\n5) Configure appropriate ingress and egress rules\n6) Save the perimeter configuration",
"Terraform": ""
},
"Recommendation": {
"Text": "Enable VPC Service Controls for all Cloud Storage buckets by adding their projects to a service perimeter with storage.googleapis.com as a restricted service. This prevents data exfiltration and ensures API calls are only allowed from authorized networks.",
"Url": "https://hub.prowler.com/check/cloudstorage_uses_vpc_service_controls"
}
},
"Categories": [
"internet-exposed"
],
"DependsOn": [],
"RelatedTo": [],
"Notes": ""
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,53 @@
from prowler.lib.check.models import Check, Check_Report_GCP
from prowler.providers.gcp.services.accesscontextmanager.accesscontextmanager_client import (
accesscontextmanager_client,
)
from prowler.providers.gcp.services.cloudresourcemanager.cloudresourcemanager_client import (
cloudresourcemanager_client,
)


class cloudstorage_uses_vpc_service_controls(Check):
"""
Ensure Cloud Storage is protected by VPC Service Controls at project level.

Reports PASS if a project is in a VPC Service Controls perimeter
with storage.googleapis.com as a restricted service, otherwise FAIL.
"""

def execute(self) -> list[Check_Report_GCP]:
findings = []

protected_projects = {}
for perimeter in accesscontextmanager_client.service_perimeters:
if any(
service == "storage.googleapis.com"
for service in perimeter.restricted_services
):
for resource in perimeter.resources:
protected_projects[resource] = perimeter.title

for project in cloudresourcemanager_client.cloud_resource_manager_projects:
report = Check_Report_GCP(
metadata=self.metadata(),
resource=cloudresourcemanager_client.projects[project.id],
project_id=project.id,
location=cloudresourcemanager_client.region,
resource_name=(
cloudresourcemanager_client.projects[project.id].name
if cloudresourcemanager_client.projects[project.id].name
else "GCP Project"
),
)
report.status = "FAIL"
report.status_extended = f"Project {project.id} does not have VPC Service Controls enabled for Cloud Storage."
# GCP stores resources by project number, not project ID
project_resource_id = f"projects/{project.number}"

if project_resource_id in protected_projects:
report.status = "PASS"
report.status_extended = f"Project {project.id} has VPC Service Controls enabled for Cloud Storage in perimeter {protected_projects[project_resource_id]}."

findings.append(report)

return findings
78 changes: 76 additions & 2 deletions tests/providers/gcp/gcp_fixtures.py
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,7 @@ def mock_api_client(GCPService, service, api_version, _):
mock_api_policies_calls(client)
mock_api_sink_calls(client)
mock_api_services_calls(client)
mock_api_access_policies_calls(client)

return client

Expand Down Expand Up @@ -117,8 +118,9 @@ def mock_api_projects_calls(client: MagicMock):
"etag": "BwWWja0YfJA=",
"version": 3,
}
# Used by compute client
# Used by compute client and cloudresourcemanager
client.projects().get().execute.return_value = {
"projectNumber": "123456789012",
"commonInstanceMetadata": {
"items": [
{
Expand All @@ -134,7 +136,7 @@ def mock_api_projects_calls(client: MagicMock):
"value": "TRUE",
},
]
}
},
}
client.projects().list_next.return_value = None
# Used by dataproc client
Expand Down Expand Up @@ -1100,3 +1102,75 @@ def mock_api_services_calls(client: MagicMock):
]
}
client.services().list_next.return_value = None


def mock_api_access_policies_calls(client: MagicMock):
# Mock access policies list based on parent organization
def mock_list_access_policies(parent):
return_value = MagicMock()
# Only return policies for the first organization (123456789)
if parent == "organizations/123456789":
return_value.execute.return_value = {
"accessPolicies": [
{
"name": "accessPolicies/123456",
"title": "Test Access Policy 1",
},
{
"name": "accessPolicies/789012",
"title": "Test Access Policy 2",
},
]
}
elif parent == "organizations/987654321":
# No policies for the second organization
return_value.execute.return_value = {"accessPolicies": []}
else:
return_value.execute.return_value = {"accessPolicies": []}
return return_value

client.accessPolicies().list = mock_list_access_policies
client.accessPolicies().list_next.return_value = None

# Mock service perimeters list based on parent access policy
def mock_list_service_perimeters(parent):
return_value = MagicMock()
if parent == "accessPolicies/123456":
return_value.execute.return_value = {
"servicePerimeters": [
{
"name": "accessPolicies/123456/servicePerimeters/perimeter1",
"title": "Test Perimeter 1",
"perimeterType": "PERIMETER_TYPE_REGULAR",
"status": {
"resources": [
f"projects/{GCP_PROJECT_ID}",
],
"restrictedServices": [
"storage.googleapis.com",
"bigquery.googleapis.com",
],
},
},
{
"name": "accessPolicies/123456/servicePerimeters/perimeter2",
"title": "Test Perimeter 2",
"perimeterType": "PERIMETER_TYPE_BRIDGE",
"spec": {
"resources": [],
"restrictedServices": [
"compute.googleapis.com",
],
},
},
]
}
elif parent == "accessPolicies/789012":
# No perimeters for the second policy
return_value.execute.return_value = {"servicePerimeters": []}
else:
return_value.execute.return_value = {"servicePerimeters": []}
return return_value

client.accessPolicies().servicePerimeters().list = mock_list_service_perimeters
client.accessPolicies().servicePerimeters().list_next.return_value = None
Loading
Loading