diff --git a/.github/CODEOWNERS b/.github/CODEOWNERS index e1e3c80032a..5d973f3691a 100644 --- a/.github/CODEOWNERS +++ b/.github/CODEOWNERS @@ -79,6 +79,7 @@ /run/**/* @GoogleCloudPlatform/aap-dpes @GoogleCloudPlatform/python-samples-reviewers /run/django/**/* @glasnt @GoogleCloudPlatform/aap-dpes @GoogleCloudPlatform/python-samples-reviewers /secretmanager/**/* @GoogleCloudPlatform/aap-dpes @GoogleCloudPlatform/python-samples-reviewers +/securitycenter/**/* @GoogleCloudPlatform/dee-infra @GoogleCloudPlatform/python-samples-reviewers /storage/**/* @GoogleCloudPlatform/cloud-storage-dpes @GoogleCloudPlatform/python-samples-reviewers /storagetransfer/**/* @GoogleCloudPlatform/cloud-storage-dpes @GoogleCloudPlatform/python-samples-reviewers /texttospeech/**/* @GoogleCloudPlatform/dee-data-ai @GoogleCloudPlatform/python-samples-reviewers diff --git a/.github/blunderbuss.yml b/.github/blunderbuss.yml index 868de1d881d..60cfb68b3f4 100644 --- a/.github/blunderbuss.yml +++ b/.github/blunderbuss.yml @@ -117,6 +117,10 @@ assign_issues_by: - 'api: pubsublite' to: - anguillanneuf +- labels: + - 'api: securitycenter' + to: + - GoogleCloudPlatform/dee-infra - labels: - 'api: spanner' to: diff --git a/securitycenter/AUTHORING_GUIDE.md b/securitycenter/AUTHORING_GUIDE.md new file mode 100644 index 00000000000..55c97b32f4c --- /dev/null +++ b/securitycenter/AUTHORING_GUIDE.md @@ -0,0 +1 @@ +See https://github.com/GoogleCloudPlatform/python-docs-samples/blob/master/AUTHORING_GUIDE.md \ No newline at end of file diff --git a/securitycenter/CONTRIBUTING.md b/securitycenter/CONTRIBUTING.md new file mode 100644 index 00000000000..34c882b6f1a --- /dev/null +++ b/securitycenter/CONTRIBUTING.md @@ -0,0 +1 @@ +See https://github.com/GoogleCloudPlatform/python-docs-samples/blob/master/CONTRIBUTING.md \ No newline at end of file diff --git a/securitycenter/snippets/noxfile_config.py b/securitycenter/snippets/noxfile_config.py new file mode 100644 index 00000000000..daf5c43ae22 --- /dev/null +++ b/securitycenter/snippets/noxfile_config.py @@ -0,0 +1,40 @@ +# Copyright 2020 Google LLC +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +# Default TEST_CONFIG_OVERRIDE for python repos. + +# You can copy this file into your directory, then it will be inported from +# the noxfile.py. + +# The source of truth: +# https://github.com/GoogleCloudPlatform/python-docs-samples/blob/master/noxfile_config.py + +TEST_CONFIG_OVERRIDE = { + # You can opt out from the test for specific Python versions. + "ignored_versions": ["2.7"], + # An envvar key for determining the project id to use. Change it + # to 'BUILD_SPECIFIC_GCLOUD_PROJECT' if you want to opt in using a + # build specific Cloud project. You can also use your own string + # to use your own Cloud project. + # 'gcloud_project_env': 'GOOGLE_CLOUD_PROJECT', + "gcloud_project_env": "BUILD_SPECIFIC_GCLOUD_PROJECT", + # A dictionary you want to inject into your test. Don't put any + # secrets here. These values will override predefined values. + "envs": { + "GCLOUD_ORGANIZATION": "1081635000895", + "GCLOUD_PROJECT": "project-a-id", + "GCLOUD_PUBSUB_TOPIC": "projects/project-a-id/topics/notifications-sample-topic", + "GCLOUD_PUBSUB_SUBSCRIPTION": "notification-sample-subscription", + }, +} diff --git a/securitycenter/snippets/requirements-test.txt b/securitycenter/snippets/requirements-test.txt new file mode 100644 index 00000000000..e47a4859ff8 --- /dev/null +++ b/securitycenter/snippets/requirements-test.txt @@ -0,0 +1,2 @@ +pytest==7.2.0 +google-cloud-bigquery==3.3.5 diff --git a/securitycenter/snippets/requirements.txt b/securitycenter/snippets/requirements.txt new file mode 100644 index 00000000000..9a8e73c3bda --- /dev/null +++ b/securitycenter/snippets/requirements.txt @@ -0,0 +1,2 @@ +google-cloud-pubsub==2.13.10 +google-cloud-securitycenter==1.16.2 \ No newline at end of file diff --git a/securitycenter/snippets/snippets_bigquery_export.py b/securitycenter/snippets/snippets_bigquery_export.py new file mode 100644 index 00000000000..591d9af91c4 --- /dev/null +++ b/securitycenter/snippets/snippets_bigquery_export.py @@ -0,0 +1,190 @@ +#!/usr/bin/env python +# +# Copyright 2022 Google LLC +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# https://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +"""Snippets on exporting findings from Security Command Center to BigQuery.""" + + +# [START securitycenter_create_bigquery_export] + + +def create_bigquery_export( + parent: str, export_filter: str, bigquery_dataset_id: str, bigquery_export_id: str +): + + from google.cloud import securitycenter + + """ + Create export configuration to export findings from a project to a BigQuery dataset. + Optionally specify filter to export certain findings only. + + Args: + parent: Use any one of the following resource paths: + - organizations/{organization_id} + - folders/{folder_id} + - projects/{project_id} + export_filter: Expression that defines the filter to apply across create/update events of findings. + bigquery_dataset_id: The BigQuery dataset to write findings' updates to. + bigquery_export_id: Unique identifier provided by the client. + - example id: f"default-{str(uuid.uuid4()).split('-')[0]}" + For more info, see: + https://cloud.google.com/security-command-center/docs/how-to-analyze-findings-in-big-query#export_findings_from_to + """ + client = securitycenter.SecurityCenterClient() + + # Create the BigQuery export configuration. + bigquery_export = securitycenter.BigQueryExport() + bigquery_export.description = "Export low and medium findings if the compute resource has an IAM anomalous grant" + bigquery_export.filter = export_filter + bigquery_export.dataset = f"{parent}/datasets/{bigquery_dataset_id}" + + request = securitycenter.CreateBigQueryExportRequest() + request.parent = parent + request.big_query_export = bigquery_export + request.big_query_export_id = bigquery_export_id + + # Create the export request. + response = client.create_big_query_export(request) + + print(f"BigQuery export request created successfully: {response.name}\n") + + +# [END securitycenter_create_bigquery_export] + + +# [START securitycenter_get_bigquery_export] +def get_bigquery_export(parent: str, bigquery_export_id: str): + from google.cloud import securitycenter + + """ + Retrieve an existing BigQuery export. + Args: + parent: Use any one of the following resource paths: + - organizations/{organization_id} + - folders/{folder_id} + - projects/{project_id} + bigquery_export_id: Unique identifier that is used to identify the export. + """ + + client = securitycenter.SecurityCenterClient() + + request = securitycenter.GetBigQueryExportRequest() + request.name = f"{parent}/bigQueryExports/{bigquery_export_id}" + + response = client.get_big_query_export(request) + print(f"Retrieved the BigQuery export: {response.name}") + + +# [END securitycenter_get_bigquery_export] + + +# [START securitycenter_list_bigquery_export] +def list_bigquery_exports(parent: str): + from google.cloud import securitycenter + + """ + List BigQuery exports in the given parent. + Args: + parent: The parent which owns the collection of BigQuery exports. + Use any one of the following resource paths: + - organizations/{organization_id} + - folders/{folder_id} + - projects/{project_id} + """ + + client = securitycenter.SecurityCenterClient() + + request = securitycenter.ListBigQueryExportsRequest() + request.parent = parent + + response = client.list_big_query_exports(request) + + print("Listing BigQuery exports:") + for bigquery_export in response: + print(bigquery_export.name) + + +# [END securitycenter_list_bigquery_export] + + +# [START securitycenter_update_bigquery_export] +def update_bigquery_export(parent: str, export_filter: str, bigquery_export_id: str): + """ + Updates an existing BigQuery export. + Args: + parent: Use any one of the following resource paths: + - organizations/{organization_id} + - folders/{folder_id} + - projects/{project_id} + export_filter: Expression that defines the filter to apply across create/update events of findings. + bigquery_export_id: Unique identifier provided by the client. + For more info, see: + https://cloud.google.com/security-command-center/docs/how-to-analyze-findings-in-big-query#export_findings_from_to + """ + from google.cloud import securitycenter + from google.protobuf import field_mask_pb2 + + client = securitycenter.SecurityCenterClient() + + # Set the new values for export configuration. + bigquery_export = securitycenter.BigQueryExport() + bigquery_export.name = f"{parent}/bigQueryExports/{bigquery_export_id}" + bigquery_export.filter = export_filter + + # Field mask to only update the export filter. + # Set the update mask to specify which properties should be updated. + # If empty, all mutable fields will be updated. + # For more info on constructing field mask path, see the proto or: + # https://googleapis.dev/python/protobuf/latest/google/protobuf/field_mask_pb2.html + field_mask = field_mask_pb2.FieldMask(paths=["filter"]) + + request = securitycenter.UpdateBigQueryExportRequest() + request.big_query_export = bigquery_export + request.update_mask = field_mask + + response = client.update_big_query_export(request) + + if response.filter != export_filter: + print("Failed to update BigQueryExport!") + return + print("BigQueryExport updated successfully!") + + +# [END securitycenter_update_bigquery_export] + + +# [START securitycenter_delete_bigquery_export] +def delete_bigquery_export(parent: str, bigquery_export_id: str): + """ + Delete an existing BigQuery export. + Args: + parent: Use any one of the following resource paths: + - organizations/{organization_id} + - folders/{folder_id} + - projects/{project_id} + bigquery_export_id: Unique identifier that is used to identify the export. + """ + from google.cloud import securitycenter + + client = securitycenter.SecurityCenterClient() + + request = securitycenter.DeleteBigQueryExportRequest() + request.name = f"{parent}/bigQueryExports/{bigquery_export_id}" + + client.delete_big_query_export(request) + print(f"BigQuery export request deleted successfully: {bigquery_export_id}") + + +# [END securitycenter_delete_bigquery_export] diff --git a/securitycenter/snippets/snippets_bigquery_export_test.py b/securitycenter/snippets/snippets_bigquery_export_test.py new file mode 100644 index 00000000000..197bd6f6b1b --- /dev/null +++ b/securitycenter/snippets/snippets_bigquery_export_test.py @@ -0,0 +1,96 @@ +#!/usr/bin/env python +# +# Copyright 2022 Google LLC +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# https://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + + +# TODO(developer): Replace these variables before running the sample. +import os +import re +import uuid + +from _pytest.capture import CaptureFixture +import pytest + +import snippets_bigquery_export + +PROJECT_ID = os.environ["GOOGLE_CLOUD_PROJECT"] +GOOGLE_APPLICATION_CREDENTIALS = os.environ["GOOGLE_APPLICATION_CREDENTIALS"] +BIGQUERY_DATASET_ID = f"sampledataset{str(uuid.uuid4()).split('-')[0]}" + + +@pytest.fixture(scope="module") +def bigquery_export_id(): + bigquery_export_id = f"default-{str(uuid.uuid4()).split('-')[0]}" + + create_bigquery_dataset(BIGQUERY_DATASET_ID) + export_filter = 'severity="LOW" OR severity="MEDIUM"' + snippets_bigquery_export.create_bigquery_export( + f"projects/{PROJECT_ID}", export_filter, BIGQUERY_DATASET_ID, bigquery_export_id + ) + + yield bigquery_export_id + + snippets_bigquery_export.delete_bigquery_export( + f"projects/{PROJECT_ID}", bigquery_export_id + ) + delete_bigquery_dataset(BIGQUERY_DATASET_ID) + + +def create_bigquery_dataset(dataset_id: str): + from google.cloud import bigquery + + bigquery_client = bigquery.Client() + + dataset_id_full = "{}.{}".format(PROJECT_ID, dataset_id) + dataset = bigquery.Dataset(dataset_id_full) + + dataset = bigquery_client.create_dataset(dataset) + print("Dataset {} created.".format(dataset.dataset_id)) + + +def delete_bigquery_dataset(dataset_id: str): + from google.cloud import bigquery + + bigquery_client = bigquery.Client() + bigquery_client.delete_dataset(dataset_id) + print("Dataset {} deleted.".format(dataset_id)) + + +def test_get_bigquery_export(capsys: CaptureFixture, bigquery_export_id: str): + snippets_bigquery_export.get_bigquery_export( + f"projects/{PROJECT_ID}", bigquery_export_id + ) + out, _ = capsys.readouterr() + assert re.search( + "Retrieved the BigQuery export", + out, + ) + assert re.search(f"bigQueryExports/{bigquery_export_id}", out) + + +def test_list_bigquery_exports(capsys: CaptureFixture, bigquery_export_id: str): + snippets_bigquery_export.list_bigquery_exports(f"projects/{PROJECT_ID}") + out, _ = capsys.readouterr() + assert re.search("Listing BigQuery exports:", out) + assert re.search(bigquery_export_id, out) + + +def test_update_bigquery_exports(capsys: CaptureFixture, bigquery_export_id: str): + export_filter = 'severity="MEDIUM"' + snippets_bigquery_export.update_bigquery_export( + f"projects/{PROJECT_ID}", export_filter, bigquery_export_id + ) + out, _ = capsys.readouterr() + assert re.search("BigQueryExport updated successfully!", out) diff --git a/securitycenter/snippets/snippets_findings.py b/securitycenter/snippets/snippets_findings.py new file mode 100644 index 00000000000..06cddc412f0 --- /dev/null +++ b/securitycenter/snippets/snippets_findings.py @@ -0,0 +1,586 @@ +#!/usr/bin/env python +# +# Copyright 2019 Google LLC +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# https://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +"""Examples of working with source and findings in Cloud Security Command Center.""" + + +def create_source(organization_id): + """Create a new findings source.""" + # [START securitycenter_create_source] + from google.cloud import securitycenter + + client = securitycenter.SecurityCenterClient() + # organization_id is the numeric ID of the organization. e.g.: + # organization_id = "111122222444" + org_name = "organizations/{org_id}".format(org_id=organization_id) + + created = client.create_source( + request={ + "parent": org_name, + "source": { + "display_name": "Customized Display Name", + "description": "A new custom source that does X", + }, + } + ) + print("Created Source: {}".format(created.name)) + # [END securitycenter_create_source] + + +def get_source(source_name): + """Gets an existing source.""" + # [START securitycenter_get_source] + from google.cloud import securitycenter + + client = securitycenter.SecurityCenterClient() + + # source_name is the resource path for a source that has been + # created previously (you can use list_sources to find a specific one). + # Its format is: + # source_name = "organizations/{organization_id}/sources/{source_id}" + # e.g.: + # source_name = "organizations/111122222444/sources/1234" + source = client.get_source(request={"name": source_name}) + + print("Source: {}".format(source)) + # [END securitycenter_get_source] + return source + + +def update_source(source_name): + """Updates a source's display name.""" + # [START securitycenter_update_source] + from google.cloud import securitycenter + from google.protobuf import field_mask_pb2 + + client = securitycenter.SecurityCenterClient() + + # Field mask to only update the display name. + field_mask = field_mask_pb2.FieldMask(paths=["display_name"]) + + # source_name is the resource path for a source that has been + # created previously (you can use list_sources to find a specific one). + # Its format is: + # source_name = "organizations/{organization_id}/sources/{source_id}" + # e.g.: + # source_name = "organizations/111122222444/sources/1234" + updated = client.update_source( + request={ + "source": {"name": source_name, "display_name": "Updated Display Name"}, + "update_mask": field_mask, + } + ) + print("Updated Source: {}".format(updated)) + # [END securitycenter_update_source] + return updated + + +def add_user_to_source(source_name): + """Gives a user findingsEditor permission to the source.""" + user_email = "csccclienttest@gmail.com" + # [START securitycenter_set_source_iam] + from google.cloud import securitycenter + from google.iam.v1 import policy_pb2 + + client = securitycenter.SecurityCenterClient() + + # source_name is the resource path for a source that has been + # created previously (you can use list_sources to find a specific one). + # Its format is: + # source_name = "organizations/{organization_id}/sources/{source_id}" + # e.g.: + # source_name = "organizations/111122222444/sources/1234" + # Get the old policy so we can do an incremental update. + old_policy = client.get_iam_policy(request={"resource": source_name}) + print("Old Policy: {}".format(old_policy)) + + # Setup a new IAM binding. + binding = policy_pb2.Binding() + binding.role = "roles/securitycenter.findingsEditor" + # user_email is an e-mail address known to Cloud IAM (e.g. a gmail address). + # user_mail = user@somedomain.com + binding.members.append("user:{}".format(user_email)) + + # Setting the e-tag avoids over-write existing policy + updated = client.set_iam_policy( + request={ + "resource": source_name, + "policy": {"etag": old_policy.etag, "bindings": [binding]}, + } + ) + + print("Updated Policy: {}".format(updated)) + + # [END securitycenter_set_source_iam] + return binding, updated + + +def list_source(organization_id): + """Lists finding sources.""" + i = -1 + # [START securitycenter_list_sources] + from google.cloud import securitycenter + + # Create a new client. + client = securitycenter.SecurityCenterClient() + # organization_id is the numeric ID of the organization. e.g.: + # organization_id = "111122222444" + org_name = "organizations/{org_id}".format(org_id=organization_id) + + # Call the API and print out each existing source. + for i, source in enumerate(client.list_sources(request={"parent": org_name})): + print(i, source) + # [END securitycenter_list_sources] + return i + + +def create_finding(source_name, finding_id): + """Creates a new finding.""" + # [START securitycenter_create_finding] + import datetime + + from google.cloud import securitycenter + from google.cloud.securitycenter_v1 import Finding + + # Create a new client. + client = securitycenter.SecurityCenterClient() + + # Use the current time as the finding "event time". + event_time = datetime.datetime.now(tz=datetime.timezone.utc) + + # source_name is the resource path for a source that has been + # created previously (you can use list_sources to find a specific one). + # Its format is: + # source_name = "organizations/{organization_id}/sources/{source_id}" + # e.g.: + # source_name = "organizations/111122222444/sources/1234" + + # The resource this finding applies to. The CSCC UI can link + # the findings for a resource to the corresponding Asset of a resource + # if there are matches. + resource_name = "//cloudresourcemanager.googleapis.com/organizations/11232" + + finding = Finding( + state=Finding.State.ACTIVE, + resource_name=resource_name, + category="MEDIUM_RISK_ONE", + event_time=event_time, + ) + + # Call The API. + created_finding = client.create_finding( + request={"parent": source_name, "finding_id": finding_id, "finding": finding} + ) + print(created_finding) + # [END securitycenter_create_finding] + return created_finding + + +def create_finding_with_source_properties(source_name): + """Demonstrate creating a new finding with source properties.""" + # [START securitycenter_create_finding_with_source_properties] + import datetime + + from google.cloud import securitycenter + from google.cloud.securitycenter_v1 import Finding + from google.protobuf.struct_pb2 import Value + + # Create a new client. + client = securitycenter.SecurityCenterClient() + + # source_name is the resource path for a source that has been + # created previously (you can use list_sources to find a specific one). + # Its format is: + # source_name = "organizations/{organization_id}/sources/{source_id}" + # e.g.: + # source_name = "organizations/111122222444/sources/1234" + + # Controlled by caller. + finding_id = "samplefindingid2" + + # The resource this finding applies to. The CSCC UI can link + # the findings for a resource to the corresponding Asset of a resource + # if there are matches. + resource_name = "//cloudresourcemanager.googleapis.com/organizations/11232" + + # Define source properties values as protobuf "Value" objects. + str_value = Value() + str_value.string_value = "string_example" + num_value = Value() + num_value.number_value = 1234 + + # Use the current time as the finding "event time". + event_time = datetime.datetime.now(tz=datetime.timezone.utc) + + finding = Finding( + state=Finding.State.ACTIVE, + resource_name=resource_name, + category="MEDIUM_RISK_ONE", + source_properties={"s_value": "string_example", "n_value": 1234}, + event_time=event_time, + ) + + created_finding = client.create_finding( + request={"parent": source_name, "finding_id": finding_id, "finding": finding} + ) + print(created_finding) + # [END securitycenter_create_finding_with_source_properties] + + +def update_finding(source_name): + # [START securitycenter_update_finding_source_properties] + import datetime + + from google.cloud import securitycenter + from google.cloud.securitycenter_v1 import Finding + from google.protobuf import field_mask_pb2 + + client = securitycenter.SecurityCenterClient() + # Only update the specific source property and event_time. event_time + # is required for updates. + field_mask = field_mask_pb2.FieldMask( + paths=["source_properties.s_value", "event_time"] + ) + + # Set the update time to Now. This must be some time greater then the + # event_time on the original finding. + event_time = datetime.datetime.now(tz=datetime.timezone.utc) + + # source_name is the resource path for a source that has been + # created previously (you can use list_sources to find a specific one). + # Its format is: + # source_name = "organizations/{organization_id}/sources/{source_id}" + # e.g.: + # source_name = "organizations/111122222444/sources/1234" + finding_name = "{}/findings/samplefindingid2".format(source_name) + finding = Finding( + name=finding_name, + source_properties={"s_value": "new_string"}, + event_time=event_time, + ) + updated_finding = client.update_finding( + request={"finding": finding, "update_mask": field_mask} + ) + + print( + "New Source properties: {}, Event Time {}".format( + updated_finding.source_properties, updated_finding.event_time + ) + ) + # [END securitycenter_update_finding_source_properties] + + +def update_finding_state(source_name): + """Demonstrate updating only a finding state.""" + # [START securitycenter_update_finding_state] + import datetime + + from google.cloud import securitycenter + from google.cloud.securitycenter_v1 import Finding + + # Create a client. + client = securitycenter.SecurityCenterClient() + # source_name is the resource path for a source that has been + # created previously (you can use list_sources to find a specific one). + # Its format is: + # source_name = "organizations/{organization_id}/sources/{source_id}" + # e.g.: + # source_name = "organizations/111122222444/sources/1234" + finding_name = "{}/findings/samplefindingid2".format(source_name) + + # Call the API to change the finding state to inactive as of now. + new_finding = client.set_finding_state( + request={ + "name": finding_name, + "state": Finding.State.INACTIVE, + "start_time": datetime.datetime.now(tz=datetime.timezone.utc), + } + ) + print(f"New state: {new_finding.state}") + # [END securitycenter_update_finding_state] + + +def trouble_shoot(source_name): + """Demonstrate calling test_iam_permissions to determine if the + service account has the correct permisions.""" + # [START securitycenter_test_iam] + from google.cloud import securitycenter + + # Create a client. + client = securitycenter.SecurityCenterClient() + # source_name is the resource path for a source that has been + # created previously (you can use list_sources to find a specific one). + # Its format is: + # source_name = "organizations/{organization_id}/sources/{source_id}" + # e.g.: + # source_name = "organizations/111122222444/sources/1234" + + # Check for permssions to call create_finding or update_finding. + permission_response = client.test_iam_permissions( + request={ + "resource": source_name, + "permissions": ["securitycenter.findings.update"], + } + ) + + print( + "Permision to create or update findings? {}".format( + len(permission_response.permissions) > 0 + ) + ) + # [END securitycenter_test_iam] + assert len(permission_response.permissions) > 0 + # [START securitycenter_test_iam] + # Check for permissions necessary to call set_finding_state. + permission_response = client.test_iam_permissions( + request={ + "resource": source_name, + "permissions": ["securitycenter.findings.setState"], + } + ) + print( + "Permision to update state? {}".format(len(permission_response.permissions) > 0) + ) + # [END securitycenter_test_iam] + return permission_response + assert len(permission_response.permissions) > 0 + + +def list_all_findings(organization_id): + # [START securitycenter_list_all_findings] + from google.cloud import securitycenter + + # Create a client. + client = securitycenter.SecurityCenterClient() + + # organization_id is the numeric ID of the organization. e.g.: + # organization_id = "111122222444" + org_name = "organizations/{org_id}".format(org_id=organization_id) + # The "sources/-" suffix lists findings across all sources. You + # also use a specific source_name instead. + all_sources = "{org_name}/sources/-".format(org_name=org_name) + finding_result_iterator = client.list_findings(request={"parent": all_sources}) + for i, finding_result in enumerate(finding_result_iterator): + print( + "{}: name: {} resource: {}".format( + i, finding_result.finding.name, finding_result.finding.resource_name + ) + ) + # [END securitycenter_list_all_findings] + return i + + +def list_filtered_findings(source_name): + # [START securitycenter_list_filtered_findings] + from google.cloud import securitycenter + + # Create a new client. + client = securitycenter.SecurityCenterClient() + + # source_name is the resource path for a source that has been + # created previously (you can use list_sources to find a specific one). + # Its format is: + # source_name = "organizations/{organization_id}/sources/{source_id}" + # e.g.: + # source_name = "organizations/111122222444/sources/1234" + # You an also use a wild-card "-" for all sources: + # source_name = "organizations/111122222444/sources/-" + finding_result_iterator = client.list_findings( + request={"parent": source_name, "filter": 'category="MEDIUM_RISK_ONE"'} + ) + # Iterate an print all finding names and the resource they are + # in reference to. + for i, finding_result in enumerate(finding_result_iterator): + print( + "{}: name: {} resource: {}".format( + i, finding_result.finding.name, finding_result.finding.resource_name + ) + ) + # [END securitycenter_list_filtered_findings] + return i + + +def list_findings_at_time(source_name): + # [START securitycenter_list_findings_at_time] + from datetime import datetime, timedelta + + from google.cloud import securitycenter + + # Create a new client. + client = securitycenter.SecurityCenterClient() + + # source_name is the resource path for a source that has been + # created previously (you can use list_sources to find a specific one). + # Its format is: + # source_name = "organizations/{organization_id}/sources/{source_id}" + # e.g.: + # source_name = "organizations/111122222444/sources/1234" + # You an also use a wild-card "-" for all sources: + # source_name = "organizations/111122222444/sources/-" + five_days_ago = str(datetime.now() - timedelta(days=5)) + # [END securitycenter_list_findings_at_time] + i = -1 + # [START securitycenter_list_findings_at_time] + + finding_result_iterator = client.list_findings( + request={"parent": source_name, "filter": five_days_ago} + ) + for i, finding_result in enumerate(finding_result_iterator): + print( + "{}: name: {} resource: {}".format( + i, finding_result.finding.name, finding_result.finding.resource_name + ) + ) + # [END securitycenter_list_findings_at_time] + return i + + +def get_iam_policy(source_name): + """Gives a user findingsEditor permission to the source.""" + # [START securitycenter_get_source_iam] + from google.cloud import securitycenter + + client = securitycenter.SecurityCenterClient() + + # source_name is the resource path for a source that has been + # created previously (you can use list_sources to find a specific one). + # Its format is: + # source_name = "organizations/{organization_id}/sources/{source_id}" + # e.g.: + # source_name = "organizations/111122222444/sources/1234" + # Get the old policy so we can do an incremental update. + policy = client.get_iam_policy(request={"resource": source_name}) + print("Policy: {}".format(policy)) + # [END securitycenter_get_source_iam] + + +def group_all_findings(organization_id): + """Demonstrates grouping all findings across an organization.""" + i = 0 + # [START securitycenter_group_all_findings] + from google.cloud import securitycenter + + # Create a client. + client = securitycenter.SecurityCenterClient() + + # organization_id is the numeric ID of the organization. e.g.: + # organization_id = "111122222444" + org_name = "organizations/{org_id}".format(org_id=organization_id) + # The "sources/-" suffix lists findings across all sources. You + # also use a specific source_name instead. + all_sources = "{org_name}/sources/-".format(org_name=org_name) + group_result_iterator = client.group_findings( + request={"parent": all_sources, "group_by": "category"} + ) + for i, group_result in enumerate(group_result_iterator): + print((i + 1), group_result) + # [END securitycenter_group_all_findings] + return i + + +def group_filtered_findings(source_name): + """Demonstrates grouping all findings across an organization.""" + i = 0 + # [START securitycenter_group_filtered_findings] + from google.cloud import securitycenter + + # Create a client. + client = securitycenter.SecurityCenterClient() + + # source_name is the resource path for a source that has been + # created previously (you can use list_sources to find a specific one). + # Its format is: + # source_name = "organizations/{organization_id}/sources/{source_id}" + # e.g.: + # source_name = "organizations/111122222444/sources/1234" + + group_result_iterator = client.group_findings( + request={ + "parent": source_name, + "group_by": "category", + "filter": 'state="ACTIVE"', + } + ) + for i, group_result in enumerate(group_result_iterator): + print((i + 1), group_result) + # [END securitycenter_group_filtered_findings] + return i + + +def group_findings_at_time(source_name): + """Demonstrates grouping all findings across an organization as of + a specific time.""" + i = -1 + # [START securitycenter_group_findings_at_time] + from datetime import datetime, timedelta + + from google.cloud import securitycenter + + # Create a client. + client = securitycenter.SecurityCenterClient() + + # source_name is the resource path for a source that has been + # created previously (you can use list_sources to find a specific one). + # Its format is: + # source_name = "organizations/{organization_id}/sources/{source_id}" + # e.g.: + # source_name = "organizations/111122222444/sources/1234" + + # Group findings as of yesterday. + read_time = datetime.utcnow() - timedelta(days=1) + + group_result_iterator = client.group_findings( + request={"parent": source_name, "group_by": "category", "read_time": read_time} + ) + for i, group_result in enumerate(group_result_iterator): + print((i + 1), group_result) + # [END securitycenter_group_findings_at_time] + return i + + +def group_findings_and_changes(source_name): + """Demonstrates grouping all findings across an organization and + associated changes.""" + i = 0 + # [START securitycenter_group_findings_with_changes] + from datetime import timedelta + + from google.cloud import securitycenter + + # Create a client. + client = securitycenter.SecurityCenterClient() + + # source_name is the resource path for a source that has been + # created previously (you can use list_sources to find a specific one). + # Its format is: + # source_name = "organizations/{organization_id}/sources/{source_id}" + # e.g.: + # source_name = "organizations/111122222444/sources/1234" + + # List assets and their state change the last 30 days + compare_delta = timedelta(days=30) + + group_result_iterator = client.group_findings( + request={ + "parent": source_name, + "group_by": "state_change", + "compare_duration": compare_delta, + } + ) + for i, group_result in enumerate(group_result_iterator): + print((i + 1), group_result) + # [END securitycenter_group_findings_with_changes]] + return i diff --git a/securitycenter/snippets/snippets_findings_test.py b/securitycenter/snippets/snippets_findings_test.py new file mode 100644 index 00000000000..8cd6353f6e0 --- /dev/null +++ b/securitycenter/snippets/snippets_findings_test.py @@ -0,0 +1,135 @@ +#!/usr/bin/env python +# +# Copyright 2020 Google LLC +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# https://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +from itertools import chain +import os + +import pytest + +import snippets_findings + + +@pytest.fixture(scope="module") +def organization_id(): + """Get Organization ID from the environment variable""" + return os.environ["GCLOUD_ORGANIZATION"] + + +@pytest.fixture(scope="module") +def source_name(organization_id): + from google.cloud import securitycenter + + client = securitycenter.SecurityCenterClient() + org_name = "organizations/{org_id}".format(org_id=organization_id) + + source = client.create_source( + request={ + "parent": org_name, + "source": { + "display_name": "Unit test source", + "description": "A new custom source that does X", + }, + } + ) + return source.name + + +def test_create_source(organization_id): + snippets_findings.create_source(organization_id) + + +def test_get_source(source_name): + source = snippets_findings.get_source(source_name) + assert source.name == source_name + + +def test_update_source(source_name): + updated = snippets_findings.update_source(source_name) + assert updated.display_name == "Updated Display Name" + + +def test_add_user_to_source(source_name): + binding, updated = snippets_findings.add_user_to_source(source_name) + assert any( + member == "user:csccclienttest@gmail.com" + for member in chain.from_iterable( + binding.members for binding in updated.bindings + ) + ) + + +def test_list_source(organization_id): + count = snippets_findings.list_source(organization_id) + assert count >= 0 + + +def test_create_finding(source_name): + created_finding = snippets_findings.create_finding(source_name, "samplefindingid") + assert len(created_finding.name) > 0 + + +def test_create_finding_with_source_properties(source_name): + snippets_findings.create_finding_with_source_properties(source_name) + + +def test_update_finding(source_name): + snippets_findings.update_finding(source_name) + + +def test_update_finding_state(source_name): + snippets_findings.update_finding_state(source_name) + + +def test_trouble_shoot(source_name): + snippets_findings.trouble_shoot(source_name) + + +def test_list_all_findings(organization_id): + count = snippets_findings.list_all_findings(organization_id) + assert count > 0 + + +def test_list_filtered_findings(source_name): + count = snippets_findings.list_filtered_findings(source_name) + assert count > 0 + + +def list_findings_at_time(source_name): + count = snippets_findings.list_findings_at_time(source_name) + assert count == -1 + + +def test_get_iam_policy(source_name): + snippets_findings.get_iam_policy(source_name) + + +def test_group_all_findings(organization_id): + count = snippets_findings.group_all_findings(organization_id) + assert count > 0 + + +def test_group_filtered_findings(source_name): + count = snippets_findings.group_filtered_findings(source_name) + assert count == 0 + + +def test_group_findings_at_time(source_name): + count = snippets_findings.group_findings_at_time(source_name) + assert count == -1 + + +def test_group_findings_and_changes(source_name): + count = snippets_findings.group_findings_and_changes(source_name) + assert count == 0 diff --git a/securitycenter/snippets/snippets_list_assets.py b/securitycenter/snippets/snippets_list_assets.py new file mode 100644 index 00000000000..42511b94b3e --- /dev/null +++ b/securitycenter/snippets/snippets_list_assets.py @@ -0,0 +1,208 @@ +#!/usr/bin/env python +# +# Copyright 2019 Google LLC +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# https://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +""" Examples of listing assets in Cloud Security Command Center.""" + + +def list_all_assets(organization_id): + """Demonstrate listing and printing all assets.""" + i = 0 + # [START securitycenter_list_all_assets] + from google.cloud import securitycenter + + client = securitycenter.SecurityCenterClient() + # organization_id is the numeric ID of the organization. + # organization_id = "1234567777" + org_name = "organizations/{org_id}".format(org_id=organization_id) + + # Call the API and print results. + asset_iterator = client.list_assets(request={"parent": org_name}) + for i, asset_result in enumerate(asset_iterator): + print(i, asset_result) + # [END securitycenter_list_all_assets] + return i + + +def list_assets_with_filters(organization_id): + """Demonstrate listing assets with a filter.""" + i = 0 + # [START securitycenter_list_assets_with_filter] + from google.cloud import securitycenter + + client = securitycenter.SecurityCenterClient() + + # organization_id is the numeric ID of the organization. + # organization_id = "1234567777" + org_name = "organizations/{org_id}".format(org_id=organization_id) + + project_filter = ( + "security_center_properties.resource_type=" + + '"google.cloud.resourcemanager.Project"' + ) + # Call the API and print results. + asset_iterator = client.list_assets( + request={"parent": org_name, "filter": project_filter} + ) + for i, asset_result in enumerate(asset_iterator): + print(i, asset_result) + # [END securitycenter_list_assets_with_filter] + return i + + +def list_assets_with_filters_and_read_time(organization_id): + """Demonstrate listing assets with a filter.""" + i = 0 + # [START securitycenter_list_assets_at_time] + from datetime import datetime, timedelta, timezone + + from google.cloud import securitycenter + + client = securitycenter.SecurityCenterClient() + + # organization_id is the numeric ID of the organization. + # organization_id = "1234567777" + org_name = "organizations/{org_id}".format(org_id=organization_id) + + project_filter = ( + "security_center_properties.resource_type=" + + '"google.cloud.resourcemanager.Project"' + ) + + # Lists assets as of yesterday. + read_time = datetime.now(tz=timezone.utc) - timedelta(days=1) + + # Call the API and print results. + asset_iterator = client.list_assets( + request={"parent": org_name, "filter": project_filter, "read_time": read_time} + ) + for i, asset_result in enumerate(asset_iterator): + print(i, asset_result) + # [END securitycenter_list_assets_at_time] + return i + + +def list_point_in_time_changes(organization_id): + """Demonstrate listing assets along with their state changes.""" + i = 0 + # [START securitycenter_list_assets_and_changes] + from datetime import timedelta + + from google.cloud import securitycenter + + client = securitycenter.SecurityCenterClient() + + # organization_id is the numeric ID of the organization. + # organization_id = "1234567777" + org_name = "organizations/{org_id}".format(org_id=organization_id) + project_filter = ( + "security_center_properties.resource_type=" + + '"google.cloud.resourcemanager.Project"' + ) + + # List assets and their state change the last 30 days + compare_delta = timedelta(days=30) + + # Call the API and print results. + asset_iterator = client.list_assets( + request={ + "parent": org_name, + "filter": project_filter, + "compare_duration": compare_delta, + } + ) + for i, asset in enumerate(asset_iterator): + print(i, asset) + + # [END securitycenter_list_assets_and_changes] + return i + + +def group_assets(organization_id): + """Demonstrates grouping all assets by type.""" + i = 0 + # [START securitycenter_group_all_assets] + from google.cloud import securitycenter + + client = securitycenter.SecurityCenterClient() + + # organization_id is the numeric ID of the organization. + # organization_id = "1234567777" + org_name = "organizations/{org_id}".format(org_id=organization_id) + + group_by_type = "security_center_properties.resource_type" + + result_iterator = client.group_assets( + request={"parent": org_name, "group_by": group_by_type} + ) + for i, result in enumerate(result_iterator): + print((i + 1), result) + # [END securitycenter_group_all_assets] + return i + + +def group_filtered_assets(organization_id): + """Demonstrates grouping assets by type with a filter.""" + i = 0 + # [START securitycenter_group_all_assets_with_filter] + from google.cloud import securitycenter + + client = securitycenter.SecurityCenterClient() + + # organization_id is the numeric ID of the organization. + # organization_id = "1234567777" + org_name = "organizations/{org_id}".format(org_id=organization_id) + + group_by_type = "security_center_properties.resource_type" + only_projects = ( + "security_center_properties.resource_type=" + + '"google.cloud.resourcemanager.Project"' + ) + result_iterator = client.group_assets( + request={"parent": org_name, "group_by": group_by_type, "filter": only_projects} + ) + for i, result in enumerate(result_iterator): + print((i + 1), result) + # [END securitycenter_group_all_assets_with_filter] + # only one asset type is a project + return i + + +def group_assets_by_changes(organization_id): + """Demonstrates grouping assets by their changes over a period of time.""" + i = 0 + # [START securitycenter_group_all_assets_by_change] + from datetime import timedelta + + from google.cloud import securitycenter + + client = securitycenter.SecurityCenterClient() + + duration = timedelta(days=5) + + # organization_id is the numeric ID of the organization. + # organization_id = "1234567777" + org_name = "organizations/{org_id}".format(org_id=organization_id) + result_iterator = client.group_assets( + request={ + "parent": org_name, + "group_by": "state_change", + "compare_duration": duration, + } + ) + for i, result in enumerate(result_iterator): + print((i + 1), result) + # [END securitycenter_group_all_assets_by_change] + return i diff --git a/securitycenter/snippets/snippets_list_assets_test.py b/securitycenter/snippets/snippets_list_assets_test.py new file mode 100644 index 00000000000..4451423840c --- /dev/null +++ b/securitycenter/snippets/snippets_list_assets_test.py @@ -0,0 +1,64 @@ +#!/usr/bin/env python +# +# Copyright 2020 Google LLC +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# https://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +"""Tests for snippets.""" + +import os + +import pytest + +import snippets_list_assets + + +@pytest.fixture(scope="module") +def organization_id(): + """Get Organization ID from the environment variable""" + return os.environ["GCLOUD_ORGANIZATION"] + + +def test_list_all_assets(organization_id): + """Demonstrate listing and printing all assets.""" + count = snippets_list_assets.list_all_assets(organization_id) + assert count > 0 + + +def list_assets_with_filters(organization_id): + count = snippets_list_assets.list_all_assets(organization_id) + assert count > 0 + + +def test_list_assets_with_filters_and_read_time(organization_id): + count = snippets_list_assets.list_assets_with_filters_and_read_time(organization_id) + assert count > 0 + + +def test_list_point_in_time_changes(organization_id): + count = snippets_list_assets.list_point_in_time_changes(organization_id) + assert count > 0 + + +def test_group_assets(organization_id): + count = snippets_list_assets.group_assets(organization_id) + assert count >= 8 # 8 different asset types. + + +def test_group_filtered_assets(organization_id): + count = snippets_list_assets.group_filtered_assets(organization_id) + assert count == 0 + + +def test_group_assets_by_changes(organization_id): + count = snippets_list_assets.group_assets_by_changes(organization_id) + assert count >= 0 # only one asset type is a project diff --git a/securitycenter/snippets/snippets_mute_config.py b/securitycenter/snippets/snippets_mute_config.py new file mode 100644 index 00000000000..97de131216e --- /dev/null +++ b/securitycenter/snippets/snippets_mute_config.py @@ -0,0 +1,264 @@ +#!/usr/bin/env python +# +# Copyright 2022 Google LLC +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# https://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + + +# [START securitycenter_create_mute_config] + + +def create_mute_rule(parent_path: str, mute_config_id: str) -> None: + """ + Creates a mute configuration under a given scope that will mute + all new findings that match a given filter. + Existing findings will NOT BE muted. + Args: + parent_path: use any one of the following options: + - organizations/{organization_id} + - folders/{folder_id} + - projects/{project_id} + mute_config_id: Set a unique id; max of 63 chars. + """ + + from google.cloud import securitycenter + + client = securitycenter.SecurityCenterClient() + + mute_config = securitycenter.MuteConfig() + mute_config.description = "Mute low-medium IAM grants excluding 'compute' " + # Set mute rule(s). + # To construct mute rules and for supported properties, see: + # https://cloud.google.com/security-command-center/docs/how-to-mute-findings#create_mute_rules + mute_config.filter = ( + 'severity="LOW" OR severity="MEDIUM" AND ' + 'category="Persistence: IAM Anomalous Grant" AND ' + '-resource.type:"compute"' + ) + + request = securitycenter.CreateMuteConfigRequest() + request.parent = parent_path + request.mute_config_id = mute_config_id + request.mute_config = mute_config + + mute_config = client.create_mute_config(request=request) + print(f"Mute rule created successfully: {mute_config.name}") + + +# [END securitycenter_create_mute_config] + + +# [START securitycenter_delete_mute_config] +def delete_mute_rule(mute_config_name: str) -> None: + """ + Deletes a mute configuration given its resource name. + Note: Previously muted findings are not affected when a mute config is deleted. + Args: + mute_config_name: Specify the name of the mute config to delete. + Use any one of the following formats: + - organizations/{organization}/muteConfigs/{config_id} + - folders/{folder}/muteConfigs/{config_id} or + - projects/{project}/muteConfigs/{config_id} + """ + from google.cloud import securitycenter + + client = securitycenter.SecurityCenterClient() + + request = securitycenter.DeleteMuteConfigRequest() + request.name = mute_config_name + + client.delete_mute_config(request) + print(f"Mute rule deleted successfully: {mute_config_name}") + + +# [END securitycenter_delete_mute_config] + + +# [START securitycenter_get_mute_config] +def get_mute_rule(mute_config_name: str) -> None: + """ + Retrieves a mute configuration given its resource name. + Args: + mute_config_name: Name of the mute config to retrieve. + Use any one of the following formats: + - organizations/{organization}/muteConfigs/{config_id} + - folders/{folder}/muteConfigs/{config_id} + - projects/{project}/muteConfigs/{config_id} + """ + from google.cloud import securitycenter + + client = securitycenter.SecurityCenterClient() + + request = securitycenter.GetMuteConfigRequest() + request.name = mute_config_name + + mute_config = client.get_mute_config(request) + print(f"Retrieved the mute rule: {mute_config.name}") + + +# [END securitycenter_get_mute_config] + + +# [START securitycenter_list_mute_configs] +def list_mute_rules(parent: str) -> None: + """ + Listing mute configs at organization level will return all the configs + at the org, folder and project levels. + Similarly, listing configs at folder level will list all the configs + at the folder and project levels. + Args: + parent: Use any one of the following resource paths to list mute configurations: + - organizations/{organization_id} + - folders/{folder_id} + - projects/{project_id} + """ + from google.cloud import securitycenter + + client = securitycenter.SecurityCenterClient() + + request = securitycenter.ListMuteConfigsRequest() + request.parent = parent + + # List all Mute Configs present in the resource. + for mute_config in client.list_mute_configs(request): + print(mute_config.name) + + +# [END securitycenter_list_mute_configs] + + +# [START securitycenter_update_mute_config] +def update_mute_rule(mute_config_name: str) -> None: + """ + Updates an existing mute configuration. + The following can be updated in a mute config: description, and filter/ mute rule. + Args: + mute_config_name: Specify the name of the mute config to delete. + Use any one of the following formats: + - organizations/{organization}/muteConfigs/{config_id} + - folders/{folder}/muteConfigs/{config_id} + - projects/{project}/muteConfigs/{config_id} + """ + from google.cloud import securitycenter + from google.protobuf import field_mask_pb2 + + client = securitycenter.SecurityCenterClient() + + update_mute_config = securitycenter.MuteConfig() + update_mute_config.name = mute_config_name + update_mute_config.description = "Updated mute config description" + + field_mask = field_mask_pb2.FieldMask(paths=["description"]) + + request = securitycenter.UpdateMuteConfigRequest() + request.mute_config = update_mute_config + # Set the update mask to specify which properties of the Mute Config should be updated. + # If empty, all mutable fields will be updated. + # Make sure that the mask fields match the properties changed in 'update_mute_config'. + # For more info on constructing update mask path, see the proto or: + # https://cloud.google.com/security-command-center/docs/reference/rest/v1/folders.muteConfigs/patch?hl=en#query-parameters + request.update_mask = field_mask + + mute_config = client.update_mute_config(request) + print(f"Updated mute rule : {mute_config}") + + +# [END securitycenter_update_mute_config] + + +# [START securitycenter_set_mute] +def set_mute_finding(finding_path: str) -> None: + """ + Mute an individual finding. + If a finding is already muted, muting it again has no effect. + Various mute states are: MUTE_UNSPECIFIED/MUTE/UNMUTE. + Args: + finding_path: The relative resource name of the finding. See: + https://cloud.google.com/apis/design/resource_names#relative_resource_name + Use any one of the following formats: + - organizations/{organization_id}/sources/{source_id}/finding/{finding_id}, + - folders/{folder_id}/sources/{source_id}/finding/{finding_id}, + - projects/{project_id}/sources/{source_id}/finding/{finding_id}. + """ + from google.cloud import securitycenter + + client = securitycenter.SecurityCenterClient() + + request = securitycenter.SetMuteRequest() + request.name = finding_path + request.mute = securitycenter.Finding.Mute.MUTED + + finding = client.set_mute(request) + print(f"Mute value for the finding: {finding.mute.name}") + + +# [END securitycenter_set_mute] + + +# [START securitycenter_set_unmute] +def set_unmute_finding(finding_path: str) -> None: + """ + Unmute an individual finding. + Unmuting a finding that isn't muted has no effect. + Various mute states are: MUTE_UNSPECIFIED/MUTE/UNMUTE. + Args: + finding_path: The relative resource name of the finding. See: + https://cloud.google.com/apis/design/resource_names#relative_resource_name + Use any one of the following formats: + - organizations/{organization_id}/sources/{source_id}/finding/{finding_id}, + - folders/{folder_id}/sources/{source_id}/finding/{finding_id}, + - projects/{project_id}/sources/{source_id}/finding/{finding_id}. + """ + from google.cloud import securitycenter + + client = securitycenter.SecurityCenterClient() + + request = securitycenter.SetMuteRequest() + request.name = finding_path + request.mute = securitycenter.Finding.Mute.UNMUTED + + finding = client.set_mute(request) + print(f"Mute value for the finding: {finding.mute.name}") + + +# [END securitycenter_set_unmute] + + +# [START securitycenter_bulk_mute] +def bulk_mute_findings(parent_path: str, mute_rule: str) -> None: + """ + Kicks off a long-running operation (LRO) to bulk mute findings for a parent based on a filter. + The parent can be either an organization, folder, or project. The findings + matched by the filter will be muted after the LRO is done. + Args: + parent_path: use any one of the following options: + - organizations/{organization} + - folders/{folder} + - projects/{project} + mute_rule: Expression that identifies findings that should be updated. + """ + from google.cloud import securitycenter + + client = securitycenter.SecurityCenterClient() + + request = securitycenter.BulkMuteFindingsRequest() + request.parent = parent_path + # To create mute rules, see: + # https://cloud.google.com/security-command-center/docs/how-to-mute-findings#create_mute_rules + request.filter = mute_rule + + response = client.bulk_mute_findings(request) + print(f"Bulk mute findings completed successfully! : {response}") + + +# [END securitycenter_bulk_mute] diff --git a/securitycenter/snippets/snippets_mute_config_test.py b/securitycenter/snippets/snippets_mute_config_test.py new file mode 100644 index 00000000000..5c531d10cda --- /dev/null +++ b/securitycenter/snippets/snippets_mute_config_test.py @@ -0,0 +1,129 @@ +#!/usr/bin/env python +# +# Copyright 2022 Google LLC +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# https://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +import os +import re +import uuid + +from _pytest.capture import CaptureFixture +from google.cloud import securitycenter +from google.cloud.securitycenter_v1.services.security_center.pagers import ( + ListFindingsPager, +) +import pytest + +import snippets_mute_config + +# TODO(developer): Replace these variables before running the sample. +PROJECT_ID = os.environ["GOOGLE_CLOUD_PROJECT"] +ORGANIZATION_ID = os.environ["GCLOUD_ORGANIZATION"] +GOOGLE_APPLICATION_CREDENTIALS = os.environ["GOOGLE_APPLICATION_CREDENTIALS"] + + +@pytest.fixture +def mute_rule(): + mute_rule_create = f"random-mute-create-{uuid.uuid4()}" + mute_rule_update = f"random-mute-update-{uuid.uuid4()}" + snippets_mute_config.create_mute_rule(f"projects/{PROJECT_ID}", mute_rule_create) + snippets_mute_config.create_mute_rule(f"projects/{PROJECT_ID}", mute_rule_update) + + yield {"create": mute_rule_create, "update": mute_rule_update} + + snippets_mute_config.delete_mute_rule( + f"projects/{PROJECT_ID}/muteConfigs/{mute_rule_create}" + ) + snippets_mute_config.delete_mute_rule( + f"projects/{PROJECT_ID}/muteConfigs/{mute_rule_update}" + ) + + +@pytest.fixture +def finding(capsys: CaptureFixture): + import snippets_findings + from snippets_findings import create_finding + + snippets_findings.create_source(ORGANIZATION_ID) + out, _ = capsys.readouterr() + # source_path is of the format: organizations/{ORGANIZATION_ID}/sources/{source_name} + source_path = out.split(":")[1].strip() + source_name = source_path.split("/")[3] + finding1_path = create_finding(source_path, "1testingscc").name + finding2_path = create_finding(source_path, "2testingscc").name + + yield { + "source": source_name, + "finding1": finding1_path, + "finding2": finding2_path, + } + + +def list_all_findings(source_name) -> ListFindingsPager: + client = securitycenter.SecurityCenterClient() + return client.list_findings(request={"parent": source_name}) + + +def test_get_mute_rule(capsys: CaptureFixture, mute_rule): + snippets_mute_config.get_mute_rule( + f"projects/{PROJECT_ID}/muteConfigs/{mute_rule.get('create')}" + ) + out, _ = capsys.readouterr() + assert re.search("Retrieved the mute rule: ", out) + assert re.search(mute_rule.get("create"), out) + + +def test_list_mute_rules(capsys: CaptureFixture, mute_rule): + snippets_mute_config.list_mute_rules(f"projects/{PROJECT_ID}") + out, _ = capsys.readouterr() + assert re.search(mute_rule.get("create"), out) + assert re.search(mute_rule.get("update"), out) + + +def test_update_mute_rule(capsys: CaptureFixture, mute_rule): + snippets_mute_config.update_mute_rule( + f"projects/{PROJECT_ID}/muteConfigs/{mute_rule.get('update')}" + ) + snippets_mute_config.get_mute_rule( + f"projects/{PROJECT_ID}/muteConfigs/{mute_rule.get('update')}" + ) + out, _ = capsys.readouterr() + assert re.search("Updated mute config description", out) + + +def test_set_mute_finding(capsys: CaptureFixture, finding): + finding_path = finding.get("finding1") + snippets_mute_config.set_mute_finding(finding_path) + out, _ = capsys.readouterr() + assert re.search("Mute value for the finding: MUTED", out) + + +def test_set_unmute_finding(capsys: CaptureFixture, finding): + finding_path = finding.get("finding1") + snippets_mute_config.set_unmute_finding(finding_path) + out, _ = capsys.readouterr() + assert re.search("Mute value for the finding: UNMUTED", out) + + +def test_bulk_mute_findings(capsys: CaptureFixture, finding): + # Mute findings that belong to this project. + snippets_mute_config.bulk_mute_findings( + f"projects/{PROJECT_ID}", f'resource.project_display_name="{PROJECT_ID}"' + ) + + # Get all findings in the source to check if they are muted. + response = list_all_findings( + f"projects/{PROJECT_ID}/sources/{finding.get('source')}" + ) + for i, finding in enumerate(response): + assert finding.finding.mute == securitycenter.Finding.Mute.MUTED diff --git a/securitycenter/snippets/snippets_notification_configs.py b/securitycenter/snippets/snippets_notification_configs.py new file mode 100644 index 00000000000..6cc82dd8f3a --- /dev/null +++ b/securitycenter/snippets/snippets_notification_configs.py @@ -0,0 +1,170 @@ +#!/usr/bin/env python +# +# Copyright 2020 Google LLC +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# https://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +"""Demos for working with notification configs.""" + + +# [START securitycenter_create_notification_config] +def create_notification_config(parent_id, notification_config_id, pubsub_topic): + """ + Args: + parent_id: must be in one of the following formats: + "organizations/{organization_id}" + "projects/{project_id}" + "folders/{folder_id}" + notification_config_id: "your-config-id" + pubsub_topic: "projects/{your-project-id}/topics/{your-topic-ic}" + + Ensure this ServiceAccount has the "pubsub.topics.setIamPolicy" permission on the new topic. + """ + from google.cloud import securitycenter as securitycenter + + client = securitycenter.SecurityCenterClient() + + created_notification_config = client.create_notification_config( + request={ + "parent": parent_id, + "config_id": notification_config_id, + "notification_config": { + "description": "Notification for active findings", + "pubsub_topic": pubsub_topic, + "streaming_config": {"filter": 'state = "ACTIVE"'}, + }, + } + ) + + print(created_notification_config) + # [END securitycenter_create_notification_config] + return created_notification_config + + +# [START securitycenter_delete_notification_config] +def delete_notification_config(parent_id, notification_config_id): + """ + Args: + parent_id: must be in one of the following formats: + "organizations/{organization_id}" + "projects/{project_id}" + "folders/{folder_id}" + notification_config_id: "your-config-id" + """ + from google.cloud import securitycenter as securitycenter + + client = securitycenter.SecurityCenterClient() + + notification_config_name = ( + f"{parent_id}/notificationConfigs/{notification_config_id}" + ) + + client.delete_notification_config(request={"name": notification_config_name}) + print(f"Deleted notification config: {notification_config_name}") + # [END securitycenter_delete_notification_config] + return True + + +# [START securitycenter_get_notification_config] +def get_notification_config(parent_id, notification_config_id): + """ + Args: + parent_id: must be in one of the following formats: + "organizations/{organization_id}" + "projects/{project_id}" + "folders/{folder_id}" + notification_config_id: "your-config-id" + """ + from google.cloud import securitycenter as securitycenter + + client = securitycenter.SecurityCenterClient() + + notification_config_name = ( + f"{parent_id}/notificationConfigs/{notification_config_id}" + ) + + notification_config = client.get_notification_config( + request={"name": notification_config_name} + ) + print(f"Got notification config: {notification_config}") + # [END securitycenter_get_notification_config] + return notification_config + + +# [START securitycenter_list_notification_configs] +def list_notification_configs(parent_id): + """ + Args: + parent_id: must be in one of the following formats: + "organizations/{organization_id}" + "projects/{project_id}" + "folders/{folder_id}" + """ + from google.cloud import securitycenter as securitycenter + + client = securitycenter.SecurityCenterClient() + + notification_configs_iterator = client.list_notification_configs( + request={"parent": parent_id} + ) + for i, config in enumerate(notification_configs_iterator): + print(f"{i}: notification_config: {config}") + # [END securitycenter_list_notification_configs]] + return notification_configs_iterator + + +# [START securitycenter_update_notification_config] +def update_notification_config(parent_id, notification_config_id, pubsub_topic): + """ + Args: + parent_id: must be in one of the following formats: + "organizations/{organization_id}" + "projects/{project_id}" + "folders/{folder_id}" + notification_config_id: "config-id-to-update" + pubsub_topic: "projects/{new-project}/topics/{new-topic}" + + If updating a pubsub_topic, ensure this ServiceAccount has the + "pubsub.topics.setIamPolicy" permission on the new topic. + """ + from google.cloud import securitycenter as securitycenter + from google.protobuf import field_mask_pb2 + + client = securitycenter.SecurityCenterClient() + + notification_config_name = ( + f"{parent_id}/notificationConfigs/{notification_config_id}" + ) + + updated_description = "New updated description" + updated_filter = 'state = "INACTIVE"' + + # Only description and pubsub_topic can be updated. + field_mask = field_mask_pb2.FieldMask( + paths=["description", "pubsub_topic", "streaming_config.filter"] + ) + + updated_notification_config = client.update_notification_config( + request={ + "notification_config": { + "name": notification_config_name, + "description": updated_description, + "pubsub_topic": pubsub_topic, + "streaming_config": {"filter": updated_filter}, + }, + "update_mask": field_mask, + } + ) + + print(updated_notification_config) + # [END securitycenter_update_notification_config] + return updated_notification_config diff --git a/securitycenter/snippets/snippets_notification_receiver.py b/securitycenter/snippets/snippets_notification_receiver.py new file mode 100644 index 00000000000..9c4368a0472 --- /dev/null +++ b/securitycenter/snippets/snippets_notification_receiver.py @@ -0,0 +1,58 @@ +#!/usr/bin/env python +# +# Copyright 2020 Google LLC +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# https://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +"""Demo for receiving notifications.""" + + +def receive_notifications(project_id, subscription_name): + # [START securitycenter_receive_notifications] + # Requires https://cloud.google.com/pubsub/docs/quickstart-client-libraries#pubsub-client-libraries-python + import concurrent + + from google.cloud import pubsub_v1 + from google.cloud.securitycenter_v1 import NotificationMessage + + # TODO: project_id = "your-project-id" + # TODO: subscription_name = "your-subscription-name" + + def callback(message): + + # Print the data received for debugging purpose if needed + print(f"Received message: {message.data}") + + notification_msg = NotificationMessage.from_json(message.data) + + print( + "Notification config name: {}".format( + notification_msg.notification_config_name + ) + ) + print("Finding: {}".format(notification_msg.finding)) + + # Ack the message to prevent it from being pulled again + message.ack() + + subscriber = pubsub_v1.SubscriberClient() + subscription_path = subscriber.subscription_path(project_id, subscription_name) + + streaming_pull_future = subscriber.subscribe(subscription_path, callback=callback) + + print("Listening for messages on {}...\n".format(subscription_path)) + try: + streaming_pull_future.result(timeout=1) # Block for 1 second + except concurrent.futures.TimeoutError: + streaming_pull_future.cancel() + # [END securitycenter_receive_notifications] + return True diff --git a/securitycenter/snippets/snippets_notification_test.py b/securitycenter/snippets/snippets_notification_test.py new file mode 100644 index 00000000000..2cc6d262de3 --- /dev/null +++ b/securitycenter/snippets/snippets_notification_test.py @@ -0,0 +1,152 @@ +#!/usr/bin/env python +# +# Copyright 2020 Google LLC +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# https://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +"""Tests for snippets.""" + +import os +import uuid + +from google.cloud import securitycenter as securitycenter +import pytest + +import snippets_notification_configs +import snippets_notification_receiver + +ORG_ID = os.environ["GCLOUD_ORGANIZATION"] +PROJECT_ID = os.environ["GCLOUD_PROJECT"] +PUBSUB_TOPIC = os.environ["GCLOUD_PUBSUB_TOPIC"] +PUBSUB_SUBSCRIPTION = os.environ["GCLOUD_PUBSUB_SUBSCRIPTION"] + +CREATE_CONFIG_ID = "new-notification-pytest" + str(uuid.uuid1()) +DELETE_CONFIG_ID = "new-notification-pytest" + str(uuid.uuid1()) +GET_CONFIG_ID = "new-notification-pytest" + str(uuid.uuid1()) +UPDATE_CONFIG_ID = "new-notification-pytest" + str(uuid.uuid1()) + + +def cleanup_notification_config(notification_config_id): + client = securitycenter.SecurityCenterClient() + + notification_config_name = ( + "organizations/{org_id}/notificationConfigs/{config_id}".format( + org_id=ORG_ID, config_id=notification_config_id + ) + ) + client.delete_notification_config(request={"name": notification_config_name}) + + +@pytest.fixture +def new_notification_config_for_update(): + client = securitycenter.SecurityCenterClient() + + org_name = "organizations/{org_id}".format(org_id=ORG_ID) + + created_notification_config = client.create_notification_config( + request={ + "parent": org_name, + "config_id": UPDATE_CONFIG_ID, + "notification_config": { + "description": "Notification for active findings", + "pubsub_topic": PUBSUB_TOPIC, + "streaming_config": {"filter": ""}, + }, + } + ) + yield created_notification_config + cleanup_notification_config(UPDATE_CONFIG_ID) + + +@pytest.fixture +def new_notification_config_for_get(): + client = securitycenter.SecurityCenterClient() + + org_name = "organizations/{org_id}".format(org_id=ORG_ID) + + created_notification_config = client.create_notification_config( + request={ + "parent": org_name, + "config_id": GET_CONFIG_ID, + "notification_config": { + "description": "Notification for active findings", + "pubsub_topic": PUBSUB_TOPIC, + "streaming_config": {"filter": ""}, + }, + } + ) + yield created_notification_config + cleanup_notification_config(GET_CONFIG_ID) + + +@pytest.fixture +def deleted_notification_config(): + client = securitycenter.SecurityCenterClient() + + org_name = "organizations/{org_id}".format(org_id=ORG_ID) + + created_notification_config = client.create_notification_config( + request={ + "parent": org_name, + "config_id": DELETE_CONFIG_ID, + "notification_config": { + "description": "Notification for active findings", + "pubsub_topic": PUBSUB_TOPIC, + "streaming_config": {"filter": ""}, + }, + } + ) + return created_notification_config + + +def test_create_notification_config(): + created_notification_config = ( + snippets_notification_configs.create_notification_config( + f"organizations/{ORG_ID}", CREATE_CONFIG_ID, PUBSUB_TOPIC + ) + ) + assert created_notification_config is not None + + cleanup_notification_config(CREATE_CONFIG_ID) + + +def test_delete_notification_config(deleted_notification_config): + assert snippets_notification_configs.delete_notification_config( + f"organizations/{ORG_ID}", DELETE_CONFIG_ID + ) + + +def test_get_notification_config(new_notification_config_for_get): + retrieved_config = snippets_notification_configs.get_notification_config( + f"organizations/{ORG_ID}", GET_CONFIG_ID + ) + assert retrieved_config is not None + + +def test_list_notification_configs(): + iterator = snippets_notification_configs.list_notification_configs( + f"organizations/{ORG_ID}" + ) + assert iterator is not None + + +def test_update_notification_config(new_notification_config_for_update): + updated_config = snippets_notification_configs.update_notification_config( + f"organizations/{ORG_ID}", UPDATE_CONFIG_ID, PUBSUB_TOPIC + ) + assert updated_config is not None + + +def test_receive_notifications(): + assert snippets_notification_receiver.receive_notifications( + PROJECT_ID, PUBSUB_SUBSCRIPTION + ) diff --git a/securitycenter/snippets/snippets_orgs.py b/securitycenter/snippets/snippets_orgs.py new file mode 100644 index 00000000000..1164b63969c --- /dev/null +++ b/securitycenter/snippets/snippets_orgs.py @@ -0,0 +1,63 @@ +#!/usr/bin/env python +# +# Copyright 2019 Google LLC +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# https://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +"""Examples for working with organization settings. """ + + +def get_settings(organization_id): + """Example showing how to retreive current organization settings.""" + # [START securitycenter_get_org_settings] + from google.cloud import securitycenter + + client = securitycenter.SecurityCenterClient() + # organization_id is numeric ID for the organization. e.g. + # organization_id = "111112223333" + + org_settings_name = client.organization_settings_path(organization_id) + + org_settings = client.get_organization_settings(request={"name": org_settings_name}) + print(org_settings) + # [END securitycenter_get_org_settings] + + +def update_asset_discovery_org_settings(organization_id): + """Example showing how to update the asset discovery configuration + for an organization.""" + # [START securitycenter_enable_asset_discovery] + from google.cloud import securitycenter + from google.protobuf import field_mask_pb2 + + # Create the client + client = securitycenter.SecurityCenterClient() + # organization_id is numeric ID for the organization. e.g. + # organization_id = "111112223333" + org_settings_name = "organizations/{org_id}/organizationSettings".format( + org_id=organization_id + ) + # Only update the enable_asset_discovery_value (leave others untouched). + field_mask = field_mask_pb2.FieldMask(paths=["enable_asset_discovery"]) + # Call the service. + updated = client.update_organization_settings( + request={ + "organization_settings": { + "name": org_settings_name, + "enable_asset_discovery": True, + }, + "update_mask": field_mask, + } + ) + print("Asset Discovery Enabled? {}".format(updated.enable_asset_discovery)) + # [END securitycenter_enable_asset_discovery] + return updated diff --git a/securitycenter/snippets/snippets_orgs_test.py b/securitycenter/snippets/snippets_orgs_test.py new file mode 100644 index 00000000000..4f2a7c7f786 --- /dev/null +++ b/securitycenter/snippets/snippets_orgs_test.py @@ -0,0 +1,36 @@ +#!/usr/bin/env python +# +# Copyright 2019 Google LLC +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# https://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +"""Examples for working with organization settings. """ +import os + +import pytest + +import snippets_orgs + + +@pytest.fixture(scope="module") +def organization_id(): + """Get Organization ID from the environment variable""" + return os.environ["GCLOUD_ORGANIZATION"] + + +def test_get_settings(organization_id): + snippets_orgs.get_settings(organization_id) + + +def test_update_asset_discovery_org_settings(organization_id): + updated = snippets_orgs.update_asset_discovery_org_settings(organization_id) + assert updated.enable_asset_discovery diff --git a/securitycenter/snippets/snippets_security_marks.py b/securitycenter/snippets/snippets_security_marks.py new file mode 100644 index 00000000000..457cc43352c --- /dev/null +++ b/securitycenter/snippets/snippets_security_marks.py @@ -0,0 +1,200 @@ +#!/usr/bin/env python +# +# Copyright 2019 Google LLC +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# https://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +"""Demos for working with security marks.""" + + +def add_to_asset(asset_name): + """Add new security marks to an asset.""" + # [START securitycenter_add_security_marks] + from google.cloud import securitycenter + from google.protobuf import field_mask_pb2 + + # Create a new client. + client = securitycenter.SecurityCenterClient() + + # asset_name is the resource path for an asset that exists in CSCC. + # Its format is "organization/{organization_id}/assets/{asset_id} + # e.g.: + # asset_name = organizations/123123342/assets/12312321 + marks_name = "{}/securityMarks".format(asset_name) + + # Notice the suffix after "marks." in the field mask matches the keys + # in marks. + field_mask = field_mask_pb2.FieldMask(paths=["marks.key_a", "marks.key_b"]) + marks = {"key_a": "value_a", "key_b": "value_b"} + + updated_marks = client.update_security_marks( + request={ + "security_marks": {"name": marks_name, "marks": marks}, + "update_mask": field_mask, + } + ) + print(updated_marks) + # [END securitycenter_add_security_marks] + return updated_marks, marks + + +def clear_from_asset(asset_name): + """Removes security marks from an asset.""" + # Make sure they are there first + add_to_asset(asset_name) + # [START securitycenter_delete_security_marks] + from google.cloud import securitycenter + from google.protobuf import field_mask_pb2 + + # Create a new client. + client = securitycenter.SecurityCenterClient() + + # asset_name is the resource path for an asset that exists in CSCC. + # Its format is "organization/{organization_id}/assets/{asset_id} + # e.g.: + # asset_name = organizations/123123342/assets/12312321 + marks_name = "{}/securityMarks".format(asset_name) + + field_mask = field_mask_pb2.FieldMask(paths=["marks.key_a", "marks.key_b"]) + + updated_marks = client.update_security_marks( + request={ + "security_marks": { + "name": marks_name + # Note, no marks specified, so the specified values in + # the fields masks will be deleted. + }, + "update_mask": field_mask, + } + ) + print(updated_marks) + # [END securitycenter_delete_security_marks] + return updated_marks + + +def delete_and_update_marks(asset_name): + """Updates and deletes security marks from an asset in the same call.""" + # Make sure they are there first + add_to_asset(asset_name) + # [START securitycenter_add_delete_security_marks] + from google.cloud import securitycenter + from google.protobuf import field_mask_pb2 + + client = securitycenter.SecurityCenterClient() + # asset_name is the resource path for an asset that exists in CSCC. + # Its format is "organization/{organization_id}/assets/{asset_id} + # e.g.: + # asset_name = organizations/123123342/assets/12312321 + marks_name = "{}/securityMarks".format(asset_name) + + field_mask = field_mask_pb2.FieldMask(paths=["marks.key_a", "marks.key_b"]) + marks = {"key_a": "new_value_for_a"} + + updated_marks = client.update_security_marks( + request={ + "security_marks": {"name": marks_name, "marks": marks}, + "update_mask": field_mask, + } + ) + print(updated_marks) + # [END securitycenter_add_delete_security_marks] + return updated_marks + + +def add_to_finding(finding_name): + """Adds security marks to a finding.""" + # [START securitycenter_add_finding_security_marks] + from google.cloud import securitycenter + from google.protobuf import field_mask_pb2 + + client = securitycenter.SecurityCenterClient() + # finding_name is the resource path for a finding that exists in CSCC. + # Its format is + # "organizations/{org_id}/sources/{source_id}/findings/{finding_id}" + # e.g.: + # finding_name = "organizations/1112/sources/1234/findings/findingid" + finding_marks_name = "{}/securityMarks".format(finding_name) + + # Notice the suffix after "marks." in the field mask matches the keys + # in marks. + field_mask = field_mask_pb2.FieldMask( + paths=["marks.finding_key_a", "marks.finding_key_b"] + ) + marks = {"finding_key_a": "value_a", "finding_key_b": "value_b"} + + updated_marks = client.update_security_marks( + request={ + "security_marks": {"name": finding_marks_name, "marks": marks}, + "update_mask": field_mask, + } + ) + # [END securitycenter_add_finding_security_marks] + return updated_marks, marks + + +def list_assets_with_query_marks(organization_id, asset_name): + """Lists assets with a filter on security marks.""" + add_to_asset(asset_name) + i = -1 + # [START securitycenter_list_assets_with_security_marks] + from google.cloud import securitycenter + + client = securitycenter.SecurityCenterClient() + + # organization_id is the numeric ID of the organization. + # organization_id=1234567777 + org_name = "organizations/{org_id}".format(org_id=organization_id) + + marks_filter = 'security_marks.marks.key_a = "value_a"' + # Call the API and print results. + asset_iterator = client.list_assets( + request={"parent": org_name, "filter": marks_filter} + ) + + # Call the API and print results. + asset_iterator = client.list_assets( + request={"parent": org_name, "filter": marks_filter} + ) + for i, asset_result in enumerate(asset_iterator): + print(i, asset_result) + # [END securitycenter_list_assets_with_security_marks] + return i + + +def list_findings_with_query_marks(source_name, finding_name): + """Lists findings with a filter on security marks.""" + # ensure marks are set on finding. + add_to_finding(finding_name) + i = -1 + # [START securitycenter_list_findings_with_security_marks] + from google.cloud import securitycenter + + client = securitycenter.SecurityCenterClient() + + # source_name is the resource path for a source that has been + # created previously (you can use list_sources to find a specific one). + # Its format is: + # source_name = "organizations/{organization_id}/sources/{source_id}" + # e.g.: + # source_name = "organizations/111122222444/sources/1234" + marks_filter = 'NOT security_marks.marks.finding_key_a="value_a"' + + # Call the API and print results. + finding_iterator = client.list_findings( + request={"parent": source_name, "filter": marks_filter} + ) + for i, finding_result in enumerate(finding_iterator): + print(i, finding_result) + # [END securitycenter_list_findings_with_security_marks] + # one finding should have been updated with keys, and one should be + # untouched. + return i diff --git a/securitycenter/snippets/snippets_security_marks_test.py b/securitycenter/snippets/snippets_security_marks_test.py new file mode 100644 index 00000000000..01666f59bb9 --- /dev/null +++ b/securitycenter/snippets/snippets_security_marks_test.py @@ -0,0 +1,148 @@ +#!/usr/bin/env python +# +# Copyright 2019 Google LLC +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# https://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +"""Demos for working with security marks.""" +import os +import random + +import pytest + +import snippets_security_marks + + +@pytest.fixture(scope="module") +def organization_id(): + """Gets Organization ID from the environment variable""" + return os.environ["GCLOUD_ORGANIZATION"] + + +@pytest.fixture(scope="module") +def asset_name(organization_id): + """Returns a random asset name from existing assets.""" + from google.cloud import securitycenter + + client = securitycenter.SecurityCenterClient() + # organization_id is the numeric ID of the organization. + # organization_id=1234567777 + org_name = "organizations/{org_id}".format(org_id=organization_id) + assets = list(client.list_assets(request={"parent": org_name})) + # Select a random asset to avoid collision between integration tests. + asset = (random.sample(assets, 1)[0]).asset.name + + # Set fresh marks. + update = client.update_security_marks( + request={ + "security_marks": { + "name": "{}/securityMarks".format(asset), + "marks": {"other": "other_val"}, + } + } + ) + assert update.marks == {"other": "other_val"} + return asset + + +@pytest.fixture(scope="module") +def source_name(organization_id): + """Creates a new source in the organization.""" + from google.cloud import securitycenter + + client = securitycenter.SecurityCenterClient() + org_name = "organizations/{org_id}".format(org_id=organization_id) + source = client.create_source( + request={ + "parent": org_name, + "source": { + "display_name": "Security marks Unit test source", + "description": "A new custom source that does X", + }, + } + ) + return source.name + + +@pytest.fixture(scope="module") +def finding_name(source_name): + """Creates a new finding and returns it name.""" + from google.cloud import securitycenter + from google.cloud.securitycenter_v1 import Finding + from google.protobuf.timestamp_pb2 import Timestamp + + client = securitycenter.SecurityCenterClient() + + now_proto = Timestamp() + now_proto.GetCurrentTime() + + finding = client.create_finding( + request={ + "parent": source_name, + "finding_id": "scfinding", + "finding": { + "state": Finding.State.ACTIVE, + "category": "C1", + "event_time": now_proto, + "resource_name": "//cloudresourcemanager.googleapis.com/organizations/1234", + }, + } + ) + client.create_finding( + request={ + "parent": source_name, + "finding_id": "untouched", + "finding": { + "state": Finding.State.ACTIVE, + "category": "MEDIUM_RISK_ONE", + "event_time": now_proto, + "resource_name": "//cloudresourcemanager.googleapis.com/organizations/1234", + }, + } + ) + + return finding.name + + +def test_add_to_asset(asset_name): + updated_marks, marks = snippets_security_marks.add_to_asset(asset_name) + assert updated_marks.marks.keys() >= marks.keys() + + +def test_clear_from_asset(asset_name): + updated_marks = snippets_security_marks.clear_from_asset(asset_name) + assert "other" in updated_marks.marks + assert len(updated_marks.marks) == 1 + + +def test_delete_and_update_marks(asset_name): + updated_marks = snippets_security_marks.delete_and_update_marks(asset_name) + assert updated_marks.marks == {"key_a": "new_value_for_a", "other": "other_val"} + + +def test_add_to_finding(finding_name): + updated_marks, marks = snippets_security_marks.add_to_finding(finding_name) + assert updated_marks.marks == marks + + +def test_list_assets_with_query_marks(organization_id, asset_name): + count = snippets_security_marks.list_assets_with_query_marks( + organization_id, asset_name + ) + assert count >= 0 + + +def test_list_findings_with_query_marks(source_name, finding_name): + count = snippets_security_marks.list_findings_with_query_marks( + source_name, finding_name + ) + assert count == 0