Skip to content
Merged
Show file tree
Hide file tree
Changes from 6 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 5 additions & 1 deletion .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -23,4 +23,8 @@ erl_crash.dump
faktory_worker-*.tar

# ignore elixir language server files
/.elixir_ls
/.elixir_ls

.idea/
.DS_Store
.vscode
2 changes: 1 addition & 1 deletion config/config.exs
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
use Mix.Config
import Config

if Mix.env() == :test do
config :logger, backends: []
Expand Down
1 change: 0 additions & 1 deletion lib/faktory_worker.ex
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,6 @@ defmodule FaktoryWorker do
[
{FaktoryWorker.QueueManager, opts},
{FaktoryWorker.Pool, opts},
{FaktoryWorker.PushPipeline, opts},
{FaktoryWorker.JobSupervisor, opts},
{FaktoryWorker.WorkerSupervisor, opts}
]
Expand Down
13 changes: 4 additions & 9 deletions lib/faktory_worker/batch.ex
Original file line number Diff line number Diff line change
Expand Up @@ -7,11 +7,6 @@ defmodule FaktoryWorker.Batch do
all jobs in a batch have completed, Faktory will queue a callback job. This
allows building complex job workflows with dependencies.

Jobs pushed as part of a batch _must_ be pushed synchronously. This can be
done using the `skip_pipeline: true` option when calling `perform_async/2`. If
a job isn't pushed synchronously, you may encounter a race condition where the
batch is committed before all jobs have been pushed.

## Creating a batch

A batch is created using `new!/1` and must provide a description and declare
Expand All @@ -25,9 +20,9 @@ defmodule FaktoryWorker.Batch do
alias FaktoryWorker.Batch

{:ok, bid} = Batch.new!(on_success: {MyApp.EmailReportJob, [], []})
MyApp.Job.perform_async([1, 2], custom: %{"bid" => bid}, skip_pipeline: true)
MyApp.Job.perform_async([3, 4], custom: %{"bid" => bid}, skip_pipeline: true)
MyApp.Job.perform_async([5, 6], custom: %{"bid" => bid}, skip_pipeline: true)
MyApp.Job.perform_async([1, 2], custom: %{"bid" => bid})
MyApp.Job.perform_async([3, 4], custom: %{"bid" => bid})
MyApp.Job.perform_async([5, 6], custom: %{"bid" => bid})
Batch.commit(bid)
```

Expand All @@ -45,7 +40,7 @@ defmodule FaktoryWorker.Batch do
def perform(arg1, arg2, bid) do
Batch.open(bid)

MyApp.OtherJob.perform_async([1, 2], custom: %{"bid" => bid}, skip_pipeline: true)
MyApp.OtherJob.perform_async([1, 2], custom: %{"bid" => bid})

Batch.commit(bid)
end
Expand Down
2 changes: 1 addition & 1 deletion lib/faktory_worker/connection.ex
Original file line number Diff line number Diff line change
Expand Up @@ -115,7 +115,7 @@ defmodule FaktoryWorker.Connection do

defp put_worker_args(args, opts) do
process_wid = Keyword.get(opts, :process_wid)
sys_pid = System.get_pid()
sys_pid = System.pid()
{:ok, hostname} = :inet.gethostname()

worker_args = %{
Expand Down
54 changes: 9 additions & 45 deletions lib/faktory_worker/job.ex
Original file line number Diff line number Diff line change
Expand Up @@ -59,13 +59,10 @@ defmodule FaktoryWorker.Job do

## Synchronous job pushing

By default, jobs are pushed asynchronously to the Faktory server. To ensure a
job has been successfully submitted before continuing, jobs can be pushed
synchronously instead. To do this, pass the `:skip_pipeline` option with the
value of `true` to `perform_async/2`.

Synchronous pushing is required in certain situations to guarantee ordering,
such as when using the Faktory Enterprise batching feature.
Previous version used Broadway to send jobs and `:skip_pipeline` parameter was used to do it synchronously.
`:skip_pipeline` is not supported anymore.
Since Batch operations is a feature of Faktory Enterprise this library now sends any single job synchronously
and makes HTTP call to faktory server (see `FaktoryWorker.Batch`).

## Worker Configuration

Expand All @@ -88,7 +85,7 @@ defmodule FaktoryWorker.Job do
means only values that implement the `Jason.Encoder` protocol are valid when calling the `perform_async/2` function.
"""

alias FaktoryWorker.{ConnectionManager, Random, Pool, Sandbox, Telemetry}
alias FaktoryWorker.{ConnectionManager, Random, Pool, Telemetry}

# Look at supporting the following optional fields when pushing a job
# priority
Expand Down Expand Up @@ -128,37 +125,9 @@ defmodule FaktoryWorker.Job do

@doc false
def perform_async(payload, opts) do
case Keyword.get(opts, :skip_pipeline, false) do
false ->
opts
|> push_pipeline_name()
|> perform_async(payload, opts)

true ->
opts
|> faktory_name()
|> push(payload)
end
end

@doc false
def perform_async(_, {:error, _} = error, _), do: error

def perform_async(pipeline_name, payload, opts) do
if Sandbox.active?() do
Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we want to keep the sandbox, it is what we use to simulate the server during testing.

Copy link
Copy Markdown
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@jeremyowensboggs makes sense, I overdid it with simplifications

Sandbox.enqueue_job(
String.to_existing_atom("Elixir." <> payload.jobtype),
payload.args,
opts
)
else
message = %Broadway.Message{
acknowledger: {FaktoryWorker.PushPipeline.Acknowledger, :push_message, []},
data: {pipeline_name, payload}
}

Broadway.push_messages(pipeline_name, [message])
end
opts
|> faktory_name()
|> push(payload)
end

@doc false
Expand All @@ -170,6 +139,7 @@ defmodule FaktoryWorker.Job do
end

@doc false
def push(_, invalid_payload = {:error, _}), do: invalid_payload
def push(faktory_name, job) do
faktory_name
|> Pool.format_pool_name()
Expand Down Expand Up @@ -215,12 +185,6 @@ defmodule FaktoryWorker.Job do
"The field '#{Atom.to_string(field)}' has an invalid value '#{inspect(value)}'"
end

defp push_pipeline_name(opts) do
opts
|> faktory_name()
|> FaktoryWorker.PushPipeline.format_pipeline_name()
end

defp faktory_name(opts) do
Keyword.get(opts, :faktory_name, FaktoryWorker)
end
Expand Down
45 changes: 0 additions & 45 deletions lib/faktory_worker/push_pipeline.ex

This file was deleted.

21 changes: 0 additions & 21 deletions lib/faktory_worker/push_pipeline/acknowledger.ex

This file was deleted.

21 changes: 0 additions & 21 deletions lib/faktory_worker/push_pipeline/consumer.ex

This file was deleted.

19 changes: 0 additions & 19 deletions lib/faktory_worker/push_pipeline/producer.ex

This file was deleted.

1 change: 0 additions & 1 deletion mix.exs
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,6 @@ defmodule FaktoryWorker.MixProject do

defp deps do
[
{:broadway, "~> 1.0.0"},
{:certifi, "~> 2.5"},
{:excoveralls, "~> 0.10", only: :test},
{:jason, "~> 1.1"},
Expand Down
2 changes: 1 addition & 1 deletion test/faktory_worker/connection_manager_test.exs
Original file line number Diff line number Diff line change
Expand Up @@ -91,7 +91,7 @@ defmodule FaktoryWorker.ConnectionManagerTest do
{{:ok, result}, _} = ConnectionManager.send_command(state, {:push, payload})

assert result == "halt reason"
end) =~ "[warn] [123456] Halt: halt reason"
end) |> String.match?(~r/\[warn.*(?<!ing)\]*\[123456\]*[Halt: halt reason]/)
end

test "should unset the connection when there is a socket failure" do
Expand Down
6 changes: 2 additions & 4 deletions test/faktory_worker/job/batch_integration_test.exs
Original file line number Diff line number Diff line change
Expand Up @@ -34,8 +34,7 @@ defmodule FaktoryWorker.BatchIntegrationTest do

job_opts = [
faktory_name: faktory_name,
custom: %{"bid" => bid},
skip_pipeline: true
custom: %{"bid" => bid}
]

DefaultWorker.perform_async(["1"], job_opts)
Expand Down Expand Up @@ -72,8 +71,7 @@ defmodule FaktoryWorker.BatchIntegrationTest do

job_opts = [
faktory_name: faktory_name,
custom: %{"bid" => bid},
skip_pipeline: true
custom: %{"bid" => bid}
]

DefaultWorker.perform_async(["1"], job_opts)
Expand Down
5 changes: 3 additions & 2 deletions test/faktory_worker/job/job_integration_test.exs
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ defmodule FaktoryWorker.JobIntegrationTest do

setup :flush_faktory!

describe "perform_async/3" do
describe "perform_async/2" do
test "should send a new job to faktory" do
faktory_name = :"Test_#{Random.string()}"

Expand All @@ -21,7 +21,8 @@ defmodule FaktoryWorker.JobIntegrationTest do

job = Job.build_payload(DefaultWorker, %{hey: "there!"}, opts)

Job.perform_async(job, opts)
{:ok, job_sent} = Job.perform_async(job, opts)
assert job_sent == job

assert_queue_size("default", 1)

Expand Down
11 changes: 0 additions & 11 deletions test/faktory_worker/job/job_test.exs
Original file line number Diff line number Diff line change
Expand Up @@ -184,15 +184,4 @@ defmodule FaktoryWorker.Job.JobTest do
end
end

describe "perform_async/3" do
test "should not send a bad payload" do
data = %{hey: "there!"}
opts = [queue: 123]
{:error, _} = payload = Job.build_payload(Test.Worker, data, opts)

{:error, error} = Job.perform_async(TestPipeline, payload, [])

assert error == "The field 'queue' has an invalid value '123'"
end
end
end

This file was deleted.

Loading