Skip to content
Merged
Show file tree
Hide file tree
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions packages/core/src/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,7 @@ import Result, { QueryResult, ResultObserver } from './result'
import ConnectionProvider from './connection-provider'
import Connection from './connection'
import Transaction from './transaction'
import ManagedTransaction from './transaction-managed'
import TransactionPromise from './transaction-promise'
import Session, { TransactionConfig } from './session'
import Driver, * as driver from './driver'
Expand Down Expand Up @@ -137,6 +138,7 @@ const forExport = {
Stats,
Result,
Transaction,
ManagedTransaction,
TransactionPromise,
Session,
Driver,
Expand Down Expand Up @@ -196,6 +198,7 @@ export {
ConnectionProvider,
Connection,
Transaction,
ManagedTransaction,
TransactionPromise,
Session,
Driver,
Expand Down
46 changes: 29 additions & 17 deletions packages/core/src/internal/transaction-executor.ts
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ const DEFAULT_RETRY_DELAY_MULTIPLIER = 2.0
const DEFAULT_RETRY_DELAY_JITTER_FACTOR = 0.2

type TransactionCreator = () => TransactionPromise
type TransactionWork<T> = (tx: Transaction) => T | Promise<T>
type TransactionWork<T, Tx = Transaction> = (tx: Tx) => T | Promise<T>
type Resolve<T> = (value: T | PromiseLike<T>) => void
type Reject = (value: any) => void
type Timeout = ReturnType<typeof setTimeout>
Expand Down Expand Up @@ -68,16 +68,18 @@ export class TransactionExecutor {
this._verifyAfterConstruction()
}

execute<T>(
execute<T, Tx = Transaction>(
transactionCreator: TransactionCreator,
transactionWork: TransactionWork<T>
transactionWork: TransactionWork<T, Tx>,
transactionWrapper?: (tx: Transaction) => Tx
): Promise<T> {
return new Promise<T>((resolve, reject) => {
this._executeTransactionInsidePromise(
transactionCreator,
transactionWork,
resolve,
reject
reject,
transactionWrapper
)
}).catch(error => {
const retryStartTimeMs = Date.now()
Expand All @@ -87,7 +89,8 @@ export class TransactionExecutor {
transactionWork,
error,
retryStartTimeMs,
retryDelayMs
retryDelayMs,
transactionWrapper
)
})
}
Expand All @@ -98,12 +101,13 @@ export class TransactionExecutor {
this._inFlightTimeoutIds = []
}

_retryTransactionPromise<T>(
_retryTransactionPromise<T, Tx = Transaction>(
transactionCreator: TransactionCreator,
transactionWork: TransactionWork<T>,
transactionWork: TransactionWork<T, Tx>,
error: Error,
retryStartTime: number,
retryDelayMs: number
retryDelayMs: number,
transactionWrapper?: (tx: Transaction) => Tx
): Promise<T> {
const elapsedTimeMs = Date.now() - retryStartTime

Expand All @@ -122,7 +126,8 @@ export class TransactionExecutor {
transactionCreator,
transactionWork,
resolve,
reject
reject,
transactionWrapper
)
}, nextRetryTime)
// add newly created timeoutId to the list of all in-flight timeouts
Expand All @@ -134,16 +139,18 @@ export class TransactionExecutor {
transactionWork,
error,
retryStartTime,
nextRetryDelayMs
nextRetryDelayMs,
transactionWrapper
)
})
}

async _executeTransactionInsidePromise<T>(
async _executeTransactionInsidePromise<T, Tx = Transaction>(
transactionCreator: TransactionCreator,
transactionWork: TransactionWork<T>,
transactionWork: TransactionWork<T, Tx>,
resolve: Resolve<T>,
reject: Reject
reject: Reject,
transactionWrapper?: (tx: Transaction) => Tx,
): Promise<void> {
let tx: Transaction
try {
Expand All @@ -154,7 +161,12 @@ export class TransactionExecutor {
return
}

const resultPromise = this._safeExecuteTransactionWork(tx, transactionWork)
// The conversion from `tx` as `unknown` then to `Tx` is necessary
// because it is not possible to be sure that `Tx` is a subtype of `Transaction`
// in using static type checking.
const wrap = transactionWrapper || ((tx: Transaction) => tx as unknown as Tx)
const wrappedTx = wrap(tx)
const resultPromise = this._safeExecuteTransactionWork(wrappedTx, transactionWork)

resultPromise
.then(result =>
Expand All @@ -163,9 +175,9 @@ export class TransactionExecutor {
.catch(error => this._handleTransactionWorkFailure(error, tx, reject))
}

_safeExecuteTransactionWork<T>(
tx: Transaction,
transactionWork: TransactionWork<T>
_safeExecuteTransactionWork<T, Tx = Transaction>(
tx: Tx,
transactionWork: TransactionWork<T, Tx>
): Promise<T> {
try {
const result = transactionWork(tx)
Expand Down
72 changes: 72 additions & 0 deletions packages/core/src/session.ts
Original file line number Diff line number Diff line change
Expand Up @@ -32,9 +32,11 @@ import { Query, SessionMode } from './types'
import Connection from './connection'
import { NumberOrInteger } from './graph-types'
import TransactionPromise from './transaction-promise'
import ManagedTransaction from './transaction-managed'

type ConnectionConsumer = (connection: Connection | void) => any | undefined
type TransactionWork<T> = (tx: Transaction) => Promise<T> | T
type ManagedTransactionWork<T> = (tx: ManagedTransaction) => Promise<T> | T

interface TransactionConfig {
timeout?: NumberOrInteger
Expand Down Expand Up @@ -336,6 +338,8 @@ class Session {
* delay of 1 second and maximum retry time of 30 seconds. Maximum retry time is configurable via driver config's
* `maxTransactionRetryTime` property in milliseconds.
*
* @deprecated This method will be removed in version 6.0. Please, use {@link Session#executeRead} instead.
*
* @param {function(tx: Transaction): Promise} transactionWork - Callback that executes operations against
* a given {@link Transaction}.
* @param {TransactionConfig} [transactionConfig] - Configuration for all transactions started to execute the unit of work.
Expand All @@ -358,6 +362,8 @@ class Session {
* delay of 1 second and maximum retry time of 30 seconds. Maximum retry time is configurable via driver config's
* `maxTransactionRetryTime` property in milliseconds.
*
* @deprecated This method will be removed in version 6.0. Please, use {@link Session#executeWrite} instead.
*
* @param {function(tx: Transaction): Promise} transactionWork - Callback that executes operations against
* a given {@link Transaction}.
* @param {TransactionConfig} [transactionConfig] - Configuration for all transactions started to execute the unit of work.
Expand All @@ -383,6 +389,72 @@ class Session {
)
}

/**
* Execute given unit of work in a {@link READ} transaction.
*
* Transaction will automatically be committed unless the given function throws or returns a rejected promise.
* Some failures of the given function or the commit itself will be retried with exponential backoff with initial
* delay of 1 second and maximum retry time of 30 seconds. Maximum retry time is configurable via driver config's
* `maxTransactionRetryTime` property in milliseconds.
*
* @param {function(tx: ManagedTransaction): Promise} transactionWork - Callback that executes operations against
* a given {@link Transaction}.
* @param {TransactionConfig} [transactionConfig] - Configuration for all transactions started to execute the unit of work.
* @return {Promise} Resolved promise as returned by the given function or rejected promise when given
* function or commit fails.
*/
executeRead<T>(
transactionWork: ManagedTransactionWork<T>,
transactionConfig?: TransactionConfig
): Promise<T> {
const config = new TxConfig(transactionConfig)
return this._executeInTransaction(ACCESS_MODE_READ, config, transactionWork)
}

/**
* Execute given unit of work in a {@link WRITE} transaction.
*
* Transaction will automatically be committed unless the given function throws or returns a rejected promise.
* Some failures of the given function or the commit itself will be retried with exponential backoff with initial
* delay of 1 second and maximum retry time of 30 seconds. Maximum retry time is configurable via driver config's
* `maxTransactionRetryTime` property in milliseconds.
*
* @param {function(tx: ManagedTransaction): Promise} transactionWork - Callback that executes operations against
* a given {@link Transaction}.
* @param {TransactionConfig} [transactionConfig] - Configuration for all transactions started to execute the unit of work.
* @return {Promise} Resolved promise as returned by the given function or rejected promise when given
* function or commit fails.
*/
executeWrite<T>(
transactionWork: ManagedTransactionWork<T>,
transactionConfig?: TransactionConfig
): Promise<T> {
const config = new TxConfig(transactionConfig)
return this._executeInTransaction(ACCESS_MODE_WRITE, config, transactionWork)
}

/**
* @private
* @param {SessionMode} accessMode
* @param {TxConfig} transactionConfig
* @param {ManagedTransactionWork} transactionWork
* @returns {Promise}
*/
private _executeInTransaction<T>(
accessMode: SessionMode,
transactionConfig: TxConfig,
transactionWork: ManagedTransactionWork<T>
): Promise<T> {
return this._transactionExecutor.execute(
() => this._beginTransaction(accessMode, transactionConfig),
transactionWork,
tx => new ManagedTransaction({
isOpen: tx.isOpen.bind(tx),
run: tx.run.bind(tx),
})
)
}

/**
* Sets the resolved database name in the session context.
* @private
Expand Down
72 changes: 72 additions & 0 deletions packages/core/src/transaction-managed.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,72 @@
/**
* Copyright (c) "Neo4j"
* Neo4j Sweden AB [http://neo4j.com]
*
* This file is part of Neo4j.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

import Result from './result'
import { Query } from './types'

interface Run {
(query: Query, parameters?: any): Result
}

interface IsOpen {
(): boolean
}

/**
* Represents a transaction that is managed by the transaction executor.
*
* @public
*/
class ManagedTransaction {
private _run: Run
private _isOpen: IsOpen

constructor({ run, isOpen }: { run: Run, isOpen: IsOpen }) {
/**
* @private
*/
this._run = run
/**
* @private
*/
this._isOpen = isOpen
}

/**
* Run Cypher query
* Could be called with a query object i.e.: `{text: "MATCH ...", parameters: {param: 1}}`
* or with the query and parameters as separate arguments.
* @param {mixed} query - Cypher query to execute
* @param {Object} parameters - Map with parameters to use in query
* @return {Result} New Result
*/
run(query: Query, parameters?: any): Result {
return this._run(query, parameters)
}

/**
* Check if this transaction is active, which means commit and rollback did not happen.
* @return {boolean} `true` when not committed and not rolled back, `false` otherwise.
*/
isOpen(): boolean {
return this._isOpen()
}
}

export default ManagedTransaction
2 changes: 2 additions & 0 deletions packages/core/src/transaction-promise.ts
Original file line number Diff line number Diff line change
Expand Up @@ -170,6 +170,7 @@ class TransactionPromise extends Transaction implements Promise<Transaction>{

/**
* @access private
* @returns {void}
*/
private _onBeginError(error: Error): void {
this._beginError = error;
Expand All @@ -180,6 +181,7 @@ class TransactionPromise extends Transaction implements Promise<Transaction>{

/**
* @access private
* @returns {void}
*/
private _onBeginMetadata(metadata: any): void {
this._beginMetadata = metadata || {};
Expand Down
Loading