Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 0 additions & 1 deletion docs/configuration/cli.md
Original file line number Diff line number Diff line change
Expand Up @@ -233,4 +233,3 @@ marten - Advanced Marten operations to 'heal' event store projection issues or r
[-l, --log-level Trace|Debug|Information|Warning|Error|Critical|None] Override the log level
[--config:<prop> <value>] Overwrite individual configuration items
```

209 changes: 209 additions & 0 deletions src/EventSourcingTests/FetchForWriting/fetch_latest_async_aggregate.cs
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
using JasperFx.Events;
using Marten.Events;
using Marten.Events.Projections;
using Marten.Testing.Documents;
using Marten.Testing.Harness;
using Shouldly;
using Xunit;
Expand Down Expand Up @@ -35,6 +36,32 @@ public async Task from_no_current_activity_guid_centric()
document.CCount.ShouldBe(3);
}

[Fact]
public async Task from_no_current_activity_guid_centric_as_batch()
{
StoreOptions(opts =>
{
opts.Projections.Snapshot<SimpleAggregate>(SnapshotLifecycle.Async);
});

var streamId = Guid.NewGuid();
theSession.Events.StartStream<SimpleAggregate>(streamId, new AEvent(), new BEvent(), new BEvent(), new CEvent(),
new CEvent(), new CEvent());

await theSession.SaveChangesAsync();

await using var query = theStore.LightweightSession();
var batch = query.CreateBatchQuery();
var documentQuery = batch.Events.FetchLatest<SimpleAggregate>(streamId);
await batch.Execute();

var document = await documentQuery;

document.ACount.ShouldBe(1);
document.BCount.ShouldBe(2);
document.CCount.ShouldBe(3);
}


[Fact]
public async Task from_no_current_activity_string_centric()
Expand All @@ -59,6 +86,31 @@ public async Task from_no_current_activity_string_centric()
document.CCount.ShouldBe(3);
}

[Fact]
public async Task from_no_current_activity_string_centric_from_batch()
{
StoreOptions(opts =>
{
opts.Projections.Snapshot<SimpleAggregateAsString>(SnapshotLifecycle.Async);
opts.Events.StreamIdentity = StreamIdentity.AsString;
});

var streamId = Guid.NewGuid().ToString();
theSession.Events.StartStream<SimpleAggregateAsString>(streamId, new AEvent(), new BEvent(), new BEvent(), new CEvent(),
new CEvent(), new CEvent());

await theSession.SaveChangesAsync();

await using var query = theStore.LightweightSession();


var document = await query.Events.FetchLatest<SimpleAggregateAsString>(streamId);

document.ACount.ShouldBe(1);
document.BCount.ShouldBe(2);
document.CCount.ShouldBe(3);
}

[Fact]
public async Task from_after_fetch_for_writing_guid_centric_brand_new_1()
{
Expand All @@ -81,6 +133,34 @@ public async Task from_after_fetch_for_writing_guid_centric_brand_new_1()
aggregate.CCount.ShouldBe(3);
}

[Fact]
public async Task from_after_fetch_for_writing_guid_centric_brand_new_1_from_batch()
{
StoreOptions(opts =>
{
opts.Events.UseIdentityMapForAggregates = true;
opts.Projections.Snapshot<SimpleAggregate>(SnapshotLifecycle.Async);
});

var streamId = Guid.NewGuid();

var stream = await theSession.Events.FetchForWriting<SimpleAggregate>(streamId);
stream.AppendMany(new AEvent(), new BEvent(), new BEvent(), new CEvent(),
new CEvent(), new CEvent());
await theSession.SaveChangesAsync();

var batch = theSession.CreateBatchQuery();

var aggregateQuery = batch.Events.FetchLatest<SimpleAggregate>(streamId);
await batch.Execute();

var aggregate = await aggregateQuery;

aggregate.ACount.ShouldBe(1);
aggregate.BCount.ShouldBe(2);
aggregate.CCount.ShouldBe(3);
}

[Fact]
public async Task from_after_fetch_for_writing_guid_centric_brand_new_no_optimization()
{
Expand All @@ -103,6 +183,33 @@ public async Task from_after_fetch_for_writing_guid_centric_brand_new_no_optimiz
aggregate.CCount.ShouldBe(3);
}

[Fact]
public async Task from_after_fetch_for_writing_guid_centric_brand_new_no_optimization_from_batch()
{
StoreOptions(opts =>
{
//opts.Events.UseIdentityMapForAggregates = true;
opts.Projections.Snapshot<SimpleAggregate>(SnapshotLifecycle.Async);
});

var streamId = Guid.NewGuid();

var stream = await theSession.Events.FetchForWriting<SimpleAggregate>(streamId);
stream.AppendMany(new AEvent(), new BEvent(), new BEvent(), new CEvent(),
new CEvent(), new CEvent());
await theSession.SaveChangesAsync();

var batch = theSession.CreateBatchQuery();

var aggregateQuery = batch.Events.FetchLatest<SimpleAggregate>(streamId);
await batch.Execute();
var aggregate = await aggregateQuery;

aggregate.ACount.ShouldBe(1);
aggregate.BCount.ShouldBe(2);
aggregate.CCount.ShouldBe(3);
}


[Fact]
public async Task from_after_fetch_for_writing_guid_centric_brand_existing()
Expand Down Expand Up @@ -132,6 +239,47 @@ public async Task from_after_fetch_for_writing_guid_centric_brand_existing()
document.DCount.ShouldBe(2);
}

[Fact]
public async Task from_after_fetch_for_writing_guid_centric_brand_existing_from_batch()
{
StoreOptions(opts =>
{
opts.Projections.Snapshot<SimpleAggregate>(SnapshotLifecycle.Async);
});

var target1 = Target.Random();
var target2 = Target.Random();
theSession.Store(target1, target2);
await theSession.SaveChangesAsync();

var streamId = Guid.NewGuid();
theSession.Events.StartStream<SimpleAggregate>(streamId, new AEvent(), new BEvent(), new BEvent(), new CEvent(),
new CEvent(), new CEvent());

await theSession.SaveChangesAsync();

using var session = theStore.LightweightSession();
var stream = await session.Events.FetchForWriting<SimpleAggregate>(streamId);
stream.AppendMany(new DEvent(), new DEvent());
await session.SaveChangesAsync();

await using var query = theStore.LightweightSession();
var batch = query.CreateBatchQuery();
var targetQuery1 = batch.Load<Target>(target1.Id);
var documentQuery = batch.Events.FetchLatest<SimpleAggregate>(streamId);
var targetQuery2 = batch.Load<Target>(target2.Id);
await batch.Execute();

(await targetQuery1).ShouldNotBeNull();
var document = await documentQuery;
(await targetQuery2).ShouldNotBeNull();

document.ACount.ShouldBe(1);
document.BCount.ShouldBe(2);
document.CCount.ShouldBe(3);
document.DCount.ShouldBe(2);
}

[Fact]
public async Task from_after_fetch_for_writing_string_centric_brand_new()
{
Expand All @@ -154,6 +302,34 @@ public async Task from_after_fetch_for_writing_string_centric_brand_new()
aggregate.CCount.ShouldBe(3);
}

[Fact]
public async Task from_after_fetch_for_writing_string_centric_brand_new_with_batch()
{
StoreOptions(opts =>
{
opts.Events.StreamIdentity = StreamIdentity.AsString;
opts.Projections.Snapshot<SimpleAggregateAsString>(SnapshotLifecycle.Async);
});

var streamId = Guid.NewGuid().ToString();

var stream = await theSession.Events.FetchForWriting<SimpleAggregateAsString>(streamId);
stream.AppendMany(new AEvent(), new BEvent(), new BEvent(), new CEvent(),
new CEvent(), new CEvent());
await theSession.SaveChangesAsync();

var batch = theSession.CreateBatchQuery();

var aggregateQuery = batch.Events.FetchLatest<SimpleAggregateAsString>(streamId);
await batch.Execute();

var aggregate = await aggregateQuery;

aggregate.ACount.ShouldBe(1);
aggregate.BCount.ShouldBe(2);
aggregate.CCount.ShouldBe(3);
}

[Fact]
public async Task from_after_fetch_for_writing_string_centric_existing()
{
Expand Down Expand Up @@ -183,6 +359,38 @@ public async Task from_after_fetch_for_writing_string_centric_existing()
document.DCount.ShouldBe(2);
}

[Fact]
public async Task from_after_fetch_for_writing_string_centric_existing_with_batch()
{
StoreOptions(opts =>
{
opts.Events.StreamIdentity = StreamIdentity.AsString;
opts.Projections.Snapshot<SimpleAggregateAsString>(SnapshotLifecycle.Async);
});

var streamId = Guid.NewGuid().ToString();
theSession.Events.StartStream<SimpleAggregate>(streamId, new AEvent(), new BEvent(), new BEvent(), new CEvent(),
new CEvent(), new CEvent());

await theSession.SaveChangesAsync();

using var session = theStore.LightweightSession();
var stream = await session.Events.FetchForWriting<SimpleAggregateAsString>(streamId);
stream.AppendMany(new DEvent(), new DEvent());
await session.SaveChangesAsync();

await using var query = theStore.LightweightSession();
var batch = session.CreateBatchQuery();
var documentQuery = batch.Events.FetchLatest<SimpleAggregateAsString>(streamId);
await batch.Execute();
var document = await documentQuery;

document.ACount.ShouldBe(1);
document.BCount.ShouldBe(2);
document.CCount.ShouldBe(3);
document.DCount.ShouldBe(2);
}

[Fact]
public async Task fetch_latest_immutable_aggregate_running_inline()
{
Expand Down Expand Up @@ -238,6 +446,7 @@ public async Task fetch_latest_immutable_aggregate_running_inline_and_identity_m
var aggregate2 = await session2.Events.FetchLatest<TestAggregateImmutable>(streamId);
aggregate2.Name.ShouldBe("Random");
}

}

public record CreatedEvent(Guid Id, string Name, Dictionary<int, string> TestData);
Expand Down
Loading
Loading