-
Notifications
You must be signed in to change notification settings - Fork 322
fix: validate job_config.source_format in load_table_from_dataframe #262
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Merged
Merged
Changes from 1 commit
Commits
Show all changes
2 commits
Select commit
Hold shift + click to select a range
File filter
Filter by extension
Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
There are no files selected for viewing
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| Original file line number | Diff line number | Diff line change | ||||
|---|---|---|---|---|---|---|
|
|
@@ -2174,7 +2174,15 @@ def load_table_from_dataframe( | |||||
| else: | ||||||
| job_config = job.LoadJobConfig() | ||||||
|
|
||||||
| job_config.source_format = job.SourceFormat.PARQUET | ||||||
| if job_config.source_format: | ||||||
| if job_config.source_format is not job.SourceFormat.PARQUET: | ||||||
| raise ValueError( | ||||||
| "Cannot pass `{}` as a ``source_format``, currently PARQUET is supported".format( | ||||||
|
||||||
| "Cannot pass `{}` as a ``source_format``, currently PARQUET is supported".format( | |
| "Got unexpected source_format: '{}'. Currently, only PARQUET is supported".format( |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| Original file line number | Diff line number | Diff line change | ||||
|---|---|---|---|---|---|---|
|
|
@@ -7544,7 +7544,7 @@ def test_load_table_from_dataframe_w_client_location(self): | |||||
|
|
||||||
| @unittest.skipIf(pandas is None, "Requires `pandas`") | ||||||
| @unittest.skipIf(pyarrow is None, "Requires `pyarrow`") | ||||||
| def test_load_table_from_dataframe_w_custom_job_config(self): | ||||||
| def test_load_table_from_dataframe_w_custom_job_config_wihtout_source_format(self): | ||||||
| from google.cloud.bigquery.client import _DEFAULT_NUM_RETRIES | ||||||
| from google.cloud.bigquery import job | ||||||
| from google.cloud.bigquery.schema import SchemaField | ||||||
|
|
@@ -7553,7 +7553,7 @@ def test_load_table_from_dataframe_w_custom_job_config(self): | |||||
| records = [{"id": 1, "age": 100}, {"id": 2, "age": 60}] | ||||||
| dataframe = pandas.DataFrame(records) | ||||||
| job_config = job.LoadJobConfig( | ||||||
| write_disposition=job.WriteDisposition.WRITE_TRUNCATE | ||||||
| write_disposition=job.WriteDisposition.WRITE_TRUNCATE, | ||||||
| ) | ||||||
| original_config_copy = copy.deepcopy(job_config) | ||||||
|
|
||||||
|
|
@@ -7595,6 +7595,80 @@ def test_load_table_from_dataframe_w_custom_job_config(self): | |||||
| # the original config object should not have been modified | ||||||
| assert job_config.to_api_repr() == original_config_copy.to_api_repr() | ||||||
|
|
||||||
| @unittest.skipIf(pandas is None, "Requires `pandas`") | ||||||
| @unittest.skipIf(pyarrow is None, "Requires `pyarrow`") | ||||||
| def test_load_table_from_dataframe_w_custom_job_config_w_source_format(self): | ||||||
| from google.cloud.bigquery.client import _DEFAULT_NUM_RETRIES | ||||||
| from google.cloud.bigquery import job | ||||||
| from google.cloud.bigquery.schema import SchemaField | ||||||
|
|
||||||
| client = self._make_client() | ||||||
| records = [{"id": 1, "age": 100}, {"id": 2, "age": 60}] | ||||||
| dataframe = pandas.DataFrame(records) | ||||||
| job_config = job.LoadJobConfig( | ||||||
| write_disposition=job.WriteDisposition.WRITE_TRUNCATE, | ||||||
| source_format=job.SourceFormat.PARQUET, | ||||||
| ) | ||||||
| original_config_copy = copy.deepcopy(job_config) | ||||||
|
|
||||||
| get_table_patch = mock.patch( | ||||||
| "google.cloud.bigquery.client.Client.get_table", | ||||||
| autospec=True, | ||||||
| return_value=mock.Mock( | ||||||
| schema=[SchemaField("id", "INTEGER"), SchemaField("age", "INTEGER")] | ||||||
| ), | ||||||
| ) | ||||||
| load_patch = mock.patch( | ||||||
| "google.cloud.bigquery.client.Client.load_table_from_file", autospec=True | ||||||
| ) | ||||||
| with load_patch as load_table_from_file, get_table_patch as get_table: | ||||||
| client.load_table_from_dataframe( | ||||||
| dataframe, self.TABLE_REF, job_config=job_config, location=self.LOCATION | ||||||
| ) | ||||||
|
|
||||||
| # no need to fetch and inspect table schema for WRITE_TRUNCATE jobs | ||||||
| assert not get_table.called | ||||||
|
|
||||||
| load_table_from_file.assert_called_once_with( | ||||||
| client, | ||||||
| mock.ANY, | ||||||
| self.TABLE_REF, | ||||||
| num_retries=_DEFAULT_NUM_RETRIES, | ||||||
| rewind=True, | ||||||
| job_id=mock.ANY, | ||||||
| job_id_prefix=None, | ||||||
| location=self.LOCATION, | ||||||
| project=None, | ||||||
| job_config=mock.ANY, | ||||||
| ) | ||||||
|
|
||||||
| sent_config = load_table_from_file.mock_calls[0][2]["job_config"] | ||||||
| assert sent_config.source_format == job.SourceFormat.PARQUET | ||||||
| assert sent_config.write_disposition == job.WriteDisposition.WRITE_TRUNCATE | ||||||
|
|
||||||
| # the original config object should not have been modified | ||||||
| assert job_config.to_api_repr() == original_config_copy.to_api_repr() | ||||||
|
|
||||||
| @unittest.skipIf(pandas is None, "Requires `pandas`") | ||||||
| @unittest.skipIf(pyarrow is None, "Requires `pyarrow`") | ||||||
| def test_load_table_from_dataframe_w_custom_job_config_w_wrong_source_format(self): | ||||||
| from google.cloud.bigquery import job | ||||||
|
|
||||||
| client = self._make_client() | ||||||
| records = [{"id": 1, "age": 100}, {"id": 2, "age": 60}] | ||||||
| dataframe = pandas.DataFrame(records) | ||||||
| job_config = job.LoadJobConfig( | ||||||
| write_disposition=job.WriteDisposition.WRITE_TRUNCATE, | ||||||
| source_format=job.SourceFormat.ORC, | ||||||
| ) | ||||||
|
|
||||||
| with pytest.raises(ValueError) as exc: | ||||||
| client.load_table_from_dataframe( | ||||||
| dataframe, self.TABLE_REF, job_config=job_config, location=self.LOCATION | ||||||
| ) | ||||||
|
|
||||||
| assert "Cannot pass" in str(exc.value) | ||||||
|
||||||
| assert "Cannot pass" in str(exc.value) | |
| assert "Got unexpected source_format:" in str(exc.value) |
Add this suggestion to a batch that can be applied as a single commit.
This suggestion is invalid because no changes were made to the code.
Suggestions cannot be applied while the pull request is closed.
Suggestions cannot be applied while viewing a subset of changes.
Only one suggestion per line can be applied in a batch.
Add this suggestion to a batch that can be applied as a single commit.
Applying suggestions on deleted lines is not supported.
You must change the existing code in this line in order to create a valid suggestion.
Outdated suggestions cannot be applied.
This suggestion has been applied or marked resolved.
Suggestions cannot be applied from pending reviews.
Suggestions cannot be applied on multi-line comments.
Suggestions cannot be applied while the pull request is queued to merge.
Suggestion cannot be applied right now. Please check back later.
Uh oh!
There was an error while loading. Please reload this page.