From 5548da29a3f0435caa1490078388b38eaaa5cfec Mon Sep 17 00:00:00 2001 From: JT Smith Date: Tue, 18 Apr 2006 22:23:15 +0000 Subject: [PATCH] better cron job handling for spectre --- lib/Spectre/Cron.pm | 123 ++++++++++++++++++++++++++--------- lib/WebGUI/Operation.pm | 1 + lib/WebGUI/Operation/Cron.pm | 38 +++++++++++ lib/WebGUI/Session/Http.pm | 2 +- 4 files changed, 133 insertions(+), 31 deletions(-) diff --git a/lib/Spectre/Cron.pm b/lib/Spectre/Cron.pm index c17ac223c..f96c7ff1e 100644 --- a/lib/Spectre/Cron.pm +++ b/lib/Spectre/Cron.pm @@ -17,10 +17,11 @@ package Spectre::Cron; use strict; use DateTime; use DateTime::Cron::Simple; +use HTTP::Request::Common; use POE; +use POE::Component::Client::UserAgent; use WebGUI::Session; use WebGUI::Workflow::Cron; -use WebGUI::Workflow::Instance; #------------------------------------------------------------------- @@ -173,33 +174,7 @@ sub checkSchedule { my $cron = DateTime::Cron::Simple->new($job->{schedule}); if ($cron->validate_time($now)) { $self->debug("It's time to run ".$job->{taskId}." for ".$job->{config}.". Creating workflow instance."); - my $session = WebGUI::Session->open($self->config->getWebguiRoot, $job->{config}); - my $instance = WebGUI::Workflow::Instance->create($session, { - workflowId=>$job->{workflowId}, - className=>$job->{className}, - methodName=>$job->{methodName}, - parameters=>$job->{parameters}, - priority=>$job->{priority}, - notifySpectre=>0 - }); - if (defined $instance) { - $self->debug("Created workflow instance ".$instance->getId."."); - $kernel->post($self->workflowSession, "addInstance", {instanceId=>$instance->getId, priority=>$job->{priority}, sitename=>$job->{sitename}}); - } else { - $self->debug("Something bad happened. Couldn't create workflow instance for schedule ".$job->{taskId}." for ".$job->{config}."."); - } - if ($job->{runOnce}) { - $self->debug("Schedule ".$job->{taskId}." for ".$job->{config}." is only supposed to run once."); - my $cron = WebGUI::Workflow::Cron->new($session, $job->{taskId}); - if (defined $cron) { - $self->debug("Deleting schedule from database."); - $cron->delete(1); - } else { - $self->debug("Couldn't instanciate schedule ".$job->{taskId}." in order to delete it."); - } - $kernel->yield("deleteJob",{config=>$job->{config}, taskId=>$job->{taskId}}); - } - $session->close; + $kernel->yield("runJob",$job); } } @@ -283,6 +258,25 @@ sub deleteJob { } +#------------------------------------------------------------------- + +=head2 error ( output ) + +Prints out error information if debug is enabled. + +=head3 + +=cut + +sub error { + my $self = shift; + my $output = shift; + if ($self->{_debug}) { + print "CRON: ".$output."\n"; + } + $self->getLogger->error("CRON: ".$output); +} + #------------------------------------------------------------------- =head3 getLogger ( ) @@ -355,14 +349,83 @@ sub new { my $debug = shift; my $self = {_debug=>$debug, _workflowSession=>$workflowSession, _config=>$config, _logger=>$logger}; bless $self, $class; - my @publicEvents = qw(addJob deleteJob); + my @publicEvents = qw(runJob runJobResponse addJob deleteJob); POE::Session->create( - object_states => [ $self => [qw(_start _stop addJob deleteJob checkSchedules checkSchedule loadSchedule), @publicEvents] ], + object_states => [ $self => [qw(_start _stop runJob runJobResponse addJob deleteJob checkSchedules checkSchedule loadSchedule), @publicEvents] ], args=>[\@publicEvents] ); } +#------------------------------------------------------------------- + +=head2 runJob ( ) + +Calls a worker to execute a cron job. + +=cut + +sub runJob { + my ($kernel, $self, $job, $session) = @_[KERNEL, OBJECT, ARG0, SESSION]; + $self->debug("Preparing to run a scheduled job ".$job->{taskId}."."); + POE::Component::Client::UserAgent->new; + my $url = "http://".$job->{sitename}.':'.$self->config->get("webguiPort").'/'; + my $request = POST $url, [op=>"runCronJob", taskId=>$job->{taskId}]; + my $cookie = $self->{_cookies}{$job->{sitename}}; + $request->header("Cookie","wgSession=".$cookie) if (defined $cookie); + $request->header("User-Agent","Spectre"); + $request->header("X-taskId",$job->{taskId}); + $self->debug("Posting schedule job ".$job->{taskId}." to $url."); + $kernel->post( useragent => 'request', { request => $request, response => $session->postback('runJobResponse') }); + $self->debug("Cron job ".$job->{taskId}." posted."); +} + +#------------------------------------------------------------------- + +=head2 runJobResponse ( ) + +This method is called when the response from the runJob() method is received. + +=cut + +sub runJobResponse { + my ($self, $kernel) = @_[OBJECT, KERNEL]; + $self->debug("Retrieving response from scheduled job."); + my ($request, $response, $entry) = @{$_[ARG1]}; + my $taskId = $request->header("X-taskId"); # got to figure out how to get this from the request, cuz the response may die + $self->debug("Response retrieved is for scheduled task $taskId."); + my $job = $self->{_jobs}{$taskId}; + if ($response->is_success) { + $self->debug("Response for scheduled task $taskId retrieved successfully."); + if ($response->header("Cookie") ne "") { + $self->debug("Storing cookie for $taskId for later use."); + my $cookie = $response->header("Set-Cookie"); + $cookie =~ s/wgSession=([a-zA-Z0-9\_\-]{22})/$1/; + $self->{_cookies}{$self->{_jobs}{$taskId}{sitename}} = $cookie; + } + my $state = $response->content; + if ($state eq "done") { + $self->debug("Scheduled task $taskId is now complete."); + if ($job->{runOnce}) { + $kernel->yield("deleteJob",{config=>$job->{config}, taskId=>$job->{taskId}}); + } + } elsif ($state eq "error") { + $self->debug("Got an error response for scheduled task $taskId, will try again in ".$self->config->get("suspensionDelay")." seconds."); + $kernel->delay_set("runJob",$self->config->get("suspensionDelay"),$job); + } else { + $self->error("Something bad happened on the return of scheduled task $taskId, will try again in ".$self->config->get("suspensionDelay").". ".$response->error_as_HTML); + $kernel->delay_set("runJob",$self->config->get("suspensionDelay"),$job); + } + } elsif ($response->is_redirect) { + $self->debug("Response for $taskId was redirected."); + } elsif ($response->is_error) { + $self->error("Response for scheduled task $taskId had a communications error. ".$response->error_as_HTML); + $kernel->delay_set("runJob",$self->config->get("suspensionDelay"),$job); + # we should probably log something + } +} + + #------------------------------------------------------------------- =head2 workflowSession ( ) diff --git a/lib/WebGUI/Operation.pm b/lib/WebGUI/Operation.pm index c0c76c5df..14924f26e 100644 --- a/lib/WebGUI/Operation.pm +++ b/lib/WebGUI/Operation.pm @@ -120,6 +120,7 @@ sub getOperations { 'editCronJobSave' => 'WebGUI::Operation::Cron', 'deleteCronJob' => 'WebGUI::Operation::Cron', 'manageCron' => 'WebGUI::Operation::Cron', + 'runCronJob' => 'WebGUI::Operation::Cron', 'copyDatabaseLink' => 'WebGUI::Operation::DatabaseLink', 'deleteDatabaseLink' => 'WebGUI::Operation::DatabaseLink', diff --git a/lib/WebGUI/Operation/Cron.pm b/lib/WebGUI/Operation/Cron.pm index 9f7b0040c..940ac8b8b 100644 --- a/lib/WebGUI/Operation/Cron.pm +++ b/lib/WebGUI/Operation/Cron.pm @@ -16,6 +16,8 @@ use WebGUI::AdminConsole; use WebGUI::HTMLForm; use WebGUI::International; use WebGUI::Workflow::Cron; +use WebGUI::Workflow::Instance; +use WebGUI::Utility; =head1 NAME @@ -239,4 +241,40 @@ sub www_manageCron { return $ac->render($output); } + +#------------------------------------------------------------------- + +=head2 www_runCronJob ( ) + +Checks to ensure the requestor is who we think it is, and then executes a cron job and returns the results. + +=cut + +sub www_runCronJob { + my $session = shift; + $session->http->setMimeType("text/plain"); + unless (isInSubnet($session->env->get("REMOTE_ADDR"), $session->config->get("spectreSubnets"))) { + $session->errorHandler->security("make a Spectre cron job runner request, but we're only allowed to + accept requests from ".join(",",@{$session->config->get("spectreSubnets")})."."); + return "error"; + } + my $taskId = $session->form->param("taskId"); + if ($taskId) { + my $task = WebGUI::Workflow::Cron->new($session, $taskId); + return "done" unless (defined $task); + my $instance = WebGUI::Workflow::Instance->create($session, { + workflowId=>$task->get("workflowId"), + className=>$task->get("className"), + methodName=>$task->get("methodName"), + parameters=>$task->get("parameters"), + priority=>$task->get("priority"), + }); + $task->delete(1) if ($task->get("runOnce")); + return "done"; + } + $session->errorHandler->warn("No task ID passed to cron job runner."); + return "error"; +} + + 1; diff --git a/lib/WebGUI/Session/Http.pm b/lib/WebGUI/Session/Http.pm index 935f31be7..c4136a016 100644 --- a/lib/WebGUI/Session/Http.pm +++ b/lib/WebGUI/Session/Http.pm @@ -169,7 +169,7 @@ sub sendHeader { $self->session->request->status(301); } else { $self->session->request->content_type($self->{_http}{mimetype} || "text/html"); - $self->session->request->set_last_modified($self->{_http}{lastModified} || time()); +# $self->session->request->set_last_modified($self->{_http}{lastModified} || time()); if ($self->session->setting->get("preventProxyCache")) { $self->session->request->headers_out->set(Expires => "-1d"); }