Skip to content

Commit 2716898

Browse files
authored
feat!: disable PXE concurrency (#12637)
Debugging #12391 led me to discover that we cannot have concurrent simulations due to contracts now being allowed to read and write to PXE's stores at arbitrary moments. E.g. #12391 was failing CI due to multiple concurrent simulations deleting the same pending partial note from a capsule array. This PR disables that behavior by putting the problematic tasks in a serial queue. Multiple tests still call PXE expecting concurrency (typically via usage of `await Promise.all`), but I thought it made more sense to disable the behavior this way and issue a warning (to unblock #12391) and then worry about removing attempts to achieve concurrent behavior. I considered putting _all_ PXE functions in the serial queue, but refrained from doing so to avoid introducing a larger than strictly needed change. We may want to do this automatically via e.g. monkey-patching to avoid accidentally forgetting a case.
1 parent 93a6f4e commit 2716898

2 files changed

Lines changed: 174 additions & 117 deletions

File tree

docs/docs/migration_notes.md

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,10 @@ Aztec is in full-speed development. Literally every version breaks compatibility
88

99
## TBD
1010

11+
### [PXE] Concurrent contract function simulation disabled
12+
13+
PXE is no longer be able to execute contract functions concurrently (e.g. by collecting calls to `simulateTx` and then using `await Promise.all`). They will instead be put in a job queue and executed sequentially in order of arrival.
14+
1115
### [aztec.js] Changes to `BatchCall` and `BaseContractInteraction`
1216

1317
The constructor arguments of `BatchCall` have been updated to improve usability. Previously, it accepted an array of `FunctionCall`, requiring users to manually set additional data such as `authwit` and `capsules`. Now, `BatchCall` takes an array of `BaseContractInteraction`, which encapsulates all necessary information.

yarn-project/pxe/src/pxe_service/pxe_service.ts

Lines changed: 170 additions & 117 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
import { L1_TO_L2_MSG_TREE_HEIGHT } from '@aztec/constants';
22
import { Fr, type Point } from '@aztec/foundation/fields';
33
import { type Logger, createLogger } from '@aztec/foundation/log';
4+
import { SerialQueue } from '@aztec/foundation/queue';
45
import { Timer } from '@aztec/foundation/timer';
56
import type { SiblingPath } from '@aztec/foundation/trees';
67
import { KeyStore } from '@aztec/key-store';
@@ -103,6 +104,7 @@ export class PXEService implements PXE {
103104
private proofCreator: PrivateKernelProver,
104105
private protocolContractsProvider: ProtocolContractsProvider,
105106
private log: Logger,
107+
private jobQueue: SerialQueue,
106108
) {}
107109

108110
/**
@@ -160,6 +162,8 @@ export class PXEService implements PXE {
160162
log,
161163
);
162164
const simulator = new AcirSimulator(pxeOracleInterface, simulationProvider);
165+
const jobQueue = new SerialQueue();
166+
163167
const pxeService = new PXEService(
164168
node,
165169
synchronizer,
@@ -177,13 +181,34 @@ export class PXEService implements PXE {
177181
proofCreator,
178182
protocolContractsProvider,
179183
log,
184+
jobQueue,
180185
);
186+
187+
pxeService.jobQueue.start();
188+
181189
await pxeService.#registerProtocolContracts();
182190
const info = await pxeService.getNodeInfo();
183191
log.info(`Started PXE connected to chain ${info.l1ChainId} version ${info.protocolVersion}`);
184192
return pxeService;
185193
}
186194

195+
/**
196+
* Enqueues a job for execution once no other jobs are running. Returns a promise that will resolve once the job is
197+
* complete.
198+
*
199+
* Useful for tasks that cannot run concurrently, such as contract function simulation.
200+
*/
201+
#putInJobQueue<T>(fn: () => Promise<T>): Promise<T> {
202+
// TODO(#12636): relax the conditions under which we forbid concurrency.
203+
if (this.jobQueue.length() != 0) {
204+
this.log.warn(
205+
`PXE is already processing ${this.jobQueue.length()} jobs, concurrent execution is not supported. Will run once those are complete.`,
206+
);
207+
}
208+
209+
return this.jobQueue.put(fn);
210+
}
211+
187212
isL1ToL2MessageSynced(l1ToL2Message: Fr): Promise<boolean> {
188213
return this.node.isL1ToL2MessageSynced(l1ToL2Message);
189214
}
@@ -364,35 +389,39 @@ export class PXEService implements PXE {
364389
);
365390
}
366391

367-
public async updateContract(contractAddress: AztecAddress, artifact: ContractArtifact): Promise<void> {
368-
const currentInstance = await this.contractDataProvider.getContractInstance(contractAddress);
369-
const contractClass = await getContractClassFromArtifact(artifact);
370-
await this.synchronizer.sync();
392+
public updateContract(contractAddress: AztecAddress, artifact: ContractArtifact): Promise<void> {
393+
// We disable concurrently updating contracts to avoid concurrently syncing with the node, or changing a contract's
394+
// class while we're simulating it.
395+
return this.#putInJobQueue(async () => {
396+
const currentInstance = await this.contractDataProvider.getContractInstance(contractAddress);
397+
const contractClass = await getContractClassFromArtifact(artifact);
398+
await this.synchronizer.sync();
371399

372-
const header = await this.syncDataProvider.getBlockHeader();
400+
const header = await this.syncDataProvider.getBlockHeader();
373401

374-
const currentClassId = await readCurrentClassId(
375-
contractAddress,
376-
currentInstance,
377-
this.node,
378-
header.globalVariables.blockNumber.toNumber(),
379-
);
380-
if (!contractClass.id.equals(currentClassId)) {
381-
throw new Error('Could not update contract to a class different from the current one.');
382-
}
402+
const currentClassId = await readCurrentClassId(
403+
contractAddress,
404+
currentInstance,
405+
this.node,
406+
header.globalVariables.blockNumber.toNumber(),
407+
);
408+
if (!contractClass.id.equals(currentClassId)) {
409+
throw new Error('Could not update contract to a class different from the current one.');
410+
}
383411

384-
await this.contractDataProvider.addContractArtifact(contractClass.id, artifact);
412+
await this.contractDataProvider.addContractArtifact(contractClass.id, artifact);
385413

386-
const publicFunctionSignatures = artifact.functions
387-
.filter(fn => fn.functionType === FunctionType.PUBLIC)
388-
.map(fn => decodeFunctionSignature(fn.name, fn.parameters));
389-
await this.node.registerContractFunctionSignatures(contractAddress, publicFunctionSignatures);
414+
const publicFunctionSignatures = artifact.functions
415+
.filter(fn => fn.functionType === FunctionType.PUBLIC)
416+
.map(fn => decodeFunctionSignature(fn.name, fn.parameters));
417+
await this.node.registerContractFunctionSignatures(contractAddress, publicFunctionSignatures);
390418

391-
// TODO(#10007): Node should get public contract class from the registration event, not from PXE registration
392-
await this.node.addContractClass({ ...contractClass, privateFunctions: [], unconstrainedFunctions: [] });
393-
currentInstance.currentContractClassId = contractClass.id;
394-
await this.contractDataProvider.addContractInstance(currentInstance);
395-
this.log.info(`Updated contract ${artifact.name} at ${contractAddress.toString()} to class ${contractClass.id}`);
419+
// TODO(#10007): Node should get public contract class from the registration event, not from PXE registration
420+
await this.node.addContractClass({ ...contractClass, privateFunctions: [], unconstrainedFunctions: [] });
421+
currentInstance.currentContractClassId = contractClass.id;
422+
await this.contractDataProvider.addContractInstance(currentInstance);
423+
this.log.info(`Updated contract ${artifact.name} at ${contractAddress.toString()} to class ${contractClass.id}`);
424+
});
396425
}
397426

398427
public getContracts(): Promise<AztecAddress[]> {
@@ -456,24 +485,33 @@ export class PXEService implements PXE {
456485
return await this.node.getCurrentBaseFees();
457486
}
458487

459-
public async proveTx(
488+
public proveTx(
460489
txRequest: TxExecutionRequest,
461490
privateExecutionResult: PrivateExecutionResult,
462491
): Promise<TxProvingResult> {
463-
try {
464-
const { publicInputs, clientIvcProof } = await this.#prove(txRequest, this.proofCreator, privateExecutionResult, {
465-
simulate: false,
466-
skipFeeEnforcement: false,
467-
profile: false,
468-
});
469-
return new TxProvingResult(privateExecutionResult, publicInputs, clientIvcProof!);
470-
} catch (err: any) {
471-
throw this.contextualizeError(err, inspect(txRequest), inspect(privateExecutionResult));
472-
}
492+
// We disable proving concurrently mostly out of caution, since it accesses some of our stores. Proving is so
493+
// computationally demanding that it'd be rare for someone to try to do it concurrently regardless.
494+
return this.#putInJobQueue(async () => {
495+
try {
496+
const { publicInputs, clientIvcProof } = await this.#prove(
497+
txRequest,
498+
this.proofCreator,
499+
privateExecutionResult,
500+
{
501+
simulate: false,
502+
skipFeeEnforcement: false,
503+
profile: false,
504+
},
505+
);
506+
return new TxProvingResult(privateExecutionResult, publicInputs, clientIvcProof!);
507+
} catch (err: any) {
508+
throw this.contextualizeError(err, inspect(txRequest), inspect(privateExecutionResult));
509+
}
510+
});
473511
}
474512

475513
// TODO(#7456) Prevent msgSender being defined here for the first call
476-
public async simulateTx(
514+
public simulateTx(
477515
txRequest: TxExecutionRequest,
478516
simulatePublic: boolean,
479517
msgSender: AztecAddress | undefined = undefined,
@@ -482,74 +520,84 @@ export class PXEService implements PXE {
482520
profile: boolean = false,
483521
scopes?: AztecAddress[],
484522
): Promise<TxSimulationResult> {
485-
try {
486-
const txInfo = {
487-
origin: txRequest.origin,
488-
functionSelector: txRequest.functionSelector,
489-
simulatePublic,
490-
msgSender,
491-
chainId: txRequest.txContext.chainId,
492-
version: txRequest.txContext.version,
493-
authWitnesses: txRequest.authWitnesses.map(w => w.requestHash),
494-
};
495-
this.log.info(
496-
`Simulating transaction execution request to ${txRequest.functionSelector} at ${txRequest.origin}`,
497-
txInfo,
498-
);
499-
const timer = new Timer();
500-
await this.synchronizer.sync();
501-
const privateExecutionResult = await this.#executePrivate(txRequest, msgSender, scopes);
502-
503-
const { publicInputs, profileResult } = await this.#prove(txRequest, this.proofCreator, privateExecutionResult, {
504-
simulate: !profile,
505-
skipFeeEnforcement,
506-
profile,
507-
});
508-
509-
const privateSimulationResult = new PrivateSimulationResult(privateExecutionResult, publicInputs);
510-
const simulatedTx = privateSimulationResult.toSimulatedTx();
511-
let publicOutput: PublicSimulationOutput | undefined;
512-
if (simulatePublic && publicInputs.forPublic) {
513-
publicOutput = await this.#simulatePublicCalls(simulatedTx, skipFeeEnforcement);
514-
}
523+
// We disable concurrent simulations since those might execute oracles which read and write to the PXE stores (e.g.
524+
// to the capsules), and we need to prevent concurrent runs from interfering with one another (e.g. attempting to
525+
// delete the same read value, or reading values that another simulation is currently modifying).
526+
return this.#putInJobQueue(async () => {
527+
try {
528+
const txInfo = {
529+
origin: txRequest.origin,
530+
functionSelector: txRequest.functionSelector,
531+
simulatePublic,
532+
msgSender,
533+
chainId: txRequest.txContext.chainId,
534+
version: txRequest.txContext.version,
535+
authWitnesses: txRequest.authWitnesses.map(w => w.requestHash),
536+
};
537+
this.log.info(
538+
`Simulating transaction execution request to ${txRequest.functionSelector} at ${txRequest.origin}`,
539+
txInfo,
540+
);
541+
const timer = new Timer();
542+
await this.synchronizer.sync();
543+
const privateExecutionResult = await this.#executePrivate(txRequest, msgSender, scopes);
544+
545+
const { publicInputs, profileResult } = await this.#prove(
546+
txRequest,
547+
this.proofCreator,
548+
privateExecutionResult,
549+
{
550+
simulate: !profile,
551+
skipFeeEnforcement,
552+
profile,
553+
},
554+
);
515555

516-
if (!skipTxValidation) {
517-
const validationResult = await this.node.isValidTx(simulatedTx, { isSimulation: true, skipFeeEnforcement });
518-
if (validationResult.result === 'invalid') {
519-
throw new Error('The simulated transaction is unable to be added to state and is invalid.');
556+
const privateSimulationResult = new PrivateSimulationResult(privateExecutionResult, publicInputs);
557+
const simulatedTx = privateSimulationResult.toSimulatedTx();
558+
let publicOutput: PublicSimulationOutput | undefined;
559+
if (simulatePublic && publicInputs.forPublic) {
560+
publicOutput = await this.#simulatePublicCalls(simulatedTx, skipFeeEnforcement);
520561
}
521-
}
522562

523-
const txHash = await simulatedTx.getTxHash();
524-
this.log.info(`Simulation completed for ${txHash.toString()} in ${timer.ms()}ms`, {
525-
txHash,
526-
...txInfo,
527-
...(profileResult ? { gateCounts: profileResult.gateCounts } : {}),
528-
...(publicOutput
529-
? {
530-
gasUsed: publicOutput.gasUsed,
531-
revertCode: publicOutput.txEffect.revertCode.getCode(),
532-
revertReason: publicOutput.revertReason,
533-
}
534-
: {}),
535-
});
563+
if (!skipTxValidation) {
564+
const validationResult = await this.node.isValidTx(simulatedTx, { isSimulation: true, skipFeeEnforcement });
565+
if (validationResult.result === 'invalid') {
566+
throw new Error('The simulated transaction is unable to be added to state and is invalid.');
567+
}
568+
}
536569

537-
return TxSimulationResult.fromPrivateSimulationResultAndPublicOutput(
538-
privateSimulationResult,
539-
publicOutput,
540-
profileResult,
541-
);
542-
} catch (err: any) {
543-
throw this.contextualizeError(
544-
err,
545-
inspect(txRequest),
546-
`simulatePublic=${simulatePublic}`,
547-
`msgSender=${msgSender?.toString() ?? 'undefined'}`,
548-
`skipTxValidation=${skipTxValidation}`,
549-
`profile=${profile}`,
550-
`scopes=${scopes?.map(s => s.toString()).join(', ') ?? 'undefined'}`,
551-
);
552-
}
570+
const txHash = await simulatedTx.getTxHash();
571+
this.log.info(`Simulation completed for ${txHash.toString()} in ${timer.ms()}ms`, {
572+
txHash,
573+
...txInfo,
574+
...(profileResult ? { gateCounts: profileResult.gateCounts } : {}),
575+
...(publicOutput
576+
? {
577+
gasUsed: publicOutput.gasUsed,
578+
revertCode: publicOutput.txEffect.revertCode.getCode(),
579+
revertReason: publicOutput.revertReason,
580+
}
581+
: {}),
582+
});
583+
584+
return TxSimulationResult.fromPrivateSimulationResultAndPublicOutput(
585+
privateSimulationResult,
586+
publicOutput,
587+
profileResult,
588+
);
589+
} catch (err: any) {
590+
throw this.contextualizeError(
591+
err,
592+
inspect(txRequest),
593+
`simulatePublic=${simulatePublic}`,
594+
`msgSender=${msgSender?.toString() ?? 'undefined'}`,
595+
`skipTxValidation=${skipTxValidation}`,
596+
`profile=${profile}`,
597+
`scopes=${scopes?.map(s => s.toString()).join(', ') ?? 'undefined'}`,
598+
);
599+
}
600+
});
553601
}
554602

555603
public async sendTx(tx: Tx): Promise<TxHash> {
@@ -565,29 +613,34 @@ export class PXEService implements PXE {
565613
return txHash;
566614
}
567615

568-
public async simulateUnconstrained(
616+
public simulateUnconstrained(
569617
functionName: string,
570618
args: any[],
571619
to: AztecAddress,
572620
_from?: AztecAddress,
573621
scopes?: AztecAddress[],
574622
): Promise<AbiDecoded> {
575-
try {
576-
await this.synchronizer.sync();
577-
// TODO - Should check if `from` has the permission to call the view function.
578-
const functionCall = await this.#getFunctionCall(functionName, args, to);
579-
const executionResult = await this.#simulateUnconstrained(functionCall, scopes);
580-
581-
// TODO - Return typed result based on the function artifact.
582-
return executionResult;
583-
} catch (err: any) {
584-
const stringifiedArgs = args.map(arg => arg.toString()).join(', ');
585-
throw this.contextualizeError(
586-
err,
587-
`simulateUnconstrained ${to}:${functionName}(${stringifiedArgs})`,
588-
`scopes=${scopes?.map(s => s.toString()).join(', ') ?? 'undefined'}`,
589-
);
590-
}
623+
// We disable concurrent simulations since those might execute oracles which read and write to the PXE stores (e.g.
624+
// to the capsules), and we need to prevent concurrent runs from interfering with one another (e.g. attempting to
625+
// delete the same read value, or reading values that another simulation is currently modifying).
626+
return this.#putInJobQueue(async () => {
627+
try {
628+
await this.synchronizer.sync();
629+
// TODO - Should check if `from` has the permission to call the view function.
630+
const functionCall = await this.#getFunctionCall(functionName, args, to);
631+
const executionResult = await this.#simulateUnconstrained(functionCall, scopes);
632+
633+
// TODO - Return typed result based on the function artifact.
634+
return executionResult;
635+
} catch (err: any) {
636+
const stringifiedArgs = args.map(arg => arg.toString()).join(', ');
637+
throw this.contextualizeError(
638+
err,
639+
`simulateUnconstrained ${to}:${functionName}(${stringifiedArgs})`,
640+
`scopes=${scopes?.map(s => s.toString()).join(', ') ?? 'undefined'}`,
641+
);
642+
}
643+
});
591644
}
592645

593646
public getTxReceipt(txHash: TxHash): Promise<TxReceipt> {

0 commit comments

Comments
 (0)