Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
139 changes: 96 additions & 43 deletions emulator/blockchain.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,6 @@ import (
"encoding/hex"
"errors"
"fmt"
"maps"
"math"
"strings"
"sync"
Expand Down Expand Up @@ -756,9 +755,12 @@ func configureNewLedger(
nil,
nil,
nil,
nil, // systemTransactions (ordered slice)
nil, // systemTransactionBodies
nil, // systemTransactionResults
genesisExecutionSnapshot,
output.Events,
nil,
nil, // scheduledTransactionIDs
)
if err != nil {
return nil, nil, err
Expand Down Expand Up @@ -1102,6 +1104,35 @@ func (b *Blockchain) getTransactionResult(txID flowgo.Identifier) (*accessmodel.
return &result, nil
}

func (b *Blockchain) getSystemTransactionResult(blockID flowgo.Identifier, txID flowgo.Identifier) (*accessmodel.TransactionResult, error) {
storedResult, err := b.storage.SystemTransactionResultByID(context.Background(), blockID, txID)
if err != nil {
if errors.Is(err, storage.ErrNotFound) {
return &accessmodel.TransactionResult{
Status: flowgo.TransactionStatusUnknown,
}, nil
}
return nil, err
}

statusCode := 0
if storedResult.ErrorCode > 0 {
statusCode = 1
}
result := accessmodel.TransactionResult{
Status: flowgo.TransactionStatusSealed,
StatusCode: uint(statusCode),
ErrorMessage: storedResult.ErrorMessage,
Events: storedResult.Events,
TransactionID: txID,
BlockHeight: storedResult.BlockHeight,
BlockID: storedResult.BlockID,
CollectionID: storedResult.CollectionID,
}

return &result, nil
}

// GetAccountByIndex returns the account for the given address.
func (b *Blockchain) GetAccountByIndex(index uint) (*flowgo.Account, error) {
generator := flowsdk.NewAddressGenerator(flowsdk.ChainID(b.vmCtx.Chain.ChainID()))
Expand Down Expand Up @@ -1425,24 +1456,28 @@ func (b *Blockchain) commitBlock() (*flowgo.Block, error) {
if len(collections) > 0 {
collectionID = collections[0].ID()
}

// Regular user transactions and results
transactions := b.pendingBlock.Transactions()
transactionResults, err := convertToSealedResults(b.pendingBlock.TransactionResults(), b.pendingBlock.ID(), b.pendingBlock.height, collectionID)
if err != nil {
return nil, err
}

// System transactions and results (stored separately, order preserved)
systemTransactionIDs := []flowgo.Identifier{} // Ordered list of system tx IDs
systemTransactionBodies := make(map[flowgo.Identifier]*flowgo.TransactionBody)
systemTransactionResults := make(map[flowgo.Identifier]*types.StorableTransactionResult)
scheduledTransactionIDs := make(map[uint64]flowgo.Identifier)

// execute scheduled transactions out-of-band before the system chunk
// (does not change collections or payload)
blockContext := fvm.NewContextFromParent(
b.vmCtx,
fvm.WithBlockHeader(block.ToHeader()),
)
systemTransactions := storage.SystemTransactions{
BlockID: b.pendingBlock.ID(),
Transactions: []flowgo.Identifier{},
}
if b.conf.ScheduledTransactionsEnabled {
tx, results, scheduledTxIDs, err := b.executeScheduledTransactions(blockContext)
tx, results, scheduledTxIDs, orderedIDs, err := b.executeScheduledTransactions(blockContext)
if err != nil {
return nil, err
}
Expand All @@ -1451,18 +1486,23 @@ func (b *Blockchain) commitBlock() (*flowgo.Block, error) {
if err != nil {
return nil, err
}
maps.Copy(transactionResults, convertedResults)
for id, result := range convertedResults {
systemTransactionResults[id] = result
}
for id, t := range tx {
transactions[id] = t
systemTransactions.Transactions = append(systemTransactions.Transactions, id)
systemTransactionBodies[id] = t
}

// Append scheduled tx IDs in execution order
systemTransactionIDs = append(systemTransactionIDs, orderedIDs...)

// Store scheduled transaction ID mappings
systemTransactions.ScheduledTransactionIDs = scheduledTxIDs
scheduledTransactionIDs = scheduledTxIDs
}

// lastly we execute the system chunk transaction
chunkBody, itr, err := b.executeSystemChunkTransaction()
// Calculate index for system chunk transaction which executes after all user and scheduled transactions
systemChunkIndex := uint32(len(transactions)) + uint32(len(systemTransactionIDs))
chunkBody, itr, err := b.executeSystemChunkTransaction(systemChunkIndex)
if err != nil {
return nil, err
}
Expand All @@ -1472,9 +1512,9 @@ func (b *Blockchain) commitBlock() (*flowgo.Block, error) {
if err != nil {
return nil, err
}
transactions[systemTxID] = chunkBody
transactionResults[systemTxID] = &systemTxStorableResult
systemTransactions.Transactions = append(systemTransactions.Transactions, systemTxID)
systemTransactionBodies[systemTxID] = chunkBody
systemTransactionResults[systemTxID] = &systemTxStorableResult
systemTransactionIDs = append(systemTransactionIDs, systemTxID)

executionSnapshot := b.pendingBlock.Finalize()
events := b.pendingBlock.Events()
Expand All @@ -1486,15 +1526,18 @@ func (b *Blockchain) commitBlock() (*flowgo.Block, error) {
collections,
transactions,
transactionResults,
systemTransactionIDs,
systemTransactionBodies,
systemTransactionResults,
executionSnapshot,
events,
&systemTransactions)
scheduledTransactionIDs)
if err != nil {
return nil, err
}

// Index scheduled transactions globally (scheduledTxID → blockID)
for scheduledTxID := range systemTransactions.ScheduledTransactionIDs {
for scheduledTxID := range scheduledTransactionIDs {
err = b.storage.IndexScheduledTransactionID(context.Background(), scheduledTxID, block.ID())
if err != nil {
return nil, fmt.Errorf("failed to index scheduled transaction %d: %w", scheduledTxID, err)
Expand Down Expand Up @@ -1866,9 +1909,9 @@ func (b *Blockchain) GetTransactionResultsByBlockID(blockID flowgo.Identifier) (
return nil, fmt.Errorf("failed to get system transactions %w", err)
}
for j, txID := range st.Transactions {
result, err := b.getTransactionResult(txID)
result, err := b.getSystemTransactionResult(blockID, txID)
if err != nil {
return nil, fmt.Errorf("failed to get transaction result [%d] %s: %w", j, txID, err)
return nil, fmt.Errorf("failed to get system transaction result [%d] %s: %w", j, txID, err)
}
results = append(results, result)
}
Expand Down Expand Up @@ -1952,8 +1995,8 @@ func (b *Blockchain) GetSystemTransactionResult(txID flowgo.Identifier, blockID
return nil, &types.TransactionNotFoundError{ID: txID}
}

// Retrieve the transaction result
return b.getTransactionResult(txID)
// Retrieve the system transaction result using composite key (blockID, txID)
return b.getSystemTransactionResult(blockID, txID)
}

// GetScheduledTransactionByBlockID returns a scheduled transaction by its scheduled transaction ID and block ID.
Expand Down Expand Up @@ -2036,8 +2079,8 @@ func (b *Blockchain) GetScheduledTransactionResultByBlockID(scheduledTxID uint64
return nil, &types.TransactionNotFoundError{ID: flowgo.ZeroID}
}

// Retrieve the transaction result
return b.getTransactionResult(txID)
// Retrieve the system transaction result using composite key
return b.getSystemTransactionResult(blockID, txID)
}

// GetScheduledTransactionResult returns the result of a scheduled transaction by its scheduled transaction ID.
Expand Down Expand Up @@ -2070,8 +2113,8 @@ func (b *Blockchain) GetScheduledTransactionResult(scheduledTxID uint64) (*acces
return nil, &types.TransactionNotFoundError{ID: flowgo.ZeroID}
}

// Retrieve the transaction result
return b.getTransactionResult(txID)
// Retrieve the system transaction result using composite key
return b.getSystemTransactionResult(blockID, txID)
}

func (b *Blockchain) GetLogs(identifier flowgo.Identifier) ([]string, error) {
Expand Down Expand Up @@ -2162,7 +2205,7 @@ func (b *Blockchain) systemChunkTransaction() (*flowgo.TransactionBody, error) {
return tx, nil
}

func (b *Blockchain) executeSystemChunkTransaction() (*flowgo.TransactionBody, *IndexedTransactionResult, error) {
func (b *Blockchain) executeSystemChunkTransaction(txIndex uint32) (*flowgo.TransactionBody, *IndexedTransactionResult, error) {
txn, err := b.systemChunkTransaction()
if err != nil {
return nil, nil, err
Expand All @@ -2178,7 +2221,7 @@ func (b *Blockchain) executeSystemChunkTransaction() (*flowgo.TransactionBody, *

executionSnapshot, output, err := b.vm.Run(
ctx,
fvm.Transaction(txn, uint32(len(b.pendingBlock.Transactions()))),
fvm.Transaction(txn, txIndex),
b.pendingBlock.ledgerState,
)
if err != nil {
Expand All @@ -2198,15 +2241,16 @@ func (b *Blockchain) executeSystemChunkTransaction() (*flowgo.TransactionBody, *

itr := &IndexedTransactionResult{
ProcedureOutput: output,
Index: 0,
Index: txIndex,
}
return txn, itr, nil
}

func (b *Blockchain) executeScheduledTransactions(blockContext fvm.Context) (map[flowgo.Identifier]*flowgo.TransactionBody, map[flowgo.Identifier]IndexedTransactionResult, map[uint64]flowgo.Identifier, error) {
func (b *Blockchain) executeScheduledTransactions(blockContext fvm.Context) (map[flowgo.Identifier]*flowgo.TransactionBody, map[flowgo.Identifier]IndexedTransactionResult, map[uint64]flowgo.Identifier, []flowgo.Identifier, error) {
systemTransactions := map[flowgo.Identifier]*flowgo.TransactionBody{}
systemTransactionResults := map[flowgo.Identifier]IndexedTransactionResult{}
scheduledTxIDMap := map[uint64]flowgo.Identifier{} // maps scheduled tx ID to transaction ID
orderedTxIDs := []flowgo.Identifier{} // execution order of system transactions

// disable checks for signatures and keys since we are executing a system transaction
ctx := fvm.NewContextFromParent(
Expand All @@ -2216,62 +2260,71 @@ func (b *Blockchain) executeScheduledTransactions(blockContext fvm.Context) (map
fvm.WithTransactionFeesEnabled(false),
)

// Base index for system transactions (equal to count of user transactions)
systemTxBaseIndex := uint32(len(b.pendingBlock.Transactions()))

// process scheduled transactions out-of-band (do not alter collections)
processTx, err := blueprints.ProcessCallbacksTransaction(b.GetChain())
if err != nil {
return nil, nil, nil, err
return nil, nil, nil, nil, err
}

// Use the real transaction ID from the process transaction
processID := processTx.ID()
systemTransactions[processID] = processTx
orderedTxIDs = append(orderedTxIDs, processID) // First in order

processTxIndex := systemTxBaseIndex
executionSnapshot, output, err := b.vm.Run(
ctx,
fvm.Transaction(processTx, uint32(len(b.pendingBlock.Transactions()))),
fvm.Transaction(processTx, processTxIndex),
b.pendingBlock.ledgerState,
)
if err != nil {
return nil, nil, nil, err
return nil, nil, nil, nil, err
}

systemTransactionResults[processID] = IndexedTransactionResult{
ProcedureOutput: output,
Index: 0,
Index: processTxIndex,
}

// record events and state changes
b.pendingBlock.events = append(b.pendingBlock.events, output.Events...)
if err := b.pendingBlock.ledgerState.Merge(executionSnapshot); err != nil {
return nil, nil, nil, err
return nil, nil, nil, nil, err
}

executeTxs, err := blueprints.ExecuteCallbacksTransactions(b.GetChain(), output.Events)
if err != nil {
return nil, nil, nil, err
return nil, nil, nil, nil, err
}

env := systemcontracts.SystemContractsForChain(b.GetChain().ChainID()).AsTemplateEnv()
scheduledIDs, err := parseScheduledIDs(env, output.Events)
if err != nil {
return nil, nil, nil, err
return nil, nil, nil, nil, err
}

// execute scheduled transactions out-of-band
for idx, tx := range executeTxs {
id := tx.ID()
orderedTxIDs = append(orderedTxIDs, id) // Add in execution order

// Each execute callback gets its own index after the process transaction
executeTxIndex := systemTxBaseIndex + 1 + uint32(idx)
execSnapshot, execOutput, err := b.vm.Run(
ctx,
fvm.Transaction(tx, uint32(len(b.pendingBlock.Transactions()))),
fvm.Transaction(tx, executeTxIndex),
b.pendingBlock.ledgerState,
)
if err != nil {
return nil, nil, nil, err
return nil, nil, nil, nil, err
}

systemTransactionResults[id] = IndexedTransactionResult{
ProcedureOutput: execOutput,
Index: 0,
Index: executeTxIndex,
}
systemTransactions[id] = tx

Expand All @@ -2288,7 +2341,7 @@ func (b *Blockchain) executeScheduledTransactions(blockContext fvm.Context) (map
// Print scheduled transaction result (labeled), including app-level scheduled tx id
schedResult, err := convert.VMTransactionResultToEmulator(tx.ID(), execOutput)
if err != nil {
return nil, nil, nil, err
return nil, nil, nil, nil, err
}
appScheduledID := ""
if idx < len(scheduledIDs) {
Expand All @@ -2298,11 +2351,11 @@ func (b *Blockchain) executeScheduledTransactions(blockContext fvm.Context) (map

b.pendingBlock.events = append(b.pendingBlock.events, execOutput.Events...)
if err := b.pendingBlock.ledgerState.Merge(execSnapshot); err != nil {
return nil, nil, nil, err
return nil, nil, nil, nil, err
}
}

return systemTransactions, systemTransactionResults, scheduledTxIDMap, nil
return systemTransactions, systemTransactionResults, scheduledTxIDMap, orderedTxIDs, nil
}

func (b *Blockchain) GetRegisterValues(registerIDs flowgo.RegisterIDs, height uint64) (values []flowgo.RegisterValue, err error) {
Expand Down
Loading
Loading