From d95fbffa39a9c3085f3659d02ee987ffa95da12a Mon Sep 17 00:00:00 2001 From: JT Smith Date: Mon, 3 Jul 2006 19:22:14 +0000 Subject: [PATCH] cleaned up some spectre code and fixed a few bugs --- docs/changelog/7.x.x.txt | 1 + lib/Spectre/Admin.pm | 2 +- lib/Spectre/Cron.pm | 142 ++++++++++++++++++------------------ lib/Spectre/Workflow.pm | 4 +- lib/WebGUI/Workflow/Cron.pm | 4 +- 5 files changed, 75 insertions(+), 78 deletions(-) diff --git a/docs/changelog/7.x.x.txt b/docs/changelog/7.x.x.txt index 047536cbb..7a8e74201 100644 --- a/docs/changelog/7.x.x.txt +++ b/docs/changelog/7.x.x.txt @@ -19,4 +19,5 @@ - fix: Edit LDAP Connection - fix: SQL Report w/ pagination and nested queries - fix: Unable to add Web Services Client + - Fixed a bug in spectre where it wasn't using session cookies. diff --git a/lib/Spectre/Admin.pm b/lib/Spectre/Admin.pm index 6ab0c0452..855c89892 100644 --- a/lib/Spectre/Admin.pm +++ b/lib/Spectre/Admin.pm @@ -153,7 +153,7 @@ sub new { args=>[["shutdown","ping"]] ); $self->{_workflow} = Spectre::Workflow->new($config, $logger, $debug); - $self->{_cron} = Spectre::Cron->new($config, $logger, $self->{_workflow}, $debug); + $self->{_cron} = Spectre::Cron->new($config, $logger, $debug); POE::Kernel->run(); } diff --git a/lib/Spectre/Cron.pm b/lib/Spectre/Cron.pm index 0ebb581be..1398d6d4e 100644 --- a/lib/Spectre/Cron.pm +++ b/lib/Spectre/Cron.pm @@ -137,8 +137,9 @@ An integer (1,2,3) that determines what priority the workflow should be executed sub addJob { my ($self, $params) = @_[OBJECT, ARG0]; return 0 unless ($params->{enabled}); + my $id = $params->{config}."-".$params->{taskId}; $self->debug("Adding schedule ".$params->{taskId}." to the queue."); - $self->{_jobs}{$params->{config}}{$params->{taskId}} = { + $self->{_jobs}{$id} = { taskId=>$params->{taskId}, config=>$params->{config}, gateway=>$params->{gateway}, @@ -160,9 +161,9 @@ sub addJob { Compares a schedule with the current time and kicks off an event if necessary. This method should only ever need to be called by checkSchedules(). -=head3 job +=head3 jobId -A job definition created through the addJob() method. +A jobId definition created through the addJob() method. =head3 now @@ -171,12 +172,12 @@ A DateTime object representing the time to compare the schedule with. =cut sub checkSchedule { - my ($kernel, $self, $job, $now) = @_[KERNEL, OBJECT, ARG0, ARG1]; - $self->debug("Checking schedule ".$job->{taskId}." for ".$job->{config}." against the current time."); - my $cron = DateTime::Cron::Simple->new($job->{schedule}); + my ($kernel, $self, $id, $now) = @_[KERNEL, OBJECT, ARG0, ARG1]; + $self->debug("Checking schedule ".$id." against the current time."); + my $cron = DateTime::Cron::Simple->new($self->getJob($id)->{schedule}); if ($cron->validate_time($now)) { - $self->debug("It's time to run ".$job->{taskId}." for ".$job->{config}.". Creating workflow instance."); - $kernel->yield("runJob",$job); + $self->debug("It's time to run ".$id.". Creating workflow instance."); + $kernel->yield("runJob",$id); } } @@ -192,10 +193,8 @@ sub checkSchedules { my ($kernel, $self) = @_[KERNEL, OBJECT]; $self->debug("Checking schedules against current time."); my $now = DateTime->from_epoch(epoch=>time()); - foreach my $config (keys %{$self->{_jobs}}) { - foreach my $taskId (keys %{$self->{_jobs}{$config}}) { - $kernel->yield("checkSchedule", $self->{_jobs}{$config}{$taskId}, $now) - } + foreach my $id (keys %{$self->{_jobs}}) { + $kernel->yield("checkSchedule", $id, $now) } $kernel->delay_set("checkSchedules",60); } @@ -243,21 +242,17 @@ Removes a job from the monitoring queue. A hash reference containing the info needed to delete this job. -=head4 taskId +=head4 id The unique ID for this job. -=head4 config - -The config file name for the site this job belongs to. - =cut sub deleteJob { - my ($self, $params) = @_[OBJECT, ARG0]; - $self->debug("Deleting schedule ".$params->{taskId}." for ".$params->{config}." from queue."); - delete $self->{_errorCount}{$params->{config}}{$params->{taskId}}; - delete $self->{_jobs}{$params->{config}}{$params->{taskId}}; + my ($self, $id) = @_[OBJECT, ARG0]; + $self->debug("Deleting schedule ".$id." from queue."); + delete $self->{_errorCount}{$id}; + delete $self->{_jobs}{$id}; } @@ -282,6 +277,24 @@ sub error { #------------------------------------------------------------------- +=head2 getJob ( id ) + +Returns a hash reference to a job. + +=head3 id + +The unique id of the job to fetch. + +=cut + +sub getJob { + my $self = shift; + my $id = shift; + return $self->{_jobs}{$id}; +} + +#------------------------------------------------------------------- + =head3 getLogger ( ) Returns a reference to the logger. @@ -349,9 +362,8 @@ sub new { my $class = shift; my $config = shift; my $logger = shift; - my $workflowSession = shift; my $debug = shift; - my $self = {_debug=>$debug, _workflowSession=>$workflowSession, _config=>$config, _logger=>$logger}; + my $self = {_jobs=>{}, _debug=>$debug, _config=>$config, _logger=>$logger}; bless $self, $class; my @publicEvents = qw(runJob runJobResponse addJob deleteJob); POE::Session->create( @@ -370,27 +382,27 @@ Calls a worker to execute a cron job. =cut sub runJob { - my ($kernel, $self, $job, $session) = @_[KERNEL, OBJECT, ARG0, SESSION]; - $self->debug("Preparing to run a scheduled job ".$job->{taskId}."."); - POE::Component::Client::UserAgent->new; + my ($kernel, $self, $id, $session) = @_[KERNEL, OBJECT, ARG0, SESSION]; + $self->debug("Preparing to run a job ".$id."."); + my $job = $self->getJob($id); if ($job->{sitename} eq "" || $job->{config} eq "" || $job->{taskId} eq "") { - $self->error("A scheduled task has corrupt information and is not able to be run. Skipping execution."); - $kernel->yield("deleteJob",{config=>$job->{config}, taskId=>$job->{taskId}}) if ($job->{config} ne "" && $job->{taskId} ne ""); - } elsif ($self->{_errorCount}{$job->{config}}{$job->{taskId}} >= 5) { - $self->error("Scheduled task ".$job->{config}." / ".$job->{taskId}." has failed ".$self->{_errorCount}{$job->{config}}{$job->{taskId}}." times in a row and will no longer attempt to execute."); - $kernel->yield("deleteJob",{config=>$job->{config}, taskId=>$job->{taskId}}); + $self->error("A job has corrupt information and is not able to be run. Skipping execution."); + $kernel->yield("deleteJob",$id); + } elsif ($self->{_errorCount}{$id} >= 5) { + $self->error("Job ".$id." has failed ".$self->{_errorCount}{$id}." times in a row and will no longer attempt to execute."); + $kernel->yield("deleteJob",$id); } else { + POE::Component::Client::UserAgent->new; my $url = "http://".$job->{sitename}.':'.$self->config->get("webguiPort").$job->{gateway}; my $request = POST $url, [op=>"runCronJob", taskId=>$job->{taskId}]; my $cookie = $self->{_cookies}{$job->{sitename}}; $request->header("Cookie","wgSession=".$cookie) if (defined $cookie); $request->header("User-Agent","Spectre"); - $request->header("X-taskId",$job->{taskId}); - $request->header("X-config",$job->{config}); - $self->debug("Posting schedule job ".$job->{taskId}." to $url."); + $request->header("X-jobId",$id); + $self->debug("Posting job ".$id." to $url."); $kernel->post( useragent => 'request', { request => $request, response => $session->postback('runJobResponse') }); $kernel->post( useragent => 'shutdown'); # we'll still get the response, we're just done sending the request - $self->debug("Cron job ".$job->{taskId}." posted."); + $self->debug("Cron job ".$id." posted."); } } @@ -404,59 +416,43 @@ This method is called when the response from the runJob() method is received. sub runJobResponse { my ($self, $kernel) = @_[OBJECT, KERNEL]; - $self->debug("Retrieving response from scheduled job."); + $self->debug("Retrieving response from job."); my ($request, $response, $entry) = @{$_[ARG1]}; - my $taskId = $request->header("X-taskId"); # got to figure out how to get this from the request, cuz the response may die - my $config = $request->header("X-config"); # got to figure out how to get this from the request, cuz the response may die - $self->debug("Response retrieved is for scheduled task $config / $taskId."); - my $job = $self->{_jobs}{$config}{$taskId}; + my $id = $request->header("X-jobId"); # got to figure out how to get this from the request, cuz the response may die + $self->debug("Response retrieved is for job $id."); if ($response->is_success) { - $self->debug("Response for scheduled task $config / $taskId retrieved successfully."); - if ($response->header("Cookie") ne "") { - $self->debug("Storing cookie for $config / $taskId for later use."); + $self->debug("Response for job $id retrieved successfully."); + if ($response->header("Set-Cookie") ne "") { + $self->debug("Storing cookie for $id for later use."); my $cookie = $response->header("Set-Cookie"); - $cookie =~ s/wgSession=([a-zA-Z0-9\_\-]{22})/$1/; - $self->{_cookies}{$job->{sitename}} = $cookie; + $cookie =~ s/wgSession=([a-zA-Z0-9\_\-]{22}).*/$1/; + $self->{_cookies}{$self->getJob($id)->{sitename}} = $cookie; } my $state = $response->content; if ($state eq "done") { - delete $self->{_errorCount}{$config}{$taskId}; - $self->debug("Scheduled task $config / $taskId is now complete."); - if ($job->{runOnce}) { - $kernel->yield("deleteJob",{config=>$job->{config}, taskId=>$job->{taskId}}); + delete $self->{_errorCount}{$id}; + $self->debug("Job $id is now complete."); + if ($self->getJob($id)->{runOnce}) { + $kernel->yield("deleteJob",$id); } } elsif ($state eq "error") { - $self->{_errorCount}{$config}{$taskId}++; - $self->debug("Got an error response for scheduled task $config / $taskId, will try again in ".$self->config->get("suspensionDelay")." seconds."); - $kernel->delay_set("runJob",$self->config->get("suspensionDelay"),$job); + $self->{_errorCount}{$id}++; + $self->debug("Got an error response for job $id, will try again in ".$self->config->get("suspensionDelay")." seconds."); + $kernel->delay_set("runJob",$self->config->get("suspensionDelay"),$id); } else { - $self->{_errorCount}{$config}{$taskId}++; - $self->error("Something bad happened on the return of scheduled task $config / $taskId, will try again in ".$self->config->get("suspensionDelay")." seconds. ".$response->error_as_HTML); - $kernel->delay_set("runJob",$self->config->get("suspensionDelay"),$job); + $self->{_errorCount}{$id}++; + $self->error("Something bad happened on the return of job $id, will try again in ".$self->config->get("suspensionDelay")." seconds. ".$response->error_as_HTML); + $kernel->delay_set("runJob",$self->config->get("suspensionDelay"),$id); } } elsif ($response->is_redirect) { - $self->error("Response for $config / $taskId was redirected. This should never happen if configured properly!!!"); + $self->error("Response for $id was redirected. This should never happen if configured properly!!!"); } elsif ($response->is_error) { - $self->error("Response for scheduled task $config / $taskId had a communications error. ".$response->error_as_HTML); - $self->{_errorCount}{$config}{$taskId}++; - $kernel->delay_set("runJob",$self->config->get("suspensionDelay"),$job); + $self->error("Response for job $id had a communications error. ".$response->error_as_HTML); + $self->{_errorCount}{$id}++; + $kernel->delay_set("runJob",$self->config->get("suspensionDelay"),$id); } } -#------------------------------------------------------------------- - -=head2 workflowSession ( ) - -Returns a reference to the workflow session. - -=cut - -sub workflowSession { - my $self = shift; - return $self->{_workflowSession}; -} - - 1; diff --git a/lib/Spectre/Workflow.pm b/lib/Spectre/Workflow.pm index d6f4ac574..8f4349b6d 100644 --- a/lib/Spectre/Workflow.pm +++ b/lib/Spectre/Workflow.pm @@ -393,10 +393,10 @@ sub workerResponse { $self->debug("Response retrieved is for $instanceId."); if ($response->is_success) { $self->debug("Response for $instanceId retrieved successfully."); - if ($response->header("Cookie") ne "") { + if ($response->header("Set-Cookie") ne "") { $self->debug("Storing cookie for $instanceId for later use."); my $cookie = $response->header("Set-Cookie"); - $cookie =~ s/wgSession=([a-zA-Z0-9\_\-]{22})/$1/; + $cookie =~ s/wgSession=([a-zA-Z0-9\_\-]{22}).*/$1/; $self->{_cookies}{$self->{_instances}{$instanceId}{sitename}} = $cookie; } my $state = $response->content; diff --git a/lib/WebGUI/Workflow/Cron.pm b/lib/WebGUI/Workflow/Cron.pm index 2755e2929..c3a106f28 100644 --- a/lib/WebGUI/Workflow/Cron.pm +++ b/lib/WebGUI/Workflow/Cron.pm @@ -85,7 +85,7 @@ sub delete { my $skipNotify = shift; $self->session->db->deleteRow("WorkflowSchedule","taskId",$self->getId); if ($skipNotify) { - WebGUI::Workflow::Spectre->new($self->session)->notify("cron/deleteJob",{taskId=>$self->getId, config=>$self->session->config->getFilename}); + WebGUI::Workflow::Spectre->new($self->session)->notify("cron/deleteJob", $self->session->config->getFilename."-".$self->getId); } undef $self; } @@ -283,7 +283,7 @@ sub set { $self->{_data}{enabled} = 0 unless ($self->{_data}{workflowId}); my $spectre = WebGUI::Workflow::Spectre->new($self->session); $self->session->db->setRow("WorkflowSchedule","taskId",$self->{_data}); - $spectre->notify("cron/deleteJob",{taskId=>$self->getId,config=>$self->session->config->getFilename}); + $spectre->notify("cron/deleteJob", $self->session->config->getFilename."-".$self->getId); my %params = %{$self->{_data}}; $params{parameters} = $self->get("parameters"); $params{config} = $self->session->config->getFilename;