Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
62 changes: 32 additions & 30 deletions src/Illuminate/Bus/Batch.php
Original file line number Diff line number Diff line change
Expand Up @@ -242,31 +242,19 @@ public function recordSuccessfulJob(string $jobId)
$counts = $this->decrementPendingJobs($jobId);

if ($this->hasProgressCallbacks()) {
$batch = $this->fresh();

(new Collection($this->options['progress']))->each(function ($handler) use ($batch) {
$this->invokeHandlerCallback($handler, $batch);
});
$this->invokeCallbacks('progress');
}

if ($counts->pendingJobs === 0) {
$this->repository->markAsFinished($this->id);
}

if ($counts->pendingJobs === 0 && $this->hasThenCallbacks()) {
$batch = $this->fresh();

(new Collection($this->options['then']))->each(function ($handler) use ($batch) {
$this->invokeHandlerCallback($handler, $batch);
});
$this->invokeCallbacks('then');
}

if ($counts->allJobsHaveRanExactlyOnce() && $this->hasFinallyCallbacks()) {
$batch = $this->fresh();

(new Collection($this->options['finally']))->each(function ($handler) use ($batch) {
$this->invokeHandlerCallback($handler, $batch);
});
$this->invokeCallbacks('finally');
}
}

Expand All @@ -281,6 +269,18 @@ public function decrementPendingJobs(string $jobId)
return $this->repository->decrementPendingJobs($this->id, $jobId);
}

/**
* Invoke the callbacks of the given type.
*/
protected function invokeCallbacks(string $type, ?Throwable $e = null): void
{
$batch = $this->fresh();

foreach ($this->options[$type] ?? [] as $handler) {
$this->invokeHandlerCallback($handler, $batch, $e);
}
}

/**
* Determine if the batch has finished executing.
*
Expand Down Expand Up @@ -346,28 +346,22 @@ public function recordFailedJob(string $jobId, $e)
$this->cancel();
}

if ($this->hasProgressCallbacks() && $this->allowsFailures()) {
$batch = $this->fresh();
if ($this->allowsFailures()) {
if ($this->hasProgressCallbacks()) {
$this->invokeCallbacks('progress', $e);
}

(new Collection($this->options['progress']))->each(function ($handler) use ($batch, $e) {
$this->invokeHandlerCallback($handler, $batch, $e);
});
if ($this->hasFailureCallbacks()) {
$this->invokeCallbacks('failure', $e);
}
}

if ($counts->failedJobs === 1 && $this->hasCatchCallbacks()) {
$batch = $this->fresh();

(new Collection($this->options['catch']))->each(function ($handler) use ($batch, $e) {
$this->invokeHandlerCallback($handler, $batch, $e);
});
$this->invokeCallbacks('catch', $e);
}

if ($counts->allJobsHaveRanExactlyOnce() && $this->hasFinallyCallbacks()) {
$batch = $this->fresh();

(new Collection($this->options['finally']))->each(function ($handler) use ($batch, $e) {
$this->invokeHandlerCallback($handler, $batch, $e);
});
$this->invokeCallbacks('finally');
}
}

Expand All @@ -392,6 +386,14 @@ public function hasCatchCallbacks()
return isset($this->options['catch']) && ! empty($this->options['catch']);
}

/**
* Determine if the batch has "failure" callbacks.
*/
public function hasFailureCallbacks(): bool
{
return isset($this->options['failure']) && ! empty($this->options['failure']);
}

/**
* Determine if the batch has "finally" callbacks.
*
Expand Down
62 changes: 43 additions & 19 deletions src/Illuminate/Bus/PendingBatch.php
Original file line number Diff line number Diff line change
Expand Up @@ -119,9 +119,7 @@ protected function ensureJobIsBatchable(object|array $job): void
*/
public function before($callback)
{
$this->options['before'][] = $callback instanceof Closure
? new SerializableClosure($callback)
: $callback;
$this->registerCallback('before', $callback);

return $this;
}
Expand All @@ -144,9 +142,7 @@ public function beforeCallbacks()
*/
public function progress($callback)
{
$this->options['progress'][] = $callback instanceof Closure
? new SerializableClosure($callback)
: $callback;
$this->registerCallback('progress', $callback);

return $this;
}
Expand All @@ -169,9 +165,7 @@ public function progressCallbacks()
*/
public function then($callback)
{
$this->options['then'][] = $callback instanceof Closure
? new SerializableClosure($callback)
: $callback;
$this->registerCallback('then', $callback);

return $this;
}
Expand All @@ -194,9 +188,7 @@ public function thenCallbacks()
*/
public function catch($callback)
{
$this->options['catch'][] = $callback instanceof Closure
? new SerializableClosure($callback)
: $callback;
$this->registerCallback('catch', $callback);

return $this;
}
Expand All @@ -219,9 +211,7 @@ public function catchCallbacks()
*/
public function finally($callback)
{
$this->options['finally'][] = $callback instanceof Closure
? new SerializableClosure($callback)
: $callback;
$this->registerCallback('finally', $callback);

return $this;
}
Expand All @@ -237,14 +227,28 @@ public function finallyCallbacks()
}

/**
* Indicate that the batch should not be cancelled when a job within the batch fails.
* Indicate that the batch should not be canceled when a job within the batch fails.
*
* Optionally, add callbacks to be executed upon each job failure.
*
* @template TParam of Closure(\Illuminate\Bus\Batch, \Throwable|null): mixed)|(callable(\Illuminate\Bus\Batch, \Throwable|null): mixed)
*
* @param bool $allowFailures
* @param bool|TParam|array<array-key, TParam> $param
* @return $this
*/
public function allowFailures($allowFailures = true)
public function allowFailures(Closure|callable|array|bool $param = true)
{
$this->options['allowFailures'] = $allowFailures;
if (! is_bool($param)) {
$param = Arr::wrap($param);

foreach ($param as $callback) {
if (is_callable($callback)) {
$this->registerCallback('failure', $callback);
}
}
}

$this->options['allowFailures'] = ! ($param === false);

return $this;
}
Expand All @@ -259,6 +263,26 @@ public function allowsFailures()
return Arr::get($this->options, 'allowFailures', false) === true;
}

/**
* Get the "failure" callbacks that have been registered with the pending batch.
*
* @return array<array-key, Closure|callable>
*/
public function failureCallbacks(): array
{
return $this->options['failure'] ?? [];
}

/**
* Register a callback with proper serialization.
*/
private function registerCallback(string $type, Closure|callable $callback): void
{
$this->options[$type][] = $callback instanceof Closure
? new SerializableClosure($callback)
: $callback;
}

/**
* Set the name for the batch.
*
Expand Down
57 changes: 57 additions & 0 deletions tests/Bus/BusBatchTest.php
Original file line number Diff line number Diff line change
Expand Up @@ -297,6 +297,63 @@ public function test_failed_jobs_can_be_recorded_while_allowing_failures()
$this->assertSame('Something went wrong.', $_SERVER['__catch.exception']->getMessage());
}

public function test_failure_callbacks_execute_correctly(): void
{
$queue = m::mock(Factory::class);

$repository = new DatabaseBatchRepository(new BatchFactory($queue), DB::connection(), 'job_batches');

$pendingBatch = (new PendingBatch(new Container, collect()))
->allowFailures([
static fn (Batch $batch, $e): true => $_SERVER['__failure1.invoked'] = true,
function (Batch $batch, $e) {
$_SERVER['__failure2.invoked'] = true;
},
function (Batch $batch, $e) {
$_SERVER['__failure3.batch'] = $batch;
$_SERVER['__failure3.exception'] = $e;
$_SERVER['__failure3.batch_id'] = $batch->id;
$_SERVER['__failure3.batch_class'] = get_class($batch);
$_SERVER['__failure3.exception_class'] = get_class($e);
$_SERVER['__failure3.exception_message'] = $e->getMessage();
$_SERVER['__failure3.param_count'] = func_num_args();
},
])
->onConnection('test-connection')
->onQueue('test-queue');

$batch = $repository->store($pendingBatch);

$job = new class
{
use Batchable;
};

$queue->shouldReceive('connection')->once()
->with('test-connection')
->andReturn($connection = m::mock(stdClass::class));

$connection->shouldReceive('bulk')->once();

$batch = $batch->add([$job]);

$_SERVER['__failure1.invoked'] = false;
$_SERVER['__failure2.invoked'] = false;
$_SERVER['__failure3.batch'] = null;
$_SERVER['__failure3.exception'] = null;

$batch->recordFailedJob('test-id', new RuntimeException('Comprehensive callback test.'));

$this->assertTrue($_SERVER['__failure1.invoked']);
$this->assertTrue($_SERVER['__failure2.invoked']);
$this->assertInstanceOf(Batch::class, $_SERVER['__failure3.batch']);
$this->assertSame('Comprehensive callback test.', $_SERVER['__failure3.exception']->getMessage());
$this->assertSame($batch->id, $_SERVER['__failure3.batch_id']);
$this->assertSame(Batch::class, $_SERVER['__failure3.batch_class']);
$this->assertSame(RuntimeException::class, $_SERVER['__failure3.exception_class']);
$this->assertEquals(2, $_SERVER['__failure3.param_count']);
}

public function test_batch_can_be_cancelled()
{
$queue = m::mock(Factory::class);
Expand Down
Loading