Skip to content

Commit 7f75f3a

Browse files
committed
Combine persist/awaitSync to just one top-level mutationFn
1 parent 725396d commit 7f75f3a

File tree

15 files changed

+192
-446
lines changed

15 files changed

+192
-446
lines changed

README.md

Lines changed: 27 additions & 35 deletions
Original file line numberDiff line numberDiff line change
@@ -204,41 +204,33 @@ const todosConfig = {
204204
primaryKey: ['id'],
205205
}
206206
),
207-
mutationFn: {
208-
// Persist mutations to ElectricSQL
209-
persist: async (mutations, transaction) => {
210-
const response = await fetch(`http://localhost:3001/api/mutations`, {
211-
method: `POST`,
212-
headers: {
213-
"Content-Type": `application/json`,
214-
},
215-
body: JSON.stringify(transaction.mutations),
216-
})
217-
if (!response.ok) {
218-
// Throwing an error will rollback the optimistic state.
219-
throw new Error(`HTTP error! Status: ${response.status}`)
220-
}
221-
222-
const result = await response.json()
223-
224-
return {
225-
txid: result.txid,
226-
}
227-
},
228-
// Wait for a transaction to be synced
229-
awaitSync: async ({ config, persistResult: { txid: number } }) => {
230-
try {
231-
// Use the awaitTxid function from the ElectricSync configuration
232-
// This waits for the specific transaction to be synced to the server
233-
// The second parameter is an optional timeout in milliseconds
234-
await config.sync.awaitTxid(persistResult.txid, 10000)
235-
return true;
236-
} catch (error) {
237-
console.error('Error waiting for transaction to sync:', error);
238-
// Throwing an error will rollback the optimistic state.
239-
throw error;
240-
}
241-
},
207+
// Persist mutations to ElectricSQL
208+
mutationFn: async (mutations, transaction, config) => {
209+
const response = await fetch(`http://localhost:3001/api/mutations`, {
210+
method: `POST`,
211+
headers: {
212+
"Content-Type": `application/json`,
213+
},
214+
body: JSON.stringify(transaction.mutations),
215+
})
216+
if (!response.ok) {
217+
// Throwing an error will rollback the optimistic state.
218+
throw new Error(`HTTP error! Status: ${response.status}`)
219+
}
220+
221+
const result = await response.json()
222+
223+
try {
224+
// Use the awaitTxid function from the ElectricSync configuration
225+
// This waits for the specific transaction to be synced to the server
226+
// The second parameter is an optional timeout in milliseconds
227+
await config.sync.awaitTxid(persistResult.txid, 10000)
228+
return true;
229+
} catch (error) {
230+
console.error('Error waiting for transaction to sync:', error);
231+
// Throwing an error will rollback the optimistic state.
232+
throw error;
233+
}
242234
},
243235
};
244236

examples/react/todo/src/App.tsx

Lines changed: 32 additions & 56 deletions
Original file line numberDiff line numberDiff line change
@@ -30,34 +30,22 @@ export default function App() {
3030
{ primaryKey: [`id`] }
3131
),
3232
schema: updateTodoSchema,
33-
mutationFn: {
34-
persist: async ({ transaction }) => {
35-
const response = await fetch(`http://localhost:3001/api/mutations`, {
36-
method: `POST`,
37-
headers: {
38-
"Content-Type": `application/json`,
39-
},
40-
body: JSON.stringify(transaction.mutations),
41-
})
42-
if (!response.ok) {
43-
throw new Error(`HTTP error! Status: ${response.status}`)
44-
}
45-
46-
const result = await response.json()
47-
return {
48-
txid: result.txid,
49-
}
50-
},
51-
awaitSync: async ({
52-
persistResult,
53-
collection,
54-
}: {
55-
persistResult: { txid: number }
56-
collection: Collection
57-
}) => {
58-
// Start waiting for the txid
59-
await collection.config.sync.awaitTxid(persistResult.txid)
60-
},
33+
mutationFn: async ({ transaction, collection }) => {
34+
const response = await fetch(`http://localhost:3001/api/mutations`, {
35+
method: `POST`,
36+
headers: {
37+
"Content-Type": `application/json`,
38+
},
39+
body: JSON.stringify(transaction.mutations),
40+
})
41+
if (!response.ok) {
42+
throw new Error(`HTTP error! Status: ${response.status}`)
43+
}
44+
45+
const result = await response.json()
46+
47+
// Start waiting for the txid
48+
await collection.config.sync.awaitTxid(result.txid)
6149
},
6250
})
6351

@@ -83,34 +71,22 @@ export default function App() {
8371
{ primaryKey: [`id`] }
8472
),
8573
schema: updateConfigSchema,
86-
mutationFn: {
87-
persist: async ({ transaction }) => {
88-
const response = await fetch(`http://localhost:3001/api/mutations`, {
89-
method: `POST`,
90-
headers: {
91-
"Content-Type": `application/json`,
92-
},
93-
body: JSON.stringify(transaction.mutations),
94-
})
95-
if (!response.ok) {
96-
throw new Error(`HTTP error! Status: ${response.status}`)
97-
}
98-
99-
const result = await response.json()
100-
return {
101-
txid: result.txid,
102-
}
103-
},
104-
awaitSync: async ({
105-
persistResult,
106-
collection,
107-
}: {
108-
persistResult: { txid: number }
109-
collection: Collection
110-
}) => {
111-
// Start waiting for the txid
112-
await collection.config.sync.awaitTxid(persistResult.txid)
113-
},
74+
mutationFn: async ({ transaction, collection }) => {
75+
const response = await fetch(`http://localhost:3001/api/mutations`, {
76+
method: `POST`,
77+
headers: {
78+
"Content-Type": `application/json`,
79+
},
80+
body: JSON.stringify(transaction.mutations),
81+
})
82+
if (!response.ok) {
83+
throw new Error(`HTTP error! Status: ${response.status}`)
84+
}
85+
86+
const result = await response.json()
87+
88+
// Start waiting for the txid
89+
await collection.config.sync.awaitTxid(result.txid)
11490
},
11591
})
11692

package.json

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -24,7 +24,6 @@
2424
"eslint-config-prettier": "^10.1.1",
2525
"eslint-plugin-prettier": "^5.2.3",
2626
"eslint-plugin-react": "^7.37.4",
27-
"fake-indexeddb": "^6.0.0",
2827
"husky": "^9.1.7",
2928
"jsdom": "^26.0.0",
3029
"lint-staged": "^15.5.0",

packages/optimistic/package.json

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -4,8 +4,7 @@
44
"version": "0.0.2",
55
"dependencies": {
66
"@standard-schema/spec": "^1.0.0",
7-
"@tanstack/store": "^0.7.0",
8-
"idb": "^8.0.2"
7+
"@tanstack/store": "^0.7.0"
98
},
109
"devDependencies": {
1110
"@vitest/coverage-istanbul": "^3.0.9"

packages/optimistic/src/TransactionManager.ts

Lines changed: 15 additions & 58 deletions
Original file line numberDiff line numberDiff line change
@@ -163,7 +163,6 @@ export class TransactionManager<T extends object = Record<string, unknown>> {
163163
updatedAt: new Date(),
164164
mutations,
165165
metadata: {},
166-
isSynced: createDeferred(),
167166
isPersisted: createDeferred(),
168167
} as Transaction
169168
}
@@ -180,7 +179,7 @@ export class TransactionManager<T extends object = Record<string, unknown>> {
180179
}
181180

182181
/**
183-
* Process a transaction through persist and awaitSync
182+
* Process a transaction through the mutation function
184183
*
185184
* @param transactionId - The ID of the transaction to process
186185
* @private
@@ -189,73 +188,32 @@ export class TransactionManager<T extends object = Record<string, unknown>> {
189188
const transaction = this.getTransaction(transactionId)
190189
if (!transaction) return
191190

192-
// If no mutationFn is provided, throw an error
191+
// Throw an error if no mutation function is provided
193192
if (!this.collection.config.mutationFn) {
194193
throw new Error(
195-
`Cannot process transaction without a mutationFn in the collection config`
194+
`No mutation function provided for transaction ${transactionId}`
196195
)
197196
}
198197

198+
// Set transaction state to persisting
199199
this.setTransactionState(transactionId, `persisting`)
200200

201-
this.collection.config.mutationFn
202-
.persist({
203-
transaction: this.createLiveTransactionReference(transactionId),
201+
// Create a live reference to the transaction that always returns the latest state
202+
const transactionRef = this.createLiveTransactionReference(transactionId)
203+
204+
// Call the mutation function
205+
this.collection.config
206+
.mutationFn({
207+
transaction: transactionRef,
204208
collection: this.collection,
205209
})
206-
.then((persistResult) => {
210+
.then(() => {
207211
const tx = this.getTransaction(transactionId)
208212
if (!tx) return
209213

214+
// Mark as persisted
210215
tx.isPersisted?.resolve(true)
211-
if (this.collection.config.mutationFn?.awaitSync) {
212-
this.setTransactionState(transactionId, `persisted_awaiting_sync`)
213-
214-
// Create a promise that rejects after 2 seconds
215-
const timeoutPromise = new Promise<never>((_, reject) => {
216-
setTimeout(() => {
217-
reject(new Error(`Sync operation timed out after 2 seconds`))
218-
}, this.collection.config.mutationFn?.awaitSyncTimeoutMs ?? 2000)
219-
})
220-
221-
// Race the awaitSync promise against the timeout
222-
Promise.race([
223-
this.collection.config.mutationFn.awaitSync({
224-
transaction: this.createLiveTransactionReference(transactionId),
225-
collection: this.collection,
226-
persistResult,
227-
}),
228-
timeoutPromise,
229-
])
230-
.then(() => {
231-
const updatedTx = this.getTransaction(transactionId)
232-
if (!updatedTx) return
233-
234-
updatedTx.isSynced?.resolve(true)
235-
this.setTransactionState(transactionId, `completed`)
236-
})
237-
// Catch awaitSync errors or timeout
238-
.catch((error) => {
239-
const updatedTx = this.getTransaction(transactionId)
240-
if (!updatedTx) return
241-
242-
// Update transaction with error information
243-
updatedTx.error = {
244-
message: error.message || `Error during sync`,
245-
error:
246-
error instanceof Error ? error : new Error(String(error)),
247-
}
248-
249-
// Reject the isSynced promise
250-
updatedTx.isSynced?.reject(error)
251-
252-
// Set transaction state to failed
253-
this.setTransaction(updatedTx)
254-
this.setTransactionState(transactionId, `failed`)
255-
})
256-
} else {
257-
this.setTransactionState(transactionId, `completed`)
258-
}
216+
this.setTransactionState(transactionId, `completed`)
259217
})
260218
.catch((error) => {
261219
const tx = this.getTransaction(transactionId)
@@ -267,9 +225,8 @@ export class TransactionManager<T extends object = Record<string, unknown>> {
267225
error: error instanceof Error ? error : new Error(String(error)),
268226
}
269227

270-
// Reject both promises
228+
// Reject the promise
271229
tx.isPersisted?.reject(tx.error.error)
272-
tx.isSynced?.reject(tx.error.error)
273230

274231
// Set transaction state to failed
275232
this.setTransactionState(transactionId, `failed`)

packages/optimistic/src/collection.ts

Lines changed: 8 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -44,7 +44,14 @@ interface PendingSyncedTransaction<T extends object = Record<string, unknown>> {
4444
* id: `users-${params.userId}`,
4545
* sync: { ... },
4646
* // mutationFn is optional - provide it if you need mutation capabilities
47-
* mutationFn: { ... }
47+
* mutationFn: async (params: {
48+
* transaction: Transaction
49+
* collection: Collection<Record<string, unknown>>
50+
* }) => {
51+
* // Implement your mutation (and syncing) logic here
52+
* // Return a promise that resolves when the mutation and syncing is complete.
53+
* // When this function returns, the optimistic mutations are dropped.
54+
* }
4855
* });
4956
*
5057
* return null;

packages/optimistic/src/types.ts

Lines changed: 4 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -29,7 +29,6 @@ export interface Transaction {
2929
updatedAt: Date
3030
mutations: Array<PendingMutation>
3131
metadata: Record<string, unknown>
32-
isSynced?: Deferred<boolean>
3332
isPersisted?: Deferred<boolean>
3433
error?: {
3534
transactionId?: string // For dependency failures
@@ -88,21 +87,10 @@ export interface OptimisticChangeMessage<
8887
isActive?: boolean
8988
}
9089

91-
export interface MutationFn<T extends object = Record<string, unknown>> {
92-
persist: (params: {
93-
transaction: Transaction
94-
collection: Collection<T>
95-
}) => Promise<any>
96-
97-
// Set timeout for awaiting sync (default is 2 seconds)
98-
awaitSyncTimeoutMs?: number
99-
awaitSync?: (params: {
100-
transaction: Transaction
101-
collection: Collection<T>
102-
103-
persistResult: any
104-
}) => Promise<void>
105-
}
90+
export type MutationFn<T extends object = Record<string, unknown>> = (params: {
91+
transaction: Transaction
92+
collection: Collection<T>
93+
}) => Promise<any>
10694

10795
/**
10896
* The Standard Schema interface.

0 commit comments

Comments
 (0)