diff --git a/dlp/jobs_test.py b/dlp/jobs_test.py index 87c39d4c3cc..8f47fb4d428 100644 --- a/dlp/jobs_test.py +++ b/dlp/jobs_test.py @@ -26,7 +26,7 @@ @pytest.fixture(scope='session') -def create_test_job(): +def test_job_name(): import google.cloud.dlp dlp = google.cloud.dlp.DlpServiceClient() @@ -76,6 +76,5 @@ def test_list_dlp_jobs_with_job_type(capsys): assert 'Job: projects/' in out -def test_delete_dlp_job(capsys): - test_job_name = create_test_job() +def test_delete_dlp_job(test_job_name, capsys): jobs.delete_dlp_job(GCLOUD_PROJECT, test_job_name) diff --git a/dlp/quickstart.py b/dlp/quickstart.py index f905fbe1aaf..736d59ddd8f 100644 --- a/dlp/quickstart.py +++ b/dlp/quickstart.py @@ -59,7 +59,7 @@ def quickstart(project_id): } # Convert the project id into a full resource id. - parent = dlp_client.project_path(project) + parent = dlp_client.project_path(project_id) # Call the API. response = dlp_client.inspect_content(parent, inspect_config, item) diff --git a/dlp/quickstart_test.py b/dlp/quickstart_test.py index 924e7141c70..19c215fdbb0 100644 --- a/dlp/quickstart_test.py +++ b/dlp/quickstart_test.py @@ -29,7 +29,7 @@ def test_quickstart(capsys): google.cloud.dlp.DlpServiceClient, 'project_path', return_value='projects/{}'.format(GCLOUD_PROJECT)): - quickstart.quickstart() + quickstart.quickstart(GCLOUD_PROJECT) out, _ = capsys.readouterr() assert 'FIRST_NAME' in out diff --git a/dlp/requirements.txt b/dlp/requirements.txt index 404764b7743..7e812cb15f0 100644 --- a/dlp/requirements.txt +++ b/dlp/requirements.txt @@ -1,5 +1,5 @@ -google-cloud-dlp==0.9.0 -google-cloud-storage==1.13.0 -google-cloud-pubsub==0.38.0 -google-cloud-datastore==1.7.1 -google-cloud-bigquery==1.7.0 +google-cloud-dlp==0.10.0 +google-cloud-storage==1.13.2 +google-cloud-pubsub==0.39.1 +google-cloud-datastore==1.7.3 +google-cloud-bigquery==1.8.1 diff --git a/dlp/risk.py b/dlp/risk.py index 8512f054bce..273cfd1548d 100644 --- a/dlp/risk.py +++ b/dlp/risk.py @@ -49,8 +49,28 @@ def numerical_risk_analysis(project, table_project_id, dataset_id, table_id, # potentially long-running operations. import google.cloud.pubsub - # This sample also uses threading.Event() to wait for the job to finish. - import threading + def callback(message): + if (message.attributes['DlpJobName'] == operation.name): + # This is the message we're looking for, so acknowledge it. + message.ack() + + # Now that the job is done, fetch the results and print them. + job = dlp.get_dlp_job(operation.name) + results = job.risk_details.numerical_stats_result + print('Value Range: [{}, {}]'.format( + results.min_value.integer_value, + results.max_value.integer_value)) + prev_value = None + for percent, result in enumerate(results.quantile_values): + value = result.integer_value + if prev_value != value: + print('Value at {}% quantile: {}'.format( + percent, value)) + prev_value = value + subscription.set_result(None) + else: + # This is not the message we're looking for. + message.drop() # Instantiate a client. dlp = google.cloud.dlp.DlpServiceClient() @@ -84,56 +104,22 @@ def numerical_risk_analysis(project, table_project_id, dataset_id, table_id, 'actions': actions } - # Call API to start risk analysis job - operation = dlp.create_dlp_job(parent, risk_job=risk_job) - # Create a Pub/Sub client and find the subscription. The subscription is # expected to already be listening to the topic. subscriber = google.cloud.pubsub.SubscriberClient() subscription_path = subscriber.subscription_path( project, subscription_id) - subscription = subscriber.subscribe(subscription_path) + subscription = subscriber.subscribe(subscription_path, callback) - # Set up a callback to acknowledge a message. This closes around an event - # so that it can signal that it is done and the main thread can continue. - job_done = threading.Event() + # Call API to start risk analysis job + operation = dlp.create_dlp_job(parent, risk_job=risk_job) - def callback(message): - try: - if (message.attributes['DlpJobName'] == operation.name): - # This is the message we're looking for, so acknowledge it. - message.ack() - - # Now that the job is done, fetch the results and print them. - job = dlp.get_dlp_job(operation.name) - results = job.risk_details.numerical_stats_result - print('Value Range: [{}, {}]'.format( - results.min_value.integer_value, - results.max_value.integer_value)) - prev_value = None - for percent, result in enumerate(results.quantile_values): - value = result.integer_value - if prev_value != value: - print('Value at {}% quantile: {}'.format( - percent, value)) - prev_value = value - # Signal to the main thread that we can exit. - job_done.set() - else: - # This is not the message we're looking for. - message.drop() - except Exception as e: - # Because this is executing in a thread, an exception won't be - # noted unless we print it manually. - print(e) - raise - - # Register the callback and wait on the event. - subscription.open(callback) - finished = job_done.wait(timeout=timeout) - if not finished: + try: + subscription.result(timeout=timeout) + except TimeoutError: print('No event received before the timeout. Please verify that the ' 'subscription provided is subscribed to the topic provided.') + subscription.close() # [END dlp_numerical_stats] @@ -167,8 +153,32 @@ def categorical_risk_analysis(project, table_project_id, dataset_id, table_id, # potentially long-running operations. import google.cloud.pubsub - # This sample also uses threading.Event() to wait for the job to finish. - import threading + def callback(message): + if (message.attributes['DlpJobName'] == operation.name): + # This is the message we're looking for, so acknowledge it. + message.ack() + + # Now that the job is done, fetch the results and print them. + job = dlp.get_dlp_job(operation.name) + histogram_buckets = (job.risk_details + .categorical_stats_result + .value_frequency_histogram_buckets) + # Print bucket stats + for i, bucket in enumerate(histogram_buckets): + print('Bucket {}:'.format(i)) + print(' Most common value occurs {} time(s)'.format( + bucket.value_frequency_upper_bound)) + print(' Least common value occurs {} time(s)'.format( + bucket.value_frequency_lower_bound)) + print(' {} unique values total.'.format( + bucket.bucket_size)) + for value in bucket.bucket_values: + print(' Value {} occurs {} time(s)'.format( + value.value.integer_value, value.count)) + subscription.set_result(None) + else: + # This is not the message we're looking for. + message.drop() # Instantiate a client. dlp = google.cloud.dlp.DlpServiceClient() @@ -202,60 +212,22 @@ def categorical_risk_analysis(project, table_project_id, dataset_id, table_id, 'actions': actions } - # Call API to start risk analysis job - operation = dlp.create_dlp_job(parent, risk_job=risk_job) - # Create a Pub/Sub client and find the subscription. The subscription is # expected to already be listening to the topic. subscriber = google.cloud.pubsub.SubscriberClient() subscription_path = subscriber.subscription_path( project, subscription_id) - subscription = subscriber.subscribe(subscription_path) + subscription = subscriber.subscribe(subscription_path, callback) - # Set up a callback to acknowledge a message. This closes around an event - # so that it can signal that it is done and the main thread can continue. - job_done = threading.Event() + # Call API to start risk analysis job + operation = dlp.create_dlp_job(parent, risk_job=risk_job) - def callback(message): - try: - if (message.attributes['DlpJobName'] == operation.name): - # This is the message we're looking for, so acknowledge it. - message.ack() - - # Now that the job is done, fetch the results and print them. - job = dlp.get_dlp_job(operation.name) - histogram_buckets = (job.risk_details - .categorical_stats_result - .value_frequency_histogram_buckets) - # Print bucket stats - for i, bucket in enumerate(histogram_buckets): - print('Bucket {}:'.format(i)) - print(' Most common value occurs {} time(s)'.format( - bucket.value_frequency_upper_bound)) - print(' Least common value occurs {} time(s)'.format( - bucket.value_frequency_lower_bound)) - print(' {} unique values total.'.format( - bucket.bucket_size)) - for value in bucket.bucket_values: - print(' Value {} occurs {} time(s)'.format( - value.value.integer_value, value.count)) - # Signal to the main thread that we can exit. - job_done.set() - else: - # This is not the message we're looking for. - message.drop() - except Exception as e: - # Because this is executing in a thread, an exception won't be - # noted unless we print it manually. - print(e) - raise - - # Register the callback and wait on the event. - subscription.open(callback) - finished = job_done.wait(timeout=timeout) - if not finished: + try: + subscription.result(timeout=timeout) + except TimeoutError: print('No event received before the timeout. Please verify that the ' 'subscription provided is subscribed to the topic provided.') + subscription.close() # [END dlp_categorical_stats] @@ -288,8 +260,37 @@ def k_anonymity_analysis(project, table_project_id, dataset_id, table_id, # potentially long-running operations. import google.cloud.pubsub - # This sample also uses threading.Event() to wait for the job to finish. - import threading + # Create helper function for unpacking values + def get_values(obj): + return int(obj.integer_value) + + def callback(message): + if (message.attributes['DlpJobName'] == operation.name): + # This is the message we're looking for, so acknowledge it. + message.ack() + + # Now that the job is done, fetch the results and print them. + job = dlp.get_dlp_job(operation.name) + histogram_buckets = (job.risk_details + .k_anonymity_result + .equivalence_class_histogram_buckets) + # Print bucket stats + for i, bucket in enumerate(histogram_buckets): + print('Bucket {}:'.format(i)) + if bucket.equivalence_class_size_lower_bound: + print(' Bucket size range: [{}, {}]'.format( + bucket.equivalence_class_size_lower_bound, + bucket.equivalence_class_size_upper_bound)) + for value_bucket in bucket.bucket_values: + print(' Quasi-ID values: {}'.format( + map(get_values, value_bucket.quasi_ids_values) + )) + print(' Class size: {}'.format( + value_bucket.equivalence_class_size)) + subscription.set_result(None) + else: + # This is not the message we're looking for. + message.drop() # Instantiate a client. dlp = google.cloud.dlp.DlpServiceClient() @@ -326,65 +327,23 @@ def map_fields(field): 'source_table': source_table, 'actions': actions } - # Call API to start risk analysis job - operation = dlp.create_dlp_job(parent, risk_job=risk_job) # Create a Pub/Sub client and find the subscription. The subscription is # expected to already be listening to the topic. subscriber = google.cloud.pubsub.SubscriberClient() subscription_path = subscriber.subscription_path( project, subscription_id) - subscription = subscriber.subscribe(subscription_path) - - # Set up a callback to acknowledge a message. This closes around an event - # so that it can signal that it is done and the main thread can continue. - job_done = threading.Event() + subscription = subscriber.subscribe(subscription_path, callback) - # Create helper function for unpacking values - def get_values(obj): - return int(obj.integer_value) + # Call API to start risk analysis job + operation = dlp.create_dlp_job(parent, risk_job=risk_job) - def callback(message): - try: - if (message.attributes['DlpJobName'] == operation.name): - # This is the message we're looking for, so acknowledge it. - message.ack() - - # Now that the job is done, fetch the results and print them. - job = dlp.get_dlp_job(operation.name) - histogram_buckets = (job.risk_details - .k_anonymity_result - .equivalence_class_histogram_buckets) - # Print bucket stats - for i, bucket in enumerate(histogram_buckets): - print('Bucket {}:'.format(i)) - if bucket.equivalence_class_size_lower_bound: - print(' Bucket size range: [{}, {}]'.format( - bucket.equivalence_class_size_lower_bound, - bucket.equivalence_class_size_upper_bound)) - for value_bucket in bucket.bucket_values: - print(' Quasi-ID values: {}'.format( - map(get_values, value_bucket.quasi_ids_values) - )) - print(' Class size: {}'.format( - value_bucket.equivalence_class_size)) - # Signal to the main thread that we can exit. - job_done.set() - else: - # This is not the message we're looking for. - message.drop() - except Exception as e: - # Because this is executing in a thread, an exception won't be - # noted unless we print it manually. - print(e) - raise - - # Register the callback and wait on the event. - subscription.open(callback) - finished = job_done.wait(timeout=timeout) - if not finished: + try: + subscription.result(timeout=timeout) + except TimeoutError: print('No event received before the timeout. Please verify that the ' 'subscription provided is subscribed to the topic provided.') + subscription.close() # [END dlp_k_anonymity] @@ -419,8 +378,39 @@ def l_diversity_analysis(project, table_project_id, dataset_id, table_id, # potentially long-running operations. import google.cloud.pubsub - # This sample also uses threading.Event() to wait for the job to finish. - import threading + # Create helper function for unpacking values + def get_values(obj): + return int(obj.integer_value) + + def callback(message): + if (message.attributes['DlpJobName'] == operation.name): + # This is the message we're looking for, so acknowledge it. + message.ack() + + # Now that the job is done, fetch the results and print them. + job = dlp.get_dlp_job(operation.name) + histogram_buckets = ( + job.risk_details + .l_diversity_result + .sensitive_value_frequency_histogram_buckets) + # Print bucket stats + for i, bucket in enumerate(histogram_buckets): + print('Bucket {}:'.format(i)) + print(' Bucket size range: [{}, {}]'.format( + bucket.sensitive_value_frequency_lower_bound, + bucket.sensitive_value_frequency_upper_bound)) + for value_bucket in bucket.bucket_values: + print(' Quasi-ID values: {}'.format( + map(get_values, value_bucket.quasi_ids_values))) + print(' Class size: {}'.format( + value_bucket.equivalence_class_size)) + for value in value_bucket.top_sensitive_values: + print((' Sensitive value {} occurs {} time(s)' + .format(value.value, value.count))) + subscription.set_result(None) + else: + # This is not the message we're looking for. + message.drop() # Instantiate a client. dlp = google.cloud.dlp.DlpServiceClient() @@ -461,67 +451,22 @@ def map_fields(field): 'actions': actions } - # Call API to start risk analysis job - operation = dlp.create_dlp_job(parent, risk_job=risk_job) - # Create a Pub/Sub client and find the subscription. The subscription is # expected to already be listening to the topic. subscriber = google.cloud.pubsub.SubscriberClient() subscription_path = subscriber.subscription_path( project, subscription_id) - subscription = subscriber.subscribe(subscription_path) - - # Set up a callback to acknowledge a message. This closes around an event - # so that it can signal that it is done and the main thread can continue. - job_done = threading.Event() + subscription = subscriber.subscribe(subscription_path, callback) - # Create helper function for unpacking values - def get_values(obj): - return int(obj.integer_value) + # Call API to start risk analysis job + operation = dlp.create_dlp_job(parent, risk_job=risk_job) - def callback(message): - try: - if (message.attributes['DlpJobName'] == operation.name): - # This is the message we're looking for, so acknowledge it. - message.ack() - - # Now that the job is done, fetch the results and print them. - job = dlp.get_dlp_job(operation.name) - histogram_buckets = ( - job.risk_details - .l_diversity_result - .sensitive_value_frequency_histogram_buckets) - # Print bucket stats - for i, bucket in enumerate(histogram_buckets): - print('Bucket {}:'.format(i)) - print(' Bucket size range: [{}, {}]'.format( - bucket.sensitive_value_frequency_lower_bound, - bucket.sensitive_value_frequency_upper_bound)) - for value_bucket in bucket.bucket_values: - print(' Quasi-ID values: {}'.format( - map(get_values, value_bucket.quasi_ids_values))) - print(' Class size: {}'.format( - value_bucket.equivalence_class_size)) - for value in value_bucket.top_sensitive_values: - print((' Sensitive value {} occurs {} time(s)' - .format(value.value, value.count))) - # Signal to the main thread that we can exit. - job_done.set() - else: - # This is not the message we're looking for. - message.drop() - except Exception as e: - # Because this is executing in a thread, an exception won't be - # noted unless we print it manually. - print(e) - raise - - # Register the callback and wait on the event. - subscription.open(callback) - finished = job_done.wait(timeout=timeout) - if not finished: + try: + subscription.result(timeout=timeout) + except TimeoutError: print('No event received before the timeout. Please verify that the ' 'subscription provided is subscribed to the topic provided.') + subscription.close() # [END dlp_l_diversity] @@ -562,8 +507,35 @@ def k_map_estimate_analysis(project, table_project_id, dataset_id, table_id, # potentially long-running operations. import google.cloud.pubsub - # This sample also uses threading.Event() to wait for the job to finish. - import threading + # Create helper function for unpacking values + def get_values(obj): + return int(obj.integer_value) + + def callback(message): + if (message.attributes['DlpJobName'] == operation.name): + # This is the message we're looking for, so acknowledge it. + message.ack() + + # Now that the job is done, fetch the results and print them. + job = dlp.get_dlp_job(operation.name) + histogram_buckets = (job.risk_details + .k_map_estimation_result + .k_map_estimation_histogram) + # Print bucket stats + for i, bucket in enumerate(histogram_buckets): + print('Bucket {}:'.format(i)) + print(' Anonymity range: [{}, {}]'.format( + bucket.min_anonymity, bucket.max_anonymity)) + print(' Size: {}'.format(bucket.bucket_size)) + for value_bucket in bucket.bucket_values: + print(' Values: {}'.format( + map(get_values, value_bucket.quasi_ids_values))) + print(' Estimated k-map anonymity: {}'.format( + value_bucket.estimated_anonymity)) + subscription.set_result(None) + else: + # This is not the message we're looking for. + message.drop() # Instantiate a client. dlp = google.cloud.dlp.DlpServiceClient() @@ -607,63 +579,22 @@ def map_fields(quasi_id, info_type): 'actions': actions } - # Call API to start risk analysis job - operation = dlp.create_dlp_job(parent, risk_job=risk_job) - # Create a Pub/Sub client and find the subscription. The subscription is # expected to already be listening to the topic. subscriber = google.cloud.pubsub.SubscriberClient() subscription_path = subscriber.subscription_path( project, subscription_id) - subscription = subscriber.subscribe(subscription_path) + subscription = subscriber.subscribe(subscription_path, callback) - # Set up a callback to acknowledge a message. This closes around an event - # so that it can signal that it is done and the main thread can continue. - job_done = threading.Event() - - # Create helper function for unpacking values - def get_values(obj): - return int(obj.integer_value) + # Call API to start risk analysis job + operation = dlp.create_dlp_job(parent, risk_job=risk_job) - def callback(message): - try: - if (message.attributes['DlpJobName'] == operation.name): - # This is the message we're looking for, so acknowledge it. - message.ack() - - # Now that the job is done, fetch the results and print them. - job = dlp.get_dlp_job(operation.name) - histogram_buckets = (job.risk_details - .k_map_estimation_result - .k_map_estimation_histogram) - # Print bucket stats - for i, bucket in enumerate(histogram_buckets): - print('Bucket {}:'.format(i)) - print(' Anonymity range: [{}, {}]'.format( - bucket.min_anonymity, bucket.max_anonymity)) - print(' Size: {}'.format(bucket.bucket_size)) - for value_bucket in bucket.bucket_values: - print(' Values: {}'.format( - map(get_values, value_bucket.quasi_ids_values))) - print(' Estimated k-map anonymity: {}'.format( - value_bucket.estimated_anonymity)) - # Signal to the main thread that we can exit. - job_done.set() - else: - # This is not the message we're looking for. - message.drop() - except Exception as e: - # Because this is executing in a thread, an exception won't be - # noted unless we print it manually. - print(e) - raise - - # Register the callback and wait on the event. - subscription.open(callback) - finished = job_done.wait(timeout=timeout) - if not finished: + try: + subscription.result(timeout=timeout) + except TimeoutError: print('No event received before the timeout. Please verify that the ' 'subscription provided is subscribed to the topic provided.') + subscription.close() # [END dlp_k_map]