From 35e0a63262c48fc4cfe3cf4864d1ed7d2010894f Mon Sep 17 00:00:00 2001 From: JT Smith Date: Thu, 20 Jul 2006 00:55:05 +0000 Subject: [PATCH] - fix: spectre - fix: Spectre tries to delete the same workflow instance twice - Fixed part of the Spectre memory leak. See gotcha.txt for details. --- docs/changelog/7.x.x.txt | 3 ++ docs/gotcha.txt | 10 +++++ lib/Spectre/Admin.pm | 2 +- lib/Spectre/Cron.pm | 19 +++++--- lib/Spectre/Workflow.pm | 78 +++++++++++++++++++-------------- lib/WebGUI/Operation/Spectre.pm | 1 + lib/WebGUI/Workflow/Instance.pm | 14 ++++-- lib/WebGUI/Workflow/Spectre.pm | 12 ++--- sbin/spectre.pl | 2 + sbin/testEnvironment.pl | 2 +- 10 files changed, 93 insertions(+), 50 deletions(-) diff --git a/docs/changelog/7.x.x.txt b/docs/changelog/7.x.x.txt index 0a4b5ed55..f470a2ee4 100644 --- a/docs/changelog/7.x.x.txt +++ b/docs/changelog/7.x.x.txt @@ -37,6 +37,9 @@ - Added runOnLogin and runOnLogout config file properties to Authentication to allow for running an external script on successful login or logout. - fix: spectre + - fix: Spectre tries to delete the same workflow instance twice + - Fixed part of the Spectre memory leak. See gotcha.txt for details. + 7.0.1 - fix: User profile field "Department" needs i18n diff --git a/docs/gotcha.txt b/docs/gotcha.txt index e219ee4a9..390bec935 100644 --- a/docs/gotcha.txt +++ b/docs/gotcha.txt @@ -8,6 +8,14 @@ versions. Be sure to heed the warnings contained herein as they will save you many hours of grief. +7.0.2 +-------------------------------------------------------------------- + + * We've cut the memory leak in half. It turns out we had a bad perl + module that was causing a big part of the memory leak. As such, + you now need to install POE::Component::Client::HTTP. + + 7.0.0 -------------------------------------------------------------------- @@ -16,6 +24,7 @@ save you many hours of grief. every so often to clear out the memory. We recommend setting up a cron job to restart it once per day. + 6.99.5 -------------------------------------------------------------------- @@ -25,6 +34,7 @@ save you many hours of grief. to be accurate. If you use LDAP groups in WebGUI, be sure and check each one to make sure they are assigned to the appropriate LDAP Link. + 6.99.4 -------------------------------------------------------------------- diff --git a/lib/Spectre/Admin.pm b/lib/Spectre/Admin.pm index 3c1f7480c..d7fd978f4 100644 --- a/lib/Spectre/Admin.pm +++ b/lib/Spectre/Admin.pm @@ -150,7 +150,7 @@ sub new { create_ikc_server( ip => $config->get("ip"), port => $config->get("port"), - name => 'Spectre', + name => 'Spectre' ); POE::Session->create( object_states => [ $self => {_start=>"_start", _stop=>"_stop", "shutdown"=>"_stop", "ping"=>"ping"} ], diff --git a/lib/Spectre/Cron.pm b/lib/Spectre/Cron.pm index d087731ac..16e1a73a0 100644 --- a/lib/Spectre/Cron.pm +++ b/lib/Spectre/Cron.pm @@ -18,8 +18,8 @@ use strict; use DateTime; use DateTime::Cron::Simple; use HTTP::Request::Common; -use POE; -use POE::Component::Client::UserAgent; +use HTTP::Cookies; +use POE qw(Component::Client::HTTP); use WebGUI::Session; use WebGUI::Workflow::Cron; @@ -370,6 +370,12 @@ sub new { object_states => [ $self => [qw(_start _stop runJob runJobResponse addJob deleteJob checkSchedules checkSchedule loadSchedule), @publicEvents] ], args=>[\@publicEvents] ); + my $cookies = HTTP::Cookies->new(file => '/tmp/cookies'); + POE::Component::Client::HTTP->spawn( + Agent => 'Spectre', + Alias => 'cron-ua', + CookieJar => $cookies + ); } #------------------------------------------------------------------- @@ -391,7 +397,6 @@ sub runJob { $self->error("Job ".$id." has failed ".$self->{_errorCount}{$id}." times in a row and will no longer attempt to execute."); $kernel->yield("deleteJob",$id); } else { - POE::Component::Client::UserAgent->new; my $url = "http://".$job->{sitename}.':'.$self->config->get("webguiPort").$job->{gateway}; my $request = POST $url, [op=>"runCronJob", taskId=>$job->{taskId}]; my $cookie = $self->{_cookies}{$job->{sitename}}; @@ -399,8 +404,7 @@ sub runJob { $request->header("User-Agent","Spectre"); $request->header("X-jobId",$id); $self->debug("Posting job ".$id." to $url."); - $kernel->post( useragent => 'request', { request => $request, response => $session->postback('runJobResponse') }); - $kernel->post( useragent => 'shutdown'); # we'll still get the response, we're just done sending the request + $kernel->post('cron-ua','request', 'runJobResponse', $request); $self->debug("Cron job ".$id." posted."); } } @@ -414,9 +418,10 @@ This method is called when the response from the runJob() method is received. =cut sub runJobResponse { - my ($self, $kernel) = @_[OBJECT, KERNEL]; + my ($self, $kernel, $requestPacket, $responsePacket) = @_[OBJECT, KERNEL, ARG0, ARG1]; $self->debug("Retrieving response from job."); - my ($request, $response, $entry) = @{$_[ARG1]}; + my $request = $requestPacket->[0]; + my $response = $responsePacket->[0]; my $id = $request->header("X-jobId"); # got to figure out how to get this from the request, cuz the response may die $self->debug("Response retrieved is for job $id."); if ($response->is_success) { diff --git a/lib/Spectre/Workflow.pm b/lib/Spectre/Workflow.pm index d45cf7d5c..e4f110397 100644 --- a/lib/Spectre/Workflow.pm +++ b/lib/Spectre/Workflow.pm @@ -16,8 +16,8 @@ package Spectre::Workflow; use strict; use HTTP::Request::Common; -use POE; -use POE::Component::Client::UserAgent; +use HTTP::Cookies; +use POE qw(Component::Client::HTTP); use WebGUI::Session; #------------------------------------------------------------------- @@ -101,11 +101,14 @@ sub addInstance { Checks to see if there are any open instance slots available, and if there are assigns a new instance to be run to fill it. =cut +use POE::API::Peek; sub checkInstances { my ($kernel, $self) = @_[KERNEL, OBJECT]; $self->debug("Checking to see if we can run anymore instances right now."); if ($self->countRunningInstances < $self->config->get("maxWorkers")) { + my $api = POE::API::Peek->new; + $self->debug("POE SESSIONS: ".$api->session_count); $self->debug("Total workflows waiting to run: ".scalar(keys %{$self->{_instances}})); $self->debug("Priority 1 count: ".scalar(@{$self->{_priority1}})); $self->debug("Priority 2 count: ".scalar(@{$self->{_priority2}})); @@ -153,7 +156,7 @@ Returns an integer representing the number of running instances. sub countRunningInstances { my $self = shift; - my $runningInstances = $self->{_runningInstances} || []; + my $runningInstances = $self->{_runningInstances}; my $instanceCount = scalar(@{$runningInstances}); $self->debug("There are $instanceCount running instances."); return $instanceCount; @@ -190,8 +193,8 @@ Removes a workflow instance from the processing queue. sub deleteInstance { my ($self, $instanceId,$kernel, $session ) = @_[OBJECT, ARG0, KERNEL, SESSION]; - $kernel->call($session, "returnInstanceToQueue",$instanceId); - $self->debug("Deleting workflow instance $instanceId from instance queue."); + $self->debug("Deleting workflow instance $instanceId from queue."); + $self->removeInstanceFromRunningQueue($instanceId); if ($self->{_instances}{$instanceId}) { my $priority = $self->{_instances}{$instanceId}{priority}; unless ($priority) { @@ -306,33 +309,53 @@ sub new { my $config = shift; my $logger = shift; my $debug = shift; - my $self = {_priority1=>[], _priority2=>[], _priority3=>[], _debug=>$debug, _config=>$config, _logger=>$logger}; + my $self = {_runningInstances=>[], _priority1=>[], _priority2=>[], _priority3=>[], _debug=>$debug, _config=>$config, _logger=>$logger}; bless $self, $class; my @publicEvents = qw(addInstance deleteInstance); POE::Session->create( - object_states => [ $self => [qw(_start _stop returnInstanceToQueue addInstance checkInstances deleteInstance suspendInstance loadWorkflows runWorker workerResponse), @publicEvents] ], + object_states => [ $self => [qw(_start _stop returnInstanceToRunnableState addInstance checkInstances deleteInstance suspendInstance loadWorkflows runWorker workerResponse), @publicEvents] ], args=>[\@publicEvents] ); + my $cookies = HTTP::Cookies->new(file => '/tmp/cookies'); + POE::Component::Client::HTTP->spawn( + Agent => 'Spectre', + Alias => 'workflow-ua', + CookieJar => $cookies + ); } #------------------------------------------------------------------- -=head2 returnInstanceToQueue ( ) +=head2 removeInstanceFromRunningQueue ( ) + +Removes a workflow instance from the queue that tracks what's running. + +=cut + +sub removeInstanceFromRunningQueue { + my $self = shift; + my $instanceId = shift; + return undef unless defined $instanceId; + for (my $i=0; $i < scalar(@{$self->{_runningInstances}}); $i++) { + if ($self->{_runningInstances}[$i] eq $instanceId) { + splice(@{$self->{_runningInstances}}, $i, 1); + } + } +} + +#------------------------------------------------------------------- + +=head2 returnInstanceToRunnableState ( ) Returns a workflow instance back to runnable queue. =cut -sub returnInstanceToQueue { +sub returnInstanceToRunnableState { my ($self, $instanceId) = @_[OBJECT, ARG0]; - $self->debug("Returning ".$instanceId." to runnable queue."); + $self->debug("Returning ".$instanceId." to runnable state."); if ($self->{_instances}{$instanceId}) { $self->{_instances}{$instanceId}{status} = "waiting"; - for (my $i=0; $i < scalar(@{$self->{_runningInstances}}); $i++) { - if ($self->{_runningInstances}[$i] eq $instanceId) { - splice(@{$self->{_runningInstances}}, $i, 1); - } - } } } @@ -347,16 +370,14 @@ Calls a worker to execute a workflow activity. sub runWorker { my ($kernel, $self, $instance, $session) = @_[KERNEL, OBJECT, ARG0, SESSION]; $self->debug("Preparing to run workflow instance ".$instance->{instanceId}."."); - POE::Component::Client::UserAgent->new; my $url = "http://".$instance->{sitename}.':'.$self->config->get("webguiPort").$instance->{gateway}; my $request = POST $url, [op=>"runWorkflow", instanceId=>$instance->{instanceId}]; my $cookie = $self->{_cookies}{$instance->{sitename}}; $request->header("Cookie","wgSession=".$cookie) if (defined $cookie); - $request->header("User-Agent","Spectre"); $request->header("X-instanceId",$instance->{instanceId}); + $request->header("User-Agent","Spectre"); $self->debug("Posting workflow instance ".$instance->{instanceId}." to $url."); - $kernel->post( useragent => 'request', { request => $request, response => $session->postback('workerResponse') }); - $kernel->post( useragent => 'shutdown'); # we'll still get the response, we're just done sending the request + $kernel->post('workflow-ua','request', 'workerResponse', $request); $self->debug("Workflow instance ".$instance->{instanceId}." posted."); } @@ -375,16 +396,7 @@ sub suspendInstance { $kernel->yield("deleteInstance",$instanceId); } else { $self->debug("Suspending workflow instance ".$instanceId." for ".$self->config->get("suspensionDelay")." seconds."); - # normally this is taken care of by the returnInstanceToQueue method, but we want to free up the running count - # so that other things can be run while this thing is suspended - if ($self->{_instances}{$instanceId}) { - for (my $i=0; $i < scalar(@{$self->{_runningInstances}}); $i++) { - if ($self->{_runningInstances}[$i] eq $instanceId) { - splice(@{$self->{_runningInstances}}, $i, 1); - } - } - } - $kernel->delay_set("returnInstanceToQueue",$self->config->get("suspensionDelay"), $instanceId); + $kernel->delay_set("returnInstanceToRunnableState",$self->config->get("suspensionDelay"), $instanceId); } } @@ -397,11 +409,13 @@ This method is called when the response from the runWorker() method is received. =cut sub workerResponse { - my ($self, $kernel) = @_[OBJECT, KERNEL]; + my ($self, $kernel, $requestPacket, $responsePacket) = @_[OBJECT, KERNEL, ARG0, ARG1]; $self->debug("Retrieving response from workflow instance."); - my ($request, $response, $entry) = @{$_[ARG1]}; + my $request = $requestPacket->[0]; + my $response = $responsePacket->[0]; my $instanceId = $request->header("X-instanceId"); # got to figure out how to get this from the request, cuz the response may die $self->debug("Response retrieved is for $instanceId."); + $self->removeInstanceFromRunningQueue($instanceId); if ($response->is_success) { $self->debug("Response for $instanceId retrieved successfully."); if ($response->header("Set-Cookie") ne "") { @@ -418,7 +432,7 @@ sub workerResponse { } elsif ($state eq "complete") { delete $self->{_errorCount}{$instanceId}; $self->debug("Workflow instance $instanceId ran one of it's activities successfully."); - $kernel->yield("returnInstanceToQueue",$instanceId); + $kernel->yield("returnInstanceToRunnableState",$instanceId); } elsif ($state eq "disabled") { delete $self->{_errorCount}{$instanceId}; $self->debug("Workflow instance $instanceId is disabled."); diff --git a/lib/WebGUI/Operation/Spectre.pm b/lib/WebGUI/Operation/Spectre.pm index 91fd07f53..3ff45f80d 100644 --- a/lib/WebGUI/Operation/Spectre.pm +++ b/lib/WebGUI/Operation/Spectre.pm @@ -46,6 +46,7 @@ sub www_spectreTest { name=>rand(100000), timeout=>10 ); + $remote->disconnect; # Can't perform this test until I get smarter. =) #return "spectre" unless $remote; #my $result = $remote->post_respond('admin/ping'); diff --git a/lib/WebGUI/Workflow/Instance.pm b/lib/WebGUI/Workflow/Instance.pm index 90a4ac40c..30a44cba4 100644 --- a/lib/WebGUI/Workflow/Instance.pm +++ b/lib/WebGUI/Workflow/Instance.pm @@ -65,23 +65,29 @@ sub create { my $instanceId = $session->db->setRow("WorkflowInstance","instanceId",{instanceId=>"new", runningSince=>time()}); my $self = $class->new($session, $instanceId); $properties->{notifySpectre} = 1 unless ($properties->{notifySpectre} eq "0"); + $properties->{newlyCreated} = 1; $self->set($properties); return $self; } #------------------------------------------------------------------- -=head2 delete ( ) +=head2 delete ( [ skipNotify ] ) Removes this instance. +=head3 skipNotify + +A boolean, that if true will not notify Spectre of the delete. + =cut sub delete { my $self = shift; + my $skipNotify = shift; $self->session->db->write("delete from WorkflowInstanceScratch where instanceId=?",[$self->getId]); $self->session->db->deleteRow("WorkflowInstance","instanceId",$self->getId); - WebGUI::Workflow::Spectre->new($self->session)->notify("workflow/deleteInstance",$self->getId); + WebGUI::Workflow::Spectre->new($self->session)->notify("workflow/deleteInstance",$self->getId) unless ($skipNotify); undef $self; } @@ -254,7 +260,7 @@ sub run { } my $activity = $workflow->getNextActivity($self->get("currentActivityId")); unless (defined $activity) { - $self->delete; + $self->delete(1); return "done"; } $self->session->errorHandler->info("Running workflow activity ".$activity->getId.", which is a ".(ref $activity).", for instance ".$self->getId."."); @@ -373,7 +379,7 @@ sub set { $self->session->db->setRow("WorkflowInstance","instanceId",$self->{_data}); if ($properties->{notifySpectre}) { my $spectre = WebGUI::Workflow::Spectre->new($self->session); - $spectre->notify("workflow/deleteInstance",$self->getId); + $spectre->notify("workflow/deleteInstance",$self->getId) unless ($properties->{newlyCreated}); $spectre->notify("workflow/addInstance", {gateway=>$self->session->config->get("gateway"), sitename=>$self->session->config->get("sitename")->[0], instanceId=>$self->getId, priority=>$self->{_data}{priority}}); } } diff --git a/lib/WebGUI/Workflow/Spectre.pm b/lib/WebGUI/Workflow/Spectre.pm index 1a38961b7..2d70d9f2a 100644 --- a/lib/WebGUI/Workflow/Spectre.pm +++ b/lib/WebGUI/Workflow/Spectre.pm @@ -69,21 +69,23 @@ A scalar, array reference, or hash reference of data to pass to Spectre. sub notify { my $self = shift; my $module = shift; - my $params = shift;; + my $params = shift; + my ($config, $error) = $self->session->quick("config", "errorHandler"); my $remote = create_ikc_client( - port=>$self->session->config->get("spectrePort"), - ip=>$self->session->config->get("spectreIp"), + port=>$config->get("spectrePort"), + ip=>$config->get("spectreIp"), name=>rand(100000), timeout=>10 ); if (defined $remote) { my $result = $remote->post($module, $params); unless (defined $result) { - $self->session->errorHandler->warn("Couldn't send command to Spectre because ".$POE::Component::IKC::ClientLite::error); + $error->warn("Couldn't send command to Spectre because ".$POE::Component::IKC::ClientLite::error); } + $remote->disconnect; undef $remote; } else { - $self->session->errorHandler->warn("Couldn't connect to Spectre because ".$POE::Component::IKC::ClientLite::error); + $error->warn("Couldn't connect to Spectre because ".$POE::Component::IKC::ClientLite::error); } } diff --git a/sbin/spectre.pl b/sbin/spectre.pl index dbd23ac4e..2f5bda4bf 100644 --- a/sbin/spectre.pl +++ b/sbin/spectre.pl @@ -84,6 +84,7 @@ if ($shutdown) { die $POE::Component::IKC::ClientLite::error unless $remote; my $result = $remote->post('admin/shutdown'); die $POE::Component::IKC::ClientLite::error unless defined $result; + $remote->disconnect; undef $remote; } elsif ($ping) { my $res = ping(); @@ -110,6 +111,7 @@ sub ping { return $POE::Component::IKC::ClientLite::error unless $remote; my $result = $remote->post_respond('admin/ping'); return $POE::Component::IKC::ClientLite::error unless defined $result; + $remote->disconnect; undef $remote; return 0 if ($result eq "pong"); return 1; diff --git a/sbin/testEnvironment.pl b/sbin/testEnvironment.pl index 76b7d9e7e..c9e15b971 100644 --- a/sbin/testEnvironment.pl +++ b/sbin/testEnvironment.pl @@ -85,7 +85,7 @@ checkModule("Net::Subnets",0.21); checkModule("Finance::Quote",1.08); checkModule("POE",0.3202); checkModule("POE::Component::IKC::Server",0.18); -checkModule("POE::Component::Client::UserAgent", 0.06); +checkModule("POE::Component::Client::HTTP", 0.77); checkModule("Data::Structure::Util",0.11); checkModule("Apache2::Request",2.06); checkModule("Cache::Memcached",1.15,2);