Skip to content

Commit

Permalink
Fixed process tree.
Browse files Browse the repository at this point in the history
  • Loading branch information
cooldaemon committed Aug 4, 2008
1 parent 01b866f commit e076211
Show file tree
Hide file tree
Showing 9 changed files with 412 additions and 111 deletions.
57 changes: 17 additions & 40 deletions src/erljob.erl
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@
%% Here's a quick example illustrating how to use erljob:
%% ```
%% erljob:start(),
%% erljob:add_job(hi, fun ([]) -> io:fwrite("Hi!") end, [], 10000, 10),
%% erljob:add_job(hi, fun (_X) -> io:fwrite("Hi!"), ok end, ok, 1000, 2),
%% erljob:stop()
%% '''

Expand All @@ -26,11 +26,7 @@
-module(erljob).

-export([start/0, stop/0]).
-export([
add_jobs/2, add_job/5, delete_job/1,
restart_job/1, suspend_job/1,
dump/0, lookup/1
]).
-export([add_job/5, delete_job/1, restart_job/1, suspend_job/1]).

%% @equiv application:start(erljob, permanent)
start() ->
Expand All @@ -39,42 +35,23 @@ start() ->
%% @equiv application:stop(erljob)
stop() -> application:stop(erljob).

add_jobs(_Name, []) -> ok;
add_jobs(Name, [{Job, Arg, Interval, Count} | Jobs]) ->
add_job(Name, Job, Arg, Interval, Count),
add_jobs(Name, Jobs).
add_job(Name, Job, State, Interval, Count) ->
add_job(erljob_status:create(Name), Name, Job, State, Interval, Count).

add_job(Name, Job, Arg, Interval, Count) ->
{ok, Pid} = erljob_controller_sup:start_child(Job, Arg, Interval, Count),
erljob_status:add({Pid, Name, Job, Arg, Interval, Count}),
add_job(exist, _Name, _Job, _State, _Interval, _Count) -> exist;
add_job(ok, Name, Job, State, Interval, Count) ->
{ok, Pid} = erljob_controller_sup_sup:start_child(
{Name, {Job, State, Interval, Count, run}}
),
erljob_status:set(Name, sup, Pid),
ok.

delete_job(Name) ->
modify_job(Name, [
fun erljob_controller:finish/1,
fun erljob_status:delete/1
]).
delete_job(Name) -> modify_run_state(Name, finish).
restart_job(Name) -> modify_run_state(Name, run).
suspend_job(Name) -> modify_run_state(Name, suspend).

restart_job(Name) ->
modify_job(Name, [fun erljob_controller:restart/1]).

suspend_job(Name) ->
modify_job(Name, [fun erljob_controller:suspend/1]).

modify_job(Name, Actions) ->
modify_jobs(erljob_status:lookup(Name), Actions).

modify_jobs([], _Action) -> ok;
modify_jobs(
[{Pid, _Name, _Job, _Arg, _Interval, _Count, _State} | Jobs],
Actions
) ->
lists:foreach(fun (Action) -> Action(Pid) end, Actions),
modify_jobs(Jobs, Actions).

%% @equiv erljob_status:lookup()
dump() -> erljob_status:dump().

%% @equiv erljob_status:lookup(Name)
lookup(Name) -> erljob_status:lookup(Name).
modify_run_state(Name, RunState) ->
erljob_controller_status:set(
erljob_status:lookup(Name, status), run_status, RunState
).

79 changes: 79 additions & 0 deletions src/erljob_cleaner.erl
Original file line number Diff line number Diff line change
@@ -0,0 +1,79 @@
%% @author Masahito Ikuta <[email protected]> [http://d.hatena.ne.jp/cooldaemon/]
%% @copyright Masahito Ikuta 2008
%% @doc This module stops erljob_controller_sup.

%% Copyright 2008 Masahito Ikuta
%%
%% Licensed under the Apache License, Version 2.0 (the "License");
%% you may not use this file except in compliance with the License.
%% You may obtain a copy of the License at
%%
%% http://www.apache.org/licenses/LICENSE-2.0
%%
%% Unless required by applicable law or agreed to in writing, software
%% distributed under the License is distributed on an "AS IS" BASIS,
%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
%% See the License for the specific language governing permissions and
%% limitations under the License.

-module(erljob_cleaner).
-behaviour(gen_server).

-export([start_link/0, stop/0]).
-export([exit_job/1]).
-export([
init/1,
handle_call/3, handle_cast/2, handle_info/2,
terminate/2, code_change/3
]).

%% @equiv gen_server:start_link({local, ?MODULE}, ?MODULE, [], [])
start_link() ->
gen_server:start_link({local, ?MODULE}, ?MODULE, [], []).

%% @equiv gen_server:call(?MODULE, stop)
stop() ->
gen_server:call(?MODULE, stop).

exit_job(Name) ->
gen_server:cast(?MODULE, {exit_job, Name}).

%% @spec init(_Args:[]) -> {ok, []}
init(_Args) ->
process_flag(trap_exit, true),
{ok, {}}.

%% @doc stop server.
%% @spec handle_call(stop, _From:from(), State:term()) ->
%% {stop, normal, stopped, State:term()}
handle_call(stop, _From, State) ->
{stop, normal, stopped, State};

%% @spec handle_call(_Message:term(), _From:from(), State:term()) ->
%% {reply, ok, State:term()}.
handle_call(_Message, _From, State) ->
{reply, ok, State}.

handle_cast({exit_job, Name}, State) ->
erljob_status:delete(Name),
erljob_controller_sup_sup:stop_child(Name),
{noreply, State};

%% @spec handle_cast(_Message:term(), _State:term()) ->
%% {noreply, State:term()}
handle_cast(_Message, State) ->
{noreply, State}.

%% @spec handle_cast(_Info:term(), _State:term()) ->
%% {noreply, State:term()}
handle_info(_Info, State) ->
{noreply, State}.

%% @spec terminate(_Reason:term(), _State:term()) -> ok
terminate(_Reason, _State) ->
ok.

%% @spec code_change(_Reason:term(), _State:term()) -> ok
code_change(_OldVsn, State, _Extra) ->
{ok, State}.

67 changes: 47 additions & 20 deletions src/erljob_controller.erl
Original file line number Diff line number Diff line change
Expand Up @@ -20,33 +20,60 @@

-export([start_link/1]).
-export([init/2]).
-export([restart/1, suspend/1, finish/1]).

start_link(Arg) ->
proc_lib:start_link(?MODULE, init, [self(), Arg]).
-define(SUSPEND_SLEEP_TIME, 1000).

init(Parent, Arg) ->
start_link({Name, _State}) ->
proc_lib:start_link(?MODULE, init, [self(), Name]).

init(Parent, Name) ->
StatusPid = erljob_status:ensure_lookup(Name, status),
proc_lib:init_ack(Parent, {ok, self()}),
loop(Arg, run).
loop({Name, StatusPid}).

loop({_SupId, StatusPid}=State) ->
loop(
State,
lists:map(fun (Name) ->
erljob_controller_status:lookup(StatusPid, Name)
end, [job, job_state, interval, count, run_state])
).

loop({_Job, _JobArg, _Interval, 0}, _State) -> ok;
loop({_Job, _JobArg, Interval, _State}=Arg, State) ->
loop({SupId, _StatusPid}, [_Job, _JobState, _Interval, 0, _RunState]) ->
finish(SupId);
loop({SupId, _StatusPid}, [_Job, _JobState, _Interval, _Count, finish]) ->
finish(SupId);
loop(State, [_Job, _JobState, _Interval, _Count, suspend]) ->
timer:sleep(?SUSPEND_SLEEP_TIME),
loop(State);
loop(State, [_Job, _JobState, Interval, _Count, _RunState]=Status) ->
receive
restart -> loop(Arg, start);
suspend -> loop(Arg, suspend);
finish -> ok
change_state -> loop(State)
after Interval ->
run_job(Arg, State)
run_job(State, Status)
end.

run_job({{Module, Function}=Job, JobArg, Interval, Count}, run) ->
loop({Job, Module:Function(JobArg), Interval, Count - 1}, run);
run_job({Job, JobArg, Interval, Count}, run) ->
loop({Job, Job(JobArg), Interval, Count - 1}, run);
run_job(Arg, State) ->
loop(Arg, State).
run_job(
{_SupId, StatusPid}=State, [Job, JobState, _Interval, Count, run]
) ->
erljob_controller_status:set(
StatusPid, job_state, exec_job(Job, JobState)
),
decrement_count(StatusPid, Count),
loop(State);
run_job(State, _Status) ->
loop(State).

exec_job({M, F}, Arg) -> which_arg(catch M:F(Arg), Arg);
exec_job(Job, Arg) -> which_arg(catch Job(Arg), Arg).
which_arg({'EXIT', _Reason}, Arg) -> Arg;
which_arg(Arg, _OldArg) -> Arg.

decrement_count(_StatusPid, infinity) -> ok;
decrement_count(StatusPid, Count) ->
erljob_controller_status:set(StatusPid, count, Count - 1).

restart(Pid) -> Pid ! restart.
suspend(Pid) -> Pid ! stop.
finish(Pid) -> Pid ! finish.
finish(SupId) ->
erljob_cleaner:exit_job(SupId),
ok.

123 changes: 123 additions & 0 deletions src/erljob_controller_status.erl
Original file line number Diff line number Diff line change
@@ -0,0 +1,123 @@
%% @author Masahito Ikuta <[email protected]> [http://d.hatena.ne.jp/cooldaemon/]
%% @copyright Masahito Ikuta 2008
%% @doc This module has status for a job controller.

%% Copyright 2008 Masahito Ikuta
%%
%% Licensed under the Apache License, Version 2.0 (the "License");
%% you may not use this file except in compliance with the License.
%% You may obtain a copy of the License at
%%
%% http://www.apache.org/licenses/LICENSE-2.0
%%
%% Unless required by applicable law or agreed to in writing, software
%% distributed under the License is distributed on an "AS IS" BASIS,
%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
%% See the License for the specific language governing permissions and
%% limitations under the License.

-module(erljob_controller_status).
-behaviour(gen_server).

-export([start_link/1, stop/1]).
-export([lookup/2, set/3]).
-export([
init/1,
handle_call/3, handle_cast/2, handle_info/2,
terminate/2, code_change/3
]).

%% @equiv gen_server:start_link(?MODULE, [Arg:term()], [])
start_link({Name, Arg}) ->
{ok, Pid} = gen_server:start_link(?MODULE, [Arg], []),
erljob_status:set(Name, status, Pid),
{ok, Pid}.

%% @equiv gen_server:call(Pid, stop)
stop(Pid) ->
gen_server:call(Pid, stop).

lookup(Pid, Name) ->
gen_server:call(Pid, {lookup, Name}).

set(Pid, Name, Value) ->
gen_server:cast(Pid, {set, {Name, Value}}).

%% @spec init([Arg:term()]) -> {ok, Arg:term()}
init([{Job, JobState, Interval, Count, RunState}]) ->
process_flag(trap_exit, true),
{ok, {Job, JobState, Interval, Count, RunState, null}}.

handle_call(
{lookup, Name},
_From,
{Job, JobState, Interval, Count, RunState, Pid}=State
) ->
Result = case Name of
job -> Job;
job_state -> JobState;
interval -> Interval;
count -> Count;
run_state -> RunState;
controller_pid -> Pid;
_Other -> unknown_name
end,
{reply, Result, State};

%% @doc stop server.
%% @spec handle_call(stop, _From:from(), State:term()) ->
%% {stop, normal, stopped, State:term()}
handle_call(stop, _From, State) ->
{stop, normal, stopped, State};

%% @spec handle_call(_Message:term(), _From:from(), State:term()) ->
%% {reply, ok, State:term()}.
handle_call(_Message, _From, State) ->
{reply, ok, State}.

handle_cast(
{set, {Name, Value}},
{Job, JobState, Interval, Count, RunState, Pid}=State
) ->
NewState = case Name of
job ->
send_change_state(Pid),
{Value, JobState, Interval, Count, RunState, Pid};
job_state ->
{Job, Value, Interval, Count, RunState, Pid};
interval ->
send_change_state(Pid),
{Job, JobState, Value, Count, RunState, Pid};
count ->
{Job, JobState, Interval, Value, RunState, Pid};
run_state ->
send_change_state(Pid),
{Job, JobState, Interval, Count, Value, Pid};
controller_pid ->
{Job, JobState, Interval, Count, RunState, Pid};
_Other ->
State
end,
{noreply, NewState};

%% @spec handle_cast(_Message:term(), _State:term()) ->
%% {noreply, State:term()}
handle_cast(_Message, State) ->
{noreply, State}.

send_change_state(null) -> ok;
send_change_state(Pid) -> Pid ! change_state.

%% @spec handle_cast(_Info:term(), _State:term()) ->
%% {noreply, State:term()}
handle_info(_Info, State) ->
{noreply, State}.

%% @spec terminate(_Reason:term(), _State:term()) -> ok
terminate(_Reason, _State) ->
ok.

%% @spec code_change(_Reason:term(), _State:term()) -> ok
code_change(_OldVsn, State, _Extra) ->
{ok, State}.

Loading

0 comments on commit e076211

Please sign in to comment.