Skip to content
Merged
Show file tree
Hide file tree
Changes from 13 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
77 changes: 57 additions & 20 deletions adapters/access.go
Original file line number Diff line number Diff line change
Expand Up @@ -434,12 +434,56 @@ func (a *AccessAdapter) GetExecutionResultByID(_ context.Context, _ flowgo.Ident
return nil, nil
}

func (a *AccessAdapter) GetSystemTransaction(_ context.Context, _ flowgo.Identifier, _ flowgo.Identifier) (*flowgo.TransactionBody, error) {
return nil, nil
func (a *AccessAdapter) GetSystemTransaction(_ context.Context, txID flowgo.Identifier, blockID flowgo.Identifier) (*flowgo.TransactionBody, error) {
tx, err := a.emulator.GetSystemTransaction(txID, blockID)
if err != nil {
return nil, convertError(err, codes.NotFound)
}

return tx, nil
}

func (a *AccessAdapter) GetSystemTransactionResult(_ context.Context, _ flowgo.Identifier, _ flowgo.Identifier, _ entities.EventEncodingVersion) (*accessmodel.TransactionResult, error) {
return nil, nil
func (a *AccessAdapter) GetSystemTransactionResult(_ context.Context, txID flowgo.Identifier, blockID flowgo.Identifier, encodingVersion entities.EventEncodingVersion) (*accessmodel.TransactionResult, error) {
result, err := a.emulator.GetSystemTransactionResult(txID, blockID)
if err != nil {
return nil, convertError(err, codes.NotFound)
}

// Convert CCF events to JSON events, else return CCF encoded version
if encodingVersion == entities.EventEncodingVersion_JSON_CDC_V0 {
result.Events, err = ConvertCCFEventsToJsonEvents(result.Events)
if err != nil {
return nil, status.Error(codes.Internal, fmt.Sprintf("failed to convert events: %v", err))
}
}

return result, nil
}

func (a *AccessAdapter) GetScheduledTransaction(_ context.Context, scheduledTxID uint64) (*flowgo.TransactionBody, error) {
tx, err := a.emulator.GetScheduledTransaction(scheduledTxID)
if err != nil {
return nil, convertError(err, codes.NotFound)
}

return tx, nil
}

func (a *AccessAdapter) GetScheduledTransactionResult(_ context.Context, scheduledTxID uint64, encodingVersion entities.EventEncodingVersion) (*accessmodel.TransactionResult, error) {
result, err := a.emulator.GetScheduledTransactionResult(scheduledTxID)
if err != nil {
return nil, convertError(err, codes.NotFound)
}

// Convert CCF events to JSON events, else return CCF encoded version
if encodingVersion == entities.EventEncodingVersion_JSON_CDC_V0 {
result.Events, err = ConvertCCFEventsToJsonEvents(result.Events)
if err != nil {
return nil, status.Error(codes.Internal, fmt.Sprintf("failed to convert events: %v", err))
}
}

return result, nil
}

func (a *AccessAdapter) GetAccountBalanceAtLatestBlock(_ context.Context, address flowgo.Address) (uint64, error) {
Expand Down Expand Up @@ -654,7 +698,7 @@ func (a *AccessAdapter) subscribeBlocksFromStartBlockID(ctx context.Context, sta
if err != nil {
return subscription.NewFailedSubscription(err, "could not get block by ID")
}

emulatorBlockchain, ok := a.emulator.(*emulator.Blockchain)
if !ok {
return subscription.NewFailedSubscription(fmt.Errorf("emulator is not a Blockchain"), "invalid emulator type")
Expand All @@ -681,7 +725,7 @@ func (a *AccessAdapter) subscribeBlocksFromLatest(ctx context.Context, getData s
if err != nil {
return subscription.NewFailedSubscription(err, "could not get latest block")
}

emulatorBlockchain, ok := a.emulator.(*emulator.Blockchain)
if !ok {
return subscription.NewFailedSubscription(fmt.Errorf("emulator is not a Blockchain"), "invalid emulator type")
Expand Down Expand Up @@ -787,7 +831,7 @@ func (a *AccessAdapter) SubscribeTransactionStatuses(ctx context.Context, txID f
if err != nil {
return subscription.NewFailedSubscription(err, "failed to lookup latest block")
}

return a.createTransactionSubscription(ctx, txID, latestBlock.ID(), flowgo.ZeroID, requiredEventEncodingVersion)
}

Expand All @@ -800,7 +844,7 @@ func (a *AccessAdapter) SubscribeTransactionStatusesFromStartHeight(ctx context.
if err != nil {
return subscription.NewFailedSubscription(err, "failed to get start block")
}

return a.createTransactionSubscription(ctx, txID, block.ID(), flowgo.ZeroID, requiredEventEncodingVersion)
}

Expand All @@ -809,7 +853,7 @@ func (a *AccessAdapter) SubscribeTransactionStatusesFromLatest(ctx context.Conte
if err != nil {
return subscription.NewFailedSubscription(err, "failed to lookup latest block")
}

return a.createTransactionSubscription(ctx, txID, latestBlock.ID(), flowgo.ZeroID, requiredEventEncodingVersion)
}

Expand Down Expand Up @@ -857,7 +901,7 @@ func (a *AccessAdapter) getTransactionStatusResponse(
requiredEventEncodingVersion entities.EventEncodingVersion,
) subscription.GetDataByHeightFunc {
lastStatus := flowgo.TransactionStatusUnknown

return func(ctx context.Context, height uint64) (interface{}, error) {
// Check if block is ready
if err := a.validateHeight(height, flowgo.BlockStatusSealed); err != nil {
Expand All @@ -880,7 +924,7 @@ func (a *AccessAdapter) getTransactionStatusResponse(
TransactionID: txID,
}}, nil
}

// Otherwise, transaction is still pending/unknown
if lastStatus == flowgo.TransactionStatusUnknown {
return nil, nil // Don't send duplicate unknown status
Expand Down Expand Up @@ -928,14 +972,14 @@ func (a *AccessAdapter) generateTransactionStatusUpdates(
Status: status,
TransactionID: txResult.TransactionID,
}

// Add block info for finalized and later statuses
if status >= flowgo.TransactionStatusFinalized {
result.BlockID = txResult.BlockID
result.BlockHeight = txResult.BlockHeight
result.CollectionID = txResult.CollectionID
}

// Add execution details for executed and sealed statuses
if status >= flowgo.TransactionStatusExecuted {
result.Events = txResult.Events
Expand All @@ -961,10 +1005,3 @@ func ConvertCCFEventsToJsonEvents(events []flowgo.Event) ([]flowgo.Event, error)

return converted, nil
}

func (a *AccessAdapter) GetScheduledTransaction(_ context.Context, _ uint64) (*flowgo.TransactionBody, error) {
return nil, nil
}
func (a *AccessAdapter) GetScheduledTransactionResult(_ context.Context, _ uint64, _ entities.EventEncodingVersion) (*accessmodel.TransactionResult, error) {
return nil, nil
}
5 changes: 2 additions & 3 deletions adapters/streaming_integration_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -186,7 +186,7 @@ func TestStreamingTransactionStatuses_Integration(t *testing.T) {
SetProposalKey(serviceAddress, serviceKey.Index, serviceKey.SequenceNumber).
SetPayer(serviceAddress).
AddAuthorizer(serviceAddress)

tx, err := txBuilder.Build()
require.NoError(t, err)

Expand Down Expand Up @@ -240,7 +240,7 @@ func TestStreamingTransactionStatuses_Integration(t *testing.T) {
}

done:
assert.True(t, statusesReceived[flowgo.TransactionStatusSealed],
assert.True(t, statusesReceived[flowgo.TransactionStatusSealed],
"should receive sealed status. Got: %v", statusesReceived)
}

Expand Down Expand Up @@ -287,4 +287,3 @@ func TestStreamingMultipleBlocks_Integration(t *testing.T) {

assert.Equal(t, expectedBlocks, blocksReceived)
}

2 changes: 2 additions & 0 deletions convert/emu.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ func ToStorableResult(
output fvm.ProcedureOutput,
blockID flowgo.Identifier,
blockHeight uint64,
collectionID flowgo.Identifier,
) (
types.StorableTransactionResult,
error,
Expand All @@ -48,5 +49,6 @@ func ToStorableResult(
ErrorMessage: errorMessage,
Logs: output.Logs,
Events: output.Events,
CollectionID: collectionID,
}, nil
}
Loading
Loading