better queue for workflow

This commit is contained in:
JT Smith 2006-03-19 16:51:21 +00:00
parent ee4bcaf415
commit 2467dc32dc
3 changed files with 51 additions and 34 deletions

View file

@ -16,20 +16,17 @@
"maxWorkers" : 3,
# How many seconds should Spectre wait between spawning jobs This
# How many seconds should Spectre wait between spawning jobs. This
# can help avoid creating a denial of service attack on overworked
# or underpowered servers.
# or underpowered servers.
"timeBetweenRunningWorkflows" : 5,
"timeBetweenRunningWorkflows" : 4,
# The number of seconds that Spectre should wait after an activity
# has been suspended before it should start it back up again. This
# gives other workflows a chance to run if one particular workflow
# has a lot of activites in it, and provides an opportunity to
# bypass workflows that are waiting for input from an external
# source.
# How long should Spectre delay processing a workflow instance when
# it gets suspended. It can get suspended if it's waiting for
# external input, or if it errors for any reason.
"delayAfterSuspension" : 60
"suspensionDelay" : 60
}

View file

@ -107,8 +107,18 @@ sub checkInstances {
if ($self->countRunningInstances < $self->config->get("maxWorkers")) {
my $instance = $self->getNextInstance;
if (defined $instance) {
# mark it running so that it doesn't run twice at once
$instance->{status} = "running";
push(@{$self->{_runningInstances}}, $instance->{instanceId});
# put it at the end of the queue so that others get a chance
my $priority = $self->{_instances}{$instance->{instanceId}}{priority};
for (my $i=0; $i < scalar(@{$self->{"_priority".$priority}}); $i++) {
if ($self->{"_priority".$priority}[$i] eq $instance->{instanceId}) {
splice(@{$self->{"_priority".$priority}}, $i, 1);
}
}
push(@{$self->{"_priority".$priority}}, $instance->{instanceId});
# run it already
$kernel->yield("runWorker",$instance);
}
}
@ -172,7 +182,7 @@ Removes a workflow instance from the processing queue.
sub deleteInstance {
my ($self, $instanceId,$kernel, $session ) = @_[OBJECT, ARG0, KERNEL, SESSION];
$kernel->call($session, "suspendInstance",$instanceId);
$kernel->call($session, "returnInstanceToQueue",$instanceId);
$self->debug("Deleting workflow instance $instanceId from instance queue.");
if ($self->{_instances}{$instanceId}) {
my $priority = $self->{_instances}{$instanceId}{priority};
@ -196,11 +206,6 @@ sub getNextInstance {
$self->debug("Looking for a workflow instance to run.");
foreach my $priority (1..3) {
foreach my $instanceId (@{$self->{"_priority".$priority}}) {
if (time() > $self->{_instances}{$instanceId}{statusDelay} && $self->{_instances}{$instanceId}{status} eq "delay") {
$self->debug("Returning $instanceId to available pool.");
delete $self->{_instances}{$instanceId}{statusDelay};
$self->{_instances}{$instanceId}{status} = "waiting";
}
if ($self->{_instances}{$instanceId}{status} eq "waiting") {
$self->debug("Looks like ".$instanceId." would be a good workflow instance to run.");
return $self->{_instances}{$instanceId};
@ -252,13 +257,34 @@ sub new {
bless $self, $class;
my @publicEvents = qw(addInstance deleteInstance);
POE::Session->create(
object_states => [ $self => [qw(_start _stop addInstance checkInstances deleteInstance suspendInstance loadWorkflows runWorker workerResponse), @publicEvents] ],
object_states => [ $self => [qw(_start _stop returnInstanceToQueue addInstance checkInstances deleteInstance suspendInstance loadWorkflows runWorker workerResponse), @publicEvents] ],
args=>[\@publicEvents]
);
}
#-------------------------------------------------------------------
=head2 returnInstanceToQueue ( )
Returns a workflow instance back to runnable queue.
=cut
sub returnInstanceToQueue {
my ($self, $instanceId) = @_[OBJECT, ARG0];
$self->debug("Returning ".$instanceId." to runnable queue.");
if ($self->{_instances}{$instanceId}) {
$self->{_instances}{$instanceId}{status} = "waiting";
for (my $i=0; $i < scalar(@{$self->{_runningInstances}}); $i++) {
if ($self->{_runningInstances}[$i] eq $instanceId) {
splice(@{$self->{_runningInstances}}, $i, 1);
}
}
}
}
#-------------------------------------------------------------------
=head2 runWorker ( )
Calls a worker to execute a workflow activity.
@ -284,22 +310,14 @@ sub runWorker {
=head2 suspendInstance ( )
This method puts a running instance back into the available instances pool thusly freeing up a slot in the running instances pool. This is done when a instance has executed a workflow activity, but the entire workflow has not yet completed.
Suspends a workflow instance for a number of seconds defined in the config file, and then returns it to the runnable queue.
=cut
sub suspendInstance {
my ($self, $instanceId) = @_[OBJECT, ARG0];
$self->debug("Suspending workflow instance ".$instanceId." for ".$self->config->get("delayAfterSuspension")." seconds.");
if ($self->{_instances}{$instanceId}) {
$self->{_instances}{$instanceId}{status} = "delay";
$self->{_instances}{$instanceId}{statusDelay} = $self->config->get("delayAfterSuspension") + time();
for (my $i=0; $i < scalar(@{$self->{_runningInstances}}); $i++) {
if ($self->{_runningInstances}[$i] eq $instanceId) {
splice(@{$self->{_runningInstances}}, $i, 1);
}
}
}
my ($self, $instanceId, $kernel) = @_[OBJECT, ARG0, KERNEL];
$self->debug("Suspending workflow instance ".$instanceId." for ".$self->config->get("suspensionDelay")." seconds.");
$kernel->delay_set("returnInstanceToQueue",$self->config->get("suspensionDelay"), $instanceId);
}
#-------------------------------------------------------------------
@ -330,7 +348,7 @@ sub workerResponse {
$kernel->yield("suspendInstance",$instanceId);
} elsif ($state eq "complete") {
$self->debug("Workflow instance $instanceId ran one of it's activities successfully.");
$kernel->yield("suspendInstance",$instanceId);
$kernel->yield("returnInstanceToQueue",$instanceId);
} elsif ($state eq "disabled") {
$self->debug("Workflow instance $instanceId is disabled.");
$kernel->yield("suspendInstance",$instanceId);
@ -343,7 +361,6 @@ sub workerResponse {
} else {
$self->debug("Something bad happened on the return of $instanceId.");
$kernel->yield("suspendInstance",$instanceId);
# something bad happened
}
} elsif ($response->is_redirect) {
$self->debug("Response for $instanceId was redirected.");

View file

@ -68,6 +68,7 @@ sub create {
return undef if ($isSerial && $count);
my $instanceId = $session->db->setRow("WorkflowInstance","instanceId",{instanceId=>"new", runningSince=>time()}, $id);
my $self = $class->new($session, $instanceId);
$properties->{priority} ||= 2;
$self->set($properties);
return $self;
}
@ -305,9 +306,11 @@ sub set {
$self->{_data}{currentActivityId} = (exists $properties->{currentActivityId}) ? $properties->{currentActivityId} : $self->{_data}{currentActivityId};
$self->{_data}{lastUpdate} = time();
$self->session->db->setRow("WorkflowInstance","instanceId",$self->{_data});
my $spectre = WebGUI::Workflow::Spectre->new($self->session);
$spectre->notify("workflow/deleteInstance",$self->getId);
$spectre->notify("workflow/addInstance", {sitename=>$self->session->config->get("sitename")->[0], instanceId=>$self->getId, priority=>$self->{_data}{priority}});
if ($properties->{priority}) {
my $spectre = WebGUI::Workflow::Spectre->new($self->session);
$spectre->notify("workflow/deleteInstance",$self->getId);
$spectre->notify("workflow/addInstance", {sitename=>$self->session->config->get("sitename")->[0], instanceId=>$self->getId, priority=>$self->{_data}{priority}});
}
}
#-------------------------------------------------------------------