From 123eca0d3499b89e82acafbffe6b2fc862973355 Mon Sep 17 00:00:00 2001 From: Danny Hermes Date: Thu, 18 Feb 2016 16:05:41 -0800 Subject: [PATCH] Adding Bigtable Table.read_row(). --- gcloud/bigtable/table.py | 114 +++++++++++++++++++ gcloud/bigtable/test_table.py | 201 ++++++++++++++++++++++++++++++++++ 2 files changed, 315 insertions(+) diff --git a/gcloud/bigtable/table.py b/gcloud/bigtable/table.py index 16417bbf54ec..efeb60becace 100644 --- a/gcloud/bigtable/table.py +++ b/gcloud/bigtable/table.py @@ -15,6 +15,8 @@ """User friendly container for Google Cloud Bigtable Table.""" +from gcloud._helpers import _to_bytes +from gcloud.bigtable._generated import bigtable_data_pb2 as data_pb2 from gcloud.bigtable._generated import ( bigtable_table_service_messages_pb2 as messages_pb2) from gcloud.bigtable._generated import ( @@ -22,6 +24,7 @@ from gcloud.bigtable.column_family import _gc_rule_from_pb from gcloud.bigtable.column_family import ColumnFamily from gcloud.bigtable.row import Row +from gcloud.bigtable.row_data import PartialRowData class Table(object): @@ -218,6 +221,40 @@ def list_column_families(self): result[column_family_id] = column_family return result + def read_row(self, row_key, filter_=None): + """Read a single row from this table. + + :type row_key: bytes + :param row_key: The key of the row to read from. + + :type filter_: :class:`.row.RowFilter` + :param filter_: (Optional) The filter to apply to the contents of the + row. If unset, returns the entire row. + + :rtype: :class:`.PartialRowData`, :data:`NoneType ` + :returns: The contents of the row if any chunks were returned in + the response, otherwise :data:`None`. + :raises: :class:`ValueError ` if a commit row + chunk is never encountered. + """ + request_pb = _create_row_request(self.name, row_key=row_key, + filter_=filter_) + client = self._cluster._client + response_iterator = client._data_stub.ReadRows(request_pb, + client.timeout_seconds) + # We expect an iterator of `data_messages_pb2.ReadRowsResponse` + result = PartialRowData(row_key) + for read_rows_response in response_iterator: + result.update_from_read_rows(read_rows_response) + + # Make sure the result actually contains data. + if not result._chunks_encountered: + return None + # Make sure the result was committed by the back-end. + if not result.committed: + raise ValueError('The row remains partial / is not committed.') + return result + def sample_row_keys(self): """Read a sample of row keys in the table. @@ -255,3 +292,80 @@ def sample_row_keys(self): response_iterator = client._data_stub.SampleRowKeys( request_pb, client.timeout_seconds) return response_iterator + + +def _create_row_request(table_name, row_key=None, start_key=None, end_key=None, + filter_=None, allow_row_interleaving=None, limit=None): + """Creates a request to read rows in a table. + + :type table_name: str + :param table_name: The name of the table to read from. + + :type row_key: bytes + :param row_key: (Optional) The key of a specific row to read from. + + :type start_key: bytes + :param start_key: (Optional) The beginning of a range of row keys to + read from. The range will include ``start_key``. If + left empty, will be interpreted as the empty string. + + :type end_key: bytes + :param end_key: (Optional) The end of a range of row keys to read from. + The range will not include ``end_key``. If left empty, + will be interpreted as an infinite string. + + :type filter_: :class:`.row.RowFilter`, :class:`.row.RowFilterChain`, + :class:`.row.RowFilterUnion` or + :class:`.row.ConditionalRowFilter` + :param filter_: (Optional) The filter to apply to the contents of the + specified row(s). If unset, reads the entire table. + + :type allow_row_interleaving: bool + :param allow_row_interleaving: (Optional) By default, rows are read + sequentially, producing results which are + guaranteed to arrive in increasing row + order. Setting + ``allow_row_interleaving`` to + :data:`True` allows multiple rows to be + interleaved in the response stream, + which increases throughput but breaks + this guarantee, and may force the + client to use more memory to buffer + partially-received rows. + + :type limit: int + :param limit: (Optional) The read will terminate after committing to N + rows' worth of results. The default (zero) is to return + all results. Note that if ``allow_row_interleaving`` is + set to :data:`True`, partial results may be returned for + more than N rows. However, only N ``commit_row`` chunks + will be sent. + + :rtype: :class:`data_messages_pb2.ReadRowsRequest` + :returns: The ``ReadRowsRequest`` protobuf corresponding to the inputs. + :raises: :class:`ValueError ` if both + ``row_key`` and one of ``start_key`` and ``end_key`` are set + """ + request_kwargs = {'table_name': table_name} + if (row_key is not None and + (start_key is not None or end_key is not None)): + raise ValueError('Row key and row range cannot be ' + 'set simultaneously') + if row_key is not None: + request_kwargs['row_key'] = _to_bytes(row_key) + if start_key is not None or end_key is not None: + range_kwargs = {} + if start_key is not None: + range_kwargs['start_key'] = _to_bytes(start_key) + if end_key is not None: + range_kwargs['end_key'] = _to_bytes(end_key) + row_range = data_pb2.RowRange(**range_kwargs) + request_kwargs['row_range'] = row_range + if filter_ is not None: + request_kwargs['filter'] = filter_.to_pb() + if allow_row_interleaving is not None: + request_kwargs['allow_row_interleaving'] = allow_row_interleaving + if limit is not None: + request_kwargs['num_rows_limit'] = limit + + return data_messages_pb2.ReadRowsRequest(**request_kwargs) diff --git a/gcloud/bigtable/test_table.py b/gcloud/bigtable/test_table.py index a84a67061359..1dfb438b9394 100644 --- a/gcloud/bigtable/test_table.py +++ b/gcloud/bigtable/test_table.py @@ -291,6 +291,85 @@ def test_delete(self): {}, )]) + def _read_row_helper(self, chunks): + from gcloud._testing import _Monkey + from gcloud.bigtable._generated import ( + bigtable_service_messages_pb2 as messages_pb2) + from gcloud.bigtable._testing import _FakeStub + from gcloud.bigtable.row_data import PartialRowData + from gcloud.bigtable import table as MUT + + project_id = 'project-id' + zone = 'zone' + cluster_id = 'cluster-id' + table_id = 'table-id' + timeout_seconds = 596 + client = _Client(timeout_seconds=timeout_seconds) + cluster_name = ('projects/' + project_id + '/zones/' + zone + + '/clusters/' + cluster_id) + cluster = _Cluster(cluster_name, client=client) + table = self._makeOne(table_id, cluster) + + # Create request_pb + request_pb = object() # Returned by our mock. + mock_created = [] + + def mock_create_row_request(table_name, row_key, filter_): + mock_created.append((table_name, row_key, filter_)) + return request_pb + + # Create response_iterator + row_key = b'row-key' + response_pb = messages_pb2.ReadRowsResponse(row_key=row_key, + chunks=chunks) + response_iterator = [response_pb] + + # Patch the stub used by the API method. + client._data_stub = stub = _FakeStub(response_iterator) + + # Create expected_result. + if chunks: + expected_result = PartialRowData(row_key) + expected_result._committed = True + expected_result._chunks_encountered = True + else: + expected_result = None + + # Perform the method and check the result. + filter_obj = object() + with _Monkey(MUT, _create_row_request=mock_create_row_request): + result = table.read_row(row_key, filter_=filter_obj) + + self.assertEqual(result, expected_result) + self.assertEqual(stub.method_calls, [( + 'ReadRows', + (request_pb, timeout_seconds), + {}, + )]) + self.assertEqual(mock_created, [(table.name, row_key, filter_obj)]) + + def test_read_row(self): + from gcloud.bigtable._generated import ( + bigtable_service_messages_pb2 as messages_pb2) + + chunk = messages_pb2.ReadRowsResponse.Chunk(commit_row=True) + chunks = [chunk] + self._read_row_helper(chunks) + + def test_read_empty_row(self): + chunks = [] + self._read_row_helper(chunks) + + def test_read_row_still_partial(self): + from gcloud.bigtable._generated import ( + bigtable_service_messages_pb2 as messages_pb2) + + # There is never a "commit row". + chunk = messages_pb2.ReadRowsResponse.Chunk(reset_row=True) + chunks = [chunk] + with self.assertRaises(ValueError): + self._read_row_helper(chunks) + def test_sample_row_keys(self): from gcloud.bigtable._generated import ( bigtable_service_messages_pb2 as messages_pb2) @@ -331,6 +410,128 @@ def test_sample_row_keys(self): )]) +class Test__create_row_request(unittest2.TestCase): + + def _callFUT(self, table_name, row_key=None, start_key=None, end_key=None, + filter_=None, allow_row_interleaving=None, limit=None): + from gcloud.bigtable.table import _create_row_request + return _create_row_request( + table_name, row_key=row_key, start_key=start_key, end_key=end_key, + filter_=filter_, allow_row_interleaving=allow_row_interleaving, + limit=limit) + + def test_table_name_only(self): + from gcloud.bigtable._generated import ( + bigtable_service_messages_pb2 as messages_pb2) + + table_name = 'table_name' + result = self._callFUT(table_name) + expected_result = messages_pb2.ReadRowsRequest(table_name=table_name) + self.assertEqual(result, expected_result) + + def test_row_key_row_range_conflict(self): + with self.assertRaises(ValueError): + self._callFUT(None, row_key=object(), end_key=object()) + + def test_row_key(self): + from gcloud.bigtable._generated import ( + bigtable_service_messages_pb2 as messages_pb2) + + table_name = 'table_name' + row_key = b'row_key' + result = self._callFUT(table_name, row_key=row_key) + expected_result = messages_pb2.ReadRowsRequest( + table_name=table_name, + row_key=row_key, + ) + self.assertEqual(result, expected_result) + + def test_row_range_start_key(self): + from gcloud.bigtable._generated import bigtable_data_pb2 as data_pb2 + from gcloud.bigtable._generated import ( + bigtable_service_messages_pb2 as messages_pb2) + + table_name = 'table_name' + start_key = b'start_key' + result = self._callFUT(table_name, start_key=start_key) + expected_result = messages_pb2.ReadRowsRequest( + table_name=table_name, + row_range=data_pb2.RowRange(start_key=start_key), + ) + self.assertEqual(result, expected_result) + + def test_row_range_end_key(self): + from gcloud.bigtable._generated import bigtable_data_pb2 as data_pb2 + from gcloud.bigtable._generated import ( + bigtable_service_messages_pb2 as messages_pb2) + + table_name = 'table_name' + end_key = b'end_key' + result = self._callFUT(table_name, end_key=end_key) + expected_result = messages_pb2.ReadRowsRequest( + table_name=table_name, + row_range=data_pb2.RowRange(end_key=end_key), + ) + self.assertEqual(result, expected_result) + + def test_row_range_both_keys(self): + from gcloud.bigtable._generated import bigtable_data_pb2 as data_pb2 + from gcloud.bigtable._generated import ( + bigtable_service_messages_pb2 as messages_pb2) + + table_name = 'table_name' + start_key = b'start_key' + end_key = b'end_key' + result = self._callFUT(table_name, start_key=start_key, + end_key=end_key) + expected_result = messages_pb2.ReadRowsRequest( + table_name=table_name, + row_range=data_pb2.RowRange(start_key=start_key, end_key=end_key), + ) + self.assertEqual(result, expected_result) + + def test_with_filter(self): + from gcloud.bigtable._generated import ( + bigtable_service_messages_pb2 as messages_pb2) + from gcloud.bigtable.row import RowSampleFilter + + table_name = 'table_name' + row_filter = RowSampleFilter(0.33) + result = self._callFUT(table_name, filter_=row_filter) + expected_result = messages_pb2.ReadRowsRequest( + table_name=table_name, + filter=row_filter.to_pb(), + ) + self.assertEqual(result, expected_result) + + def test_with_allow_row_interleaving(self): + from gcloud.bigtable._generated import ( + bigtable_service_messages_pb2 as messages_pb2) + + table_name = 'table_name' + allow_row_interleaving = True + result = self._callFUT(table_name, + allow_row_interleaving=allow_row_interleaving) + expected_result = messages_pb2.ReadRowsRequest( + table_name=table_name, + allow_row_interleaving=allow_row_interleaving, + ) + self.assertEqual(result, expected_result) + + def test_with_limit(self): + from gcloud.bigtable._generated import ( + bigtable_service_messages_pb2 as messages_pb2) + + table_name = 'table_name' + limit = 1337 + result = self._callFUT(table_name, limit=limit) + expected_result = messages_pb2.ReadRowsRequest( + table_name=table_name, + num_rows_limit=limit, + ) + self.assertEqual(result, expected_result) + + class _Client(object): data_stub = None