diff --git a/docs/changelog/7.x.x.txt b/docs/changelog/7.x.x.txt index 1cd140277..b885f2214 100644 --- a/docs/changelog/7.x.x.txt +++ b/docs/changelog/7.x.x.txt @@ -20,6 +20,7 @@ - rfe: added user defined fields to collaboration system rss (Dept of State) - Simplified Spectre's initial data load, and reduced memory footprint in the process. + - fix: Spectre needs to be restarted every day - fix: lack of testing for valid object creation - fix: No mention of intermediate upgrade step in gotcha's - fix: A newly released version of Html::Template fixes a bug with global diff --git a/lib/Spectre/Workflow.pm b/lib/Spectre/Workflow.pm index f27184882..edceafe9b 100644 --- a/lib/Spectre/Workflow.pm +++ b/lib/Spectre/Workflow.pm @@ -18,6 +18,7 @@ use strict; use HTTP::Request::Common; use HTTP::Cookies; use POE qw(Component::Client::HTTP); +use POE::Queue::Array; #------------------------------------------------------------------- @@ -75,11 +76,10 @@ The priority (1,2, or 3) that this instance should be run at. =cut sub addInstance { - my ($self, $params) = @_[OBJECT, ARG0]; - $self->debug("Adding workflow instance ".$params->{instanceId}." from ".$params->{sitename}." to queue at priority ".$params->{priority}."."); - $params->{status} = "waiting"; - $self->{_instances}{$params->{instanceId}} = $params; - push(@{$self->{"_priority".$params->{priority}}}, $params->{instanceId}); + my ($self, $instance) = @_[OBJECT, ARG0]; + my $priority = ($instance->{priority} -1) * 10; + $self->debug("Adding workflow instance ".$instance->{instanceId}." from ".$instance->{sitename}." to queue at priority ".$priority."."); + $self->getWaitingQueue->enqueue($priority, $instance); } #------------------------------------------------------------------- @@ -94,24 +94,9 @@ sub checkInstances { my ($kernel, $self) = @_[KERNEL, OBJECT]; $self->debug("Checking to see if we can run anymore instances right now."); if ($self->countRunningInstances < $self->config->get("maxWorkers")) { - $self->debug("Total workflows waiting to run: ".scalar(keys %{$self->{_instances}})); - $self->debug("Priority 1 count: ".scalar(@{$self->{_priority1}})); - $self->debug("Priority 2 count: ".scalar(@{$self->{_priority2}})); - $self->debug("Priority 3 count: ".scalar(@{$self->{_priority3}})); + $self->debug("Total workflows waiting to run: ".$self->getWaitingQueue->get_item_count); my $instance = $self->getNextInstance; if (defined $instance) { - # mark it running so that it doesn't run twice at once - $instance->{status} = "running"; - push(@{$self->{_runningInstances}}, $instance->{instanceId}); - # put it at the end of the queue so that others get a chance - my $priority = $self->{_instances}{$instance->{instanceId}}{priority}; - for (my $i=0; $i < scalar(@{$self->{"_priority".$priority}}); $i++) { - if ($self->{"_priority".$priority}[$i] eq $instance->{instanceId}) { - splice(@{$self->{"_priority".$priority}}, $i, 1); - } - } - push(@{$self->{"_priority".$priority}}, $instance->{instanceId}); - # run it already $kernel->yield("runWorker",$instance); } } @@ -141,8 +126,7 @@ Returns an integer representing the number of running instances. sub countRunningInstances { my $self = shift; - my $runningInstances = $self->{_runningInstances}; - my $instanceCount = scalar(@{$runningInstances}); + my $instanceCount = $self->getRunningQueue->get_item_count; $self->debug("There are $instanceCount running instances."); return $instanceCount; } @@ -179,21 +163,14 @@ Removes a workflow instance from the processing queue. sub deleteInstance { my ($self, $instanceId,$kernel, $session ) = @_[OBJECT, ARG0, KERNEL, SESSION]; $self->debug("Deleting workflow instance $instanceId from queue."); + $self->getWaitingQueue->remove_items( + sub { + my $instance = shift; + return 1 if ($instance->{instanceId} eq $instanceId); + return 0; + } + ); $self->removeInstanceFromRunningQueue($instanceId); - if ($self->{_instances}{$instanceId}) { - my $priority = $self->{_instances}{$instanceId}{priority}; - unless ($priority) { - $priority = 2; - $self->error("Workflow instance $instanceId has no priority set. This is likely the cause of a bug somewhere in the system. Temporarily setting the priority to 2 to avoid a fatal error."); - } - delete $self->{_errorCount}{$instanceId}; - delete $self->{_instances}{$instanceId}; - for (my $i=0; $i < scalar(@{$self->{"_priority".$priority}}); $i++) { - if ($self->{"_priority".$priority}[$i] eq $instanceId) { - splice(@{$self->{"_priority".$priority}}, $i, 1); - } - } - } } #------------------------------------------------------------------- @@ -219,7 +196,7 @@ sub error { #------------------------------------------------------------------- -=head3 getLogger ( ) +=head2 getLogger ( ) Returns a reference to the logger. @@ -230,22 +207,51 @@ sub getLogger { return $self->{_logger}; } +#------------------------------------------------------------------- + +=head2 getRunningQueue ( ) + +Returns a reference to the queue of workflow instances that are running now. + +=cut + +sub getRunningQueue { + my $self = shift; + return $self->{_runningQueue}; +} + +#------------------------------------------------------------------- + +=head2 getWaitingQueue ( ) + +Returns a reference to the queue of workflow instances waiting to run. + +=cut + +sub getWaitingQueue { + my $self = shift; + return $self->{_waitingQueue}; +} + + #------------------------------------------------------------------- =head2 getNextInstance ( ) +Returns the next available instance. + =cut sub getNextInstance { my $self = shift; $self->debug("Looking for a workflow instance to run."); - foreach my $priority (1..3) { - foreach my $instanceId (@{$self->{"_priority".$priority}}) { - if ($self->{_instances}{$instanceId}{status} eq "waiting") { - $self->debug("Looks like ".$instanceId." would be a good workflow instance to run."); - return $self->{_instances}{$instanceId}; - } - } + my $waiting = $self->getWaitingQueue; + if ($waiting->get_item_count > 0) { + my ($priority, $id, $instance) = $waiting->dequeue_next; + $instance->{workingPriority} = $priority; + $self->getRunningQueue->enqueue($priority, $instance); + $self->debug("Looks like ".$instance->{instanceId}." at priority $priority would be a good workflow instance to run."); + return $instance; } $self->debug("Didn't see any workflow instances to run."); return undef; @@ -276,7 +282,7 @@ sub new { my $config = shift; my $logger = shift; my $debug = shift; - my $self = {_runningInstances=>[], _priority1=>[], _priority2=>[], _priority3=>[], _debug=>$debug, _config=>$config, _logger=>$logger}; + my $self = {_debug=>$debug, _config=>$config, _logger=>$logger}; bless $self, $class; my @publicEvents = qw(addInstance deleteInstance); POE::Session->create( @@ -289,25 +295,31 @@ sub new { Alias => 'workflow-ua', CookieJar => $cookies ); + $self->{_runningQueue} = POE::Queue::Array->new; + $self->{_waitingQueue} = POE::Queue::Array->new; } + #------------------------------------------------------------------- =head2 removeInstanceFromRunningQueue ( ) -Removes a workflow instance from the queue that tracks what's running. +Removes a workflow instance from the queue that tracks what's running and returns a reference to it. =cut sub removeInstanceFromRunningQueue { my $self = shift; my $instanceId = shift; - return undef unless defined $instanceId; - for (my $i=0; $i < scalar(@{$self->{_runningInstances}}); $i++) { - if ($self->{_runningInstances}[$i] eq $instanceId) { - splice(@{$self->{_runningInstances}}, $i, 1); + my @items = $self->getRunningQueue->remove_items( + sub { + my $payload = shift; + return 1 if ($payload->{instanceId} eq $instanceId); + return 0; } - } + ); + my $instance = $items[0][2]; + return $instance; } #------------------------------------------------------------------- @@ -319,11 +331,9 @@ Returns a workflow instance back to runnable queue. =cut sub returnInstanceToRunnableState { - my ($self, $instanceId) = @_[OBJECT, ARG0]; - $self->debug("Returning ".$instanceId." to runnable state."); - if ($self->{_instances}{$instanceId}) { - $self->{_instances}{$instanceId}{status} = "waiting"; - } + my ($self, $instance) = @_[OBJECT, ARG0]; + $self->debug("Returning ".$instance->{instanceId}." to runnable state."); + $self->getWaitingQueue->enqueue($instance->{workingPriority}+1, $instance); } #------------------------------------------------------------------- @@ -357,14 +367,9 @@ Suspends a workflow instance for a number of seconds defined in the config file, =cut sub suspendInstance { - my ($self, $instanceId, $kernel) = @_[OBJECT, ARG0, KERNEL]; - if ($self->{_errorCount}{$instanceId} >= 5) { - $self->error("Workflow instance $instanceId has failed to execute ".$self->{_errorCount}{$instanceId}." times in a row and will no longer attempt to execute."); - $kernel->yield("deleteInstance",$instanceId); - } else { - $self->debug("Suspending workflow instance ".$instanceId." for ".$self->config->get("suspensionDelay")." seconds."); - $kernel->delay_set("returnInstanceToRunnableState",$self->config->get("suspensionDelay"), $instanceId); - } + my ($self, $instance, $kernel) = @_[OBJECT, ARG0, KERNEL]; + $self->debug("Suspending workflow instance ".$instance->{instanceId}." for ".$self->config->get("suspensionDelay")." seconds."); + $kernel->delay_set("returnInstanceToRunnableState",$self->config->get("suspensionDelay"), $instance); } #------------------------------------------------------------------- @@ -382,47 +387,41 @@ sub workerResponse { my $response = $responsePacket->[0]; my $instanceId = $request->header("X-instanceId"); # got to figure out how to get this from the request, cuz the response may die $self->debug("Response retrieved is for $instanceId."); - $self->removeInstanceFromRunningQueue($instanceId); + my $instance = $self->removeInstanceFromRunningQueue($instanceId); if ($response->is_success) { $self->debug("Response for $instanceId retrieved successfully."); if ($response->header("Set-Cookie") ne "") { $self->debug("Storing cookie for $instanceId for later use."); my $cookie = $response->header("Set-Cookie"); - my $pattern = $self->{_instances}{$instanceId}{cookieName}."=([a-zA-Z0-9\_\-]{22}).*"; + my $pattern = $instance->{cookieName}."=([a-zA-Z0-9\_\-]{22}).*"; $cookie =~ s/$pattern/$1/; - $self->{_cookies}{$self->{_instances}{$instanceId}{sitename}} = $cookie; + $self->{_cookies}{$instance->{sitename}} = $cookie; } my $state = $response->content; if ($state eq "waiting") { - delete $self->{_errorCount}{$instanceId}; $self->debug("Was told to wait on $instanceId because we're still waiting on some external event."); - $kernel->yield("suspendInstance",$instanceId); + $kernel->yield("suspendInstance",$instance); } elsif ($state eq "complete") { - delete $self->{_errorCount}{$instanceId}; $self->debug("Workflow instance $instanceId ran one of it's activities successfully."); - $kernel->yield("returnInstanceToRunnableState",$instanceId); + $kernel->yield("returnInstanceToRunnableState",$instance); } elsif ($state eq "disabled") { - delete $self->{_errorCount}{$instanceId}; $self->debug("Workflow instance $instanceId is disabled."); - $kernel->yield("suspendInstance",$instanceId); + $kernel->yield("suspendInstance",$instance); } elsif ($state eq "done") { $self->debug("Workflow instance $instanceId is now complete."); $kernel->yield("deleteInstance",$instanceId); } elsif ($state eq "error") { - $self->{_errorCount}{$instanceId}++; $self->debug("Got an error response for $instanceId."); - $kernel->yield("suspendInstance",$instanceId); + $kernel->yield("suspendInstance",$instance); } else { - $self->{_errorCount}{$instanceId}++; $self->error("Something bad happened on the return of $instanceId. ".$response->error_as_HTML); - $kernel->yield("suspendInstance",$instanceId); + $kernel->yield("suspendInstance",$instance); } } elsif ($response->is_redirect) { $self->error("Response for $instanceId was redirected. This should never happen if configured properly!!!"); } elsif ($response->is_error) { - $self->{_errorCount}{$instanceId}++; $self->error("Response for $instanceId had a communications error. ".$response->error_as_HTML); - $kernel->yield("suspendInstance",$instanceId) + $kernel->yield("suspendInstance",$instance) } }