From 9b9b63cc128d95b7446f8e19e1e4ccec5a3da629 Mon Sep 17 00:00:00 2001 From: JT Smith Date: Sat, 18 Mar 2006 23:25:31 +0000 Subject: [PATCH] fixed several workflow/spectre related bugs --- etc/spectre.conf.original | 14 +- lib/Spectre/Admin.pm | 11 +- lib/Spectre/Workflow.pm | 208 +++++++++++++++-------------- lib/WebGUI/Operation/VersionTag.pm | 4 +- lib/WebGUI/Workflow/Instance.pm | 6 +- lib/WebGUI/Workflow/Spectre.pm | 2 +- sbin/spectre.pl | 4 +- 7 files changed, 134 insertions(+), 115 deletions(-) diff --git a/etc/spectre.conf.original b/etc/spectre.conf.original index 2af93f819..67822be2d 100644 --- a/etc/spectre.conf.original +++ b/etc/spectre.conf.original @@ -16,18 +16,20 @@ "maxWorkers" : 3, -# How many seconds should Spectre wait between spawning jobs. This +# How many seconds should Spectre wait between spawning jobs This # can help avoid creating a denial of service attack on overworked # or underpowered servers. -"timeBetweenJobs" : 5, +"timeBetweenRunningWorkflows" : 5, # The number of seconds that Spectre should wait after an activity -# has been suspended before it should start it back up again. An -# activity may be suspended if it's waiting on user input, or -# if it returns an error. +# has been suspended before it should start it back up again. This +# gives other workflows a chance to run if one particular workflow +# has a lot of activites in it, and provides an opportunity to +# bypass workflows that are waiting for input from an external +# source. -"delayAfterSuspension" : 500 +"delayAfterSuspension" : 60 } diff --git a/lib/Spectre/Admin.pm b/lib/Spectre/Admin.pm index aaa2038b5..0195fdcf3 100644 --- a/lib/Spectre/Admin.pm +++ b/lib/Spectre/Admin.pm @@ -110,14 +110,19 @@ sub new { name => 'Spectre', ); POE::Session->create( - object_states => [ $self => {_start=>"_start", _stop=>"_stop", "shutdown"=>"_stop"} ], - args=>[["shutdown"]] + object_states => [ $self => {_start=>"_start", _stop=>"_stop", "shutdown"=>"_stop","test"=>"test"} ], + args=>[["shutdown","test"]] ); $self->{_cron} = Spectre::Cron->new($config, $debug); $self->{_workflow} = Spectre::Workflow->new($config, $debug); POE::Kernel->run(); } - +sub test { + my $arg = $_[ARG1]; +use JSON; + print objToJson($arg); +} + 1; diff --git a/lib/Spectre/Workflow.pm b/lib/Spectre/Workflow.pm index bd4c46723..f6693ff8a 100644 --- a/lib/Spectre/Workflow.pm +++ b/lib/Spectre/Workflow.pm @@ -38,7 +38,7 @@ sub _start { foreach my $config (keys %{$configs}) { $kernel->yield("loadWorkflows", $configs->{$config}); } - $kernel->yield("checkJobs"); + $kernel->yield("checkInstances"); } #------------------------------------------------------------------- @@ -59,53 +59,60 @@ sub _stop { #------------------------------------------------------------------- -=head2 addJob ( config, job ) +=head2 addInstance ( params ) -Adds a workflow job to the workflow processing queue. +Adds a workflow instance to the workflow processing queue. -=head3 config +=head3 params -The config file name for the site that this job belongs to. +A hash reference containing important information about the workflow instance to add to the queue. -=head3 job +=head4 sitename -A hash reference containing a row of data from the WorkflowInstance table. +The host and domain of the site this instance belongs to. + +=head3 instanceId + +The unqiue id for this workflow instance. + +=head3 priority + +The priority (1,2, or 3) that this instance should be run at. =cut -sub addJob { - my ($self, $config, $job) = @_[OBJECT, ARG0, ARG1]; - $self->debug("Adding workflow instance ".$job->{instanceId}." from ".$config->getFilename." to job queue at priority ".$job->{priority}."."); - my $sitename = $config->get("sitename"); - $self->{_jobs}{$job->{instanceId}} = { - sitename=>$sitename->[0], - instanceId=>$job->{instanceId}, +sub addInstance { + my ($self, $params) = @_[OBJECT, ARG0]; + $self->debug("Adding workflow instance ".$params->{instanceId}." from ".$params->{sitename}." to queue at priority ".$params->{priority}."."); + $self->{_instances}{$params->{instanceId}} = { + sitename=>$params->{sitename}, + instanceId=>$params->{instanceId}, status=>"waiting", - priority=>$job->{priority} + priority=>$params->{priority} }; - push(@{$self->{"_priority".$job->{priority}}}, $self->{_jobs}{$job->{instanceId}}); + push(@{$self->{"_priority".$params->{priority}}}, $params->{instanceId}); } #------------------------------------------------------------------- -=head2 checkJobs ( ) +=head2 checkInstances ( ) -Checks to see if there are any open job slots available, and if there are assigns a new job to be run to fill it. +Checks to see if there are any open instance slots available, and if there are assigns a new instance to be run to fill it. =cut -sub checkJobs { +sub checkInstances { my ($kernel, $self) = @_[KERNEL, OBJECT]; - $self->debug("Checking to see if we can run anymore jobs right now."); - if ($self->countRunningJobs < $self->config->get("maxWorkers")) { - my $job = $self->getNextJob; - if (defined $job) { - $job->{status} = "running"; - push(@{$self->{_runningJobs}}, $job); - $kernel->yield("runWorker",$job); + $self->debug("Checking to see if we can run anymore instances right now."); + if ($self->countRunningInstances < $self->config->get("maxWorkers")) { + my $instance = $self->getNextInstance; + if (defined $instance) { + $instance->{status} = "running"; + push(@{$self->{_runningInstances}}, $instance->{instanceId}); + $kernel->yield("runWorker",$instance); } } - $kernel->delay_set("checkJobs",$self->config->get("timeBetweenJobs")); + $kernel->delay_set("checkInstances",$self->config->get("timeBetweenRunningWorkflows")); } #------------------------------------------------------------------- @@ -123,18 +130,18 @@ sub config { #------------------------------------------------------------------- -=head2 countRunningJobs ( ) +=head2 countRunningInstances ( ) -Returns an integer representing the number of running jobs. +Returns an integer representing the number of running instances. =cut -sub countRunningJobs { +sub countRunningInstances { my $self = shift; - my $runningJobs = $self->{_runningJobs} || []; - my $jobCount = scalar(@{$runningJobs}); - $self->debug("There are $jobCount running jobs."); - return $jobCount; + my $runningInstances = $self->{_runningInstances} || []; + my $instanceCount = scalar(@{$runningInstances}); + $self->debug("There are $instanceCount running instances."); + return $instanceCount; } #------------------------------------------------------------------- @@ -157,43 +164,46 @@ sub debug { #------------------------------------------------------------------- -=head2 deleteJob ( instanceId ) +=head2 deleteInstance ( instanceId ) -Removes a workflow job from the processing queue. +Removes a workflow instance from the processing queue. =cut -sub deleteJob { - my ($self, $instanceId,$kernel) = @_[OBJECT, ARG0, KERNEL]; - $kernel->yield("suspendJob",$instanceId); - $self->debug("Deleting workflow instance $instanceId from job queue."); - my $priority = $self->{_jobs}{$instanceId}{priority}; - delete $self->{_jobs}{$instanceId}; - for (my $i=0; $i < scalar(@{$self->{"_priority".$priority}}); $i++) { - if ($self->{"_priority".$priority}[$i]{instanceId} eq $instanceId) { - splice(@{$self->{"_priority".$priority}}, $i, 1); +sub deleteInstance { + my ($self, $instanceId,$kernel, $session ) = @_[OBJECT, ARG0, KERNEL, SESSION]; + $kernel->call($session, "suspendInstance",$instanceId); + $self->debug("Deleting workflow instance $instanceId from instance queue."); + if ($self->{_instances}{$instanceId}) { + my $priority = $self->{_instances}{$instanceId}{priority}; + delete $self->{_instances}{$instanceId}; + for (my $i=0; $i < scalar(@{$self->{"_priority".$priority}}); $i++) { + if ($self->{"_priority".$priority}[$i] eq $instanceId) { + splice(@{$self->{"_priority".$priority}}, $i, 1); + } } } } #------------------------------------------------------------------- -=head2 getNextJob ( ) +=head2 getNextInstance ( ) =cut -sub getNextJob { +sub getNextInstance { my $self = shift; - $self->debug("Looking for a workflow instance to execute."); + $self->debug("Looking for a workflow instance to run."); foreach my $priority (1..3) { - foreach my $job (@{$self->{"_priority".$priority}}) { - if (time() > $job->{statusDelay} & $job->{status}) { - delete $job->{statusDelay}; - $job->{status} eq "waiting"; + foreach my $instanceId (@{$self->{"_priority".$priority}}) { + if (time() > $self->{_instances}{$instanceId}{statusDelay} && $self->{_instances}{$instanceId}{status} eq "delay") { + $self->debug("Returning $instanceId to available pool."); + delete $self->{_instances}{$instanceId}{statusDelay}; + $self->{_instances}{$instanceId}{status} = "waiting"; } - if ($job->{status} eq "waiting") { - $self->debug("Looks like ".$job->{instanceId}." would be a good workflow instance to run."); - return $job; + if ($self->{_instances}{$instanceId}{status} eq "waiting") { + $self->debug("Looks like ".$instanceId." would be a good workflow instance to run."); + return $self->{_instances}{$instanceId}; } } } @@ -211,9 +221,9 @@ sub loadWorkflows { my ($kernel, $self, $config) = @_[KERNEL, OBJECT, ARG0]; $self->debug("Loading workflows for ".$config->getFilename."."); my $session = WebGUI::Session->open($config->getWebguiRoot, $config->getFilename); - my $result = $session->db->read("select * from WorkflowInstance"); - while (my $data = $result->hashRef) { - $kernel->yield("addJob", $config, $data); + my $result = $session->db->read("select instanceId,priority from WorkflowInstance"); + while (my ($id, $priority) = $result->array) { + $kernel->yield("addInstance", {sitename=>$config->get("sitename")->[0], instanceId=>$id, priority=>$priority}); } $session->close; } @@ -240,9 +250,9 @@ sub new { my $debug = shift; my $self = {_debug=>$debug, _config=>$config}; bless $self, $class; - my @publicEvents = qw(addJob deleteJob); + my @publicEvents = qw(addInstance deleteInstance); POE::Session->create( - object_states => [ $self => [qw(_start _stop checkJobs deleteJob suspendJob loadWorkflows runWorker workerResponse), @publicEvents] ], + object_states => [ $self => [qw(_start _stop addInstance checkInstances deleteInstance suspendInstance loadWorkflows runWorker workerResponse), @publicEvents] ], args=>[\@publicEvents] ); } @@ -256,36 +266,38 @@ Calls a worker to execute a workflow activity. =cut sub runWorker { - my ($kernel, $self, $job, $session) = @_[KERNEL, OBJECT, ARG0, SESSION]; - $self->debug("Preparing to run workflow instance ".$job->{instanceId}."."); + my ($kernel, $self, $instance, $session) = @_[KERNEL, OBJECT, ARG0, SESSION]; + $self->debug("Preparing to run workflow instance ".$instance->{instanceId}."."); POE::Component::Client::UserAgent->new; - my $url = "http://".$job->{sitename}.'/'; - my $request = POST $url, [op=>"runWorkflow", instanceId=>$job->{instanceId}]; - my $cookie = $self->{_cookies}{$job->{sitename}}; + my $url = "http://".$instance->{sitename}.'/'; + my $request = POST $url, [op=>"runWorkflow", instanceId=>$instance->{instanceId}]; + my $cookie = $self->{_cookies}{$instance->{sitename}}; $request->header("Cookie","wgSession=".$cookie) if (defined $cookie); $request->header("User-Agent","Spectre"); - $request->header("X-JobId",$job->{instanceId}); - $self->debug("Posting workflow instance ".$job->{instanceId}." to $url."); + $request->header("X-instanceId",$instance->{instanceId}); + $self->debug("Posting workflow instance ".$instance->{instanceId}." to $url."); $kernel->post( useragent => 'request', { request => $request, response => $session->postback('workerResponse') }); - $self->debug("Workflow instance ".$job->{instanceId}." posted."); + $self->debug("Workflow instance ".$instance->{instanceId}." posted."); } #------------------------------------------------------------------- -=head2 suspendJob ( ) +=head2 suspendInstance ( ) -This method puts a running job back into the available jobs pool thusly freeing up a slot in the running jobs pool. This is done when a job has executed a workflow activity, but the entire workflow has not yet completed. +This method puts a running instance back into the available instances pool thusly freeing up a slot in the running instances pool. This is done when a instance has executed a workflow activity, but the entire workflow has not yet completed. =cut -sub suspendJob { +sub suspendInstance { my ($self, $instanceId) = @_[OBJECT, ARG0]; - $self->debug("Suspending workflow instance ".$instanceId."."); - $self->{_jobs}{$instanceId}{status} = "delay"; - $self->{_jobs}{$instanceId}{statusDelay} = $self->config->get("delayAfterSuspension") + time(); - for (my $i=0; $i < scalar(@{$self->{_runningJobs}}); $i++) { - if ($self->{_runningJobs}[$i]{instanceId} eq $instanceId) { - splice(@{$self->{_runningJobs}}, $i, 1); + $self->debug("Suspending workflow instance ".$instanceId." for ".$self->config->get("delayAfterSuspension")." seconds."); + if ($self->{_instances}{$instanceId}) { + $self->{_instances}{$instanceId}{status} = "delay"; + $self->{_instances}{$instanceId}{statusDelay} = $self->config->get("delayAfterSuspension") + time(); + for (my $i=0; $i < scalar(@{$self->{_runningInstances}}); $i++) { + if ($self->{_runningInstances}[$i] eq $instanceId) { + splice(@{$self->{_runningInstances}}, $i, 1); + } } } } @@ -300,44 +312,44 @@ This method is called when the response from the runWorker() method is received. sub workerResponse { my ($self, $kernel) = @_[OBJECT, KERNEL]; - $self->debug("Retrieving response from workflow instance job."); + $self->debug("Retrieving response from workflow instance."); my ($request, $response, $entry) = @{$_[ARG1]}; - my $jobId = $request->header("X-JobId"); # got to figure out how to get this from the request, cuz the response may die - $self->debug("Response retrieved is for $jobId."); + my $instanceId = $request->header("X-instanceId"); # got to figure out how to get this from the request, cuz the response may die + $self->debug("Response retrieved is for $instanceId."); if ($response->is_success) { - $self->debug("Response for $jobId retrieved successfully."); + $self->debug("Response for $instanceId retrieved successfully."); if ($response->header("Cookie") ne "") { - $self->debug("Storing cookie for $jobId for later use."); + $self->debug("Storing cookie for $instanceId for later use."); my $cookie = $response->header("Set-Cookie"); $cookie =~ s/wgSession=([a-zA-Z0-9\_\-]{22})/$1/; - $self->{_cookies}{$self->{_jobs}{$jobId}{sitename}} = $cookie; + $self->{_cookies}{$self->{_instances}{$instanceId}{sitename}} = $cookie; } my $state = $response->content; if ($state eq "waiting") { - $self->debug("Was told to wait on $jobId because we're still waiting on some external event."); - $kernel->yield("suspendJob",$jobId); + $self->debug("Was told to wait on $instanceId because we're still waiting on some external event."); + $kernel->yield("suspendInstance",$instanceId); } elsif ($state eq "complete") { - $self->debug("Workflow instance $jobId ran one of it's activities successfully."); - $kernel->yield("suspendJob",$jobId); + $self->debug("Workflow instance $instanceId ran one of it's activities successfully."); + $kernel->yield("suspendInstance",$instanceId); } elsif ($state eq "disabled") { - $self->debug("Workflow instance $jobId is disabled."); - $kernel->yield("deleteJob",$jobId); + $self->debug("Workflow instance $instanceId is disabled."); + $kernel->yield("suspendInstance",$instanceId); } elsif ($state eq "done") { - $self->debug("Workflow instance $jobId is now complete."); - $kernel->yield("deleteJob",$jobId); + $self->debug("Workflow instance $instanceId is now complete."); + $kernel->yield("deleteInstance",$instanceId); } elsif ($state eq "error") { - $self->debug("Got an error for $jobId."); - $kernel->yield("suspendJob",$jobId); + $self->debug("Got an error for $instanceId."); + $kernel->yield("suspendInstance",$instanceId); } else { - $self->debug("Something bad happened on the return of $jobId."); - $kernel->yield("suspendJob",$jobId); + $self->debug("Something bad happened on the return of $instanceId."); + $kernel->yield("suspendInstance",$instanceId); # something bad happened } } elsif ($response->is_redirect) { - $self->debug("Response for $jobId was redirected."); + $self->debug("Response for $instanceId was redirected."); } elsif ($response->is_error) { - $self->debug("Response for $jobId had a communications error."); - $kernel->yield("suspendJob",$jobId) + $self->debug("Response for $instanceId had a communications error."); + $kernel->yield("suspendInstance",$instanceId) # we should probably log something } } diff --git a/lib/WebGUI/Operation/VersionTag.pm b/lib/WebGUI/Operation/VersionTag.pm index d4717e21f..68b09e099 100644 --- a/lib/WebGUI/Operation/VersionTag.pm +++ b/lib/WebGUI/Operation/VersionTag.pm @@ -207,8 +207,8 @@ sub www_commitVersionTagConfirm { my $tag = WebGUI::VersionTag->new($session, $tagId); if (defined $tag && $session->user->isInGroup($tag->get("groupToUse"))) { $tag->set({comments=>$session->form->process("comments", "textarea")}); - $tag->commit; - #$tag->requestCommit; + # $tag->commit; + $tag->requestCommit; my $i18n = WebGUI::International->new($session, "VersionTag"); my $ac = WebGUI::AdminConsole->new($session,"versions"); return $ac->render( diff --git a/lib/WebGUI/Workflow/Instance.pm b/lib/WebGUI/Workflow/Instance.pm index 1a7fe080d..04f76a562 100644 --- a/lib/WebGUI/Workflow/Instance.pm +++ b/lib/WebGUI/Workflow/Instance.pm @@ -84,7 +84,7 @@ sub delete { my $self = shift; $self->session->db->write("delete from WorkflowInstanceScratch where instanceId=?",[$self->getId]); $self->session->db->deleteRow("WorkflowInstance","instanceId",$self->getId); - WebGUI::Workflow::Spectre->new($self->session)->notify("workflow/deleteJob",$self->getId); + WebGUI::Workflow::Spectre->new($self->session)->notify("workflow/deleteInstance",$self->getId); undef $self; } @@ -306,8 +306,8 @@ sub set { $self->{_data}{lastUpdate} = time(); $self->session->db->setRow("WorkflowInstance","instanceId",$self->{_data}); my $spectre = WebGUI::Workflow::Spectre->new($self->session); - $spectre->notify("workflow/deleteJob",$self->getId); - $spectre->notify("workflow/addJob",$self->session->config->getFilename, $self->{_data}); + $spectre->notify("workflow/deleteInstance",$self->getId); + $spectre->notify("workflow/addInstance", {sitename=>$self->session->config->get("sitename")->[0], instanceId=>$self->getId, priority=>$self->{_data}{priority}}); } #------------------------------------------------------------------- diff --git a/lib/WebGUI/Workflow/Spectre.pm b/lib/WebGUI/Workflow/Spectre.pm index 35e49633d..9c24abdcc 100644 --- a/lib/WebGUI/Workflow/Spectre.pm +++ b/lib/WebGUI/Workflow/Spectre.pm @@ -81,7 +81,7 @@ sub notify { timeout=>10 ); if ($remote) { - my $result = $remote->post('admin/shutdown', @params); + my $result = $remote->post(@params); unless (defined $result) { $self->session->errorHandler->warn("Couldn't send command to Spectre because ".$POE::Component::IKC::ClientLite::error); } diff --git a/sbin/spectre.pl b/sbin/spectre.pl index 758517188..0a54fd16c 100644 --- a/sbin/spectre.pl +++ b/sbin/spectre.pl @@ -35,8 +35,8 @@ if ($help || !($shutdown||$daemon||$run)) { print <