Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
66 changes: 54 additions & 12 deletions gcloud/datastore/batch.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ class Local(object):
"""Placeholder for non-threaded applications."""

from gcloud.datastore import _implicit_environ
from gcloud.datastore import helpers
from gcloud.datastore import datastore_v1_pb2 as datastore_pb


Expand Down Expand Up @@ -209,16 +210,8 @@ def put(self, entity):
if entity.key is None:
raise ValueError("Entity must have a key")

key_pb = entity.key.to_protobuf()
properties = dict(entity)
exclude = tuple(entity.exclude_from_indexes)

self.connection.save_entity(
self.dataset_id, key_pb, properties,
exclude_from_indexes=exclude, mutation=self.mutation)

if entity.key.is_partial:
self._auto_id_entities.append(entity)
_assign_entity_to_mutation(
self.mutation, entity, self._auto_id_entities)

def delete(self, key):
"""Remember a key to be deleted durring ``commit``.
Expand All @@ -232,8 +225,7 @@ def delete(self, key):
raise ValueError("Key must be complete")

key_pb = key.to_protobuf()
self.connection.delete_entities(
self.dataset_id, [key_pb], mutation=self.mutation)
helpers._add_keys_to_request(self.mutation.delete, [key_pb])

def begin(self):
"""No-op
Expand Down Expand Up @@ -279,3 +271,53 @@ def __exit__(self, exc_type, exc_val, exc_tb):
self.rollback()
finally:
_BATCHES.pop()


def _assign_entity_to_mutation(mutation_pb, entity, auto_id_entities):
"""Copy ``entity`` into appropriate slot of ``mutation_pb``.

If ``entity.key`` is incomplete, append ``entity`` to ``auto_id_entities``
for later fixup during ``commit``.

Helper method for ``Batch.put``.

:type mutation_pb: :class:`gcloud.datastore.datastore_v1_pb2.Mutation`
:param mutation_pb; the Mutation protobuf for the batch / transaction.

:type entity: :class:`gcloud.datastore.entity.Entity`
:param entity; the entity being updated within the batch / transaction.

:type auto_id_entities: list of :class:`gcloud.datastore.entity.Entity`
:param auto_id_entities: entiites with partial keys, to be fixed up
during commit.
"""
auto_id = entity.key.is_partial

key_pb = entity.key.to_protobuf()
key_pb = helpers._prepare_key_for_request(key_pb)

if auto_id:
insert = mutation_pb.insert_auto_id.add()
auto_id_entities.append(entity)
else:
# We use ``upsert`` for entities with completed keys, rather than
# ``insert`` or ``update``, in order not to create race conditions
# based on prior existence / removal of the entity.
insert = mutation_pb.upsert.add()

This comment was marked as spam.


insert.key.CopyFrom(key_pb)

for name, value in entity.items():
prop = insert.property.add()
# Set the name of the property.
prop.name = name

# Set the appropriate value.
helpers._set_protobuf_value(prop.value, value)

if name in entity.exclude_from_indexes:
if not isinstance(value, list):
prop.value.indexed = False

for sub_value in prop.value.list_value:
sub_value.indexed = False
130 changes: 85 additions & 45 deletions gcloud/datastore/test_batch.py
Original file line number Diff line number Diff line change
Expand Up @@ -99,7 +99,7 @@ def test_add_auto_id_entity_w_partial_key(self):
batch = self._makeOne(dataset_id=_DATASET, connection=connection)
entity = _Entity()
key = entity.key = _Key(_Entity)
key._partial = True
key._id = None

batch.add_auto_id_entity(entity)

Expand Down Expand Up @@ -128,35 +128,54 @@ def test_put_entity_w_partial_key(self):
batch = self._makeOne(dataset_id=_DATASET, connection=connection)
entity = _Entity(_PROPERTIES)
key = entity.key = _Key(_DATASET)
key._partial = True
key._id = None

batch.put(entity)

self.assertEqual(
connection._saved,
[(_DATASET, key._key, _PROPERTIES, (), batch.mutation)])
insert_auto_ids = list(batch.mutation.insert_auto_id)
self.assertEqual(len(insert_auto_ids), 1)
self.assertEqual(insert_auto_ids[0].key, key._key)
upserts = list(batch.mutation.upsert)
self.assertEqual(len(upserts), 0)
deletes = list(batch.mutation.delete)
self.assertEqual(len(deletes), 0)
self.assertEqual(batch._auto_id_entities, [entity])

def test_put_entity_w_completed_key(self):
_DATASET = 'DATASET'
_PROPERTIES = {'foo': 'bar'}
_PROPERTIES = {'foo': 'bar', 'baz': 'qux', 'spam': [1, 2, 3]}
connection = _Connection()
batch = self._makeOne(dataset_id=_DATASET, connection=connection)
entity = _Entity(_PROPERTIES)
entity.exclude_from_indexes = ('baz', 'spam')
key = entity.key = _Key(_DATASET)

batch.put(entity)

self.assertEqual(
connection._saved,
[(_DATASET, key._key, _PROPERTIES, (), batch.mutation)])
insert_auto_ids = list(batch.mutation.insert_auto_id)
self.assertEqual(len(insert_auto_ids), 0)
upserts = list(batch.mutation.upsert)
self.assertEqual(len(upserts), 1)

upsert = upserts[0]
self.assertEqual(upsert.key, key._key)
props = dict([(prop.name, prop.value) for prop in upsert.property])
self.assertTrue(props['foo'].indexed)
self.assertFalse(props['baz'].indexed)
self.assertTrue(props['spam'].indexed)
self.assertFalse(props['spam'].list_value[0].indexed)
self.assertFalse(props['spam'].list_value[1].indexed)
self.assertFalse(props['spam'].list_value[2].indexed)

deletes = list(batch.mutation.delete)
self.assertEqual(len(deletes), 0)

def test_delete_w_partial_key(self):
_DATASET = 'DATASET'
connection = _Connection()
batch = self._makeOne(dataset_id=_DATASET, connection=connection)
key = _Key(_DATASET)
key._partial = True
key._id = None

self.assertRaises(ValueError, batch.delete, key)

Expand All @@ -168,9 +187,13 @@ def test_delete_w_completed_key(self):

batch.delete(key)

self.assertEqual(
connection._deleted,
[(_DATASET, [key._key], batch.mutation)])
insert_auto_ids = list(batch.mutation.insert_auto_id)
self.assertEqual(len(insert_auto_ids), 0)
upserts = list(batch.mutation.upsert)
self.assertEqual(len(upserts), 0)
deletes = list(batch.mutation.delete)
self.assertEqual(len(deletes), 1)
self.assertEqual(deletes[0], key._key)

def test_commit(self):
_DATASET = 'DATASET'
Expand All @@ -188,13 +211,13 @@ def test_commit_w_auto_id_entities(self):
batch = self._makeOne(dataset_id=_DATASET, connection=connection)
entity = _Entity({})
key = entity.key = _Key(_DATASET)
key._partial = True
key._id = None
batch._auto_id_entities.append(entity)

batch.commit()

self.assertEqual(connection._committed, [(_DATASET, batch.mutation)])
self.assertFalse(key._partial)
self.assertFalse(key.is_partial)
self.assertEqual(key._id, _NEW_ID)

def test_as_context_mgr_wo_error(self):
Expand All @@ -214,9 +237,13 @@ def test_as_context_mgr_wo_error(self):

self.assertEqual(list(_BATCHES), [])

self.assertEqual(
connection._saved,
[(_DATASET, key._key, _PROPERTIES, (), batch.mutation)])
insert_auto_ids = list(batch.mutation.insert_auto_id)
self.assertEqual(len(insert_auto_ids), 0)
upserts = list(batch.mutation.upsert)
self.assertEqual(len(upserts), 1)
self.assertEqual(upserts[0].key, key._key)
deletes = list(batch.mutation.delete)
self.assertEqual(len(deletes), 0)
self.assertEqual(connection._committed, [(_DATASET, batch.mutation)])

def test_as_context_mgr_nested(self):
Expand All @@ -225,9 +252,9 @@ def test_as_context_mgr_nested(self):
_PROPERTIES = {'foo': 'bar'}
connection = _Connection()
entity1 = _Entity(_PROPERTIES)
key = entity1.key = _Key(_DATASET)
key1 = entity1.key = _Key(_DATASET)
entity2 = _Entity(_PROPERTIES)
key = entity2.key = _Key(_DATASET)
key2 = entity2.key = _Key(_DATASET)

self.assertEqual(list(_BATCHES), [])

Expand All @@ -244,11 +271,22 @@ def test_as_context_mgr_nested(self):

self.assertEqual(list(_BATCHES), [])

self.assertEqual(
connection._saved,
[(_DATASET, key._key, _PROPERTIES, (), batch1.mutation),
(_DATASET, key._key, _PROPERTIES, (), batch2.mutation)]
)
insert_auto_ids = list(batch1.mutation.insert_auto_id)
self.assertEqual(len(insert_auto_ids), 0)
upserts = list(batch1.mutation.upsert)
self.assertEqual(len(upserts), 1)
self.assertEqual(upserts[0].key, key1._key)
deletes = list(batch1.mutation.delete)
self.assertEqual(len(deletes), 0)

insert_auto_ids = list(batch2.mutation.insert_auto_id)
self.assertEqual(len(insert_auto_ids), 0)
upserts = list(batch2.mutation.upsert)
self.assertEqual(len(upserts), 1)
self.assertEqual(upserts[0].key, key2._key)
deletes = list(batch2.mutation.delete)
self.assertEqual(len(deletes), 0)

self.assertEqual(connection._committed,
[(_DATASET, batch2.mutation),
(_DATASET, batch1.mutation)])
Expand All @@ -274,9 +312,13 @@ def test_as_context_mgr_w_error(self):

self.assertEqual(list(_BATCHES), [])

self.assertEqual(
connection._saved,
[(_DATASET, key._key, _PROPERTIES, (), batch.mutation)])
insert_auto_ids = list(batch.mutation.insert_auto_id)
self.assertEqual(len(insert_auto_ids), 0)
upserts = list(batch.mutation.upsert)
self.assertEqual(len(upserts), 1)
self.assertEqual(upserts[0].key, key._key)
deletes = list(batch.mutation.delete)
self.assertEqual(len(deletes), 0)
self.assertEqual(connection._committed, [])


Expand Down Expand Up @@ -305,17 +347,6 @@ class _Connection(object):
def __init__(self, *new_keys):
self._commit_result = _CommitResult(*new_keys)
self._committed = []
self._saved = []
self._deleted = []

def save_entity(self, dataset_id, key_pb, properties,
exclude_from_indexes=(), mutation=None):
self._saved.append((dataset_id, key_pb, properties,
tuple(exclude_from_indexes), mutation))
return self._save_result

def delete_entities(self, dataset_id, key_pbs, mutation=None):
self._deleted.append((dataset_id, key_pbs, mutation))

def commit(self, dataset_id, mutation):
self._committed.append((dataset_id, mutation))
Expand All @@ -329,23 +360,32 @@ class _Entity(dict):

class _Key(object):
_MARKER = object()
_kind = 'KIND'
_key = 'KEY'
_partial = False
_path = None
_id = None
_id = 1234
_stored = None

def __init__(self, dataset_id):
self.dataset_id = dataset_id

@property
def is_partial(self):
return self._partial
return self._id is None

def to_protobuf(self):
return self._key
from gcloud.datastore import datastore_v1_pb2
key = self._key = datastore_v1_pb2.Key()
# Don't assign it, because it will just get ripped out
# key.partition_id.dataset_id = self.dataset_id

element = key.path_element.add()
element.kind = self._kind
if self._id is not None:
element.id = self._id

return key

def completed_key(self, new_id):
assert self._partial
assert self.is_partial
self._id = new_id
self._partial = False