- fix: spectre

- fix: Spectre tries to delete the same workflow instance twice
 - Fixed part of the Spectre memory leak. See gotcha.txt for details.
This commit is contained in:
JT Smith 2006-07-20 00:55:05 +00:00
parent ac953d2043
commit 35e0a63262
10 changed files with 93 additions and 50 deletions

View file

@ -150,7 +150,7 @@ sub new {
create_ikc_server(
ip => $config->get("ip"),
port => $config->get("port"),
name => 'Spectre',
name => 'Spectre'
);
POE::Session->create(
object_states => [ $self => {_start=>"_start", _stop=>"_stop", "shutdown"=>"_stop", "ping"=>"ping"} ],

View file

@ -18,8 +18,8 @@ use strict;
use DateTime;
use DateTime::Cron::Simple;
use HTTP::Request::Common;
use POE;
use POE::Component::Client::UserAgent;
use HTTP::Cookies;
use POE qw(Component::Client::HTTP);
use WebGUI::Session;
use WebGUI::Workflow::Cron;
@ -370,6 +370,12 @@ sub new {
object_states => [ $self => [qw(_start _stop runJob runJobResponse addJob deleteJob checkSchedules checkSchedule loadSchedule), @publicEvents] ],
args=>[\@publicEvents]
);
my $cookies = HTTP::Cookies->new(file => '/tmp/cookies');
POE::Component::Client::HTTP->spawn(
Agent => 'Spectre',
Alias => 'cron-ua',
CookieJar => $cookies
);
}
#-------------------------------------------------------------------
@ -391,7 +397,6 @@ sub runJob {
$self->error("Job ".$id." has failed ".$self->{_errorCount}{$id}." times in a row and will no longer attempt to execute.");
$kernel->yield("deleteJob",$id);
} else {
POE::Component::Client::UserAgent->new;
my $url = "http://".$job->{sitename}.':'.$self->config->get("webguiPort").$job->{gateway};
my $request = POST $url, [op=>"runCronJob", taskId=>$job->{taskId}];
my $cookie = $self->{_cookies}{$job->{sitename}};
@ -399,8 +404,7 @@ sub runJob {
$request->header("User-Agent","Spectre");
$request->header("X-jobId",$id);
$self->debug("Posting job ".$id." to $url.");
$kernel->post( useragent => 'request', { request => $request, response => $session->postback('runJobResponse') });
$kernel->post( useragent => 'shutdown'); # we'll still get the response, we're just done sending the request
$kernel->post('cron-ua','request', 'runJobResponse', $request);
$self->debug("Cron job ".$id." posted.");
}
}
@ -414,9 +418,10 @@ This method is called when the response from the runJob() method is received.
=cut
sub runJobResponse {
my ($self, $kernel) = @_[OBJECT, KERNEL];
my ($self, $kernel, $requestPacket, $responsePacket) = @_[OBJECT, KERNEL, ARG0, ARG1];
$self->debug("Retrieving response from job.");
my ($request, $response, $entry) = @{$_[ARG1]};
my $request = $requestPacket->[0];
my $response = $responsePacket->[0];
my $id = $request->header("X-jobId"); # got to figure out how to get this from the request, cuz the response may die
$self->debug("Response retrieved is for job $id.");
if ($response->is_success) {

View file

@ -16,8 +16,8 @@ package Spectre::Workflow;
use strict;
use HTTP::Request::Common;
use POE;
use POE::Component::Client::UserAgent;
use HTTP::Cookies;
use POE qw(Component::Client::HTTP);
use WebGUI::Session;
#-------------------------------------------------------------------
@ -101,11 +101,14 @@ sub addInstance {
Checks to see if there are any open instance slots available, and if there are assigns a new instance to be run to fill it.
=cut
use POE::API::Peek;
sub checkInstances {
my ($kernel, $self) = @_[KERNEL, OBJECT];
$self->debug("Checking to see if we can run anymore instances right now.");
if ($self->countRunningInstances < $self->config->get("maxWorkers")) {
my $api = POE::API::Peek->new;
$self->debug("POE SESSIONS: ".$api->session_count);
$self->debug("Total workflows waiting to run: ".scalar(keys %{$self->{_instances}}));
$self->debug("Priority 1 count: ".scalar(@{$self->{_priority1}}));
$self->debug("Priority 2 count: ".scalar(@{$self->{_priority2}}));
@ -153,7 +156,7 @@ Returns an integer representing the number of running instances.
sub countRunningInstances {
my $self = shift;
my $runningInstances = $self->{_runningInstances} || [];
my $runningInstances = $self->{_runningInstances};
my $instanceCount = scalar(@{$runningInstances});
$self->debug("There are $instanceCount running instances.");
return $instanceCount;
@ -190,8 +193,8 @@ Removes a workflow instance from the processing queue.
sub deleteInstance {
my ($self, $instanceId,$kernel, $session ) = @_[OBJECT, ARG0, KERNEL, SESSION];
$kernel->call($session, "returnInstanceToQueue",$instanceId);
$self->debug("Deleting workflow instance $instanceId from instance queue.");
$self->debug("Deleting workflow instance $instanceId from queue.");
$self->removeInstanceFromRunningQueue($instanceId);
if ($self->{_instances}{$instanceId}) {
my $priority = $self->{_instances}{$instanceId}{priority};
unless ($priority) {
@ -306,33 +309,53 @@ sub new {
my $config = shift;
my $logger = shift;
my $debug = shift;
my $self = {_priority1=>[], _priority2=>[], _priority3=>[], _debug=>$debug, _config=>$config, _logger=>$logger};
my $self = {_runningInstances=>[], _priority1=>[], _priority2=>[], _priority3=>[], _debug=>$debug, _config=>$config, _logger=>$logger};
bless $self, $class;
my @publicEvents = qw(addInstance deleteInstance);
POE::Session->create(
object_states => [ $self => [qw(_start _stop returnInstanceToQueue addInstance checkInstances deleteInstance suspendInstance loadWorkflows runWorker workerResponse), @publicEvents] ],
object_states => [ $self => [qw(_start _stop returnInstanceToRunnableState addInstance checkInstances deleteInstance suspendInstance loadWorkflows runWorker workerResponse), @publicEvents] ],
args=>[\@publicEvents]
);
my $cookies = HTTP::Cookies->new(file => '/tmp/cookies');
POE::Component::Client::HTTP->spawn(
Agent => 'Spectre',
Alias => 'workflow-ua',
CookieJar => $cookies
);
}
#-------------------------------------------------------------------
=head2 returnInstanceToQueue ( )
=head2 removeInstanceFromRunningQueue ( )
Removes a workflow instance from the queue that tracks what's running.
=cut
sub removeInstanceFromRunningQueue {
my $self = shift;
my $instanceId = shift;
return undef unless defined $instanceId;
for (my $i=0; $i < scalar(@{$self->{_runningInstances}}); $i++) {
if ($self->{_runningInstances}[$i] eq $instanceId) {
splice(@{$self->{_runningInstances}}, $i, 1);
}
}
}
#-------------------------------------------------------------------
=head2 returnInstanceToRunnableState ( )
Returns a workflow instance back to runnable queue.
=cut
sub returnInstanceToQueue {
sub returnInstanceToRunnableState {
my ($self, $instanceId) = @_[OBJECT, ARG0];
$self->debug("Returning ".$instanceId." to runnable queue.");
$self->debug("Returning ".$instanceId." to runnable state.");
if ($self->{_instances}{$instanceId}) {
$self->{_instances}{$instanceId}{status} = "waiting";
for (my $i=0; $i < scalar(@{$self->{_runningInstances}}); $i++) {
if ($self->{_runningInstances}[$i] eq $instanceId) {
splice(@{$self->{_runningInstances}}, $i, 1);
}
}
}
}
@ -347,16 +370,14 @@ Calls a worker to execute a workflow activity.
sub runWorker {
my ($kernel, $self, $instance, $session) = @_[KERNEL, OBJECT, ARG0, SESSION];
$self->debug("Preparing to run workflow instance ".$instance->{instanceId}.".");
POE::Component::Client::UserAgent->new;
my $url = "http://".$instance->{sitename}.':'.$self->config->get("webguiPort").$instance->{gateway};
my $request = POST $url, [op=>"runWorkflow", instanceId=>$instance->{instanceId}];
my $cookie = $self->{_cookies}{$instance->{sitename}};
$request->header("Cookie","wgSession=".$cookie) if (defined $cookie);
$request->header("User-Agent","Spectre");
$request->header("X-instanceId",$instance->{instanceId});
$request->header("User-Agent","Spectre");
$self->debug("Posting workflow instance ".$instance->{instanceId}." to $url.");
$kernel->post( useragent => 'request', { request => $request, response => $session->postback('workerResponse') });
$kernel->post( useragent => 'shutdown'); # we'll still get the response, we're just done sending the request
$kernel->post('workflow-ua','request', 'workerResponse', $request);
$self->debug("Workflow instance ".$instance->{instanceId}." posted.");
}
@ -375,16 +396,7 @@ sub suspendInstance {
$kernel->yield("deleteInstance",$instanceId);
} else {
$self->debug("Suspending workflow instance ".$instanceId." for ".$self->config->get("suspensionDelay")." seconds.");
# normally this is taken care of by the returnInstanceToQueue method, but we want to free up the running count
# so that other things can be run while this thing is suspended
if ($self->{_instances}{$instanceId}) {
for (my $i=0; $i < scalar(@{$self->{_runningInstances}}); $i++) {
if ($self->{_runningInstances}[$i] eq $instanceId) {
splice(@{$self->{_runningInstances}}, $i, 1);
}
}
}
$kernel->delay_set("returnInstanceToQueue",$self->config->get("suspensionDelay"), $instanceId);
$kernel->delay_set("returnInstanceToRunnableState",$self->config->get("suspensionDelay"), $instanceId);
}
}
@ -397,11 +409,13 @@ This method is called when the response from the runWorker() method is received.
=cut
sub workerResponse {
my ($self, $kernel) = @_[OBJECT, KERNEL];
my ($self, $kernel, $requestPacket, $responsePacket) = @_[OBJECT, KERNEL, ARG0, ARG1];
$self->debug("Retrieving response from workflow instance.");
my ($request, $response, $entry) = @{$_[ARG1]};
my $request = $requestPacket->[0];
my $response = $responsePacket->[0];
my $instanceId = $request->header("X-instanceId"); # got to figure out how to get this from the request, cuz the response may die
$self->debug("Response retrieved is for $instanceId.");
$self->removeInstanceFromRunningQueue($instanceId);
if ($response->is_success) {
$self->debug("Response for $instanceId retrieved successfully.");
if ($response->header("Set-Cookie") ne "") {
@ -418,7 +432,7 @@ sub workerResponse {
} elsif ($state eq "complete") {
delete $self->{_errorCount}{$instanceId};
$self->debug("Workflow instance $instanceId ran one of it's activities successfully.");
$kernel->yield("returnInstanceToQueue",$instanceId);
$kernel->yield("returnInstanceToRunnableState",$instanceId);
} elsif ($state eq "disabled") {
delete $self->{_errorCount}{$instanceId};
$self->debug("Workflow instance $instanceId is disabled.");