Skip to content

Commit c198819

Browse files
authored
Don't wait for transaction to finish before applying synced transactions (#37)
1 parent d3f4b6d commit c198819

File tree

3 files changed

+111
-115
lines changed

3 files changed

+111
-115
lines changed

packages/optimistic/src/collection.ts

Lines changed: 83 additions & 109 deletions
Original file line numberDiff line numberDiff line change
@@ -7,10 +7,10 @@ import type {
77
CollectionConfig,
88
InsertConfig,
99
OperationConfig,
10+
OptimisticChangeMessage,
1011
PendingMutation,
1112
StandardSchema,
1213
Transaction,
13-
TransactionState,
1414
} from "./types"
1515

1616
// Store collections in memory using Tanstack store
@@ -26,7 +26,7 @@ const loadingCollections = new Map<
2626

2727
interface PendingSyncedTransaction<T extends object = Record<string, unknown>> {
2828
committed: boolean
29-
operations: Array<ChangeMessage<T>>
29+
operations: Array<OptimisticChangeMessage<T>>
3030
}
3131

3232
/**
@@ -152,7 +152,7 @@ export class Collection<T extends object = Record<string, unknown>> {
152152
public transactionManager!: ReturnType<typeof getTransactionManager<T>>
153153
private transactionStore: TransactionStore
154154

155-
public optimisticOperations: Derived<Array<ChangeMessage<T>>>
155+
public optimisticOperations: Derived<Array<OptimisticChangeMessage<T>>>
156156
public derivedState: Derived<Map<string, T>>
157157
public derivedArray: Derived<Array<T>>
158158

@@ -200,17 +200,16 @@ export class Collection<T extends object = Record<string, unknown>> {
200200
this.optimisticOperations = new Derived({
201201
fn: ({ currDepVals: [transactions] }) => {
202202
const result = Array.from(transactions.values())
203-
.filter(
204-
(transaction) =>
205-
transaction.state !== `completed` &&
206-
transaction.state !== `failed`
207-
)
208-
.map((transaction) =>
209-
transaction.mutations.map((mutation) => {
210-
const message: ChangeMessage<T> = {
203+
.map((transaction) => {
204+
const isActive = ![`completed`, `failed`].includes(
205+
transaction.state
206+
)
207+
return transaction.mutations.map((mutation) => {
208+
const message: OptimisticChangeMessage<T> = {
211209
type: mutation.type,
212210
key: mutation.key,
213211
value: mutation.modified as T,
212+
isActive,
214213
}
215214
if (
216215
mutation.metadata !== undefined &&
@@ -220,7 +219,7 @@ export class Collection<T extends object = Record<string, unknown>> {
220219
}
221220
return message
222221
})
223-
)
222+
})
224223
.flat()
225224

226225
return result
@@ -233,32 +232,27 @@ export class Collection<T extends object = Record<string, unknown>> {
233232
this.derivedState = new Derived({
234233
fn: ({ currDepVals: [syncedData, operations] }) => {
235234
const combined = new Map<string, T>(syncedData)
235+
const optimisticKeys = new Set<string>()
236+
236237
// Apply the optimistic operations on top of the synced state.
237238
for (const operation of operations) {
238-
let existingValue
239-
switch (operation.type) {
240-
case `insert`:
241-
combined.set(operation.key, operation.value)
242-
break
243-
case `update`:
244-
existingValue = syncedData.get(operation.key)
245-
combined.set(operation.key, {
246-
...(existingValue || {}),
247-
...operation.value,
248-
})
249-
break
250-
case `delete`:
251-
combined.delete(operation.key)
252-
break
239+
optimisticKeys.add(operation.key)
240+
if (operation.isActive) {
241+
switch (operation.type) {
242+
case `insert`:
243+
combined.set(operation.key, operation.value)
244+
break
245+
case `update`:
246+
combined.set(operation.key, operation.value)
247+
break
248+
case `delete`:
249+
combined.delete(operation.key)
250+
break
251+
}
253252
}
254253
}
255254

256255
// Update object => key mappings
257-
const optimisticKeys = new Set<string>()
258-
for (const operation of operations) {
259-
optimisticKeys.add(operation.key)
260-
}
261-
262256
optimisticKeys.forEach((key) => {
263257
if (combined.has(key)) {
264258
this.objectKeyMap.set(combined.get(key)!, key)
@@ -338,86 +332,64 @@ export class Collection<T extends object = Record<string, unknown>> {
338332
* This method processes operations from pending transactions and applies them to the synced data
339333
*/
340334
commitPendingTransactions = () => {
341-
// Check if there's any transactions that aren't finished.
342-
// If not, proceed.
343-
// If so, subscribe to transactions and keep checking if can proceed.
344-
//
345-
// The plan is to have a finer-grained locking but just blocking applying
346-
// synced data until a persisting transaction is finished seems fine.
347-
// We also don't yet have support for transactions that don't immediately
348-
// persist so right now, blocking sync only delays their application for a
349-
// few hundred milliseconds. So not the worse thing in th world.
350-
// But something to fix in the future.
351-
// Create a Set with only the terminal states
352-
const terminalStates = new Set<TransactionState>([`completed`, `failed`])
353-
354-
// Function to check if a state is NOT a terminal state
355-
function isNotTerminalState({ state }: Transaction): boolean {
356-
return !terminalStates.has(state)
357-
}
358-
if (
359-
this.transactions.size === 0 ||
360-
!Array.from(this.transactions.values()).some(isNotTerminalState)
361-
) {
362-
const keys = new Set<string>()
363-
batch(() => {
364-
for (const transaction of this.pendingSyncedTransactions) {
365-
for (const operation of transaction.operations) {
366-
keys.add(operation.key)
367-
this.syncedMetadata.setState((prevData) => {
368-
switch (operation.type) {
369-
case `insert`:
370-
prevData.set(operation.key, operation.metadata)
371-
break
372-
case `update`:
373-
prevData.set(operation.key, {
374-
...prevData.get(operation.key)!,
375-
...operation.metadata,
376-
})
377-
break
378-
case `delete`:
379-
prevData.delete(operation.key)
380-
break
381-
}
382-
return prevData
383-
})
384-
this.syncedData.setState((prevData) => {
385-
switch (operation.type) {
386-
case `insert`:
387-
prevData.set(operation.key, operation.value)
388-
break
389-
case `update`:
390-
prevData.set(operation.key, {
391-
...prevData.get(operation.key)!,
392-
...operation.value,
393-
})
394-
break
395-
case `delete`:
396-
prevData.delete(operation.key)
397-
break
398-
}
399-
return prevData
400-
})
401-
}
335+
const keys = new Set<string>()
336+
batch(() => {
337+
for (const transaction of this.pendingSyncedTransactions) {
338+
for (const operation of transaction.operations) {
339+
keys.add(operation.key)
340+
this.syncedMetadata.setState((prevData) => {
341+
switch (operation.type) {
342+
case `insert`:
343+
prevData.set(operation.key, operation.metadata)
344+
break
345+
case `update`:
346+
prevData.set(operation.key, {
347+
...prevData.get(operation.key)!,
348+
...operation.metadata,
349+
})
350+
break
351+
case `delete`:
352+
prevData.delete(operation.key)
353+
break
354+
}
355+
return prevData
356+
})
357+
this.syncedData.setState((prevData) => {
358+
switch (operation.type) {
359+
case `insert`:
360+
prevData.set(operation.key, operation.value)
361+
break
362+
case `update`:
363+
prevData.set(operation.key, {
364+
...prevData.get(operation.key)!,
365+
...operation.value,
366+
})
367+
break
368+
case `delete`:
369+
prevData.delete(operation.key)
370+
break
371+
}
372+
return prevData
373+
})
402374
}
403-
})
375+
}
376+
})
404377

405-
keys.forEach((key) => {
406-
const curValue = this.state.get(key)
407-
if (curValue) {
408-
this.objectKeyMap.set(curValue, key)
409-
}
410-
})
378+
keys.forEach((key) => {
379+
const curValue = this.state.get(key)
380+
if (curValue) {
381+
this.objectKeyMap.set(curValue, key)
382+
}
383+
})
411384

412-
this.pendingSyncedTransactions = []
385+
this.pendingSyncedTransactions = []
413386

414-
// Call any registered one-time commit listeners
415-
if (!this.hasReceivedFirstCommit) {
416-
this.hasReceivedFirstCommit = true
417-
const callbacks = [...this.onFirstCommitCallbacks]
418-
this.onFirstCommitCallbacks = []
419-
callbacks.forEach((callback) => callback())
420-
}
387+
// Call any registered one-time commit listeners
388+
if (!this.hasReceivedFirstCommit) {
389+
this.hasReceivedFirstCommit = true
390+
const callbacks = [...this.onFirstCommitCallbacks]
391+
this.onFirstCommitCallbacks = []
392+
callbacks.forEach((callback) => callback())
421393
}
422394
}
423395

@@ -746,7 +718,9 @@ export class Collection<T extends object = Record<string, unknown>> {
746718
if (typeof item === `object` && (item as unknown) !== null) {
747719
const objectKey = this.objectKeyMap.get(item)
748720
if (objectKey === undefined) {
749-
throw new Error(`Object not found in collection`)
721+
throw new Error(
722+
`Object not found in collection: ${JSON.stringify(item)}`
723+
)
750724
}
751725
key = objectKey
752726
} else if (typeof item === `string`) {

packages/optimistic/src/types.ts

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -84,6 +84,13 @@ export interface ChangeMessage<T extends object = Record<string, unknown>> {
8484
metadata?: Record<string, unknown>
8585
}
8686

87+
export interface OptimisticChangeMessage<
88+
T extends object = Record<string, unknown>,
89+
> extends ChangeMessage<T> {
90+
// Is this change message part of an active transaction. Only applies to optimistic changes.
91+
isActive?: boolean
92+
}
93+
8794
export interface MutationFn<T extends object = Record<string, unknown>> {
8895
persist: (params: {
8996
transaction: Transaction

packages/optimistic/tests/collection.test.ts

Lines changed: 21 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,11 @@ import "fake-indexeddb/auto"
33
import mitt from "mitt"
44
import { z } from "zod"
55
import { Collection, SchemaValidationError } from "../src/collection"
6-
import type { ChangeMessage, PendingMutation } from "../src/types"
6+
import type {
7+
ChangeMessage,
8+
OptimisticChangeMessage,
9+
PendingMutation,
10+
} from "../src/types"
711

812
describe(`Collection`, () => {
913
it(`should throw if there's no sync config`, () => {
@@ -185,10 +189,11 @@ describe(`Collection`, () => {
185189
})
186190

187191
// Check the optimistic operation is there
188-
const insertOperation: ChangeMessage = {
192+
const insertOperation: OptimisticChangeMessage = {
189193
key: insertedKey,
190194
value: { value: `bar` },
191195
type: `insert`,
196+
isActive: true,
192197
}
193198
expect(collection.optimisticOperations.state[0]).toEqual(insertOperation)
194199

@@ -222,7 +227,9 @@ describe(`Collection`, () => {
222227
// @ts-expect-error possibly undefined is ok in test
223228
Array.from(collection.transactions.values())[0].state
224229
).toMatchInlineSnapshot(`"completed"`)
225-
expect(collection.optimisticOperations.state).toEqual([])
230+
expect(
231+
collection.optimisticOperations.state.filter((o) => o.isActive)
232+
).toEqual([])
226233
expect(collection.state).toEqual(new Map([[insertedKey, { value: `bar` }]]))
227234

228235
// Test insert with provided key
@@ -307,7 +314,7 @@ describe(`Collection`, () => {
307314
expect(collection.state.has(keys[3])).toBe(false)
308315
})
309316

310-
it(`synced updates shouldn't be applied while there's an ongoing transaction`, async () => {
317+
it(`synced updates should be applied while there's an ongoing transaction`, async () => {
311318
const emitter = mitt()
312319

313320
// new collection w/ mock sync/mutation
@@ -336,8 +343,15 @@ describe(`Collection`, () => {
336343
// we're still in the middle of persisting a transaction.
337344
emitter.emit(`update`, [
338345
{ key: `the-key`, type: `insert`, changes: { bar: `value` } },
346+
// This update is ignored because the optimistic update overrides it.
347+
{ key: `foo`, type: `update`, changes: { bar: `value2` } },
339348
])
340-
expect(collection.state).toEqual(new Map([[`foo`, { value: `bar` }]]))
349+
expect(collection.state).toEqual(
350+
new Map([
351+
[`foo`, { value: `bar` }],
352+
[`the-key`, { bar: `value` }],
353+
])
354+
)
341355
// Remove it so we don't have to assert against it below
342356
emitter.emit(`update`, [{ key: `the-key`, type: `delete` }])
343357
return Promise.resolve()
@@ -369,10 +383,11 @@ describe(`Collection`, () => {
369383
})
370384

371385
// Check the optimistic operation is there
372-
const insertOperation: ChangeMessage = {
386+
const insertOperation: OptimisticChangeMessage = {
373387
key: `foo`,
374388
value: { value: `bar` },
375389
type: `insert`,
390+
isActive: true,
376391
}
377392
expect(collection.optimisticOperations.state[0]).toEqual(insertOperation)
378393

0 commit comments

Comments
 (0)