diff --git a/etc/spectre.conf.original b/etc/spectre.conf.original index 67822be2d..072f13149 100644 --- a/etc/spectre.conf.original +++ b/etc/spectre.conf.original @@ -16,20 +16,17 @@ "maxWorkers" : 3, -# How many seconds should Spectre wait between spawning jobs This +# How many seconds should Spectre wait between spawning jobs. This # can help avoid creating a denial of service attack on overworked -# or underpowered servers. +# or underpowered servers. -"timeBetweenRunningWorkflows" : 5, +"timeBetweenRunningWorkflows" : 4, -# The number of seconds that Spectre should wait after an activity -# has been suspended before it should start it back up again. This -# gives other workflows a chance to run if one particular workflow -# has a lot of activites in it, and provides an opportunity to -# bypass workflows that are waiting for input from an external -# source. +# How long should Spectre delay processing a workflow instance when +# it gets suspended. It can get suspended if it's waiting for +# external input, or if it errors for any reason. -"delayAfterSuspension" : 60 +"suspensionDelay" : 60 } diff --git a/lib/Spectre/Workflow.pm b/lib/Spectre/Workflow.pm index f6693ff8a..9c7cf362a 100644 --- a/lib/Spectre/Workflow.pm +++ b/lib/Spectre/Workflow.pm @@ -107,8 +107,18 @@ sub checkInstances { if ($self->countRunningInstances < $self->config->get("maxWorkers")) { my $instance = $self->getNextInstance; if (defined $instance) { + # mark it running so that it doesn't run twice at once $instance->{status} = "running"; push(@{$self->{_runningInstances}}, $instance->{instanceId}); + # put it at the end of the queue so that others get a chance + my $priority = $self->{_instances}{$instance->{instanceId}}{priority}; + for (my $i=0; $i < scalar(@{$self->{"_priority".$priority}}); $i++) { + if ($self->{"_priority".$priority}[$i] eq $instance->{instanceId}) { + splice(@{$self->{"_priority".$priority}}, $i, 1); + } + } + push(@{$self->{"_priority".$priority}}, $instance->{instanceId}); + # run it already $kernel->yield("runWorker",$instance); } } @@ -172,7 +182,7 @@ Removes a workflow instance from the processing queue. sub deleteInstance { my ($self, $instanceId,$kernel, $session ) = @_[OBJECT, ARG0, KERNEL, SESSION]; - $kernel->call($session, "suspendInstance",$instanceId); + $kernel->call($session, "returnInstanceToQueue",$instanceId); $self->debug("Deleting workflow instance $instanceId from instance queue."); if ($self->{_instances}{$instanceId}) { my $priority = $self->{_instances}{$instanceId}{priority}; @@ -196,11 +206,6 @@ sub getNextInstance { $self->debug("Looking for a workflow instance to run."); foreach my $priority (1..3) { foreach my $instanceId (@{$self->{"_priority".$priority}}) { - if (time() > $self->{_instances}{$instanceId}{statusDelay} && $self->{_instances}{$instanceId}{status} eq "delay") { - $self->debug("Returning $instanceId to available pool."); - delete $self->{_instances}{$instanceId}{statusDelay}; - $self->{_instances}{$instanceId}{status} = "waiting"; - } if ($self->{_instances}{$instanceId}{status} eq "waiting") { $self->debug("Looks like ".$instanceId." would be a good workflow instance to run."); return $self->{_instances}{$instanceId}; @@ -252,13 +257,34 @@ sub new { bless $self, $class; my @publicEvents = qw(addInstance deleteInstance); POE::Session->create( - object_states => [ $self => [qw(_start _stop addInstance checkInstances deleteInstance suspendInstance loadWorkflows runWorker workerResponse), @publicEvents] ], + object_states => [ $self => [qw(_start _stop returnInstanceToQueue addInstance checkInstances deleteInstance suspendInstance loadWorkflows runWorker workerResponse), @publicEvents] ], args=>[\@publicEvents] ); } #------------------------------------------------------------------- +=head2 returnInstanceToQueue ( ) + +Returns a workflow instance back to runnable queue. + +=cut + +sub returnInstanceToQueue { + my ($self, $instanceId) = @_[OBJECT, ARG0]; + $self->debug("Returning ".$instanceId." to runnable queue."); + if ($self->{_instances}{$instanceId}) { + $self->{_instances}{$instanceId}{status} = "waiting"; + for (my $i=0; $i < scalar(@{$self->{_runningInstances}}); $i++) { + if ($self->{_runningInstances}[$i] eq $instanceId) { + splice(@{$self->{_runningInstances}}, $i, 1); + } + } + } +} + +#------------------------------------------------------------------- + =head2 runWorker ( ) Calls a worker to execute a workflow activity. @@ -284,22 +310,14 @@ sub runWorker { =head2 suspendInstance ( ) -This method puts a running instance back into the available instances pool thusly freeing up a slot in the running instances pool. This is done when a instance has executed a workflow activity, but the entire workflow has not yet completed. +Suspends a workflow instance for a number of seconds defined in the config file, and then returns it to the runnable queue. =cut sub suspendInstance { - my ($self, $instanceId) = @_[OBJECT, ARG0]; - $self->debug("Suspending workflow instance ".$instanceId." for ".$self->config->get("delayAfterSuspension")." seconds."); - if ($self->{_instances}{$instanceId}) { - $self->{_instances}{$instanceId}{status} = "delay"; - $self->{_instances}{$instanceId}{statusDelay} = $self->config->get("delayAfterSuspension") + time(); - for (my $i=0; $i < scalar(@{$self->{_runningInstances}}); $i++) { - if ($self->{_runningInstances}[$i] eq $instanceId) { - splice(@{$self->{_runningInstances}}, $i, 1); - } - } - } + my ($self, $instanceId, $kernel) = @_[OBJECT, ARG0, KERNEL]; + $self->debug("Suspending workflow instance ".$instanceId." for ".$self->config->get("suspensionDelay")." seconds."); + $kernel->delay_set("returnInstanceToQueue",$self->config->get("suspensionDelay"), $instanceId); } #------------------------------------------------------------------- @@ -330,7 +348,7 @@ sub workerResponse { $kernel->yield("suspendInstance",$instanceId); } elsif ($state eq "complete") { $self->debug("Workflow instance $instanceId ran one of it's activities successfully."); - $kernel->yield("suspendInstance",$instanceId); + $kernel->yield("returnInstanceToQueue",$instanceId); } elsif ($state eq "disabled") { $self->debug("Workflow instance $instanceId is disabled."); $kernel->yield("suspendInstance",$instanceId); @@ -343,7 +361,6 @@ sub workerResponse { } else { $self->debug("Something bad happened on the return of $instanceId."); $kernel->yield("suspendInstance",$instanceId); - # something bad happened } } elsif ($response->is_redirect) { $self->debug("Response for $instanceId was redirected."); diff --git a/lib/WebGUI/Workflow/Instance.pm b/lib/WebGUI/Workflow/Instance.pm index 04f76a562..0b7cb5932 100644 --- a/lib/WebGUI/Workflow/Instance.pm +++ b/lib/WebGUI/Workflow/Instance.pm @@ -68,6 +68,7 @@ sub create { return undef if ($isSerial && $count); my $instanceId = $session->db->setRow("WorkflowInstance","instanceId",{instanceId=>"new", runningSince=>time()}, $id); my $self = $class->new($session, $instanceId); + $properties->{priority} ||= 2; $self->set($properties); return $self; } @@ -305,9 +306,11 @@ sub set { $self->{_data}{currentActivityId} = (exists $properties->{currentActivityId}) ? $properties->{currentActivityId} : $self->{_data}{currentActivityId}; $self->{_data}{lastUpdate} = time(); $self->session->db->setRow("WorkflowInstance","instanceId",$self->{_data}); - my $spectre = WebGUI::Workflow::Spectre->new($self->session); - $spectre->notify("workflow/deleteInstance",$self->getId); - $spectre->notify("workflow/addInstance", {sitename=>$self->session->config->get("sitename")->[0], instanceId=>$self->getId, priority=>$self->{_data}{priority}}); + if ($properties->{priority}) { + my $spectre = WebGUI::Workflow::Spectre->new($self->session); + $spectre->notify("workflow/deleteInstance",$self->getId); + $spectre->notify("workflow/addInstance", {sitename=>$self->session->config->get("sitename")->[0], instanceId=>$self->getId, priority=>$self->{_data}{priority}}); + } } #-------------------------------------------------------------------