Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
124 changes: 71 additions & 53 deletions bigtable/google/cloud/bigtable/row_data.py
Original file line number Diff line number Diff line change
Expand Up @@ -221,6 +221,69 @@ def __eq__(self, other):
def __ne__(self, other):
return not self == other

def __iter__(self):
return self._consume_next(True)

def _consume_next(self, yield_=False):
""" Helper for consume_next.

:type yield_: bool
:param yield_: if True, yields rows as they complete,
else finish iteration of the response_iterator
"""
while True:
response = six.next(self._response_iterator)
self._counter += 1

if self._last_scanned_row_key is None: # first response
if response.last_scanned_row_key:
raise InvalidReadRowsResponse()

self._last_scanned_row_key = response.last_scanned_row_key

row = self._row
cell = self._cell

for chunk in response.chunks:

self._validate_chunk(chunk)

if chunk.reset_row:
row = self._row = None
cell = self._cell = self._previous_cell = None
continue

if row is None:
row = self._row = PartialRowData(chunk.row_key)

if cell is None:
qualifier = None
if chunk.HasField('qualifier'):
qualifier = chunk.qualifier.value

cell = self._cell = PartialCellData(
chunk.row_key,
chunk.family_name.value,
qualifier,
chunk.timestamp_micros,
chunk.labels,
chunk.value)
self._copy_from_previous(cell)
else:
cell.append_value(chunk.value)

if chunk.commit_row:
self._save_current_row()
if yield_:
yield self._previous_row
row = cell = None

This comment was marked as spam.

continue

if chunk.value_size == 0:
self._save_current_cell()
cell = None
break

@property
def state(self):
"""State machine state.
Expand Down Expand Up @@ -262,54 +325,10 @@ def consume_next(self):
Parse the response and its chunks into a new/existing row in
:attr:`_rows`. Rows are returned in order by row key.
"""
response = six.next(self._response_iterator)
self._counter += 1

if self._last_scanned_row_key is None: # first response
if response.last_scanned_row_key:
raise InvalidReadRowsResponse()

self._last_scanned_row_key = response.last_scanned_row_key

row = self._row
cell = self._cell

for chunk in response.chunks:

self._validate_chunk(chunk)

if chunk.reset_row:
row = self._row = None
cell = self._cell = self._previous_cell = None
continue

if row is None:
row = self._row = PartialRowData(chunk.row_key)

if cell is None:
qualifier = None
if chunk.HasField('qualifier'):
qualifier = chunk.qualifier.value

cell = self._cell = PartialCellData(
chunk.row_key,
chunk.family_name.value,
qualifier,
chunk.timestamp_micros,
chunk.labels,
chunk.value)
self._copy_from_previous(cell)
else:
cell.append_value(chunk.value)

if chunk.commit_row:
self._save_current_row()
row = cell = None
continue

if chunk.value_size == 0:
self._save_current_cell()
cell = None
try:
next(self._consume_next(False))
except StopIteration:
return False

def consume_all(self, max_loops=None):
"""Consume the streamed responses until there are no more.
Expand All @@ -324,13 +343,12 @@ def consume_all(self, max_loops=None):
"""
curr_loop = 0
if max_loops is None:
max_loops = float('inf')
while True:
if self.consume_next() is False: # guard against None
return
while curr_loop < max_loops:
curr_loop += 1
try:
self.consume_next()
except StopIteration:
break
self.consume_next()

@staticmethod
def _validate_chunk_status(chunk):
Expand Down
17 changes: 17 additions & 0 deletions bigtable/tests/system.py
Original file line number Diff line number Diff line change
Expand Up @@ -405,6 +405,23 @@ def test_read_row(self):
}
self.assertEqual(partial_row_data.cells, expected_row_contents)

def test_read_rows_iter(self):
row = self._table.row(ROW_KEY)
row_alt = self._table.row(ROW_KEY_ALT)
self.rows_to_delete.extend([row, row_alt])

cell1, cell2, cell3, cell4 = self._write_to_row(row, row_alt,
row, row_alt)
row.commit()
row_alt.commit()
keys = [ROW_KEY, ROW_KEY_ALT]
rows_data = self._table.read_rows()
self.assertEqual(rows_data.rows, {})
for data, key in zip(rows_data, keys):
self.assertEqual(data.row_key, key)
self.assertEqual(data, self._table.read_row(key))
self.assertEqual(data.cells, self._table.read_row(key).cells)

def test_read_rows(self):
row = self._table.row(ROW_KEY)
row_alt = self._table.row(ROW_KEY_ALT)
Expand Down
46 changes: 40 additions & 6 deletions bigtable/tests/unit/test_row_data.py
Original file line number Diff line number Diff line change
Expand Up @@ -203,9 +203,12 @@ def __init__(self, *args, **kwargs):
self._consumed = []

def consume_next(self):
value = self._response_iterator.next()
self._consumed.append(value)
return value
try:
value = self._response_iterator.next()
self._consumed.append(value)
return value
except StopIteration:
return False

return FakePartialRowsData

Expand Down Expand Up @@ -522,6 +525,21 @@ def test_invalid_last_row_missing_commit(self):

# Non-error cases

def test_iter(self):
values = [mock.Mock()] * 3
chunks, results = self._load_json_test('two rows')

for value in values:
value.chunks = chunks
response_iterator = _MockCancellableIterator(*values)

partial_rows = self._make_one(response_iterator)
partial_rows._last_scanned_row_key = 'BEFORE'

for data, value in zip(partial_rows, results):
flattened = self._sort_flattend_cells(_flatten_cells(data))
self.assertEqual(flattened[0], value)

_marker = object()

def _match_results(self, testcase_name, expected_result=_marker):
Expand Down Expand Up @@ -641,12 +659,28 @@ def _flatten_cells(prd):
from google.cloud._helpers import _bytes_to_unicode
from google.cloud._helpers import _microseconds_from_datetime

for row_key, row in prd.rows.items():
for family_name, family in row.cells.items():
try:
# Flatten PartialRowsData
for row_key, row in prd.rows.items():
for family_name, family in row.cells.items():
for qualifier, column in family.items():
for cell in column:
yield {
u'rk': _bytes_to_unicode(row_key),
u'fm': family_name,
u'qual': _bytes_to_unicode(qualifier),
u'ts': _microseconds_from_datetime(cell.timestamp),
u'value': _bytes_to_unicode(cell.value),
u'label': u' '.join(cell.labels),
u'error': False,
}
except AttributeError:
# Flatten PartialRowData
for family_name, family in prd.cells.items():
for qualifier, column in family.items():
for cell in column:
yield {
u'rk': _bytes_to_unicode(row_key),
u'rk': _bytes_to_unicode(prd.row_key),
u'fm': family_name,
u'qual': _bytes_to_unicode(qualifier),
u'ts': _microseconds_from_datetime(cell.timestamp),

This comment was marked as spam.

Expand Down