diff --git a/src/Illuminate/Bus/Batch.php b/src/Illuminate/Bus/Batch.php index 717d1c4ab11d..5546a57e54f0 100644 --- a/src/Illuminate/Bus/Batch.php +++ b/src/Illuminate/Bus/Batch.php @@ -242,11 +242,7 @@ public function recordSuccessfulJob(string $jobId) $counts = $this->decrementPendingJobs($jobId); if ($this->hasProgressCallbacks()) { - $batch = $this->fresh(); - - (new Collection($this->options['progress']))->each(function ($handler) use ($batch) { - $this->invokeHandlerCallback($handler, $batch); - }); + $this->invokeCallbacks('progress'); } if ($counts->pendingJobs === 0) { @@ -254,19 +250,11 @@ public function recordSuccessfulJob(string $jobId) } if ($counts->pendingJobs === 0 && $this->hasThenCallbacks()) { - $batch = $this->fresh(); - - (new Collection($this->options['then']))->each(function ($handler) use ($batch) { - $this->invokeHandlerCallback($handler, $batch); - }); + $this->invokeCallbacks('then'); } if ($counts->allJobsHaveRanExactlyOnce() && $this->hasFinallyCallbacks()) { - $batch = $this->fresh(); - - (new Collection($this->options['finally']))->each(function ($handler) use ($batch) { - $this->invokeHandlerCallback($handler, $batch); - }); + $this->invokeCallbacks('finally'); } } @@ -281,6 +269,18 @@ public function decrementPendingJobs(string $jobId) return $this->repository->decrementPendingJobs($this->id, $jobId); } + /** + * Invoke the callbacks of the given type. + */ + protected function invokeCallbacks(string $type, ?Throwable $e = null): void + { + $batch = $this->fresh(); + + foreach ($this->options[$type] ?? [] as $handler) { + $this->invokeHandlerCallback($handler, $batch, $e); + } + } + /** * Determine if the batch has finished executing. * @@ -346,28 +346,22 @@ public function recordFailedJob(string $jobId, $e) $this->cancel(); } - if ($this->hasProgressCallbacks() && $this->allowsFailures()) { - $batch = $this->fresh(); + if ($this->allowsFailures()) { + if ($this->hasProgressCallbacks()) { + $this->invokeCallbacks('progress', $e); + } - (new Collection($this->options['progress']))->each(function ($handler) use ($batch, $e) { - $this->invokeHandlerCallback($handler, $batch, $e); - }); + if ($this->hasFailureCallbacks()) { + $this->invokeCallbacks('failure', $e); + } } if ($counts->failedJobs === 1 && $this->hasCatchCallbacks()) { - $batch = $this->fresh(); - - (new Collection($this->options['catch']))->each(function ($handler) use ($batch, $e) { - $this->invokeHandlerCallback($handler, $batch, $e); - }); + $this->invokeCallbacks('catch', $e); } if ($counts->allJobsHaveRanExactlyOnce() && $this->hasFinallyCallbacks()) { - $batch = $this->fresh(); - - (new Collection($this->options['finally']))->each(function ($handler) use ($batch, $e) { - $this->invokeHandlerCallback($handler, $batch, $e); - }); + $this->invokeCallbacks('finally'); } } @@ -392,6 +386,14 @@ public function hasCatchCallbacks() return isset($this->options['catch']) && ! empty($this->options['catch']); } + /** + * Determine if the batch has "failure" callbacks. + */ + public function hasFailureCallbacks(): bool + { + return isset($this->options['failure']) && ! empty($this->options['failure']); + } + /** * Determine if the batch has "finally" callbacks. * diff --git a/src/Illuminate/Bus/PendingBatch.php b/src/Illuminate/Bus/PendingBatch.php index 9538074d7be4..67c7b2082915 100644 --- a/src/Illuminate/Bus/PendingBatch.php +++ b/src/Illuminate/Bus/PendingBatch.php @@ -119,9 +119,7 @@ protected function ensureJobIsBatchable(object|array $job): void */ public function before($callback) { - $this->options['before'][] = $callback instanceof Closure - ? new SerializableClosure($callback) - : $callback; + $this->registerCallback('before', $callback); return $this; } @@ -144,9 +142,7 @@ public function beforeCallbacks() */ public function progress($callback) { - $this->options['progress'][] = $callback instanceof Closure - ? new SerializableClosure($callback) - : $callback; + $this->registerCallback('progress', $callback); return $this; } @@ -169,9 +165,7 @@ public function progressCallbacks() */ public function then($callback) { - $this->options['then'][] = $callback instanceof Closure - ? new SerializableClosure($callback) - : $callback; + $this->registerCallback('then', $callback); return $this; } @@ -194,9 +188,7 @@ public function thenCallbacks() */ public function catch($callback) { - $this->options['catch'][] = $callback instanceof Closure - ? new SerializableClosure($callback) - : $callback; + $this->registerCallback('catch', $callback); return $this; } @@ -219,9 +211,7 @@ public function catchCallbacks() */ public function finally($callback) { - $this->options['finally'][] = $callback instanceof Closure - ? new SerializableClosure($callback) - : $callback; + $this->registerCallback('finally', $callback); return $this; } @@ -237,14 +227,28 @@ public function finallyCallbacks() } /** - * Indicate that the batch should not be cancelled when a job within the batch fails. + * Indicate that the batch should not be canceled when a job within the batch fails. + * + * Optionally, add callbacks to be executed upon each job failure. + * + * @template TParam of Closure(\Illuminate\Bus\Batch, \Throwable|null): mixed)|(callable(\Illuminate\Bus\Batch, \Throwable|null): mixed) * - * @param bool $allowFailures + * @param bool|TParam|array $param * @return $this */ - public function allowFailures($allowFailures = true) + public function allowFailures(Closure|callable|array|bool $param = true) { - $this->options['allowFailures'] = $allowFailures; + if (! is_bool($param)) { + $param = Arr::wrap($param); + + foreach ($param as $callback) { + if (is_callable($callback)) { + $this->registerCallback('failure', $callback); + } + } + } + + $this->options['allowFailures'] = ! ($param === false); return $this; } @@ -259,6 +263,26 @@ public function allowsFailures() return Arr::get($this->options, 'allowFailures', false) === true; } + /** + * Get the "failure" callbacks that have been registered with the pending batch. + * + * @return array + */ + public function failureCallbacks(): array + { + return $this->options['failure'] ?? []; + } + + /** + * Register a callback with proper serialization. + */ + private function registerCallback(string $type, Closure|callable $callback): void + { + $this->options[$type][] = $callback instanceof Closure + ? new SerializableClosure($callback) + : $callback; + } + /** * Set the name for the batch. * diff --git a/tests/Bus/BusBatchTest.php b/tests/Bus/BusBatchTest.php index 855ce91bb2b3..22545db0ddb2 100644 --- a/tests/Bus/BusBatchTest.php +++ b/tests/Bus/BusBatchTest.php @@ -297,6 +297,63 @@ public function test_failed_jobs_can_be_recorded_while_allowing_failures() $this->assertSame('Something went wrong.', $_SERVER['__catch.exception']->getMessage()); } + public function test_failure_callbacks_execute_correctly(): void + { + $queue = m::mock(Factory::class); + + $repository = new DatabaseBatchRepository(new BatchFactory($queue), DB::connection(), 'job_batches'); + + $pendingBatch = (new PendingBatch(new Container, collect())) + ->allowFailures([ + static fn (Batch $batch, $e): true => $_SERVER['__failure1.invoked'] = true, + function (Batch $batch, $e) { + $_SERVER['__failure2.invoked'] = true; + }, + function (Batch $batch, $e) { + $_SERVER['__failure3.batch'] = $batch; + $_SERVER['__failure3.exception'] = $e; + $_SERVER['__failure3.batch_id'] = $batch->id; + $_SERVER['__failure3.batch_class'] = get_class($batch); + $_SERVER['__failure3.exception_class'] = get_class($e); + $_SERVER['__failure3.exception_message'] = $e->getMessage(); + $_SERVER['__failure3.param_count'] = func_num_args(); + }, + ]) + ->onConnection('test-connection') + ->onQueue('test-queue'); + + $batch = $repository->store($pendingBatch); + + $job = new class + { + use Batchable; + }; + + $queue->shouldReceive('connection')->once() + ->with('test-connection') + ->andReturn($connection = m::mock(stdClass::class)); + + $connection->shouldReceive('bulk')->once(); + + $batch = $batch->add([$job]); + + $_SERVER['__failure1.invoked'] = false; + $_SERVER['__failure2.invoked'] = false; + $_SERVER['__failure3.batch'] = null; + $_SERVER['__failure3.exception'] = null; + + $batch->recordFailedJob('test-id', new RuntimeException('Comprehensive callback test.')); + + $this->assertTrue($_SERVER['__failure1.invoked']); + $this->assertTrue($_SERVER['__failure2.invoked']); + $this->assertInstanceOf(Batch::class, $_SERVER['__failure3.batch']); + $this->assertSame('Comprehensive callback test.', $_SERVER['__failure3.exception']->getMessage()); + $this->assertSame($batch->id, $_SERVER['__failure3.batch_id']); + $this->assertSame(Batch::class, $_SERVER['__failure3.batch_class']); + $this->assertSame(RuntimeException::class, $_SERVER['__failure3.exception_class']); + $this->assertEquals(2, $_SERVER['__failure3.param_count']); + } + public function test_batch_can_be_cancelled() { $queue = m::mock(Factory::class); diff --git a/tests/Bus/BusPendingBatchTest.php b/tests/Bus/BusPendingBatchTest.php index baed6e2726b6..8400a15077a5 100644 --- a/tests/Bus/BusPendingBatchTest.php +++ b/tests/Bus/BusPendingBatchTest.php @@ -259,6 +259,144 @@ function () { ); $this->expectNotToPerformAssertions(); } + + public function test_allow_failures_with_boolean_true_enables_failure_tolerance(): void + { + $batch = new PendingBatch(new Container, new Collection([new BatchableJob])); + + $result = $batch->allowFailures(true); + + $this->assertSame($batch, $result); + $this->assertTrue($batch->options['allowFailures']); + $this->assertEmpty($batch->failureCallbacks()); + } + + public function test_allow_failures_with_boolean_false_disables_failure_tolerance(): void + { + $batch = new PendingBatch(new Container, new Collection([new BatchableJob])); + + $result = $batch->allowFailures(false); + + $this->assertSame($batch, $result); + $this->assertFalse($batch->options['allowFailures']); + $this->assertEmpty($batch->failureCallbacks()); + } + + public function test_allow_failures_with_single_closure_registers_callback(): void + { + $batch = new PendingBatch(new Container, new Collection([new BatchableJob])); + + $result = $batch->allowFailures(static fn (): true => true); + + $this->assertSame($batch, $result); + $this->assertTrue($batch->options['allowFailures']); + $this->assertCount(1, $batch->failureCallbacks()); + } + + public function test_allow_failures_with_single_callable_registers_callback(): void + { + $batch = new PendingBatch(new Container, new Collection([new BatchableJob])); + + $result = $batch->allowFailures('strlen'); + + $this->assertSame($batch, $result); + $this->assertTrue($batch->options['allowFailures']); + $this->assertCount(1, $batch->failureCallbacks()); + } + + public function test_allow_failures_with_array_of_callables_registers_multiple_callbacks(): void + { + $batch = new PendingBatch(new Container, new Collection([new BatchableJob])); + + $result = $batch->allowFailures([ + static fn (): true => true, + 'strlen', + [$batch, 'failureCallbacks'], + strlen(...), + ]); + + $this->assertSame($batch, $result); + $this->assertTrue($batch->options['allowFailures']); + $this->assertCount(4, $batch->failureCallbacks()); + } + + public function test_allow_failures_registers_only_valid_callbacks(): void + { + $batch = new PendingBatch(new Container, new Collection([new BatchableJob])); + + $result = $batch->allowFailures([ + // 3 valid + static fn (): true => true, + 'strlen', + [$batch, 'failureCallbacks'], + // 5 invalid + 'invalid_function_name', + 123, + null, + [], + new stdClass, + ]); + + $this->assertSame($batch, $result); + $this->assertTrue($batch->options['allowFailures']); + $this->assertCount(3, $batch->failureCallbacks()); + } + + public function test_allow_failures_with_empty_array_enables_tolerance_without_callbacks(): void + { + $batch = new PendingBatch(new Container, new Collection([new BatchableJob])); + + $result = $batch->allowFailures([]); + + $this->assertSame($batch, $result); + $this->assertTrue($batch->options['allowFailures']); + $this->assertEmpty($batch->failureCallbacks()); + } + + public function test_allow_failures_is_chainable(): void + { + $batch = new PendingBatch(new Container, new Collection([new BatchableJob])); + + $this->assertSame($batch, $batch->allowFailures(true)); + $this->assertSame($batch, $batch->allowFailures(false)); + $this->assertSame($batch, $batch->allowFailures(static fn (): true => true)); + $this->assertSame($batch, $batch->allowFailures('strlen')); + $this->assertSame($batch, $batch->allowFailures([static fn (): true => true, 'strlen'])); + $this->assertSame($batch, $batch->allowFailures([])); + } + + public function test_failure_callbacks_accessor_returns_registered_callbacks(): void + { + $batch = new PendingBatch(new Container, new Collection([new BatchableJob])); + + $this->assertEmpty($batch->failureCallbacks()); + + $batch->allowFailures( + static fn (): true => true + ); + + $this->assertCount(1, $batch->failureCallbacks()); + + $freshBatch = new PendingBatch(new Container, new Collection([new BatchableJob])); + + $freshBatch->allowFailures([ + 'strlen', + [$freshBatch, 'failureCallbacks'], + ]); + + $this->assertCount(2, $freshBatch->failureCallbacks()); + + $anotherBatch = new PendingBatch(new Container, new Collection([new BatchableJob])); + + $anotherBatch->allowFailures([ + static fn (): false => false, + 'trim', + 123, + 'invalid_function', + ]); + + $this->assertCount(2, $anotherBatch->failureCallbacks()); + } } class BatchableJob