diff --git a/samples/snippets/README.md b/samples/snippets/README.md new file mode 100644 index 00000000..0763128a --- /dev/null +++ b/samples/snippets/README.md @@ -0,0 +1 @@ +The snippets have been migrated to GoogleCloudPlatform/python-docs-samples in PR: https://github.com/GoogleCloudPlatform/python-docs-samples/pull/8493 \ No newline at end of file diff --git a/samples/snippets/noxfile.py b/samples/snippets/noxfile.py deleted file mode 100644 index de104dbc..00000000 --- a/samples/snippets/noxfile.py +++ /dev/null @@ -1,292 +0,0 @@ -# Copyright 2019 Google LLC -# -# Licensed under the Apache License, Version 2.0 (the "License"); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. - -from __future__ import print_function - -import glob -import os -from pathlib import Path -import sys -from typing import Callable, Dict, Optional - -import nox - -# WARNING - WARNING - WARNING - WARNING - WARNING -# WARNING - WARNING - WARNING - WARNING - WARNING -# DO NOT EDIT THIS FILE EVER! -# WARNING - WARNING - WARNING - WARNING - WARNING -# WARNING - WARNING - WARNING - WARNING - WARNING - -BLACK_VERSION = "black==22.3.0" -ISORT_VERSION = "isort==5.10.1" - -# Copy `noxfile_config.py` to your directory and modify it instead. - -# `TEST_CONFIG` dict is a configuration hook that allows users to -# modify the test configurations. The values here should be in sync -# with `noxfile_config.py`. Users will copy `noxfile_config.py` into -# their directory and modify it. - -TEST_CONFIG = { - # You can opt out from the test for specific Python versions. - "ignored_versions": [], - # Old samples are opted out of enforcing Python type hints - # All new samples should feature them - "enforce_type_hints": False, - # An envvar key for determining the project id to use. Change it - # to 'BUILD_SPECIFIC_GCLOUD_PROJECT' if you want to opt in using a - # build specific Cloud project. You can also use your own string - # to use your own Cloud project. - "gcloud_project_env": "GOOGLE_CLOUD_PROJECT", - # 'gcloud_project_env': 'BUILD_SPECIFIC_GCLOUD_PROJECT', - # If you need to use a specific version of pip, - # change pip_version_override to the string representation - # of the version number, for example, "20.2.4" - "pip_version_override": None, - # A dictionary you want to inject into your test. Don't put any - # secrets here. These values will override predefined values. - "envs": {}, -} - - -try: - # Ensure we can import noxfile_config in the project's directory. - sys.path.append(".") - from noxfile_config import TEST_CONFIG_OVERRIDE -except ImportError as e: - print("No user noxfile_config found: detail: {}".format(e)) - TEST_CONFIG_OVERRIDE = {} - -# Update the TEST_CONFIG with the user supplied values. -TEST_CONFIG.update(TEST_CONFIG_OVERRIDE) - - -def get_pytest_env_vars() -> Dict[str, str]: - """Returns a dict for pytest invocation.""" - ret = {} - - # Override the GCLOUD_PROJECT and the alias. - env_key = TEST_CONFIG["gcloud_project_env"] - # This should error out if not set. - ret["GOOGLE_CLOUD_PROJECT"] = os.environ[env_key] - - # Apply user supplied envs. - ret.update(TEST_CONFIG["envs"]) - return ret - - -# DO NOT EDIT - automatically generated. -# All versions used to test samples. -ALL_VERSIONS = ["3.7", "3.8", "3.9", "3.10", "3.11"] - -# Any default versions that should be ignored. -IGNORED_VERSIONS = TEST_CONFIG["ignored_versions"] - -TESTED_VERSIONS = sorted([v for v in ALL_VERSIONS if v not in IGNORED_VERSIONS]) - -INSTALL_LIBRARY_FROM_SOURCE = os.environ.get("INSTALL_LIBRARY_FROM_SOURCE", False) in ( - "True", - "true", -) - -# Error if a python version is missing -nox.options.error_on_missing_interpreters = True - -# -# Style Checks -# - - -# Linting with flake8. -# -# We ignore the following rules: -# E203: whitespace before ‘:’ -# E266: too many leading ‘#’ for block comment -# E501: line too long -# I202: Additional newline in a section of imports -# -# We also need to specify the rules which are ignored by default: -# ['E226', 'W504', 'E126', 'E123', 'W503', 'E24', 'E704', 'E121'] -FLAKE8_COMMON_ARGS = [ - "--show-source", - "--builtin=gettext", - "--max-complexity=20", - "--exclude=.nox,.cache,env,lib,generated_pb2,*_pb2.py,*_pb2_grpc.py", - "--ignore=E121,E123,E126,E203,E226,E24,E266,E501,E704,W503,W504,I202", - "--max-line-length=88", -] - - -@nox.session -def lint(session: nox.sessions.Session) -> None: - if not TEST_CONFIG["enforce_type_hints"]: - session.install("flake8") - else: - session.install("flake8", "flake8-annotations") - - args = FLAKE8_COMMON_ARGS + [ - ".", - ] - session.run("flake8", *args) - - -# -# Black -# - - -@nox.session -def blacken(session: nox.sessions.Session) -> None: - """Run black. Format code to uniform standard.""" - session.install(BLACK_VERSION) - python_files = [path for path in os.listdir(".") if path.endswith(".py")] - - session.run("black", *python_files) - - -# -# format = isort + black -# - - -@nox.session -def format(session: nox.sessions.Session) -> None: - """ - Run isort to sort imports. Then run black - to format code to uniform standard. - """ - session.install(BLACK_VERSION, ISORT_VERSION) - python_files = [path for path in os.listdir(".") if path.endswith(".py")] - - # Use the --fss option to sort imports using strict alphabetical order. - # See https://pycqa.github.io/isort/docs/configuration/options.html#force-sort-within-sections - session.run("isort", "--fss", *python_files) - session.run("black", *python_files) - - -# -# Sample Tests -# - - -PYTEST_COMMON_ARGS = ["--junitxml=sponge_log.xml"] - - -def _session_tests( - session: nox.sessions.Session, post_install: Callable = None -) -> None: - # check for presence of tests - test_list = glob.glob("**/*_test.py", recursive=True) + glob.glob( - "**/test_*.py", recursive=True - ) - test_list.extend(glob.glob("**/tests", recursive=True)) - - if len(test_list) == 0: - print("No tests found, skipping directory.") - return - - if TEST_CONFIG["pip_version_override"]: - pip_version = TEST_CONFIG["pip_version_override"] - session.install(f"pip=={pip_version}") - """Runs py.test for a particular project.""" - concurrent_args = [] - if os.path.exists("requirements.txt"): - if os.path.exists("constraints.txt"): - session.install("-r", "requirements.txt", "-c", "constraints.txt") - else: - session.install("-r", "requirements.txt") - with open("requirements.txt") as rfile: - packages = rfile.read() - - if os.path.exists("requirements-test.txt"): - if os.path.exists("constraints-test.txt"): - session.install("-r", "requirements-test.txt", "-c", "constraints-test.txt") - else: - session.install("-r", "requirements-test.txt") - with open("requirements-test.txt") as rtfile: - packages += rtfile.read() - - if INSTALL_LIBRARY_FROM_SOURCE: - session.install("-e", _get_repo_root()) - - if post_install: - post_install(session) - - if "pytest-parallel" in packages: - concurrent_args.extend(["--workers", "auto", "--tests-per-worker", "auto"]) - elif "pytest-xdist" in packages: - concurrent_args.extend(["-n", "auto"]) - - session.run( - "pytest", - *(PYTEST_COMMON_ARGS + session.posargs + concurrent_args), - # Pytest will return 5 when no tests are collected. This can happen - # on travis where slow and flaky tests are excluded. - # See http://doc.pytest.org/en/latest/_modules/_pytest/main.html - success_codes=[0, 5], - env=get_pytest_env_vars(), - ) - - -@nox.session(python=ALL_VERSIONS) -def py(session: nox.sessions.Session) -> None: - """Runs py.test for a sample using the specified version of Python.""" - if session.python in TESTED_VERSIONS: - _session_tests(session) - else: - session.skip( - "SKIPPED: {} tests are disabled for this sample.".format(session.python) - ) - - -# -# Readmegen -# - - -def _get_repo_root() -> Optional[str]: - """Returns the root folder of the project.""" - # Get root of this repository. Assume we don't have directories nested deeper than 10 items. - p = Path(os.getcwd()) - for i in range(10): - if p is None: - break - if Path(p / ".git").exists(): - return str(p) - # .git is not available in repos cloned via Cloud Build - # setup.py is always in the library's root, so use that instead - # https://github.com/googleapis/synthtool/issues/792 - if Path(p / "setup.py").exists(): - return str(p) - p = p.parent - raise Exception("Unable to detect repository root.") - - -GENERATED_READMES = sorted([x for x in Path(".").rglob("*.rst.in")]) - - -@nox.session -@nox.parametrize("path", GENERATED_READMES) -def readmegen(session: nox.sessions.Session, path: str) -> None: - """(Re-)generates the readme for a sample.""" - session.install("jinja2", "pyyaml") - dir_ = os.path.dirname(path) - - if os.path.exists(os.path.join(dir_, "requirements.txt")): - session.install("-r", os.path.join(dir_, "requirements.txt")) - - in_file = os.path.join(dir_, "README.rst.in") - session.run( - "python", _get_repo_root() + "/scripts/readme-gen/readme_gen.py", in_file - ) diff --git a/samples/snippets/noxfile_config.py b/samples/snippets/noxfile_config.py deleted file mode 100644 index daf5c43a..00000000 --- a/samples/snippets/noxfile_config.py +++ /dev/null @@ -1,40 +0,0 @@ -# Copyright 2020 Google LLC -# -# Licensed under the Apache License, Version 2.0 (the "License"); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. - -# Default TEST_CONFIG_OVERRIDE for python repos. - -# You can copy this file into your directory, then it will be inported from -# the noxfile.py. - -# The source of truth: -# https://github.com/GoogleCloudPlatform/python-docs-samples/blob/master/noxfile_config.py - -TEST_CONFIG_OVERRIDE = { - # You can opt out from the test for specific Python versions. - "ignored_versions": ["2.7"], - # An envvar key for determining the project id to use. Change it - # to 'BUILD_SPECIFIC_GCLOUD_PROJECT' if you want to opt in using a - # build specific Cloud project. You can also use your own string - # to use your own Cloud project. - # 'gcloud_project_env': 'GOOGLE_CLOUD_PROJECT', - "gcloud_project_env": "BUILD_SPECIFIC_GCLOUD_PROJECT", - # A dictionary you want to inject into your test. Don't put any - # secrets here. These values will override predefined values. - "envs": { - "GCLOUD_ORGANIZATION": "1081635000895", - "GCLOUD_PROJECT": "project-a-id", - "GCLOUD_PUBSUB_TOPIC": "projects/project-a-id/topics/notifications-sample-topic", - "GCLOUD_PUBSUB_SUBSCRIPTION": "notification-sample-subscription", - }, -} diff --git a/samples/snippets/requirements-test.txt b/samples/snippets/requirements-test.txt deleted file mode 100644 index 51d20956..00000000 --- a/samples/snippets/requirements-test.txt +++ /dev/null @@ -1,2 +0,0 @@ -pytest==7.2.1 -google-cloud-bigquery==3.4.2 diff --git a/samples/snippets/requirements.txt b/samples/snippets/requirements.txt deleted file mode 100644 index 64e05236..00000000 --- a/samples/snippets/requirements.txt +++ /dev/null @@ -1,2 +0,0 @@ -google-cloud-pubsub==2.14.0 -google-cloud-securitycenter==1.18.2 \ No newline at end of file diff --git a/samples/snippets/snippets_bigquery_export.py b/samples/snippets/snippets_bigquery_export.py deleted file mode 100644 index 591d9af9..00000000 --- a/samples/snippets/snippets_bigquery_export.py +++ /dev/null @@ -1,190 +0,0 @@ -#!/usr/bin/env python -# -# Copyright 2022 Google LLC -# -# Licensed under the Apache License, Version 2.0 (the "License"); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at -# -# https://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. - -"""Snippets on exporting findings from Security Command Center to BigQuery.""" - - -# [START securitycenter_create_bigquery_export] - - -def create_bigquery_export( - parent: str, export_filter: str, bigquery_dataset_id: str, bigquery_export_id: str -): - - from google.cloud import securitycenter - - """ - Create export configuration to export findings from a project to a BigQuery dataset. - Optionally specify filter to export certain findings only. - - Args: - parent: Use any one of the following resource paths: - - organizations/{organization_id} - - folders/{folder_id} - - projects/{project_id} - export_filter: Expression that defines the filter to apply across create/update events of findings. - bigquery_dataset_id: The BigQuery dataset to write findings' updates to. - bigquery_export_id: Unique identifier provided by the client. - - example id: f"default-{str(uuid.uuid4()).split('-')[0]}" - For more info, see: - https://cloud.google.com/security-command-center/docs/how-to-analyze-findings-in-big-query#export_findings_from_to - """ - client = securitycenter.SecurityCenterClient() - - # Create the BigQuery export configuration. - bigquery_export = securitycenter.BigQueryExport() - bigquery_export.description = "Export low and medium findings if the compute resource has an IAM anomalous grant" - bigquery_export.filter = export_filter - bigquery_export.dataset = f"{parent}/datasets/{bigquery_dataset_id}" - - request = securitycenter.CreateBigQueryExportRequest() - request.parent = parent - request.big_query_export = bigquery_export - request.big_query_export_id = bigquery_export_id - - # Create the export request. - response = client.create_big_query_export(request) - - print(f"BigQuery export request created successfully: {response.name}\n") - - -# [END securitycenter_create_bigquery_export] - - -# [START securitycenter_get_bigquery_export] -def get_bigquery_export(parent: str, bigquery_export_id: str): - from google.cloud import securitycenter - - """ - Retrieve an existing BigQuery export. - Args: - parent: Use any one of the following resource paths: - - organizations/{organization_id} - - folders/{folder_id} - - projects/{project_id} - bigquery_export_id: Unique identifier that is used to identify the export. - """ - - client = securitycenter.SecurityCenterClient() - - request = securitycenter.GetBigQueryExportRequest() - request.name = f"{parent}/bigQueryExports/{bigquery_export_id}" - - response = client.get_big_query_export(request) - print(f"Retrieved the BigQuery export: {response.name}") - - -# [END securitycenter_get_bigquery_export] - - -# [START securitycenter_list_bigquery_export] -def list_bigquery_exports(parent: str): - from google.cloud import securitycenter - - """ - List BigQuery exports in the given parent. - Args: - parent: The parent which owns the collection of BigQuery exports. - Use any one of the following resource paths: - - organizations/{organization_id} - - folders/{folder_id} - - projects/{project_id} - """ - - client = securitycenter.SecurityCenterClient() - - request = securitycenter.ListBigQueryExportsRequest() - request.parent = parent - - response = client.list_big_query_exports(request) - - print("Listing BigQuery exports:") - for bigquery_export in response: - print(bigquery_export.name) - - -# [END securitycenter_list_bigquery_export] - - -# [START securitycenter_update_bigquery_export] -def update_bigquery_export(parent: str, export_filter: str, bigquery_export_id: str): - """ - Updates an existing BigQuery export. - Args: - parent: Use any one of the following resource paths: - - organizations/{organization_id} - - folders/{folder_id} - - projects/{project_id} - export_filter: Expression that defines the filter to apply across create/update events of findings. - bigquery_export_id: Unique identifier provided by the client. - For more info, see: - https://cloud.google.com/security-command-center/docs/how-to-analyze-findings-in-big-query#export_findings_from_to - """ - from google.cloud import securitycenter - from google.protobuf import field_mask_pb2 - - client = securitycenter.SecurityCenterClient() - - # Set the new values for export configuration. - bigquery_export = securitycenter.BigQueryExport() - bigquery_export.name = f"{parent}/bigQueryExports/{bigquery_export_id}" - bigquery_export.filter = export_filter - - # Field mask to only update the export filter. - # Set the update mask to specify which properties should be updated. - # If empty, all mutable fields will be updated. - # For more info on constructing field mask path, see the proto or: - # https://googleapis.dev/python/protobuf/latest/google/protobuf/field_mask_pb2.html - field_mask = field_mask_pb2.FieldMask(paths=["filter"]) - - request = securitycenter.UpdateBigQueryExportRequest() - request.big_query_export = bigquery_export - request.update_mask = field_mask - - response = client.update_big_query_export(request) - - if response.filter != export_filter: - print("Failed to update BigQueryExport!") - return - print("BigQueryExport updated successfully!") - - -# [END securitycenter_update_bigquery_export] - - -# [START securitycenter_delete_bigquery_export] -def delete_bigquery_export(parent: str, bigquery_export_id: str): - """ - Delete an existing BigQuery export. - Args: - parent: Use any one of the following resource paths: - - organizations/{organization_id} - - folders/{folder_id} - - projects/{project_id} - bigquery_export_id: Unique identifier that is used to identify the export. - """ - from google.cloud import securitycenter - - client = securitycenter.SecurityCenterClient() - - request = securitycenter.DeleteBigQueryExportRequest() - request.name = f"{parent}/bigQueryExports/{bigquery_export_id}" - - client.delete_big_query_export(request) - print(f"BigQuery export request deleted successfully: {bigquery_export_id}") - - -# [END securitycenter_delete_bigquery_export] diff --git a/samples/snippets/snippets_bigquery_export_test.py b/samples/snippets/snippets_bigquery_export_test.py deleted file mode 100644 index 197bd6f6..00000000 --- a/samples/snippets/snippets_bigquery_export_test.py +++ /dev/null @@ -1,96 +0,0 @@ -#!/usr/bin/env python -# -# Copyright 2022 Google LLC -# -# Licensed under the Apache License, Version 2.0 (the "License"); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at -# -# https://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. - - -# TODO(developer): Replace these variables before running the sample. -import os -import re -import uuid - -from _pytest.capture import CaptureFixture -import pytest - -import snippets_bigquery_export - -PROJECT_ID = os.environ["GOOGLE_CLOUD_PROJECT"] -GOOGLE_APPLICATION_CREDENTIALS = os.environ["GOOGLE_APPLICATION_CREDENTIALS"] -BIGQUERY_DATASET_ID = f"sampledataset{str(uuid.uuid4()).split('-')[0]}" - - -@pytest.fixture(scope="module") -def bigquery_export_id(): - bigquery_export_id = f"default-{str(uuid.uuid4()).split('-')[0]}" - - create_bigquery_dataset(BIGQUERY_DATASET_ID) - export_filter = 'severity="LOW" OR severity="MEDIUM"' - snippets_bigquery_export.create_bigquery_export( - f"projects/{PROJECT_ID}", export_filter, BIGQUERY_DATASET_ID, bigquery_export_id - ) - - yield bigquery_export_id - - snippets_bigquery_export.delete_bigquery_export( - f"projects/{PROJECT_ID}", bigquery_export_id - ) - delete_bigquery_dataset(BIGQUERY_DATASET_ID) - - -def create_bigquery_dataset(dataset_id: str): - from google.cloud import bigquery - - bigquery_client = bigquery.Client() - - dataset_id_full = "{}.{}".format(PROJECT_ID, dataset_id) - dataset = bigquery.Dataset(dataset_id_full) - - dataset = bigquery_client.create_dataset(dataset) - print("Dataset {} created.".format(dataset.dataset_id)) - - -def delete_bigquery_dataset(dataset_id: str): - from google.cloud import bigquery - - bigquery_client = bigquery.Client() - bigquery_client.delete_dataset(dataset_id) - print("Dataset {} deleted.".format(dataset_id)) - - -def test_get_bigquery_export(capsys: CaptureFixture, bigquery_export_id: str): - snippets_bigquery_export.get_bigquery_export( - f"projects/{PROJECT_ID}", bigquery_export_id - ) - out, _ = capsys.readouterr() - assert re.search( - "Retrieved the BigQuery export", - out, - ) - assert re.search(f"bigQueryExports/{bigquery_export_id}", out) - - -def test_list_bigquery_exports(capsys: CaptureFixture, bigquery_export_id: str): - snippets_bigquery_export.list_bigquery_exports(f"projects/{PROJECT_ID}") - out, _ = capsys.readouterr() - assert re.search("Listing BigQuery exports:", out) - assert re.search(bigquery_export_id, out) - - -def test_update_bigquery_exports(capsys: CaptureFixture, bigquery_export_id: str): - export_filter = 'severity="MEDIUM"' - snippets_bigquery_export.update_bigquery_export( - f"projects/{PROJECT_ID}", export_filter, bigquery_export_id - ) - out, _ = capsys.readouterr() - assert re.search("BigQueryExport updated successfully!", out) diff --git a/samples/snippets/snippets_findings.py b/samples/snippets/snippets_findings.py deleted file mode 100644 index 06cddc41..00000000 --- a/samples/snippets/snippets_findings.py +++ /dev/null @@ -1,586 +0,0 @@ -#!/usr/bin/env python -# -# Copyright 2019 Google LLC -# -# Licensed under the Apache License, Version 2.0 (the "License"); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at -# -# https://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. - -"""Examples of working with source and findings in Cloud Security Command Center.""" - - -def create_source(organization_id): - """Create a new findings source.""" - # [START securitycenter_create_source] - from google.cloud import securitycenter - - client = securitycenter.SecurityCenterClient() - # organization_id is the numeric ID of the organization. e.g.: - # organization_id = "111122222444" - org_name = "organizations/{org_id}".format(org_id=organization_id) - - created = client.create_source( - request={ - "parent": org_name, - "source": { - "display_name": "Customized Display Name", - "description": "A new custom source that does X", - }, - } - ) - print("Created Source: {}".format(created.name)) - # [END securitycenter_create_source] - - -def get_source(source_name): - """Gets an existing source.""" - # [START securitycenter_get_source] - from google.cloud import securitycenter - - client = securitycenter.SecurityCenterClient() - - # source_name is the resource path for a source that has been - # created previously (you can use list_sources to find a specific one). - # Its format is: - # source_name = "organizations/{organization_id}/sources/{source_id}" - # e.g.: - # source_name = "organizations/111122222444/sources/1234" - source = client.get_source(request={"name": source_name}) - - print("Source: {}".format(source)) - # [END securitycenter_get_source] - return source - - -def update_source(source_name): - """Updates a source's display name.""" - # [START securitycenter_update_source] - from google.cloud import securitycenter - from google.protobuf import field_mask_pb2 - - client = securitycenter.SecurityCenterClient() - - # Field mask to only update the display name. - field_mask = field_mask_pb2.FieldMask(paths=["display_name"]) - - # source_name is the resource path for a source that has been - # created previously (you can use list_sources to find a specific one). - # Its format is: - # source_name = "organizations/{organization_id}/sources/{source_id}" - # e.g.: - # source_name = "organizations/111122222444/sources/1234" - updated = client.update_source( - request={ - "source": {"name": source_name, "display_name": "Updated Display Name"}, - "update_mask": field_mask, - } - ) - print("Updated Source: {}".format(updated)) - # [END securitycenter_update_source] - return updated - - -def add_user_to_source(source_name): - """Gives a user findingsEditor permission to the source.""" - user_email = "csccclienttest@gmail.com" - # [START securitycenter_set_source_iam] - from google.cloud import securitycenter - from google.iam.v1 import policy_pb2 - - client = securitycenter.SecurityCenterClient() - - # source_name is the resource path for a source that has been - # created previously (you can use list_sources to find a specific one). - # Its format is: - # source_name = "organizations/{organization_id}/sources/{source_id}" - # e.g.: - # source_name = "organizations/111122222444/sources/1234" - # Get the old policy so we can do an incremental update. - old_policy = client.get_iam_policy(request={"resource": source_name}) - print("Old Policy: {}".format(old_policy)) - - # Setup a new IAM binding. - binding = policy_pb2.Binding() - binding.role = "roles/securitycenter.findingsEditor" - # user_email is an e-mail address known to Cloud IAM (e.g. a gmail address). - # user_mail = user@somedomain.com - binding.members.append("user:{}".format(user_email)) - - # Setting the e-tag avoids over-write existing policy - updated = client.set_iam_policy( - request={ - "resource": source_name, - "policy": {"etag": old_policy.etag, "bindings": [binding]}, - } - ) - - print("Updated Policy: {}".format(updated)) - - # [END securitycenter_set_source_iam] - return binding, updated - - -def list_source(organization_id): - """Lists finding sources.""" - i = -1 - # [START securitycenter_list_sources] - from google.cloud import securitycenter - - # Create a new client. - client = securitycenter.SecurityCenterClient() - # organization_id is the numeric ID of the organization. e.g.: - # organization_id = "111122222444" - org_name = "organizations/{org_id}".format(org_id=organization_id) - - # Call the API and print out each existing source. - for i, source in enumerate(client.list_sources(request={"parent": org_name})): - print(i, source) - # [END securitycenter_list_sources] - return i - - -def create_finding(source_name, finding_id): - """Creates a new finding.""" - # [START securitycenter_create_finding] - import datetime - - from google.cloud import securitycenter - from google.cloud.securitycenter_v1 import Finding - - # Create a new client. - client = securitycenter.SecurityCenterClient() - - # Use the current time as the finding "event time". - event_time = datetime.datetime.now(tz=datetime.timezone.utc) - - # source_name is the resource path for a source that has been - # created previously (you can use list_sources to find a specific one). - # Its format is: - # source_name = "organizations/{organization_id}/sources/{source_id}" - # e.g.: - # source_name = "organizations/111122222444/sources/1234" - - # The resource this finding applies to. The CSCC UI can link - # the findings for a resource to the corresponding Asset of a resource - # if there are matches. - resource_name = "//cloudresourcemanager.googleapis.com/organizations/11232" - - finding = Finding( - state=Finding.State.ACTIVE, - resource_name=resource_name, - category="MEDIUM_RISK_ONE", - event_time=event_time, - ) - - # Call The API. - created_finding = client.create_finding( - request={"parent": source_name, "finding_id": finding_id, "finding": finding} - ) - print(created_finding) - # [END securitycenter_create_finding] - return created_finding - - -def create_finding_with_source_properties(source_name): - """Demonstrate creating a new finding with source properties.""" - # [START securitycenter_create_finding_with_source_properties] - import datetime - - from google.cloud import securitycenter - from google.cloud.securitycenter_v1 import Finding - from google.protobuf.struct_pb2 import Value - - # Create a new client. - client = securitycenter.SecurityCenterClient() - - # source_name is the resource path for a source that has been - # created previously (you can use list_sources to find a specific one). - # Its format is: - # source_name = "organizations/{organization_id}/sources/{source_id}" - # e.g.: - # source_name = "organizations/111122222444/sources/1234" - - # Controlled by caller. - finding_id = "samplefindingid2" - - # The resource this finding applies to. The CSCC UI can link - # the findings for a resource to the corresponding Asset of a resource - # if there are matches. - resource_name = "//cloudresourcemanager.googleapis.com/organizations/11232" - - # Define source properties values as protobuf "Value" objects. - str_value = Value() - str_value.string_value = "string_example" - num_value = Value() - num_value.number_value = 1234 - - # Use the current time as the finding "event time". - event_time = datetime.datetime.now(tz=datetime.timezone.utc) - - finding = Finding( - state=Finding.State.ACTIVE, - resource_name=resource_name, - category="MEDIUM_RISK_ONE", - source_properties={"s_value": "string_example", "n_value": 1234}, - event_time=event_time, - ) - - created_finding = client.create_finding( - request={"parent": source_name, "finding_id": finding_id, "finding": finding} - ) - print(created_finding) - # [END securitycenter_create_finding_with_source_properties] - - -def update_finding(source_name): - # [START securitycenter_update_finding_source_properties] - import datetime - - from google.cloud import securitycenter - from google.cloud.securitycenter_v1 import Finding - from google.protobuf import field_mask_pb2 - - client = securitycenter.SecurityCenterClient() - # Only update the specific source property and event_time. event_time - # is required for updates. - field_mask = field_mask_pb2.FieldMask( - paths=["source_properties.s_value", "event_time"] - ) - - # Set the update time to Now. This must be some time greater then the - # event_time on the original finding. - event_time = datetime.datetime.now(tz=datetime.timezone.utc) - - # source_name is the resource path for a source that has been - # created previously (you can use list_sources to find a specific one). - # Its format is: - # source_name = "organizations/{organization_id}/sources/{source_id}" - # e.g.: - # source_name = "organizations/111122222444/sources/1234" - finding_name = "{}/findings/samplefindingid2".format(source_name) - finding = Finding( - name=finding_name, - source_properties={"s_value": "new_string"}, - event_time=event_time, - ) - updated_finding = client.update_finding( - request={"finding": finding, "update_mask": field_mask} - ) - - print( - "New Source properties: {}, Event Time {}".format( - updated_finding.source_properties, updated_finding.event_time - ) - ) - # [END securitycenter_update_finding_source_properties] - - -def update_finding_state(source_name): - """Demonstrate updating only a finding state.""" - # [START securitycenter_update_finding_state] - import datetime - - from google.cloud import securitycenter - from google.cloud.securitycenter_v1 import Finding - - # Create a client. - client = securitycenter.SecurityCenterClient() - # source_name is the resource path for a source that has been - # created previously (you can use list_sources to find a specific one). - # Its format is: - # source_name = "organizations/{organization_id}/sources/{source_id}" - # e.g.: - # source_name = "organizations/111122222444/sources/1234" - finding_name = "{}/findings/samplefindingid2".format(source_name) - - # Call the API to change the finding state to inactive as of now. - new_finding = client.set_finding_state( - request={ - "name": finding_name, - "state": Finding.State.INACTIVE, - "start_time": datetime.datetime.now(tz=datetime.timezone.utc), - } - ) - print(f"New state: {new_finding.state}") - # [END securitycenter_update_finding_state] - - -def trouble_shoot(source_name): - """Demonstrate calling test_iam_permissions to determine if the - service account has the correct permisions.""" - # [START securitycenter_test_iam] - from google.cloud import securitycenter - - # Create a client. - client = securitycenter.SecurityCenterClient() - # source_name is the resource path for a source that has been - # created previously (you can use list_sources to find a specific one). - # Its format is: - # source_name = "organizations/{organization_id}/sources/{source_id}" - # e.g.: - # source_name = "organizations/111122222444/sources/1234" - - # Check for permssions to call create_finding or update_finding. - permission_response = client.test_iam_permissions( - request={ - "resource": source_name, - "permissions": ["securitycenter.findings.update"], - } - ) - - print( - "Permision to create or update findings? {}".format( - len(permission_response.permissions) > 0 - ) - ) - # [END securitycenter_test_iam] - assert len(permission_response.permissions) > 0 - # [START securitycenter_test_iam] - # Check for permissions necessary to call set_finding_state. - permission_response = client.test_iam_permissions( - request={ - "resource": source_name, - "permissions": ["securitycenter.findings.setState"], - } - ) - print( - "Permision to update state? {}".format(len(permission_response.permissions) > 0) - ) - # [END securitycenter_test_iam] - return permission_response - assert len(permission_response.permissions) > 0 - - -def list_all_findings(organization_id): - # [START securitycenter_list_all_findings] - from google.cloud import securitycenter - - # Create a client. - client = securitycenter.SecurityCenterClient() - - # organization_id is the numeric ID of the organization. e.g.: - # organization_id = "111122222444" - org_name = "organizations/{org_id}".format(org_id=organization_id) - # The "sources/-" suffix lists findings across all sources. You - # also use a specific source_name instead. - all_sources = "{org_name}/sources/-".format(org_name=org_name) - finding_result_iterator = client.list_findings(request={"parent": all_sources}) - for i, finding_result in enumerate(finding_result_iterator): - print( - "{}: name: {} resource: {}".format( - i, finding_result.finding.name, finding_result.finding.resource_name - ) - ) - # [END securitycenter_list_all_findings] - return i - - -def list_filtered_findings(source_name): - # [START securitycenter_list_filtered_findings] - from google.cloud import securitycenter - - # Create a new client. - client = securitycenter.SecurityCenterClient() - - # source_name is the resource path for a source that has been - # created previously (you can use list_sources to find a specific one). - # Its format is: - # source_name = "organizations/{organization_id}/sources/{source_id}" - # e.g.: - # source_name = "organizations/111122222444/sources/1234" - # You an also use a wild-card "-" for all sources: - # source_name = "organizations/111122222444/sources/-" - finding_result_iterator = client.list_findings( - request={"parent": source_name, "filter": 'category="MEDIUM_RISK_ONE"'} - ) - # Iterate an print all finding names and the resource they are - # in reference to. - for i, finding_result in enumerate(finding_result_iterator): - print( - "{}: name: {} resource: {}".format( - i, finding_result.finding.name, finding_result.finding.resource_name - ) - ) - # [END securitycenter_list_filtered_findings] - return i - - -def list_findings_at_time(source_name): - # [START securitycenter_list_findings_at_time] - from datetime import datetime, timedelta - - from google.cloud import securitycenter - - # Create a new client. - client = securitycenter.SecurityCenterClient() - - # source_name is the resource path for a source that has been - # created previously (you can use list_sources to find a specific one). - # Its format is: - # source_name = "organizations/{organization_id}/sources/{source_id}" - # e.g.: - # source_name = "organizations/111122222444/sources/1234" - # You an also use a wild-card "-" for all sources: - # source_name = "organizations/111122222444/sources/-" - five_days_ago = str(datetime.now() - timedelta(days=5)) - # [END securitycenter_list_findings_at_time] - i = -1 - # [START securitycenter_list_findings_at_time] - - finding_result_iterator = client.list_findings( - request={"parent": source_name, "filter": five_days_ago} - ) - for i, finding_result in enumerate(finding_result_iterator): - print( - "{}: name: {} resource: {}".format( - i, finding_result.finding.name, finding_result.finding.resource_name - ) - ) - # [END securitycenter_list_findings_at_time] - return i - - -def get_iam_policy(source_name): - """Gives a user findingsEditor permission to the source.""" - # [START securitycenter_get_source_iam] - from google.cloud import securitycenter - - client = securitycenter.SecurityCenterClient() - - # source_name is the resource path for a source that has been - # created previously (you can use list_sources to find a specific one). - # Its format is: - # source_name = "organizations/{organization_id}/sources/{source_id}" - # e.g.: - # source_name = "organizations/111122222444/sources/1234" - # Get the old policy so we can do an incremental update. - policy = client.get_iam_policy(request={"resource": source_name}) - print("Policy: {}".format(policy)) - # [END securitycenter_get_source_iam] - - -def group_all_findings(organization_id): - """Demonstrates grouping all findings across an organization.""" - i = 0 - # [START securitycenter_group_all_findings] - from google.cloud import securitycenter - - # Create a client. - client = securitycenter.SecurityCenterClient() - - # organization_id is the numeric ID of the organization. e.g.: - # organization_id = "111122222444" - org_name = "organizations/{org_id}".format(org_id=organization_id) - # The "sources/-" suffix lists findings across all sources. You - # also use a specific source_name instead. - all_sources = "{org_name}/sources/-".format(org_name=org_name) - group_result_iterator = client.group_findings( - request={"parent": all_sources, "group_by": "category"} - ) - for i, group_result in enumerate(group_result_iterator): - print((i + 1), group_result) - # [END securitycenter_group_all_findings] - return i - - -def group_filtered_findings(source_name): - """Demonstrates grouping all findings across an organization.""" - i = 0 - # [START securitycenter_group_filtered_findings] - from google.cloud import securitycenter - - # Create a client. - client = securitycenter.SecurityCenterClient() - - # source_name is the resource path for a source that has been - # created previously (you can use list_sources to find a specific one). - # Its format is: - # source_name = "organizations/{organization_id}/sources/{source_id}" - # e.g.: - # source_name = "organizations/111122222444/sources/1234" - - group_result_iterator = client.group_findings( - request={ - "parent": source_name, - "group_by": "category", - "filter": 'state="ACTIVE"', - } - ) - for i, group_result in enumerate(group_result_iterator): - print((i + 1), group_result) - # [END securitycenter_group_filtered_findings] - return i - - -def group_findings_at_time(source_name): - """Demonstrates grouping all findings across an organization as of - a specific time.""" - i = -1 - # [START securitycenter_group_findings_at_time] - from datetime import datetime, timedelta - - from google.cloud import securitycenter - - # Create a client. - client = securitycenter.SecurityCenterClient() - - # source_name is the resource path for a source that has been - # created previously (you can use list_sources to find a specific one). - # Its format is: - # source_name = "organizations/{organization_id}/sources/{source_id}" - # e.g.: - # source_name = "organizations/111122222444/sources/1234" - - # Group findings as of yesterday. - read_time = datetime.utcnow() - timedelta(days=1) - - group_result_iterator = client.group_findings( - request={"parent": source_name, "group_by": "category", "read_time": read_time} - ) - for i, group_result in enumerate(group_result_iterator): - print((i + 1), group_result) - # [END securitycenter_group_findings_at_time] - return i - - -def group_findings_and_changes(source_name): - """Demonstrates grouping all findings across an organization and - associated changes.""" - i = 0 - # [START securitycenter_group_findings_with_changes] - from datetime import timedelta - - from google.cloud import securitycenter - - # Create a client. - client = securitycenter.SecurityCenterClient() - - # source_name is the resource path for a source that has been - # created previously (you can use list_sources to find a specific one). - # Its format is: - # source_name = "organizations/{organization_id}/sources/{source_id}" - # e.g.: - # source_name = "organizations/111122222444/sources/1234" - - # List assets and their state change the last 30 days - compare_delta = timedelta(days=30) - - group_result_iterator = client.group_findings( - request={ - "parent": source_name, - "group_by": "state_change", - "compare_duration": compare_delta, - } - ) - for i, group_result in enumerate(group_result_iterator): - print((i + 1), group_result) - # [END securitycenter_group_findings_with_changes]] - return i diff --git a/samples/snippets/snippets_findings_test.py b/samples/snippets/snippets_findings_test.py deleted file mode 100644 index 8cd6353f..00000000 --- a/samples/snippets/snippets_findings_test.py +++ /dev/null @@ -1,135 +0,0 @@ -#!/usr/bin/env python -# -# Copyright 2020 Google LLC -# -# Licensed under the Apache License, Version 2.0 (the "License"); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at -# -# https://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. -from itertools import chain -import os - -import pytest - -import snippets_findings - - -@pytest.fixture(scope="module") -def organization_id(): - """Get Organization ID from the environment variable""" - return os.environ["GCLOUD_ORGANIZATION"] - - -@pytest.fixture(scope="module") -def source_name(organization_id): - from google.cloud import securitycenter - - client = securitycenter.SecurityCenterClient() - org_name = "organizations/{org_id}".format(org_id=organization_id) - - source = client.create_source( - request={ - "parent": org_name, - "source": { - "display_name": "Unit test source", - "description": "A new custom source that does X", - }, - } - ) - return source.name - - -def test_create_source(organization_id): - snippets_findings.create_source(organization_id) - - -def test_get_source(source_name): - source = snippets_findings.get_source(source_name) - assert source.name == source_name - - -def test_update_source(source_name): - updated = snippets_findings.update_source(source_name) - assert updated.display_name == "Updated Display Name" - - -def test_add_user_to_source(source_name): - binding, updated = snippets_findings.add_user_to_source(source_name) - assert any( - member == "user:csccclienttest@gmail.com" - for member in chain.from_iterable( - binding.members for binding in updated.bindings - ) - ) - - -def test_list_source(organization_id): - count = snippets_findings.list_source(organization_id) - assert count >= 0 - - -def test_create_finding(source_name): - created_finding = snippets_findings.create_finding(source_name, "samplefindingid") - assert len(created_finding.name) > 0 - - -def test_create_finding_with_source_properties(source_name): - snippets_findings.create_finding_with_source_properties(source_name) - - -def test_update_finding(source_name): - snippets_findings.update_finding(source_name) - - -def test_update_finding_state(source_name): - snippets_findings.update_finding_state(source_name) - - -def test_trouble_shoot(source_name): - snippets_findings.trouble_shoot(source_name) - - -def test_list_all_findings(organization_id): - count = snippets_findings.list_all_findings(organization_id) - assert count > 0 - - -def test_list_filtered_findings(source_name): - count = snippets_findings.list_filtered_findings(source_name) - assert count > 0 - - -def list_findings_at_time(source_name): - count = snippets_findings.list_findings_at_time(source_name) - assert count == -1 - - -def test_get_iam_policy(source_name): - snippets_findings.get_iam_policy(source_name) - - -def test_group_all_findings(organization_id): - count = snippets_findings.group_all_findings(organization_id) - assert count > 0 - - -def test_group_filtered_findings(source_name): - count = snippets_findings.group_filtered_findings(source_name) - assert count == 0 - - -def test_group_findings_at_time(source_name): - count = snippets_findings.group_findings_at_time(source_name) - assert count == -1 - - -def test_group_findings_and_changes(source_name): - count = snippets_findings.group_findings_and_changes(source_name) - assert count == 0 diff --git a/samples/snippets/snippets_list_assets.py b/samples/snippets/snippets_list_assets.py deleted file mode 100644 index 42511b94..00000000 --- a/samples/snippets/snippets_list_assets.py +++ /dev/null @@ -1,208 +0,0 @@ -#!/usr/bin/env python -# -# Copyright 2019 Google LLC -# -# Licensed under the Apache License, Version 2.0 (the "License"); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at -# -# https://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. - -""" Examples of listing assets in Cloud Security Command Center.""" - - -def list_all_assets(organization_id): - """Demonstrate listing and printing all assets.""" - i = 0 - # [START securitycenter_list_all_assets] - from google.cloud import securitycenter - - client = securitycenter.SecurityCenterClient() - # organization_id is the numeric ID of the organization. - # organization_id = "1234567777" - org_name = "organizations/{org_id}".format(org_id=organization_id) - - # Call the API and print results. - asset_iterator = client.list_assets(request={"parent": org_name}) - for i, asset_result in enumerate(asset_iterator): - print(i, asset_result) - # [END securitycenter_list_all_assets] - return i - - -def list_assets_with_filters(organization_id): - """Demonstrate listing assets with a filter.""" - i = 0 - # [START securitycenter_list_assets_with_filter] - from google.cloud import securitycenter - - client = securitycenter.SecurityCenterClient() - - # organization_id is the numeric ID of the organization. - # organization_id = "1234567777" - org_name = "organizations/{org_id}".format(org_id=organization_id) - - project_filter = ( - "security_center_properties.resource_type=" - + '"google.cloud.resourcemanager.Project"' - ) - # Call the API and print results. - asset_iterator = client.list_assets( - request={"parent": org_name, "filter": project_filter} - ) - for i, asset_result in enumerate(asset_iterator): - print(i, asset_result) - # [END securitycenter_list_assets_with_filter] - return i - - -def list_assets_with_filters_and_read_time(organization_id): - """Demonstrate listing assets with a filter.""" - i = 0 - # [START securitycenter_list_assets_at_time] - from datetime import datetime, timedelta, timezone - - from google.cloud import securitycenter - - client = securitycenter.SecurityCenterClient() - - # organization_id is the numeric ID of the organization. - # organization_id = "1234567777" - org_name = "organizations/{org_id}".format(org_id=organization_id) - - project_filter = ( - "security_center_properties.resource_type=" - + '"google.cloud.resourcemanager.Project"' - ) - - # Lists assets as of yesterday. - read_time = datetime.now(tz=timezone.utc) - timedelta(days=1) - - # Call the API and print results. - asset_iterator = client.list_assets( - request={"parent": org_name, "filter": project_filter, "read_time": read_time} - ) - for i, asset_result in enumerate(asset_iterator): - print(i, asset_result) - # [END securitycenter_list_assets_at_time] - return i - - -def list_point_in_time_changes(organization_id): - """Demonstrate listing assets along with their state changes.""" - i = 0 - # [START securitycenter_list_assets_and_changes] - from datetime import timedelta - - from google.cloud import securitycenter - - client = securitycenter.SecurityCenterClient() - - # organization_id is the numeric ID of the organization. - # organization_id = "1234567777" - org_name = "organizations/{org_id}".format(org_id=organization_id) - project_filter = ( - "security_center_properties.resource_type=" - + '"google.cloud.resourcemanager.Project"' - ) - - # List assets and their state change the last 30 days - compare_delta = timedelta(days=30) - - # Call the API and print results. - asset_iterator = client.list_assets( - request={ - "parent": org_name, - "filter": project_filter, - "compare_duration": compare_delta, - } - ) - for i, asset in enumerate(asset_iterator): - print(i, asset) - - # [END securitycenter_list_assets_and_changes] - return i - - -def group_assets(organization_id): - """Demonstrates grouping all assets by type.""" - i = 0 - # [START securitycenter_group_all_assets] - from google.cloud import securitycenter - - client = securitycenter.SecurityCenterClient() - - # organization_id is the numeric ID of the organization. - # organization_id = "1234567777" - org_name = "organizations/{org_id}".format(org_id=organization_id) - - group_by_type = "security_center_properties.resource_type" - - result_iterator = client.group_assets( - request={"parent": org_name, "group_by": group_by_type} - ) - for i, result in enumerate(result_iterator): - print((i + 1), result) - # [END securitycenter_group_all_assets] - return i - - -def group_filtered_assets(organization_id): - """Demonstrates grouping assets by type with a filter.""" - i = 0 - # [START securitycenter_group_all_assets_with_filter] - from google.cloud import securitycenter - - client = securitycenter.SecurityCenterClient() - - # organization_id is the numeric ID of the organization. - # organization_id = "1234567777" - org_name = "organizations/{org_id}".format(org_id=organization_id) - - group_by_type = "security_center_properties.resource_type" - only_projects = ( - "security_center_properties.resource_type=" - + '"google.cloud.resourcemanager.Project"' - ) - result_iterator = client.group_assets( - request={"parent": org_name, "group_by": group_by_type, "filter": only_projects} - ) - for i, result in enumerate(result_iterator): - print((i + 1), result) - # [END securitycenter_group_all_assets_with_filter] - # only one asset type is a project - return i - - -def group_assets_by_changes(organization_id): - """Demonstrates grouping assets by their changes over a period of time.""" - i = 0 - # [START securitycenter_group_all_assets_by_change] - from datetime import timedelta - - from google.cloud import securitycenter - - client = securitycenter.SecurityCenterClient() - - duration = timedelta(days=5) - - # organization_id is the numeric ID of the organization. - # organization_id = "1234567777" - org_name = "organizations/{org_id}".format(org_id=organization_id) - result_iterator = client.group_assets( - request={ - "parent": org_name, - "group_by": "state_change", - "compare_duration": duration, - } - ) - for i, result in enumerate(result_iterator): - print((i + 1), result) - # [END securitycenter_group_all_assets_by_change] - return i diff --git a/samples/snippets/snippets_list_assets_test.py b/samples/snippets/snippets_list_assets_test.py deleted file mode 100644 index 44514238..00000000 --- a/samples/snippets/snippets_list_assets_test.py +++ /dev/null @@ -1,64 +0,0 @@ -#!/usr/bin/env python -# -# Copyright 2020 Google LLC -# -# Licensed under the Apache License, Version 2.0 (the "License"); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at -# -# https://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. -"""Tests for snippets.""" - -import os - -import pytest - -import snippets_list_assets - - -@pytest.fixture(scope="module") -def organization_id(): - """Get Organization ID from the environment variable""" - return os.environ["GCLOUD_ORGANIZATION"] - - -def test_list_all_assets(organization_id): - """Demonstrate listing and printing all assets.""" - count = snippets_list_assets.list_all_assets(organization_id) - assert count > 0 - - -def list_assets_with_filters(organization_id): - count = snippets_list_assets.list_all_assets(organization_id) - assert count > 0 - - -def test_list_assets_with_filters_and_read_time(organization_id): - count = snippets_list_assets.list_assets_with_filters_and_read_time(organization_id) - assert count > 0 - - -def test_list_point_in_time_changes(organization_id): - count = snippets_list_assets.list_point_in_time_changes(organization_id) - assert count > 0 - - -def test_group_assets(organization_id): - count = snippets_list_assets.group_assets(organization_id) - assert count >= 8 # 8 different asset types. - - -def test_group_filtered_assets(organization_id): - count = snippets_list_assets.group_filtered_assets(organization_id) - assert count == 0 - - -def test_group_assets_by_changes(organization_id): - count = snippets_list_assets.group_assets_by_changes(organization_id) - assert count >= 0 # only one asset type is a project diff --git a/samples/snippets/snippets_mute_config.py b/samples/snippets/snippets_mute_config.py deleted file mode 100644 index 97de1312..00000000 --- a/samples/snippets/snippets_mute_config.py +++ /dev/null @@ -1,264 +0,0 @@ -#!/usr/bin/env python -# -# Copyright 2022 Google LLC -# -# Licensed under the Apache License, Version 2.0 (the "License"); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at -# -# https://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. - - -# [START securitycenter_create_mute_config] - - -def create_mute_rule(parent_path: str, mute_config_id: str) -> None: - """ - Creates a mute configuration under a given scope that will mute - all new findings that match a given filter. - Existing findings will NOT BE muted. - Args: - parent_path: use any one of the following options: - - organizations/{organization_id} - - folders/{folder_id} - - projects/{project_id} - mute_config_id: Set a unique id; max of 63 chars. - """ - - from google.cloud import securitycenter - - client = securitycenter.SecurityCenterClient() - - mute_config = securitycenter.MuteConfig() - mute_config.description = "Mute low-medium IAM grants excluding 'compute' " - # Set mute rule(s). - # To construct mute rules and for supported properties, see: - # https://cloud.google.com/security-command-center/docs/how-to-mute-findings#create_mute_rules - mute_config.filter = ( - 'severity="LOW" OR severity="MEDIUM" AND ' - 'category="Persistence: IAM Anomalous Grant" AND ' - '-resource.type:"compute"' - ) - - request = securitycenter.CreateMuteConfigRequest() - request.parent = parent_path - request.mute_config_id = mute_config_id - request.mute_config = mute_config - - mute_config = client.create_mute_config(request=request) - print(f"Mute rule created successfully: {mute_config.name}") - - -# [END securitycenter_create_mute_config] - - -# [START securitycenter_delete_mute_config] -def delete_mute_rule(mute_config_name: str) -> None: - """ - Deletes a mute configuration given its resource name. - Note: Previously muted findings are not affected when a mute config is deleted. - Args: - mute_config_name: Specify the name of the mute config to delete. - Use any one of the following formats: - - organizations/{organization}/muteConfigs/{config_id} - - folders/{folder}/muteConfigs/{config_id} or - - projects/{project}/muteConfigs/{config_id} - """ - from google.cloud import securitycenter - - client = securitycenter.SecurityCenterClient() - - request = securitycenter.DeleteMuteConfigRequest() - request.name = mute_config_name - - client.delete_mute_config(request) - print(f"Mute rule deleted successfully: {mute_config_name}") - - -# [END securitycenter_delete_mute_config] - - -# [START securitycenter_get_mute_config] -def get_mute_rule(mute_config_name: str) -> None: - """ - Retrieves a mute configuration given its resource name. - Args: - mute_config_name: Name of the mute config to retrieve. - Use any one of the following formats: - - organizations/{organization}/muteConfigs/{config_id} - - folders/{folder}/muteConfigs/{config_id} - - projects/{project}/muteConfigs/{config_id} - """ - from google.cloud import securitycenter - - client = securitycenter.SecurityCenterClient() - - request = securitycenter.GetMuteConfigRequest() - request.name = mute_config_name - - mute_config = client.get_mute_config(request) - print(f"Retrieved the mute rule: {mute_config.name}") - - -# [END securitycenter_get_mute_config] - - -# [START securitycenter_list_mute_configs] -def list_mute_rules(parent: str) -> None: - """ - Listing mute configs at organization level will return all the configs - at the org, folder and project levels. - Similarly, listing configs at folder level will list all the configs - at the folder and project levels. - Args: - parent: Use any one of the following resource paths to list mute configurations: - - organizations/{organization_id} - - folders/{folder_id} - - projects/{project_id} - """ - from google.cloud import securitycenter - - client = securitycenter.SecurityCenterClient() - - request = securitycenter.ListMuteConfigsRequest() - request.parent = parent - - # List all Mute Configs present in the resource. - for mute_config in client.list_mute_configs(request): - print(mute_config.name) - - -# [END securitycenter_list_mute_configs] - - -# [START securitycenter_update_mute_config] -def update_mute_rule(mute_config_name: str) -> None: - """ - Updates an existing mute configuration. - The following can be updated in a mute config: description, and filter/ mute rule. - Args: - mute_config_name: Specify the name of the mute config to delete. - Use any one of the following formats: - - organizations/{organization}/muteConfigs/{config_id} - - folders/{folder}/muteConfigs/{config_id} - - projects/{project}/muteConfigs/{config_id} - """ - from google.cloud import securitycenter - from google.protobuf import field_mask_pb2 - - client = securitycenter.SecurityCenterClient() - - update_mute_config = securitycenter.MuteConfig() - update_mute_config.name = mute_config_name - update_mute_config.description = "Updated mute config description" - - field_mask = field_mask_pb2.FieldMask(paths=["description"]) - - request = securitycenter.UpdateMuteConfigRequest() - request.mute_config = update_mute_config - # Set the update mask to specify which properties of the Mute Config should be updated. - # If empty, all mutable fields will be updated. - # Make sure that the mask fields match the properties changed in 'update_mute_config'. - # For more info on constructing update mask path, see the proto or: - # https://cloud.google.com/security-command-center/docs/reference/rest/v1/folders.muteConfigs/patch?hl=en#query-parameters - request.update_mask = field_mask - - mute_config = client.update_mute_config(request) - print(f"Updated mute rule : {mute_config}") - - -# [END securitycenter_update_mute_config] - - -# [START securitycenter_set_mute] -def set_mute_finding(finding_path: str) -> None: - """ - Mute an individual finding. - If a finding is already muted, muting it again has no effect. - Various mute states are: MUTE_UNSPECIFIED/MUTE/UNMUTE. - Args: - finding_path: The relative resource name of the finding. See: - https://cloud.google.com/apis/design/resource_names#relative_resource_name - Use any one of the following formats: - - organizations/{organization_id}/sources/{source_id}/finding/{finding_id}, - - folders/{folder_id}/sources/{source_id}/finding/{finding_id}, - - projects/{project_id}/sources/{source_id}/finding/{finding_id}. - """ - from google.cloud import securitycenter - - client = securitycenter.SecurityCenterClient() - - request = securitycenter.SetMuteRequest() - request.name = finding_path - request.mute = securitycenter.Finding.Mute.MUTED - - finding = client.set_mute(request) - print(f"Mute value for the finding: {finding.mute.name}") - - -# [END securitycenter_set_mute] - - -# [START securitycenter_set_unmute] -def set_unmute_finding(finding_path: str) -> None: - """ - Unmute an individual finding. - Unmuting a finding that isn't muted has no effect. - Various mute states are: MUTE_UNSPECIFIED/MUTE/UNMUTE. - Args: - finding_path: The relative resource name of the finding. See: - https://cloud.google.com/apis/design/resource_names#relative_resource_name - Use any one of the following formats: - - organizations/{organization_id}/sources/{source_id}/finding/{finding_id}, - - folders/{folder_id}/sources/{source_id}/finding/{finding_id}, - - projects/{project_id}/sources/{source_id}/finding/{finding_id}. - """ - from google.cloud import securitycenter - - client = securitycenter.SecurityCenterClient() - - request = securitycenter.SetMuteRequest() - request.name = finding_path - request.mute = securitycenter.Finding.Mute.UNMUTED - - finding = client.set_mute(request) - print(f"Mute value for the finding: {finding.mute.name}") - - -# [END securitycenter_set_unmute] - - -# [START securitycenter_bulk_mute] -def bulk_mute_findings(parent_path: str, mute_rule: str) -> None: - """ - Kicks off a long-running operation (LRO) to bulk mute findings for a parent based on a filter. - The parent can be either an organization, folder, or project. The findings - matched by the filter will be muted after the LRO is done. - Args: - parent_path: use any one of the following options: - - organizations/{organization} - - folders/{folder} - - projects/{project} - mute_rule: Expression that identifies findings that should be updated. - """ - from google.cloud import securitycenter - - client = securitycenter.SecurityCenterClient() - - request = securitycenter.BulkMuteFindingsRequest() - request.parent = parent_path - # To create mute rules, see: - # https://cloud.google.com/security-command-center/docs/how-to-mute-findings#create_mute_rules - request.filter = mute_rule - - response = client.bulk_mute_findings(request) - print(f"Bulk mute findings completed successfully! : {response}") - - -# [END securitycenter_bulk_mute] diff --git a/samples/snippets/snippets_mute_config_test.py b/samples/snippets/snippets_mute_config_test.py deleted file mode 100644 index 5c531d10..00000000 --- a/samples/snippets/snippets_mute_config_test.py +++ /dev/null @@ -1,129 +0,0 @@ -#!/usr/bin/env python -# -# Copyright 2022 Google LLC -# -# Licensed under the Apache License, Version 2.0 (the "License"); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at -# -# https://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. -import os -import re -import uuid - -from _pytest.capture import CaptureFixture -from google.cloud import securitycenter -from google.cloud.securitycenter_v1.services.security_center.pagers import ( - ListFindingsPager, -) -import pytest - -import snippets_mute_config - -# TODO(developer): Replace these variables before running the sample. -PROJECT_ID = os.environ["GOOGLE_CLOUD_PROJECT"] -ORGANIZATION_ID = os.environ["GCLOUD_ORGANIZATION"] -GOOGLE_APPLICATION_CREDENTIALS = os.environ["GOOGLE_APPLICATION_CREDENTIALS"] - - -@pytest.fixture -def mute_rule(): - mute_rule_create = f"random-mute-create-{uuid.uuid4()}" - mute_rule_update = f"random-mute-update-{uuid.uuid4()}" - snippets_mute_config.create_mute_rule(f"projects/{PROJECT_ID}", mute_rule_create) - snippets_mute_config.create_mute_rule(f"projects/{PROJECT_ID}", mute_rule_update) - - yield {"create": mute_rule_create, "update": mute_rule_update} - - snippets_mute_config.delete_mute_rule( - f"projects/{PROJECT_ID}/muteConfigs/{mute_rule_create}" - ) - snippets_mute_config.delete_mute_rule( - f"projects/{PROJECT_ID}/muteConfigs/{mute_rule_update}" - ) - - -@pytest.fixture -def finding(capsys: CaptureFixture): - import snippets_findings - from snippets_findings import create_finding - - snippets_findings.create_source(ORGANIZATION_ID) - out, _ = capsys.readouterr() - # source_path is of the format: organizations/{ORGANIZATION_ID}/sources/{source_name} - source_path = out.split(":")[1].strip() - source_name = source_path.split("/")[3] - finding1_path = create_finding(source_path, "1testingscc").name - finding2_path = create_finding(source_path, "2testingscc").name - - yield { - "source": source_name, - "finding1": finding1_path, - "finding2": finding2_path, - } - - -def list_all_findings(source_name) -> ListFindingsPager: - client = securitycenter.SecurityCenterClient() - return client.list_findings(request={"parent": source_name}) - - -def test_get_mute_rule(capsys: CaptureFixture, mute_rule): - snippets_mute_config.get_mute_rule( - f"projects/{PROJECT_ID}/muteConfigs/{mute_rule.get('create')}" - ) - out, _ = capsys.readouterr() - assert re.search("Retrieved the mute rule: ", out) - assert re.search(mute_rule.get("create"), out) - - -def test_list_mute_rules(capsys: CaptureFixture, mute_rule): - snippets_mute_config.list_mute_rules(f"projects/{PROJECT_ID}") - out, _ = capsys.readouterr() - assert re.search(mute_rule.get("create"), out) - assert re.search(mute_rule.get("update"), out) - - -def test_update_mute_rule(capsys: CaptureFixture, mute_rule): - snippets_mute_config.update_mute_rule( - f"projects/{PROJECT_ID}/muteConfigs/{mute_rule.get('update')}" - ) - snippets_mute_config.get_mute_rule( - f"projects/{PROJECT_ID}/muteConfigs/{mute_rule.get('update')}" - ) - out, _ = capsys.readouterr() - assert re.search("Updated mute config description", out) - - -def test_set_mute_finding(capsys: CaptureFixture, finding): - finding_path = finding.get("finding1") - snippets_mute_config.set_mute_finding(finding_path) - out, _ = capsys.readouterr() - assert re.search("Mute value for the finding: MUTED", out) - - -def test_set_unmute_finding(capsys: CaptureFixture, finding): - finding_path = finding.get("finding1") - snippets_mute_config.set_unmute_finding(finding_path) - out, _ = capsys.readouterr() - assert re.search("Mute value for the finding: UNMUTED", out) - - -def test_bulk_mute_findings(capsys: CaptureFixture, finding): - # Mute findings that belong to this project. - snippets_mute_config.bulk_mute_findings( - f"projects/{PROJECT_ID}", f'resource.project_display_name="{PROJECT_ID}"' - ) - - # Get all findings in the source to check if they are muted. - response = list_all_findings( - f"projects/{PROJECT_ID}/sources/{finding.get('source')}" - ) - for i, finding in enumerate(response): - assert finding.finding.mute == securitycenter.Finding.Mute.MUTED diff --git a/samples/snippets/snippets_notification_configs.py b/samples/snippets/snippets_notification_configs.py deleted file mode 100644 index 6cc82dd8..00000000 --- a/samples/snippets/snippets_notification_configs.py +++ /dev/null @@ -1,170 +0,0 @@ -#!/usr/bin/env python -# -# Copyright 2020 Google LLC -# -# Licensed under the Apache License, Version 2.0 (the "License"); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at -# -# https://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. -"""Demos for working with notification configs.""" - - -# [START securitycenter_create_notification_config] -def create_notification_config(parent_id, notification_config_id, pubsub_topic): - """ - Args: - parent_id: must be in one of the following formats: - "organizations/{organization_id}" - "projects/{project_id}" - "folders/{folder_id}" - notification_config_id: "your-config-id" - pubsub_topic: "projects/{your-project-id}/topics/{your-topic-ic}" - - Ensure this ServiceAccount has the "pubsub.topics.setIamPolicy" permission on the new topic. - """ - from google.cloud import securitycenter as securitycenter - - client = securitycenter.SecurityCenterClient() - - created_notification_config = client.create_notification_config( - request={ - "parent": parent_id, - "config_id": notification_config_id, - "notification_config": { - "description": "Notification for active findings", - "pubsub_topic": pubsub_topic, - "streaming_config": {"filter": 'state = "ACTIVE"'}, - }, - } - ) - - print(created_notification_config) - # [END securitycenter_create_notification_config] - return created_notification_config - - -# [START securitycenter_delete_notification_config] -def delete_notification_config(parent_id, notification_config_id): - """ - Args: - parent_id: must be in one of the following formats: - "organizations/{organization_id}" - "projects/{project_id}" - "folders/{folder_id}" - notification_config_id: "your-config-id" - """ - from google.cloud import securitycenter as securitycenter - - client = securitycenter.SecurityCenterClient() - - notification_config_name = ( - f"{parent_id}/notificationConfigs/{notification_config_id}" - ) - - client.delete_notification_config(request={"name": notification_config_name}) - print(f"Deleted notification config: {notification_config_name}") - # [END securitycenter_delete_notification_config] - return True - - -# [START securitycenter_get_notification_config] -def get_notification_config(parent_id, notification_config_id): - """ - Args: - parent_id: must be in one of the following formats: - "organizations/{organization_id}" - "projects/{project_id}" - "folders/{folder_id}" - notification_config_id: "your-config-id" - """ - from google.cloud import securitycenter as securitycenter - - client = securitycenter.SecurityCenterClient() - - notification_config_name = ( - f"{parent_id}/notificationConfigs/{notification_config_id}" - ) - - notification_config = client.get_notification_config( - request={"name": notification_config_name} - ) - print(f"Got notification config: {notification_config}") - # [END securitycenter_get_notification_config] - return notification_config - - -# [START securitycenter_list_notification_configs] -def list_notification_configs(parent_id): - """ - Args: - parent_id: must be in one of the following formats: - "organizations/{organization_id}" - "projects/{project_id}" - "folders/{folder_id}" - """ - from google.cloud import securitycenter as securitycenter - - client = securitycenter.SecurityCenterClient() - - notification_configs_iterator = client.list_notification_configs( - request={"parent": parent_id} - ) - for i, config in enumerate(notification_configs_iterator): - print(f"{i}: notification_config: {config}") - # [END securitycenter_list_notification_configs]] - return notification_configs_iterator - - -# [START securitycenter_update_notification_config] -def update_notification_config(parent_id, notification_config_id, pubsub_topic): - """ - Args: - parent_id: must be in one of the following formats: - "organizations/{organization_id}" - "projects/{project_id}" - "folders/{folder_id}" - notification_config_id: "config-id-to-update" - pubsub_topic: "projects/{new-project}/topics/{new-topic}" - - If updating a pubsub_topic, ensure this ServiceAccount has the - "pubsub.topics.setIamPolicy" permission on the new topic. - """ - from google.cloud import securitycenter as securitycenter - from google.protobuf import field_mask_pb2 - - client = securitycenter.SecurityCenterClient() - - notification_config_name = ( - f"{parent_id}/notificationConfigs/{notification_config_id}" - ) - - updated_description = "New updated description" - updated_filter = 'state = "INACTIVE"' - - # Only description and pubsub_topic can be updated. - field_mask = field_mask_pb2.FieldMask( - paths=["description", "pubsub_topic", "streaming_config.filter"] - ) - - updated_notification_config = client.update_notification_config( - request={ - "notification_config": { - "name": notification_config_name, - "description": updated_description, - "pubsub_topic": pubsub_topic, - "streaming_config": {"filter": updated_filter}, - }, - "update_mask": field_mask, - } - ) - - print(updated_notification_config) - # [END securitycenter_update_notification_config] - return updated_notification_config diff --git a/samples/snippets/snippets_notification_receiver.py b/samples/snippets/snippets_notification_receiver.py deleted file mode 100644 index 9c4368a0..00000000 --- a/samples/snippets/snippets_notification_receiver.py +++ /dev/null @@ -1,58 +0,0 @@ -#!/usr/bin/env python -# -# Copyright 2020 Google LLC -# -# Licensed under the Apache License, Version 2.0 (the "License"); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at -# -# https://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. -"""Demo for receiving notifications.""" - - -def receive_notifications(project_id, subscription_name): - # [START securitycenter_receive_notifications] - # Requires https://cloud.google.com/pubsub/docs/quickstart-client-libraries#pubsub-client-libraries-python - import concurrent - - from google.cloud import pubsub_v1 - from google.cloud.securitycenter_v1 import NotificationMessage - - # TODO: project_id = "your-project-id" - # TODO: subscription_name = "your-subscription-name" - - def callback(message): - - # Print the data received for debugging purpose if needed - print(f"Received message: {message.data}") - - notification_msg = NotificationMessage.from_json(message.data) - - print( - "Notification config name: {}".format( - notification_msg.notification_config_name - ) - ) - print("Finding: {}".format(notification_msg.finding)) - - # Ack the message to prevent it from being pulled again - message.ack() - - subscriber = pubsub_v1.SubscriberClient() - subscription_path = subscriber.subscription_path(project_id, subscription_name) - - streaming_pull_future = subscriber.subscribe(subscription_path, callback=callback) - - print("Listening for messages on {}...\n".format(subscription_path)) - try: - streaming_pull_future.result(timeout=1) # Block for 1 second - except concurrent.futures.TimeoutError: - streaming_pull_future.cancel() - # [END securitycenter_receive_notifications] - return True diff --git a/samples/snippets/snippets_notification_test.py b/samples/snippets/snippets_notification_test.py deleted file mode 100644 index 2cc6d262..00000000 --- a/samples/snippets/snippets_notification_test.py +++ /dev/null @@ -1,152 +0,0 @@ -#!/usr/bin/env python -# -# Copyright 2020 Google LLC -# -# Licensed under the Apache License, Version 2.0 (the "License"); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at -# -# https://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. -"""Tests for snippets.""" - -import os -import uuid - -from google.cloud import securitycenter as securitycenter -import pytest - -import snippets_notification_configs -import snippets_notification_receiver - -ORG_ID = os.environ["GCLOUD_ORGANIZATION"] -PROJECT_ID = os.environ["GCLOUD_PROJECT"] -PUBSUB_TOPIC = os.environ["GCLOUD_PUBSUB_TOPIC"] -PUBSUB_SUBSCRIPTION = os.environ["GCLOUD_PUBSUB_SUBSCRIPTION"] - -CREATE_CONFIG_ID = "new-notification-pytest" + str(uuid.uuid1()) -DELETE_CONFIG_ID = "new-notification-pytest" + str(uuid.uuid1()) -GET_CONFIG_ID = "new-notification-pytest" + str(uuid.uuid1()) -UPDATE_CONFIG_ID = "new-notification-pytest" + str(uuid.uuid1()) - - -def cleanup_notification_config(notification_config_id): - client = securitycenter.SecurityCenterClient() - - notification_config_name = ( - "organizations/{org_id}/notificationConfigs/{config_id}".format( - org_id=ORG_ID, config_id=notification_config_id - ) - ) - client.delete_notification_config(request={"name": notification_config_name}) - - -@pytest.fixture -def new_notification_config_for_update(): - client = securitycenter.SecurityCenterClient() - - org_name = "organizations/{org_id}".format(org_id=ORG_ID) - - created_notification_config = client.create_notification_config( - request={ - "parent": org_name, - "config_id": UPDATE_CONFIG_ID, - "notification_config": { - "description": "Notification for active findings", - "pubsub_topic": PUBSUB_TOPIC, - "streaming_config": {"filter": ""}, - }, - } - ) - yield created_notification_config - cleanup_notification_config(UPDATE_CONFIG_ID) - - -@pytest.fixture -def new_notification_config_for_get(): - client = securitycenter.SecurityCenterClient() - - org_name = "organizations/{org_id}".format(org_id=ORG_ID) - - created_notification_config = client.create_notification_config( - request={ - "parent": org_name, - "config_id": GET_CONFIG_ID, - "notification_config": { - "description": "Notification for active findings", - "pubsub_topic": PUBSUB_TOPIC, - "streaming_config": {"filter": ""}, - }, - } - ) - yield created_notification_config - cleanup_notification_config(GET_CONFIG_ID) - - -@pytest.fixture -def deleted_notification_config(): - client = securitycenter.SecurityCenterClient() - - org_name = "organizations/{org_id}".format(org_id=ORG_ID) - - created_notification_config = client.create_notification_config( - request={ - "parent": org_name, - "config_id": DELETE_CONFIG_ID, - "notification_config": { - "description": "Notification for active findings", - "pubsub_topic": PUBSUB_TOPIC, - "streaming_config": {"filter": ""}, - }, - } - ) - return created_notification_config - - -def test_create_notification_config(): - created_notification_config = ( - snippets_notification_configs.create_notification_config( - f"organizations/{ORG_ID}", CREATE_CONFIG_ID, PUBSUB_TOPIC - ) - ) - assert created_notification_config is not None - - cleanup_notification_config(CREATE_CONFIG_ID) - - -def test_delete_notification_config(deleted_notification_config): - assert snippets_notification_configs.delete_notification_config( - f"organizations/{ORG_ID}", DELETE_CONFIG_ID - ) - - -def test_get_notification_config(new_notification_config_for_get): - retrieved_config = snippets_notification_configs.get_notification_config( - f"organizations/{ORG_ID}", GET_CONFIG_ID - ) - assert retrieved_config is not None - - -def test_list_notification_configs(): - iterator = snippets_notification_configs.list_notification_configs( - f"organizations/{ORG_ID}" - ) - assert iterator is not None - - -def test_update_notification_config(new_notification_config_for_update): - updated_config = snippets_notification_configs.update_notification_config( - f"organizations/{ORG_ID}", UPDATE_CONFIG_ID, PUBSUB_TOPIC - ) - assert updated_config is not None - - -def test_receive_notifications(): - assert snippets_notification_receiver.receive_notifications( - PROJECT_ID, PUBSUB_SUBSCRIPTION - ) diff --git a/samples/snippets/snippets_orgs.py b/samples/snippets/snippets_orgs.py deleted file mode 100644 index 1164b639..00000000 --- a/samples/snippets/snippets_orgs.py +++ /dev/null @@ -1,63 +0,0 @@ -#!/usr/bin/env python -# -# Copyright 2019 Google LLC -# -# Licensed under the Apache License, Version 2.0 (the "License"); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at -# -# https://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. -"""Examples for working with organization settings. """ - - -def get_settings(organization_id): - """Example showing how to retreive current organization settings.""" - # [START securitycenter_get_org_settings] - from google.cloud import securitycenter - - client = securitycenter.SecurityCenterClient() - # organization_id is numeric ID for the organization. e.g. - # organization_id = "111112223333" - - org_settings_name = client.organization_settings_path(organization_id) - - org_settings = client.get_organization_settings(request={"name": org_settings_name}) - print(org_settings) - # [END securitycenter_get_org_settings] - - -def update_asset_discovery_org_settings(organization_id): - """Example showing how to update the asset discovery configuration - for an organization.""" - # [START securitycenter_enable_asset_discovery] - from google.cloud import securitycenter - from google.protobuf import field_mask_pb2 - - # Create the client - client = securitycenter.SecurityCenterClient() - # organization_id is numeric ID for the organization. e.g. - # organization_id = "111112223333" - org_settings_name = "organizations/{org_id}/organizationSettings".format( - org_id=organization_id - ) - # Only update the enable_asset_discovery_value (leave others untouched). - field_mask = field_mask_pb2.FieldMask(paths=["enable_asset_discovery"]) - # Call the service. - updated = client.update_organization_settings( - request={ - "organization_settings": { - "name": org_settings_name, - "enable_asset_discovery": True, - }, - "update_mask": field_mask, - } - ) - print("Asset Discovery Enabled? {}".format(updated.enable_asset_discovery)) - # [END securitycenter_enable_asset_discovery] - return updated diff --git a/samples/snippets/snippets_orgs_test.py b/samples/snippets/snippets_orgs_test.py deleted file mode 100644 index 4f2a7c7f..00000000 --- a/samples/snippets/snippets_orgs_test.py +++ /dev/null @@ -1,36 +0,0 @@ -#!/usr/bin/env python -# -# Copyright 2019 Google LLC -# -# Licensed under the Apache License, Version 2.0 (the "License"); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at -# -# https://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. -"""Examples for working with organization settings. """ -import os - -import pytest - -import snippets_orgs - - -@pytest.fixture(scope="module") -def organization_id(): - """Get Organization ID from the environment variable""" - return os.environ["GCLOUD_ORGANIZATION"] - - -def test_get_settings(organization_id): - snippets_orgs.get_settings(organization_id) - - -def test_update_asset_discovery_org_settings(organization_id): - updated = snippets_orgs.update_asset_discovery_org_settings(organization_id) - assert updated.enable_asset_discovery diff --git a/samples/snippets/snippets_security_marks.py b/samples/snippets/snippets_security_marks.py deleted file mode 100644 index 457cc433..00000000 --- a/samples/snippets/snippets_security_marks.py +++ /dev/null @@ -1,200 +0,0 @@ -#!/usr/bin/env python -# -# Copyright 2019 Google LLC -# -# Licensed under the Apache License, Version 2.0 (the "License"); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at -# -# https://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. -"""Demos for working with security marks.""" - - -def add_to_asset(asset_name): - """Add new security marks to an asset.""" - # [START securitycenter_add_security_marks] - from google.cloud import securitycenter - from google.protobuf import field_mask_pb2 - - # Create a new client. - client = securitycenter.SecurityCenterClient() - - # asset_name is the resource path for an asset that exists in CSCC. - # Its format is "organization/{organization_id}/assets/{asset_id} - # e.g.: - # asset_name = organizations/123123342/assets/12312321 - marks_name = "{}/securityMarks".format(asset_name) - - # Notice the suffix after "marks." in the field mask matches the keys - # in marks. - field_mask = field_mask_pb2.FieldMask(paths=["marks.key_a", "marks.key_b"]) - marks = {"key_a": "value_a", "key_b": "value_b"} - - updated_marks = client.update_security_marks( - request={ - "security_marks": {"name": marks_name, "marks": marks}, - "update_mask": field_mask, - } - ) - print(updated_marks) - # [END securitycenter_add_security_marks] - return updated_marks, marks - - -def clear_from_asset(asset_name): - """Removes security marks from an asset.""" - # Make sure they are there first - add_to_asset(asset_name) - # [START securitycenter_delete_security_marks] - from google.cloud import securitycenter - from google.protobuf import field_mask_pb2 - - # Create a new client. - client = securitycenter.SecurityCenterClient() - - # asset_name is the resource path for an asset that exists in CSCC. - # Its format is "organization/{organization_id}/assets/{asset_id} - # e.g.: - # asset_name = organizations/123123342/assets/12312321 - marks_name = "{}/securityMarks".format(asset_name) - - field_mask = field_mask_pb2.FieldMask(paths=["marks.key_a", "marks.key_b"]) - - updated_marks = client.update_security_marks( - request={ - "security_marks": { - "name": marks_name - # Note, no marks specified, so the specified values in - # the fields masks will be deleted. - }, - "update_mask": field_mask, - } - ) - print(updated_marks) - # [END securitycenter_delete_security_marks] - return updated_marks - - -def delete_and_update_marks(asset_name): - """Updates and deletes security marks from an asset in the same call.""" - # Make sure they are there first - add_to_asset(asset_name) - # [START securitycenter_add_delete_security_marks] - from google.cloud import securitycenter - from google.protobuf import field_mask_pb2 - - client = securitycenter.SecurityCenterClient() - # asset_name is the resource path for an asset that exists in CSCC. - # Its format is "organization/{organization_id}/assets/{asset_id} - # e.g.: - # asset_name = organizations/123123342/assets/12312321 - marks_name = "{}/securityMarks".format(asset_name) - - field_mask = field_mask_pb2.FieldMask(paths=["marks.key_a", "marks.key_b"]) - marks = {"key_a": "new_value_for_a"} - - updated_marks = client.update_security_marks( - request={ - "security_marks": {"name": marks_name, "marks": marks}, - "update_mask": field_mask, - } - ) - print(updated_marks) - # [END securitycenter_add_delete_security_marks] - return updated_marks - - -def add_to_finding(finding_name): - """Adds security marks to a finding.""" - # [START securitycenter_add_finding_security_marks] - from google.cloud import securitycenter - from google.protobuf import field_mask_pb2 - - client = securitycenter.SecurityCenterClient() - # finding_name is the resource path for a finding that exists in CSCC. - # Its format is - # "organizations/{org_id}/sources/{source_id}/findings/{finding_id}" - # e.g.: - # finding_name = "organizations/1112/sources/1234/findings/findingid" - finding_marks_name = "{}/securityMarks".format(finding_name) - - # Notice the suffix after "marks." in the field mask matches the keys - # in marks. - field_mask = field_mask_pb2.FieldMask( - paths=["marks.finding_key_a", "marks.finding_key_b"] - ) - marks = {"finding_key_a": "value_a", "finding_key_b": "value_b"} - - updated_marks = client.update_security_marks( - request={ - "security_marks": {"name": finding_marks_name, "marks": marks}, - "update_mask": field_mask, - } - ) - # [END securitycenter_add_finding_security_marks] - return updated_marks, marks - - -def list_assets_with_query_marks(organization_id, asset_name): - """Lists assets with a filter on security marks.""" - add_to_asset(asset_name) - i = -1 - # [START securitycenter_list_assets_with_security_marks] - from google.cloud import securitycenter - - client = securitycenter.SecurityCenterClient() - - # organization_id is the numeric ID of the organization. - # organization_id=1234567777 - org_name = "organizations/{org_id}".format(org_id=organization_id) - - marks_filter = 'security_marks.marks.key_a = "value_a"' - # Call the API and print results. - asset_iterator = client.list_assets( - request={"parent": org_name, "filter": marks_filter} - ) - - # Call the API and print results. - asset_iterator = client.list_assets( - request={"parent": org_name, "filter": marks_filter} - ) - for i, asset_result in enumerate(asset_iterator): - print(i, asset_result) - # [END securitycenter_list_assets_with_security_marks] - return i - - -def list_findings_with_query_marks(source_name, finding_name): - """Lists findings with a filter on security marks.""" - # ensure marks are set on finding. - add_to_finding(finding_name) - i = -1 - # [START securitycenter_list_findings_with_security_marks] - from google.cloud import securitycenter - - client = securitycenter.SecurityCenterClient() - - # source_name is the resource path for a source that has been - # created previously (you can use list_sources to find a specific one). - # Its format is: - # source_name = "organizations/{organization_id}/sources/{source_id}" - # e.g.: - # source_name = "organizations/111122222444/sources/1234" - marks_filter = 'NOT security_marks.marks.finding_key_a="value_a"' - - # Call the API and print results. - finding_iterator = client.list_findings( - request={"parent": source_name, "filter": marks_filter} - ) - for i, finding_result in enumerate(finding_iterator): - print(i, finding_result) - # [END securitycenter_list_findings_with_security_marks] - # one finding should have been updated with keys, and one should be - # untouched. - return i diff --git a/samples/snippets/snippets_security_marks_test.py b/samples/snippets/snippets_security_marks_test.py deleted file mode 100644 index 01666f59..00000000 --- a/samples/snippets/snippets_security_marks_test.py +++ /dev/null @@ -1,148 +0,0 @@ -#!/usr/bin/env python -# -# Copyright 2019 Google LLC -# -# Licensed under the Apache License, Version 2.0 (the "License"); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at -# -# https://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. -"""Demos for working with security marks.""" -import os -import random - -import pytest - -import snippets_security_marks - - -@pytest.fixture(scope="module") -def organization_id(): - """Gets Organization ID from the environment variable""" - return os.environ["GCLOUD_ORGANIZATION"] - - -@pytest.fixture(scope="module") -def asset_name(organization_id): - """Returns a random asset name from existing assets.""" - from google.cloud import securitycenter - - client = securitycenter.SecurityCenterClient() - # organization_id is the numeric ID of the organization. - # organization_id=1234567777 - org_name = "organizations/{org_id}".format(org_id=organization_id) - assets = list(client.list_assets(request={"parent": org_name})) - # Select a random asset to avoid collision between integration tests. - asset = (random.sample(assets, 1)[0]).asset.name - - # Set fresh marks. - update = client.update_security_marks( - request={ - "security_marks": { - "name": "{}/securityMarks".format(asset), - "marks": {"other": "other_val"}, - } - } - ) - assert update.marks == {"other": "other_val"} - return asset - - -@pytest.fixture(scope="module") -def source_name(organization_id): - """Creates a new source in the organization.""" - from google.cloud import securitycenter - - client = securitycenter.SecurityCenterClient() - org_name = "organizations/{org_id}".format(org_id=organization_id) - source = client.create_source( - request={ - "parent": org_name, - "source": { - "display_name": "Security marks Unit test source", - "description": "A new custom source that does X", - }, - } - ) - return source.name - - -@pytest.fixture(scope="module") -def finding_name(source_name): - """Creates a new finding and returns it name.""" - from google.cloud import securitycenter - from google.cloud.securitycenter_v1 import Finding - from google.protobuf.timestamp_pb2 import Timestamp - - client = securitycenter.SecurityCenterClient() - - now_proto = Timestamp() - now_proto.GetCurrentTime() - - finding = client.create_finding( - request={ - "parent": source_name, - "finding_id": "scfinding", - "finding": { - "state": Finding.State.ACTIVE, - "category": "C1", - "event_time": now_proto, - "resource_name": "//cloudresourcemanager.googleapis.com/organizations/1234", - }, - } - ) - client.create_finding( - request={ - "parent": source_name, - "finding_id": "untouched", - "finding": { - "state": Finding.State.ACTIVE, - "category": "MEDIUM_RISK_ONE", - "event_time": now_proto, - "resource_name": "//cloudresourcemanager.googleapis.com/organizations/1234", - }, - } - ) - - return finding.name - - -def test_add_to_asset(asset_name): - updated_marks, marks = snippets_security_marks.add_to_asset(asset_name) - assert updated_marks.marks.keys() >= marks.keys() - - -def test_clear_from_asset(asset_name): - updated_marks = snippets_security_marks.clear_from_asset(asset_name) - assert "other" in updated_marks.marks - assert len(updated_marks.marks) == 1 - - -def test_delete_and_update_marks(asset_name): - updated_marks = snippets_security_marks.delete_and_update_marks(asset_name) - assert updated_marks.marks == {"key_a": "new_value_for_a", "other": "other_val"} - - -def test_add_to_finding(finding_name): - updated_marks, marks = snippets_security_marks.add_to_finding(finding_name) - assert updated_marks.marks == marks - - -def test_list_assets_with_query_marks(organization_id, asset_name): - count = snippets_security_marks.list_assets_with_query_marks( - organization_id, asset_name - ) - assert count >= 0 - - -def test_list_findings_with_query_marks(source_name, finding_name): - count = snippets_security_marks.list_findings_with_query_marks( - source_name, finding_name - ) - assert count == 0