diff --git a/.gitignore b/.gitignore new file mode 100644 index 0000000..8b83339 --- /dev/null +++ b/.gitignore @@ -0,0 +1,2 @@ +*.beam +ebin/sidejob.app diff --git a/rebar b/rebar new file mode 100755 index 0000000..ce3af71 Binary files /dev/null and b/rebar differ diff --git a/rebar.config b/rebar.config new file mode 100644 index 0000000..b781578 --- /dev/null +++ b/rebar.config @@ -0,0 +1,5 @@ +{erl_opts, [debug_info, warnings_as_errors]}. +{cover_enabled, true}. +{eunit_opts, [verbose]}. +{edoc_opts, [{preprocess, true}]}. +{deps, []}. diff --git a/src/sidejob.app.src b/src/sidejob.app.src new file mode 100644 index 0000000..8f14cf7 --- /dev/null +++ b/src/sidejob.app.src @@ -0,0 +1,13 @@ +{application, sidejob, + [ + {description, ""}, + {vsn, "0.0.1"}, + {registered, []}, + {applications, [ + kernel, + stdlib + ]}, + {registered, []}, + {mod, {sidejob_app, []}}, + {env, []} + ]}. diff --git a/src/sidejob.erl b/src/sidejob.erl new file mode 100644 index 0000000..0f67eee --- /dev/null +++ b/src/sidejob.erl @@ -0,0 +1,132 @@ +%% ------------------------------------------------------------------- +%% +%% Copyright (c) 2013 Basho Technologies, Inc. All Rights Reserved. +%% +%% This file is provided to you under the Apache License, +%% Version 2.0 (the "License"); you may not use this file +%% except in compliance with the License. You may obtain +%% a copy of the License at +%% +%% http://www.apache.org/licenses/LICENSE-2.0 +%% +%% Unless required by applicable law or agreed to in writing, +%% software distributed under the License is distributed on an +%% "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +%% KIND, either express or implied. See the License for the +%% specific language governing permissions and limitations +%% under the License. +%% +%% ------------------------------------------------------------------- +-module(sidejob). +-export([new_resource/3, new_resource/4, call/2, call/3, cast/2]). + +%%%=================================================================== +%%% API +%%%=================================================================== + +%% @doc +%% Create a new sidejob resource that uses the provided worker module, +%% enforces the requested usage limit, and is managed by the specified +%% number of worker processes. +%% +%% This call will generate and load a new module, via {@link sidejob_config}, +%% that provides information about the new resource. It will also start up the +%% supervision hierarchy that manages this resource: ensuring that the workers +%% and stats aggregation server for this resource remain running. +new_resource(Name, Mod, Limit, Workers) -> + ETS = sidejob_resource_sup:ets(Name), + StatsETS = sidejob_resource_sup:stats_ets(Name), + WorkerNames = sidejob_worker:workers(Name, Workers), + StatsName = sidejob_resource_stats:reg_name(Name), + WorkerLimit = Limit div Workers, + sidejob_config:load_config(Name, [{width, Workers}, + {limit, Limit}, + {worker_limit, WorkerLimit}, + {ets, ETS}, + {stats_ets, StatsETS}, + {workers, list_to_tuple(WorkerNames)}, + {stats, StatsName}]), + sidejob_sup:add_resource(Name, Mod). + +%% @doc +%% Same as {@link new_resource/4} except that the number of workers defaults +%% to the number of scheduler threads. +new_resource(Name, Mod, Limit) -> + Workers = erlang:system_info(schedulers), + new_resource(Name, Mod, Limit, Workers). + + +%% @doc +%% Same as {@link call/3} with a default timeout of 5 seconds. +call(Name, Msg) -> + call(Name, Msg, 5000). + +%% @doc +%% Perform a synchronous call to the specified resource, failing if the +%% resource has reached its usage limit. +call(Name, Msg, Timeout) -> + case available(Name) of + none -> + overload; + Worker -> + gen_server:call(Worker, Msg, Timeout) + end. + +%% @doc +%% Perform an asynchronous cast to the specified resource, failing if the +%% resource has reached its usage limit. +cast(Name, Msg) -> + case available(Name) of + none -> + overload; + Worker -> + gen_server:cast(Worker, Msg) + end. + +%%%=================================================================== +%%% Internal functions +%%%=================================================================== + +%% Find an available worker or return none if all workers at limit +available(Name) -> + ETS = Name:ets(), + Width = Name:width(), + Limit = Name:worker_limit(), + Scheduler = erlang:system_info(scheduler_id), + Worker = Scheduler rem Width, + case is_available(ETS, Limit, Worker) of + true -> + worker_reg_name(Name, Worker); + false -> + available(Name, ETS, Width, Limit, Worker+1, Worker) + end. + +available(Name, _ETS, _Width, _Limit, End, End) -> + ets:update_counter(Name:stats_ets(), rejected, 1), + none; +available(Name, ETS, Width, Limit, X, End) -> + Worker = X rem Width, + case is_available(ETS, Limit, Worker) of + false -> + available(Name, ETS, Width, Limit, Worker+1, End); + true -> + worker_reg_name(Name, Worker) + end. + +is_available(ETS, Limit, Worker) -> + case ets:lookup_element(ETS, {full, Worker}, 2) of + 1 -> + false; + 0 -> + Value = ets:update_counter(ETS, Worker, 1), + case Value >= Limit of + true -> + ets:insert(ETS, {{full, Worker}, 1}), + false; + false -> + true + end + end. + +worker_reg_name(Name, Id) -> + element(Id+1, Name:workers()). diff --git a/src/sidejob_app.erl b/src/sidejob_app.erl new file mode 100644 index 0000000..cecb3c9 --- /dev/null +++ b/src/sidejob_app.erl @@ -0,0 +1,48 @@ +%% ------------------------------------------------------------------- +%% +%% Copyright (c) 2013 Basho Technologies, Inc. All Rights Reserved. +%% +%% This file is provided to you under the Apache License, +%% Version 2.0 (the "License"); you may not use this file +%% except in compliance with the License. You may obtain +%% a copy of the License at +%% +%% http://www.apache.org/licenses/LICENSE-2.0 +%% +%% Unless required by applicable law or agreed to in writing, +%% software distributed under the License is distributed on an +%% "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +%% KIND, either express or implied. See the License for the +%% specific language governing permissions and limitations +%% under the License. +%% +%% ------------------------------------------------------------------- +-module(sidejob_app). +-behaviour(application). + +%% Application callbacks +-export([start/2, start_phase/3, prep_stop/1, stop/1, config_change/3]). + +%%%=================================================================== +%%% Application callbacks +%%%=================================================================== + +start(_Type, _StartArgs) -> + case sidejob_sup:start_link() of + {ok, Pid} -> + {ok, Pid}; + Error -> + Error + end. + +stop(_State) -> + ok. + +start_phase(_Phase, _Type, _PhaseArgs) -> + ok. + +prep_stop(State) -> + State. + +config_change(_Changed, _New, _Removed) -> + ok. diff --git a/src/sidejob_config.erl b/src/sidejob_config.erl new file mode 100644 index 0000000..4775b49 --- /dev/null +++ b/src/sidejob_config.erl @@ -0,0 +1,66 @@ +%% ------------------------------------------------------------------- +%% +%% Copyright (c) 2013 Basho Technologies, Inc. All Rights Reserved. +%% +%% This file is provided to you under the Apache License, +%% Version 2.0 (the "License"); you may not use this file +%% except in compliance with the License. You may obtain +%% a copy of the License at +%% +%% http://www.apache.org/licenses/LICENSE-2.0 +%% +%% Unless required by applicable law or agreed to in writing, +%% software distributed under the License is distributed on an +%% "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +%% KIND, either express or implied. See the License for the +%% specific language governing permissions and limitations +%% under the License. +%% +%% ------------------------------------------------------------------- + +%% @doc +%% Utility that converts a given property list into a module that provides +%% constant time access to the various key/value pairs. +%% +%% Example: +%% load_config(test, [{limit, 1000}, +%% {num_workers, 4}, +%% {workers, [{test_1, test_2, test_3, test_4}]}]). +%% +%% creates the module `test' such that: +%% test:limit(). => 1000 +%% test:num_workers(). => 16 +%% test:workers(). => [{test_1, test_2, test_3, test_4}]}] +%% +-module(sidejob_config). +-export([load_config/2]). + +load_config(Resource, Config) -> + Module = make_module(Resource), + Exports = [make_export(Key) || {Key, _} <- Config], + Functions = [make_function(Key, Value) || {Key, Value} <- Config], + ExportAttr = make_export_attribute(Exports), + Abstract = [Module, ExportAttr | Functions], + Forms = erl_syntax:revert_forms(Abstract), + {ok, Resource, Bin} = compile:forms(Forms, [verbose, report_errors]), + code:purge(Resource), + {module, Resource} = code:load_binary(Resource, + atom_to_list(Resource) ++ ".erl", + Bin), + ok. + +make_module(Module) -> + erl_syntax:attribute(erl_syntax:atom(module), + [erl_syntax:atom(Module)]). + +make_export(Key) -> + erl_syntax:arity_qualifier(erl_syntax:atom(Key), + erl_syntax:integer(0)). + +make_export_attribute(Exports) -> + erl_syntax:attribute(erl_syntax:atom(export), + [erl_syntax:list(Exports)]). + +make_function(Key, Value) -> + Constant = erl_syntax:clause([], none, [erl_syntax:abstract(Value)]), + erl_syntax:function(erl_syntax:atom(Key), [Constant]). diff --git a/src/sidejob_resource_stats.erl b/src/sidejob_resource_stats.erl new file mode 100644 index 0000000..9f3e1fd --- /dev/null +++ b/src/sidejob_resource_stats.erl @@ -0,0 +1,189 @@ +%% ------------------------------------------------------------------- +%% +%% Copyright (c) 2013 Basho Technologies, Inc. All Rights Reserved. +%% +%% This file is provided to you under the Apache License, +%% Version 2.0 (the "License"); you may not use this file +%% except in compliance with the License. You may obtain +%% a copy of the License at +%% +%% http://www.apache.org/licenses/LICENSE-2.0 +%% +%% Unless required by applicable law or agreed to in writing, +%% software distributed under the License is distributed on an +%% "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +%% KIND, either express or implied. See the License for the +%% specific language governing permissions and limitations +%% under the License. +%% +%% ------------------------------------------------------------------- +-module(sidejob_resource_stats). +-behaviour(gen_server). + +%% API +-export([reg_name/1, start_link/2, report/5, init_stats/1, stats/1, usage/1]). + +%% gen_server callbacks +-export([init/1, handle_call/3, handle_cast/2, handle_info/2, + terminate/2, code_change/3]). + +-record(state, {worker_reports = dict:new(), + stats_ets = undefined, + usage = 0, + rejected = 0, + in = 0, + out = 0, + stats_60s = sidejob_stat:new(), + next_stats_60s = sidejob_stat:new(), + left_60s = 60, + stats_total = sidejob_stat:new()}). + +%%%=================================================================== +%%% API +%%%=================================================================== + +reg_name(Name) when is_atom(Name) -> + reg_name(atom_to_binary(Name, latin1)); +reg_name(Name) -> + binary_to_atom(<>, latin1). + +start_link(RegName, StatsETS) -> + gen_server:start_link({local, RegName}, ?MODULE, [StatsETS], []). + +%% @doc +%% Used by {@link sidejob_worker} processes to report per-worker statistics +report(Name, Id, Usage, In, Out) -> + gen_server:cast(Name, {report, Id, Usage, In, Out}). + +%% @doc +%% Used by {@link sidejob_resource_sup} to initialize a newly created +%% stats ETS table to ensure the table is non-empty before bringing a +%% resource online +init_stats(StatsETS) -> + EmptyStats = compute(#state{}), + ets:insert(StatsETS, [{rejected, 0}, + {usage, 0}, + {stats, EmptyStats}]). + +%% @doc +%% Return the computed stats for the given sidejob resource +stats(Name) -> + StatsETS = Name:stats_ets(), + ets:lookup_element(StatsETS, stats, 2). + +%% @doc +%% Return the current usage for the given sidejob resource +usage(Name) -> + StatsETS = Name:stats_ets(), + ets:lookup_element(StatsETS, usage, 2). + +%%%=================================================================== +%%% gen_server callbacks +%%%=================================================================== + +init([StatsETS]) -> + schedule_tick(), + {ok, #state{stats_ets=StatsETS}}. + +handle_call(get_stats, _From, State) -> + {reply, compute(State), State}; + +handle_call(usage, _From, State=#state{usage=Usage}) -> + {reply, Usage, State}; + +handle_call(_Request, _From, State) -> + {reply, ok, State}. + +handle_cast({report, Id, UsageVal, InVal, OutVal}, + State=#state{worker_reports=Reports}) -> + Reports2 = dict:store(Id, {UsageVal, InVal, OutVal}, Reports), + State2 = State#state{worker_reports=Reports2}, + {noreply, State2}; + +handle_cast(_Msg, State) -> + {noreply, State}. + +handle_info(tick, State) -> + schedule_tick(), + State2 = tick(State), + {noreply, State2}; + +handle_info(_Info, State) -> + {noreply, State}. + +terminate(_Reason, _State) -> + ok. + +code_change(_OldVsn, State, _Extra) -> + {ok, State}. + +%%%=================================================================== +%%% Internal functions +%%%=================================================================== + +schedule_tick() -> + erlang:send_after(1000, self(), tick). + +%% Aggregate all reported worker stats into unified stat report for +%% this resource +tick(State=#state{stats_ets=StatsETS, + left_60s=Left60, + next_stats_60s=Next60, + stats_total=Total}) -> + {Usage, In, Out} = combine_reports(State), + + Rejected = ets:update_counter(StatsETS, rejected, 0), + ets:update_counter(StatsETS, rejected, {2,-Rejected,0,0}), + + NewNext60 = sidejob_stat:add(Rejected, In, Out, Next60), + NewTotal = sidejob_stat:add(Rejected, In, Out, Total), + State2 = State#state{usage=Usage, + rejected=Rejected, + in=In, + out=Out, + next_stats_60s=NewNext60, + stats_total=NewTotal}, + + State3 = case Left60 of + 0 -> + State2#state{left_60s=59, + stats_60s=NewNext60, + next_stats_60s=sidejob_stat:new()}; + _ -> + State2#state{left_60s=Left60-1} + end, + + ets:insert(StatsETS, [{usage, Usage}, + {stats, compute(State3)}]), + State3. + +%% Total all reported worker stats into a single sum for each metric +combine_reports(#state{worker_reports=Reports}) -> + dict:fold(fun(_, {Usage, In, Out}, {UsageAcc, InAcc, OutAcc}) -> + {UsageAcc + Usage, InAcc + In, OutAcc + Out} + end, {0,0,0}, Reports). + +compute(#state{usage=Usage, rejected=Rejected, in=In, out=Out, + stats_60s=Stats60s, stats_total=StatsTotal}) -> + {Usage60, Rejected60, InAvg60, InMax60, OutAvg60, OutMax60} = + sidejob_stat:compute(Stats60s), + + {UsageTot, RejectedTot, InAvgTot, InMaxTot, OutAvgTot, OutMaxTot} = + sidejob_stat:compute(StatsTotal), + + [{usage, Usage}, + {rejected, Rejected}, + {in_rate, In}, + {out_rate, Out}, + {usage_60s, Usage60}, + {rejected_60s, Rejected60}, + {avg_in_rate_60s, InAvg60}, + {max_in_rate_60s, InMax60}, + {avg_out_rate_60s, OutAvg60}, + {max_out_rate_60s, OutMax60}, + {usage_total, UsageTot}, + {rejected_total, RejectedTot}, + {avg_in_rate_total, InAvgTot}, + {max_in_rate_total, InMaxTot}, + {avg_out_rate_total, OutAvgTot}, + {max_out_rate_total, OutMaxTot}]. diff --git a/src/sidejob_resource_sup.erl b/src/sidejob_resource_sup.erl new file mode 100644 index 0000000..7709930 --- /dev/null +++ b/src/sidejob_resource_sup.erl @@ -0,0 +1,88 @@ +%% ------------------------------------------------------------------- +%% +%% Copyright (c) 2013 Basho Technologies, Inc. All Rights Reserved. +%% +%% This file is provided to you under the Apache License, +%% Version 2.0 (the "License"); you may not use this file +%% except in compliance with the License. You may obtain +%% a copy of the License at +%% +%% http://www.apache.org/licenses/LICENSE-2.0 +%% +%% Unless required by applicable law or agreed to in writing, +%% software distributed under the License is distributed on an +%% "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +%% KIND, either express or implied. See the License for the +%% specific language governing permissions and limitations +%% under the License. +%% +%% ------------------------------------------------------------------- + +%% @doc +%% The sidejob_resource_sup manages the entire supervision hierarchy for +%% a sidejob resource. Thus, there is one resource supervisor for each +%% registered sidejob resource. +%% +%% The resource supervisor is the owner of a resource's limit and stats +%% ETS tables, therefore ensuring the ETS tables survive crashes elsewhere +%% in the resource hierarchy. +%% +%% The resource supervisor has two children: a {@link sidejob_worker_sup} +%% that supervises the actual worker processes for a given resource, and +%% a {@link sidejob_resource_stats} server that aggregates statistics +%% reported by the worker processes. +-module(sidejob_resource_sup). +-behaviour(supervisor). + +%% API +-export([start_link/2, ets/1, stats_ets/1]). + +%% Supervisor callbacks +-export([init/1]). + +%%%=================================================================== +%%% API functions +%%%=================================================================== + +start_link(Name, Mod) -> + supervisor:start_link({local, Name}, ?MODULE, [Name, Mod]). + +ets(Name) -> + ETS = iolist_to_binary([atom_to_binary(Name, latin1), "_ets"]), + binary_to_atom(ETS, latin1). + +stats_ets(Name) -> + ETS = iolist_to_binary([atom_to_binary(Name, latin1), "_stats_ets"]), + binary_to_atom(ETS, latin1). + +%%%=================================================================== +%%% Supervisor callbacks +%%%=================================================================== + +init([Name, Mod]) -> + Width = Name:width(), + ETS = ets(Name), + StatsETS = stats_ets(Name), + StatsName = Name:stats(), + Tab = ets:new(ETS, [named_table, + public, + {read_concurrency,true}, + {write_concurrency,true}]), + Counters = [{X, 0} || X <- lists:seq(0,Width-1)], + Full = [{{full,X}, 0} || X <- lists:seq(0,Width-1)], + ets:insert(Tab, Counters ++ Full), + + StatsTab = ets:new(StatsETS, [named_table, + public, + {read_concurrency,true}, + {write_concurrency,true}]), + sidejob_resource_stats:init_stats(StatsTab), + + WorkerSup = {sidejob_worker_sup, + {sidejob_worker_sup, start_link, + [Name, Width, Tab, StatsName, Mod]}, + permanent, infinity, supervisor, [sidejob_worker_sup]}, + StatsServer = {StatsName, + {sidejob_resource_stats, start_link, [StatsName, StatsTab]}, + permanent, 5000, worker, [sidejob_resource_stats]}, + {ok, {{one_for_one, 10, 10}, [WorkerSup, StatsServer]}}. diff --git a/src/sidejob_stat.erl b/src/sidejob_stat.erl new file mode 100644 index 0000000..74ecfac --- /dev/null +++ b/src/sidejob_stat.erl @@ -0,0 +1,48 @@ +%% ------------------------------------------------------------------- +%% +%% Copyright (c) 2013 Basho Technologies, Inc. All Rights Reserved. +%% +%% This file is provided to you under the Apache License, +%% Version 2.0 (the "License"); you may not use this file +%% except in compliance with the License. You may obtain +%% a copy of the License at +%% +%% http://www.apache.org/licenses/LICENSE-2.0 +%% +%% Unless required by applicable law or agreed to in writing, +%% software distributed under the License is distributed on an +%% "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +%% KIND, either express or implied. See the License for the +%% specific language governing permissions and limitations +%% under the License. +%% +%% ------------------------------------------------------------------- +-module(sidejob_stat). +-export([new/0, add/4, compute/1]). + +-record(stat, {rejected = 0, + in_sum = 0, + in_max = 0, + out_sum = 0, + out_max = 0, + samples = 0}). + +-define(ADD(Field, Value), Field = Stat#stat.Field + Value). +-define(MAX(Field, Value), Field = max(Stat#stat.Field, Value)). + +new() -> + #stat{}. + +add(Rejected, In, Out, Stat) -> + Stat#stat{?ADD(rejected, Rejected), + ?ADD(in_sum, In), + ?ADD(out_sum, Out), + ?ADD(samples, 1), + ?MAX(in_max, In), + ?MAX(out_max, Out)}. + +compute(#stat{rejected=Rejected, in_sum=InSum, in_max=InMax, + out_sum=OutSum, out_max=OutMax, samples=Samples}) -> + InAvg = InSum div max(1,Samples), + OutAvg = OutSum div max(1,Samples), + {InSum, Rejected, InAvg, InMax, OutAvg, OutMax}. diff --git a/src/sidejob_sup.erl b/src/sidejob_sup.erl new file mode 100644 index 0000000..61c94fe --- /dev/null +++ b/src/sidejob_sup.erl @@ -0,0 +1,58 @@ +%% ------------------------------------------------------------------- +%% +%% Copyright (c) 2013 Basho Technologies, Inc. All Rights Reserved. +%% +%% This file is provided to you under the Apache License, +%% Version 2.0 (the "License"); you may not use this file +%% except in compliance with the License. You may obtain +%% a copy of the License at +%% +%% http://www.apache.org/licenses/LICENSE-2.0 +%% +%% Unless required by applicable law or agreed to in writing, +%% software distributed under the License is distributed on an +%% "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +%% KIND, either express or implied. See the License for the +%% specific language governing permissions and limitations +%% under the License. +%% +%% ------------------------------------------------------------------- + +%% @doc +%% The top-level supervisor for the sidejob application. +%% +%% When a new resource is created via {@link sidejob:new_resource/4), +%% a new {@link sidejob_resource_sup} is added to this supervisor. +%% +%% The actual resource supervisor manages a given resource's process +%% hierarchy. This top-level supervisor simply ensures that all registered +%% resource supervisors remain up. + +-module(sidejob_sup). +-behaviour(supervisor). + +%% API +-export([start_link/0, add_resource/2]). + +%% Supervisor callbacks +-export([init/1]). + +%%%=================================================================== +%%% API functions +%%%=================================================================== + +start_link() -> + supervisor:start_link({local, ?MODULE}, ?MODULE, []). + +add_resource(Name, Mod) -> + Child = {Name, + {sidejob_resource_sup, start_link, [Name, Mod]}, + permanent, infinity, supervisor, [sidejob_resource_sup]}, + supervisor:start_child(?MODULE, Child). + +%%%=================================================================== +%%% Supervisor callbacks +%%%=================================================================== + +init([]) -> + {ok, {{one_for_one, 10, 10}, []}}. diff --git a/src/sidejob_supervisor.erl b/src/sidejob_supervisor.erl new file mode 100644 index 0000000..780279e --- /dev/null +++ b/src/sidejob_supervisor.erl @@ -0,0 +1,161 @@ +%% ------------------------------------------------------------------- +%% +%% Copyright (c) 2013 Basho Technologies, Inc. All Rights Reserved. +%% +%% This file is provided to you under the Apache License, +%% Version 2.0 (the "License"); you may not use this file +%% except in compliance with the License. You may obtain +%% a copy of the License at +%% +%% http://www.apache.org/licenses/LICENSE-2.0 +%% +%% Unless required by applicable law or agreed to in writing, +%% software distributed under the License is distributed on an +%% "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +%% KIND, either express or implied. See the License for the +%% specific language governing permissions and limitations +%% under the License. +%% +%% ------------------------------------------------------------------- + +%% @doc +%% This module implements a sidejob_worker behavior that operates as a +%% parallel, capacity-limited supervisor of dynamic, transient children. + +-module(sidejob_supervisor). +-behaviour(gen_server). + +%% API +-export([start_child/4, spawn/2, spawn/4, which_children/1]). + +%% sidejob_worker callbacks +-export([current_usage/1, rate/1]). + +%% gen_server callbacks +-export([init/1, handle_call/3, handle_cast/2, handle_info/2, + terminate/2, code_change/3]). + +-record(state, {name, + children=sets:new(), + spawned=0, + died=0}). + +-type resource() :: atom(). + +%%%=================================================================== +%%% API +%%%=================================================================== + +-spec start_child(resource(), module(), atom(), term()) -> {ok, pid()} | + {error, overload} | + {error, term()}. +start_child(Name, Mod, Fun, Args) -> + case sidejob:call(Name, {start_child, Mod, Fun, Args}, infinity) of + overload -> + {error, overload}; + Other -> + Other + end. + +-spec spawn(resource(), function()) -> {ok, pid()} | {error, overload}. +spawn(Name, Fun) -> + case sidejob:call(Name, {spawn, Fun}, infinity) of + overload -> + {error, overload}; + Other -> + Other + end. + +-spec spawn(resource(), module(), atom(), term()) -> {ok, pid()} | + {error, overload}. +spawn(Name, Mod, Fun, Args) -> + ?MODULE:spawn(Name, {Mod, Fun, Args}). + +-spec which_children(resource()) -> [pid()]. +which_children(Name) -> + Workers = tuple_to_list(Name:workers()), + Children = [gen_server:call(Worker, get_children) || Worker <- Workers], + lists:flatten(Children). + +%%%=================================================================== +%%% gen_server callbacks +%%%=================================================================== + +init([Name]) -> + process_flag(trap_exit, true), + {ok, #state{name=Name}}. + +handle_call(get_children, _From, State=#state{children=Children}) -> + {reply, sets:to_list(Children), State}; + +handle_call({start_child, Mod, Fun, Args}, _From, State) -> + Result = (catch apply(Mod, Fun, Args)), + {Reply, State2} = case Result of + {ok, Pid} when is_pid(Pid) -> + {Result, add_child(Pid, State)}; + {ok, Pid, _Info} when is_pid(Pid) -> + {Result, add_child(Pid, State)}; + ignore -> + {{ok, undefined}, State}; + {error, _} -> + {Result, State}; + Error -> + {{error, Error}, State} + end, + {reply, Reply, State2}; + +handle_call({spawn, Fun}, _From, State) -> + Pid = case Fun of + _ when is_function(Fun) -> + spawn_link(Fun); + {Mod, Fun, Args} -> + spawn_link(Mod, Fun, Args) + end, + State2 = add_child(Pid, State), + {reply, Pid, State2}; + +handle_call(_Request, _From, State) -> + {reply, ok, State}. + +handle_cast(_Msg, State) -> + {noreply, State}. + +handle_info({'EXIT', Pid, Reason}, State=#state{children=Children, + died=Died}) -> + case sets:is_element(Pid, Children) of + true -> + Children2 = sets:del_element(Pid, Children), + Died2 = Died + 1, + State2 = State#state{children=Children2, died=Died2}, + {noreply, State2}; + false -> + {stop, Reason, State} + end; + +handle_info(_Info, State) -> + {noreply, State}. + +terminate(_Reason, _State) -> + ok. + +code_change(_OldVsn, State, _Extra) -> + {ok, State}. + +current_usage(#state{children=Children}) -> + {message_queue_len, Pending} = process_info(self(), message_queue_len), + Current = sets:size(Children), + Pending + Current. + +rate(State=#state{spawned=Spawned, died=Died}) -> + State2 = State#state{spawned=0, + died=0}, + {Spawned, Died, State2}. + +%%%=================================================================== +%%% Internal functions +%%%=================================================================== + +add_child(Pid, State=#state{children=Children, spawned=Spawned}) -> + Children2 = sets:add_element(Pid, Children), + Spawned2 = Spawned + 1, + State#state{children=Children2, spawned=Spawned2}. diff --git a/src/sidejob_worker.erl b/src/sidejob_worker.erl new file mode 100644 index 0000000..ad4f4e4 --- /dev/null +++ b/src/sidejob_worker.erl @@ -0,0 +1,237 @@ +%% ------------------------------------------------------------------- +%% +%% Copyright (c) 2013 Basho Technologies, Inc. All Rights Reserved. +%% +%% This file is provided to you under the Apache License, +%% Version 2.0 (the "License"); you may not use this file +%% except in compliance with the License. You may obtain +%% a copy of the License at +%% +%% http://www.apache.org/licenses/LICENSE-2.0 +%% +%% Unless required by applicable law or agreed to in writing, +%% software distributed under the License is distributed on an +%% "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +%% KIND, either express or implied. See the License for the +%% specific language governing permissions and limitations +%% under the License. +%% +%% ------------------------------------------------------------------- + +%% @doc +%% This module implements the sidejob_worker logic used by all worker +%% processes created to manage a sidejob resource. This code emulates +%% the gen_server API, wrapping a provided user-specified module which +%% implements the gen_server behavior. +%% +%% The primary purpose of this module is updating the usage information +%% published in a given resource's ETS table, such that capacity limiting +%% operates correctly. The sidejob_worker also cooperates with a given +%% {@link sidejob_resource_stats} server to maintain statistics about a +%% given resource. +%% +%% By default, a sidejob_worker calculates resource usage based on message +%% queue size. However, the user-specified module can also choose to +%% implement the `current_usage/1` and `rate/1` callbacks to change how +%% usage is calculated. An example is the {@link sidejob_supervisor} module +%% which reports usage as: queue size + num_children. + +-module(sidejob_worker). +-behaviour(gen_server). + +%% API +-export([start_link/6, reg_name/1, reg_name/2, workers/2]). + +%% gen_server callbacks +-export([init/1, handle_call/3, handle_cast/2, handle_info/2, + terminate/2, code_change/3]). + +-record(state, {id :: non_neg_integer(), + ets :: term(), + width :: pos_integer(), + limit :: pos_integer(), + reporter :: term(), + mod :: module(), + modstate :: term(), + usage :: custom | default, + last_mq_len = 0 :: pos_integer(), + enqueue = 0 :: pos_integer(), + dequeue = 0 :: pos_integer()}). + +%%%=================================================================== +%%% API +%%%=================================================================== + +reg_name(Id) -> + IdBin = list_to_binary(integer_to_list(Id)), + binary_to_atom(<<"sidejob_worker_", IdBin/binary>>, latin1). + +reg_name(Name, Id) when is_atom(Name) -> + reg_name(atom_to_binary(Name, latin1), Id); +reg_name(NameBin, Id) -> + WorkerName = iolist_to_binary([NameBin, "_", integer_to_list(Id)]), + binary_to_atom(WorkerName, latin1). + +workers(Name, Count) -> + NameBin = atom_to_binary(Name, latin1), + [reg_name(NameBin, Id) || Id <- lists:seq(1,Count)]. + +start_link(RegName, ResName, Id, ETS, StatsName, Mod) -> + gen_server:start_link({local, RegName}, ?MODULE, + [ResName, Id, ETS, StatsName, Mod], []). + +%%%=================================================================== +%%% gen_server callbacks +%%%=================================================================== + +init([ResName, Id, ETS, StatsName, Mod]) -> + %% TODO: Add ability to pass args + case Mod:init([ResName]) of + {ok, ModState} -> + Exports = proplists:get_value(exports, Mod:module_info()), + Usage = case lists:member({current_usage, 1}, Exports) of + true -> + custom; + false -> + default + end, + schedule_tick(), + Width = ResName:width(), + Limit = ResName:limit(), + State = #state{id=Id, + ets=ETS, + mod=Mod, + modstate=ModState, + usage=Usage, + width=Width, + limit=Limit, + reporter=StatsName}, + {ok, State}; + Other -> + Other + end. + +handle_call(Request, From, State=#state{mod=Mod, + modstate=ModState}) -> + Result = Mod:handle_call(Request, From, ModState), + {Pos, ModState2} = case Result of + {reply,_Reply,NewState} -> + {3, NewState}; + {reply,_Reply,NewState,hibernate} -> + {3, NewState}; + {reply,_Reply,NewState,_Timeout} -> + {3, NewState}; + {noreply,NewState} -> + {2, NewState}; + {noreply,NewState,hibernate} -> + {2, NewState}; + {noreply,NewState,_Timeout} -> + {2, NewState}; + {stop,_Reason,_Reply,NewState} -> + {4, NewState}; + {stop,_Reason,NewState} -> + {3, NewState} + end, + State2 = State#state{modstate=ModState2}, + State3 = update_rate(update_usage(State2)), + Return = setelement(Pos, Result, State3), + Return. + +handle_cast(Request, State=#state{mod=Mod, + modstate=ModState}) -> + Result = Mod:handle_cast(Request, ModState), + {Pos, ModState2} = case Result of + {noreply,NewState} -> + {2, NewState}; + {noreply,NewState,hibernate} -> + {2, NewState}; + {noreply,NewState,_Timeout} -> + {2, NewState}; + {stop,_Reason,NewState} -> + {3, NewState} + end, + State2 = State#state{modstate=ModState2}, + State3 = update_rate(update_usage(State2)), + Return = setelement(Pos, Result, State3), + Return. + +handle_info('$sidejob_worker_tick', State) -> + State2 = tick(State), + schedule_tick(), + {noreply, State2}; + +handle_info(Info, State=#state{mod=Mod, + modstate=ModState}) -> + Result = Mod:handle_info(Info, ModState), + {Pos, ModState2} = case Result of + {noreply,NewState} -> + {2, NewState}; + {noreply,NewState,hibernate} -> + {2, NewState}; + {noreply,NewState,_Timeout} -> + {2, NewState}; + {stop,_Reason,NewState} -> + {3, NewState} + end, + State2 = State#state{modstate=ModState2}, + State3 = update_rate(update_usage(State2)), + Return = setelement(Pos, Result, State3), + Return. + +terminate(_Reason, _State) -> + ok. + +code_change(_OldVsn, State, _Extra) -> + {ok, State}. + +%%%=================================================================== +%%% Internal functions +%%%=================================================================== + +schedule_tick() -> + erlang:send_after(1000, self(), '$sidejob_worker_tick'). + +tick(State=#state{id=Id, reporter=Reporter}) -> + Usage = current_usage(State), + {In, Out, State2} = current_rate(State), + sidejob_resource_stats:report(Reporter, Id, Usage, In, Out), + State2. + +update_usage(State=#state{id=Id, ets=ETS, width=Width, limit=Limit}) -> + Usage = current_usage(State), + Full = case Usage >= (Limit div Width) of + true -> + 1; + false -> + 0 + end, + ets:insert(ETS, [{Id-1, Usage}, + {{full, Id-1}, Full}]), + State. + +current_usage(#state{usage=default}) -> + {message_queue_len, Len} = process_info(self(), message_queue_len), + Len; +current_usage(#state{usage=custom, mod=Mod, modstate=ModState}) -> + Mod:current_usage(ModState). + +update_rate(State=#state{usage=custom}) -> + %% Assume this is updated internally in the custom module + State; +update_rate(State=#state{usage=default, + last_mq_len=LastLen}) -> + {message_queue_len, Len} = process_info(self(), message_queue_len), + Enqueue = Len - LastLen + 1, + Dequeue = State#state.dequeue + 1, + State#state{enqueue=Enqueue, dequeue=Dequeue}. + +%% TODO: Probably should rename since it resets rate +current_rate(State=#state{usage=default, + enqueue=Enqueue, + dequeue=Dequeue}) -> + State2 = State#state{enqueue=0, dequeue=0}, + {Enqueue, Dequeue, State2}; +current_rate(State=#state{usage=custom, mod=Mod, modstate=ModState}) -> + {Enqueue, Dequeue, ModState2} = Mod:rate(ModState), + State2 = State#state{modstate=ModState2}, + {Enqueue, Dequeue, State2}. diff --git a/src/sidejob_worker_sup.erl b/src/sidejob_worker_sup.erl new file mode 100644 index 0000000..4f3eae7 --- /dev/null +++ b/src/sidejob_worker_sup.erl @@ -0,0 +1,51 @@ +%% ------------------------------------------------------------------- +%% +%% Copyright (c) 2013 Basho Technologies, Inc. All Rights Reserved. +%% +%% This file is provided to you under the Apache License, +%% Version 2.0 (the "License"); you may not use this file +%% except in compliance with the License. You may obtain +%% a copy of the License at +%% +%% http://www.apache.org/licenses/LICENSE-2.0 +%% +%% Unless required by applicable law or agreed to in writing, +%% software distributed under the License is distributed on an +%% "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +%% KIND, either express or implied. See the License for the +%% specific language governing permissions and limitations +%% under the License. +%% +%% ------------------------------------------------------------------- +-module(sidejob_worker_sup). +-behaviour(supervisor). + +%% API +-export([start_link/5]). + +%% Supervisor callbacks +-export([init/1]). + +%%%=================================================================== +%%% API functions +%%%=================================================================== + +start_link(Name, NumWorkers, ETS, StatsName, Mod) -> + NameBin = atom_to_binary(Name, latin1), + RegName = binary_to_atom(<>, latin1), + supervisor:start_link({local, RegName}, ?MODULE, + [Name, NumWorkers, ETS, StatsName, Mod]). + +%%%=================================================================== +%%% Supervisor callbacks +%%%=================================================================== + +init([Name, NumWorkers, ETS, StatsName, Mod]) -> + Children = [begin + WorkerName = sidejob_worker:reg_name(Name, Id), + {WorkerName, + {sidejob_worker, start_link, + [WorkerName, Name, Id, ETS, StatsName, Mod]}, + permanent, 5000, worker, [sidejob_worker]} + end || Id <- lists:seq(1, NumWorkers)], + {ok, {{one_for_one, 10, 10}, Children}}.