diff --git a/docs/gotcha.txt b/docs/gotcha.txt index 9c9b742d2..7b93db81a 100644 --- a/docs/gotcha.txt +++ b/docs/gotcha.txt @@ -34,7 +34,7 @@ save you many hours of grief. MIME::Tools POE POE::Component::IKC::Server - + POE::Component::Client::UserAgent 6.8.4 diff --git a/lib/Spectre/Workflow.pm b/lib/Spectre/Workflow.pm index 7cb7b54ed..83d5a6c2f 100644 --- a/lib/Spectre/Workflow.pm +++ b/lib/Spectre/Workflow.pm @@ -16,6 +16,7 @@ package Spectre::Workflow; use strict; use POE; +use POE::Component::Client::UserAgent; #------------------------------------------------------------------- @@ -74,11 +75,14 @@ A hash reference containing a row of data from the WorkflowInstance table. sub addJob { my ($self, $config, $job) = @_[OBJECT, ARG0, ARG1]; - $self->{"_priority".$job->{priority}}{$job->{instanceId}} = { + # job list + $self->{_jobs}{$job->{instanceId}} = { instanceId=>$job->{instanceId}, config=>$config, - status=>"waiting" + status=>"waiting", + priority=>$job->{priority} }; + push(@{$self->{"_priority".$job->{priority}}}, $self->{_jobs}{$job->{instanceId}}); } #------------------------------------------------------------------- @@ -93,8 +97,11 @@ sub checkJobs { my ($kernel, $self) = @_[KERNEL, OBJECT]; if ($self->countRunningJobs < 5) { my $job = $self->getNextJob; - $job->{status} = "running"; - $kernel->yield("runJob",$job); + if (defined $job) { + $job->{status} = "running"; + push(@{$self->{_runningJobs}}, $job); + $kernel->yield("runWorker",$job); + } } } @@ -121,12 +128,15 @@ Removes a workflow job from the processing queue. sub deleteJob { my ($self, $instanceId) = @_[OBJECT, ARG0]; - delete $self->{_priority1}{$instanceId}; - delete $self->{_priority2}{$instanceId}; - delete $self->{_priority3}{$instanceId}; + my $priority = $self->{_jobs}{$instanceId}{priority}; + delete $self->{_jobs}{$instanceId}; + for (my $i=0; $i < scalar(@{$self->{"_priority".$priority}}); $i++) { + if ($self->{"_priority".$priority}[$i]{instanceId} eq $instanceId) { + splice(@{$self->{"_priority".$priority}}, $i, 1); + } + } } - #------------------------------------------------------------------- =head2 getNextJob ( ) @@ -136,9 +146,9 @@ sub deleteJob { sub getNextJob { my $self = shift; foreach my $priority (1..3) { - foreach my $instanceId (keys %{$self->{"_priority".$priority}}) { - if ($self->{"_priority".$priority}{$instanceId}{status} eq "waiting") { - return $self->{"_priority".$priority}{$instanceId}; + foreach my $job (@{$self->{"_priority".$priority}}) { + if ($job->{status} eq "waiting") { + return $job; } } } @@ -180,11 +190,83 @@ sub new { bless $self, $class; my @publicEvents = qw(addJob deleteJob); POE::Session->create( - object_states => [ $self => [qw(_start _stop checkJobs loadWorkflows runJob), @publicEvents] ], + object_states => [ $self => [qw(_start _stop checkJobs loadWorkflows runWorker), @publicEvents] ], args=>[\@publicEvents] ); } +#------------------------------------------------------------------- + +=head2 runWorker ( ) + +Calls a worker to execute a workflow activity. + +=cut + +sub runWorker { + my ($kernel, $self, $job, $session) = @_[KERNEL, OBJECT, ARG0, SESSION]; + POE::Component::Client::UserAgent->new; + my $url = $job->{sitename}.'/'.$job->{gateway}; + $url =~ s/\/\//\//g; + $url = "http://".$url."?op=spectre;instanceId=".$job->{instanceId}; + $kernel->post( useragent => 'request', { request => HTTP::Request->new(GET => $url), response => $session->postback('workerResponse') }); +} + +#------------------------------------------------------------------- + +=head2 suspendJob ( jobId ) { + +This method puts a running job back into the available jobs pool thusly freeing up a slot in the running jobs pool. This is done when a job has executed a workflow activity, but the entire workflow has not yet completed. + +=head3 jobId + +The job being suspended. + +=cut + +sub suspendJob { + my $self = shift; + my $instanceId = shift; + $self->{_jobs}{$instanceId}{status} = "waiting"; + for (my $i=0; $i < scalar(@{$self->{_runningJobs}}); $i++) { + if ($self->{_runningJobs}[$i]{instanceId} eq $instanceId) { + splice(@{$self->{_runningJobs}}, $i, 1); + } + } +} + +#------------------------------------------------------------------- + +=head2 workerResponse ( ) + +This method is called when the response from the runWorker() method is received. + +=cut + +sub workerResponse { + my $self = $_[OBJECT]; + my ($request, $response, $entry) = @{$_[ARG1]}; + my $jobId = ""; # got to figure out how to get this from the request, cuz the response may die + if ($response->is_success) { + my $state = ""; # get the response + if ($state eq "continue") { + $self->suspendJob($jobId); + } elsif ($state eq "done") { + $self->deleteJob($jobId); + } else { + $self->suspendJob($jobId); + # something bad happened + } + } elsif ($response->is_redirect) { + # nothing to do, cuz we're following the redirect to see what happens + } elsif ($response->is_error) { + $self->suspendJob($jobId) + # we should probably log something + } +} + + + 1; diff --git a/sbin/spectre.pl b/sbin/spectre.pl index e4ffa8b80..118aac8f8 100644 --- a/sbin/spectre.pl +++ b/sbin/spectre.pl @@ -1,5 +1,5 @@ #------------------------------------------------------------------- -# WebGUI is Copyright 2001-2005 Plain Black Corporation. +# WebGUI is Copyright 2001-2006 Plain Black Corporation. #------------------------------------------------------------------- # Please read the legal notices (docs/legal.txt) and the license # (docs/license.txt) that came with this distribution before using @@ -11,14 +11,8 @@ use strict; use warnings; use lib '../lib'; -use DateTime; -use DateTime::Cron::Simple; -use Getopt::Long; -use POE qw(Session); +use Spectre::Admin; use POE::Component::IKC::ClientLite; -use POE::Component::JobQueue; -use WebGUI::Session; -use WebGUI::Workflow; $|=1; # disable output buffering my $help; @@ -64,165 +58,5 @@ if ($shutdown) { fork and exit; - -create_ikc_server( - port => 32133, - name => 'Spectre', - ); - -POE::Session->create( - inline_states => { - _start => \&initializeScheduler, - _stop => \&shutdown, - "shutdown" => \&shutdown, - loadSchedule => \&loadSchedule, - checkSchedule => \&checkSchedule, - checkEvent => \&checkEvent, - } - ); - -POE::Session->create( - inline_states => { - _start => \&initializeJobQueue, - _stop => \&shutdown, - } - ); - -POE::Component::JobQueue->spawn ( - Alias => 'queuer', - WorkerLimit => 10, - Worker => \&spawnWorker, - Passive => { - Prioritizer => \&prioritizeJobs, - }, - ); - -POE::Kernel->run(); -exit 0; - - -#------------------------------------------------------------------- -sub checkEvent { - my ($kernel, $schedule, $workflowId, $time) = @_[KERNEL, ARG0, ARG1, ARG2]; - my $cron = DateTime::Cron::Simple->new($schedule); - if ($cron->validate_time(DateTime->from_epoch(epoch=>$time))) { - print "Supposed to run task ".$workflowId." now!!\n"; - } -} - -#------------------------------------------------------------------- -sub checkSchedule { - my ($kernel, $heap) = @_[KERNEL, HEAP]; - my $now = time(); - foreach my $config (keys %{$heap->{workflowSchedules}}) { - foreach my $event (@{$heap->{workflowSchedules}{$config}}) { - $kernel->yield("checkEvent",$event->{schedule},$event->{workflowId},$now); - } - } - $kernel->delay_set("checkSchedule",60); -} - -#------------------------------------------------------------------- -sub initializeJobQueue { - print "Starting WebGUI Spectre Job Queue..."; - my $kernel = $_[KERNEL]; - my $serviceName = "queue"; - $kernel->alias_set($serviceName); - $kernel->call( IKC => publish => $serviceName, ["shutdown"] ); - print "OK\n"; - foreach my $config (keys %{WebGUI::Config::readAllConfigs("..")}) { - $kernel->yield("loadJobs", $config); - } -} - -#------------------------------------------------------------------- -sub initializeScheduler { - print "Starting WebGUI Spectre Scheduler..."; - my ( $kernel, $heap) = @_[ KERNEL, HEAP ]; - my $serviceName = "scheduler"; - $kernel->alias_set($serviceName); - $kernel->call( IKC => publish => $serviceName, ["shutdown", "loadSchedule"] ); - foreach my $config (keys %{WebGUI::Config::readAllConfigs("..")}) { - $kernel->yield("loadSchedule", $config); - } - print "OK\n"; - $kernel->yield("checkSchedule"); -} - -#------------------------------------------------------------------- -sub loadJobs { - my ($heap, $config) = @_[HEAP, ARG0]; - sessionOpen($config); -} - -#------------------------------------------------------------------- -sub loadSchedule { - my ($heap, $config) = @_[HEAP, ARG0]; - sessionOpen($config); - $heap->{workflowSchedules}{$config} = WebGUI::Workflow::getSchedules(); - sessionClose(); -} - -#------------------------------------------------------------------- -sub performJob { - -} - -#------------------------------------------------------------------- -sub prioritizeJobs { - return 1; # FIFO queue, but let's add priorities at some point -} - -#------------------------------------------------------------------- -sub sessionOpen { - WebGUI::Session::open("..",shift); - WebGUI::Session::refreshUserInfo("pbuser_________spectre"); -} - -#------------------------------------------------------------------- -sub sessionClose { - WebGUI::Session::end(); - WebGUI::Session::close(); -} - -#------------------------------------------------------------------- -sub shutdown { - my $kernel = $_[KERNEL]; - print "Stopping WebGUI Spectre..."; - if ($session{var}{userId}) { - sessionClose(); - } - print "OK\n"; - $kernel->stop; -} - -#------------------------------------------------------------------- -sub spawnWorker { - my ($postback, @jobParams) = @_; - POE::Session->create ( - inline_states => { - _start => \&startWorker, - _stop => \&stopWorker, - performJob => \&performJob - }, - args => [ - $postback, - @jobParams, - ], - ); -} - -#------------------------------------------------------------------- -sub startWorker { - -} - -#------------------------------------------------------------------- -sub stopWorker { - -} - - - - +Spectre::Admin->new("/data/WebGUI"); diff --git a/sbin/testEnvironment.pl b/sbin/testEnvironment.pl index 39a8f31e3..8815dd792 100644 --- a/sbin/testEnvironment.pl +++ b/sbin/testEnvironment.pl @@ -80,6 +80,7 @@ checkModule("JSON",0.991); checkModule("Finance::Quote",1.08); checkModule("POE",0.3202); checkModule("POE::Component::IKC::Server",0.18); +checkModule("POE::Component::Client::UserAgent", 0.06); checkModule("Data::Structure::Util",0.11); checkModule("Apache2::Request",2.06);