- fix: Spectre needs to be restarted every day

This commit is contained in:
JT Smith 2007-02-01 21:34:28 +00:00
parent a2a3741457
commit 0ee42f1060
2 changed files with 80 additions and 80 deletions

View file

@ -20,6 +20,7 @@
- rfe: added user defined fields to collaboration system rss (Dept of State)
- Simplified Spectre's initial data load, and reduced memory footprint in the
process.
- fix: Spectre needs to be restarted every day
- fix: lack of testing for valid object creation
- fix: No mention of intermediate upgrade step in gotcha's
- fix: A newly released version of Html::Template fixes a bug with global

View file

@ -18,6 +18,7 @@ use strict;
use HTTP::Request::Common;
use HTTP::Cookies;
use POE qw(Component::Client::HTTP);
use POE::Queue::Array;
#-------------------------------------------------------------------
@ -75,11 +76,10 @@ The priority (1,2, or 3) that this instance should be run at.
=cut
sub addInstance {
my ($self, $params) = @_[OBJECT, ARG0];
$self->debug("Adding workflow instance ".$params->{instanceId}." from ".$params->{sitename}." to queue at priority ".$params->{priority}.".");
$params->{status} = "waiting";
$self->{_instances}{$params->{instanceId}} = $params;
push(@{$self->{"_priority".$params->{priority}}}, $params->{instanceId});
my ($self, $instance) = @_[OBJECT, ARG0];
my $priority = ($instance->{priority} -1) * 10;
$self->debug("Adding workflow instance ".$instance->{instanceId}." from ".$instance->{sitename}." to queue at priority ".$priority.".");
$self->getWaitingQueue->enqueue($priority, $instance);
}
#-------------------------------------------------------------------
@ -94,24 +94,9 @@ sub checkInstances {
my ($kernel, $self) = @_[KERNEL, OBJECT];
$self->debug("Checking to see if we can run anymore instances right now.");
if ($self->countRunningInstances < $self->config->get("maxWorkers")) {
$self->debug("Total workflows waiting to run: ".scalar(keys %{$self->{_instances}}));
$self->debug("Priority 1 count: ".scalar(@{$self->{_priority1}}));
$self->debug("Priority 2 count: ".scalar(@{$self->{_priority2}}));
$self->debug("Priority 3 count: ".scalar(@{$self->{_priority3}}));
$self->debug("Total workflows waiting to run: ".$self->getWaitingQueue->get_item_count);
my $instance = $self->getNextInstance;
if (defined $instance) {
# mark it running so that it doesn't run twice at once
$instance->{status} = "running";
push(@{$self->{_runningInstances}}, $instance->{instanceId});
# put it at the end of the queue so that others get a chance
my $priority = $self->{_instances}{$instance->{instanceId}}{priority};
for (my $i=0; $i < scalar(@{$self->{"_priority".$priority}}); $i++) {
if ($self->{"_priority".$priority}[$i] eq $instance->{instanceId}) {
splice(@{$self->{"_priority".$priority}}, $i, 1);
}
}
push(@{$self->{"_priority".$priority}}, $instance->{instanceId});
# run it already
$kernel->yield("runWorker",$instance);
}
}
@ -141,8 +126,7 @@ Returns an integer representing the number of running instances.
sub countRunningInstances {
my $self = shift;
my $runningInstances = $self->{_runningInstances};
my $instanceCount = scalar(@{$runningInstances});
my $instanceCount = $self->getRunningQueue->get_item_count;
$self->debug("There are $instanceCount running instances.");
return $instanceCount;
}
@ -179,21 +163,14 @@ Removes a workflow instance from the processing queue.
sub deleteInstance {
my ($self, $instanceId,$kernel, $session ) = @_[OBJECT, ARG0, KERNEL, SESSION];
$self->debug("Deleting workflow instance $instanceId from queue.");
$self->getWaitingQueue->remove_items(
sub {
my $instance = shift;
return 1 if ($instance->{instanceId} eq $instanceId);
return 0;
}
);
$self->removeInstanceFromRunningQueue($instanceId);
if ($self->{_instances}{$instanceId}) {
my $priority = $self->{_instances}{$instanceId}{priority};
unless ($priority) {
$priority = 2;
$self->error("Workflow instance $instanceId has no priority set. This is likely the cause of a bug somewhere in the system. Temporarily setting the priority to 2 to avoid a fatal error.");
}
delete $self->{_errorCount}{$instanceId};
delete $self->{_instances}{$instanceId};
for (my $i=0; $i < scalar(@{$self->{"_priority".$priority}}); $i++) {
if ($self->{"_priority".$priority}[$i] eq $instanceId) {
splice(@{$self->{"_priority".$priority}}, $i, 1);
}
}
}
}
#-------------------------------------------------------------------
@ -219,7 +196,7 @@ sub error {
#-------------------------------------------------------------------
=head3 getLogger ( )
=head2 getLogger ( )
Returns a reference to the logger.
@ -230,22 +207,51 @@ sub getLogger {
return $self->{_logger};
}
#-------------------------------------------------------------------
=head2 getRunningQueue ( )
Returns a reference to the queue of workflow instances that are running now.
=cut
sub getRunningQueue {
my $self = shift;
return $self->{_runningQueue};
}
#-------------------------------------------------------------------
=head2 getWaitingQueue ( )
Returns a reference to the queue of workflow instances waiting to run.
=cut
sub getWaitingQueue {
my $self = shift;
return $self->{_waitingQueue};
}
#-------------------------------------------------------------------
=head2 getNextInstance ( )
Returns the next available instance.
=cut
sub getNextInstance {
my $self = shift;
$self->debug("Looking for a workflow instance to run.");
foreach my $priority (1..3) {
foreach my $instanceId (@{$self->{"_priority".$priority}}) {
if ($self->{_instances}{$instanceId}{status} eq "waiting") {
$self->debug("Looks like ".$instanceId." would be a good workflow instance to run.");
return $self->{_instances}{$instanceId};
}
}
my $waiting = $self->getWaitingQueue;
if ($waiting->get_item_count > 0) {
my ($priority, $id, $instance) = $waiting->dequeue_next;
$instance->{workingPriority} = $priority;
$self->getRunningQueue->enqueue($priority, $instance);
$self->debug("Looks like ".$instance->{instanceId}." at priority $priority would be a good workflow instance to run.");
return $instance;
}
$self->debug("Didn't see any workflow instances to run.");
return undef;
@ -276,7 +282,7 @@ sub new {
my $config = shift;
my $logger = shift;
my $debug = shift;
my $self = {_runningInstances=>[], _priority1=>[], _priority2=>[], _priority3=>[], _debug=>$debug, _config=>$config, _logger=>$logger};
my $self = {_debug=>$debug, _config=>$config, _logger=>$logger};
bless $self, $class;
my @publicEvents = qw(addInstance deleteInstance);
POE::Session->create(
@ -289,25 +295,31 @@ sub new {
Alias => 'workflow-ua',
CookieJar => $cookies
);
$self->{_runningQueue} = POE::Queue::Array->new;
$self->{_waitingQueue} = POE::Queue::Array->new;
}
#-------------------------------------------------------------------
=head2 removeInstanceFromRunningQueue ( )
Removes a workflow instance from the queue that tracks what's running.
Removes a workflow instance from the queue that tracks what's running and returns a reference to it.
=cut
sub removeInstanceFromRunningQueue {
my $self = shift;
my $instanceId = shift;
return undef unless defined $instanceId;
for (my $i=0; $i < scalar(@{$self->{_runningInstances}}); $i++) {
if ($self->{_runningInstances}[$i] eq $instanceId) {
splice(@{$self->{_runningInstances}}, $i, 1);
my @items = $self->getRunningQueue->remove_items(
sub {
my $payload = shift;
return 1 if ($payload->{instanceId} eq $instanceId);
return 0;
}
}
);
my $instance = $items[0][2];
return $instance;
}
#-------------------------------------------------------------------
@ -319,11 +331,9 @@ Returns a workflow instance back to runnable queue.
=cut
sub returnInstanceToRunnableState {
my ($self, $instanceId) = @_[OBJECT, ARG0];
$self->debug("Returning ".$instanceId." to runnable state.");
if ($self->{_instances}{$instanceId}) {
$self->{_instances}{$instanceId}{status} = "waiting";
}
my ($self, $instance) = @_[OBJECT, ARG0];
$self->debug("Returning ".$instance->{instanceId}." to runnable state.");
$self->getWaitingQueue->enqueue($instance->{workingPriority}+1, $instance);
}
#-------------------------------------------------------------------
@ -357,14 +367,9 @@ Suspends a workflow instance for a number of seconds defined in the config file,
=cut
sub suspendInstance {
my ($self, $instanceId, $kernel) = @_[OBJECT, ARG0, KERNEL];
if ($self->{_errorCount}{$instanceId} >= 5) {
$self->error("Workflow instance $instanceId has failed to execute ".$self->{_errorCount}{$instanceId}." times in a row and will no longer attempt to execute.");
$kernel->yield("deleteInstance",$instanceId);
} else {
$self->debug("Suspending workflow instance ".$instanceId." for ".$self->config->get("suspensionDelay")." seconds.");
$kernel->delay_set("returnInstanceToRunnableState",$self->config->get("suspensionDelay"), $instanceId);
}
my ($self, $instance, $kernel) = @_[OBJECT, ARG0, KERNEL];
$self->debug("Suspending workflow instance ".$instance->{instanceId}." for ".$self->config->get("suspensionDelay")." seconds.");
$kernel->delay_set("returnInstanceToRunnableState",$self->config->get("suspensionDelay"), $instance);
}
#-------------------------------------------------------------------
@ -382,47 +387,41 @@ sub workerResponse {
my $response = $responsePacket->[0];
my $instanceId = $request->header("X-instanceId"); # got to figure out how to get this from the request, cuz the response may die
$self->debug("Response retrieved is for $instanceId.");
$self->removeInstanceFromRunningQueue($instanceId);
my $instance = $self->removeInstanceFromRunningQueue($instanceId);
if ($response->is_success) {
$self->debug("Response for $instanceId retrieved successfully.");
if ($response->header("Set-Cookie") ne "") {
$self->debug("Storing cookie for $instanceId for later use.");
my $cookie = $response->header("Set-Cookie");
my $pattern = $self->{_instances}{$instanceId}{cookieName}."=([a-zA-Z0-9\_\-]{22}).*";
my $pattern = $instance->{cookieName}."=([a-zA-Z0-9\_\-]{22}).*";
$cookie =~ s/$pattern/$1/;
$self->{_cookies}{$self->{_instances}{$instanceId}{sitename}} = $cookie;
$self->{_cookies}{$instance->{sitename}} = $cookie;
}
my $state = $response->content;
if ($state eq "waiting") {
delete $self->{_errorCount}{$instanceId};
$self->debug("Was told to wait on $instanceId because we're still waiting on some external event.");
$kernel->yield("suspendInstance",$instanceId);
$kernel->yield("suspendInstance",$instance);
} elsif ($state eq "complete") {
delete $self->{_errorCount}{$instanceId};
$self->debug("Workflow instance $instanceId ran one of it's activities successfully.");
$kernel->yield("returnInstanceToRunnableState",$instanceId);
$kernel->yield("returnInstanceToRunnableState",$instance);
} elsif ($state eq "disabled") {
delete $self->{_errorCount}{$instanceId};
$self->debug("Workflow instance $instanceId is disabled.");
$kernel->yield("suspendInstance",$instanceId);
$kernel->yield("suspendInstance",$instance);
} elsif ($state eq "done") {
$self->debug("Workflow instance $instanceId is now complete.");
$kernel->yield("deleteInstance",$instanceId);
} elsif ($state eq "error") {
$self->{_errorCount}{$instanceId}++;
$self->debug("Got an error response for $instanceId.");
$kernel->yield("suspendInstance",$instanceId);
$kernel->yield("suspendInstance",$instance);
} else {
$self->{_errorCount}{$instanceId}++;
$self->error("Something bad happened on the return of $instanceId. ".$response->error_as_HTML);
$kernel->yield("suspendInstance",$instanceId);
$kernel->yield("suspendInstance",$instance);
}
} elsif ($response->is_redirect) {
$self->error("Response for $instanceId was redirected. This should never happen if configured properly!!!");
} elsif ($response->is_error) {
$self->{_errorCount}{$instanceId}++;
$self->error("Response for $instanceId had a communications error. ".$response->error_as_HTML);
$kernel->yield("suspendInstance",$instanceId)
$kernel->yield("suspendInstance",$instance)
}
}