better cron job handling for spectre
This commit is contained in:
parent
10a1289bac
commit
5548da29a3
4 changed files with 133 additions and 31 deletions
|
|
@ -17,10 +17,11 @@ package Spectre::Cron;
|
|||
use strict;
|
||||
use DateTime;
|
||||
use DateTime::Cron::Simple;
|
||||
use HTTP::Request::Common;
|
||||
use POE;
|
||||
use POE::Component::Client::UserAgent;
|
||||
use WebGUI::Session;
|
||||
use WebGUI::Workflow::Cron;
|
||||
use WebGUI::Workflow::Instance;
|
||||
|
||||
#-------------------------------------------------------------------
|
||||
|
||||
|
|
@ -173,33 +174,7 @@ sub checkSchedule {
|
|||
my $cron = DateTime::Cron::Simple->new($job->{schedule});
|
||||
if ($cron->validate_time($now)) {
|
||||
$self->debug("It's time to run ".$job->{taskId}." for ".$job->{config}.". Creating workflow instance.");
|
||||
my $session = WebGUI::Session->open($self->config->getWebguiRoot, $job->{config});
|
||||
my $instance = WebGUI::Workflow::Instance->create($session, {
|
||||
workflowId=>$job->{workflowId},
|
||||
className=>$job->{className},
|
||||
methodName=>$job->{methodName},
|
||||
parameters=>$job->{parameters},
|
||||
priority=>$job->{priority},
|
||||
notifySpectre=>0
|
||||
});
|
||||
if (defined $instance) {
|
||||
$self->debug("Created workflow instance ".$instance->getId.".");
|
||||
$kernel->post($self->workflowSession, "addInstance", {instanceId=>$instance->getId, priority=>$job->{priority}, sitename=>$job->{sitename}});
|
||||
} else {
|
||||
$self->debug("Something bad happened. Couldn't create workflow instance for schedule ".$job->{taskId}." for ".$job->{config}.".");
|
||||
}
|
||||
if ($job->{runOnce}) {
|
||||
$self->debug("Schedule ".$job->{taskId}." for ".$job->{config}." is only supposed to run once.");
|
||||
my $cron = WebGUI::Workflow::Cron->new($session, $job->{taskId});
|
||||
if (defined $cron) {
|
||||
$self->debug("Deleting schedule from database.");
|
||||
$cron->delete(1);
|
||||
} else {
|
||||
$self->debug("Couldn't instanciate schedule ".$job->{taskId}." in order to delete it.");
|
||||
}
|
||||
$kernel->yield("deleteJob",{config=>$job->{config}, taskId=>$job->{taskId}});
|
||||
}
|
||||
$session->close;
|
||||
$kernel->yield("runJob",$job);
|
||||
}
|
||||
}
|
||||
|
||||
|
|
@ -283,6 +258,25 @@ sub deleteJob {
|
|||
}
|
||||
|
||||
|
||||
#-------------------------------------------------------------------
|
||||
|
||||
=head2 error ( output )
|
||||
|
||||
Prints out error information if debug is enabled.
|
||||
|
||||
=head3
|
||||
|
||||
=cut
|
||||
|
||||
sub error {
|
||||
my $self = shift;
|
||||
my $output = shift;
|
||||
if ($self->{_debug}) {
|
||||
print "CRON: ".$output."\n";
|
||||
}
|
||||
$self->getLogger->error("CRON: ".$output);
|
||||
}
|
||||
|
||||
#-------------------------------------------------------------------
|
||||
|
||||
=head3 getLogger ( )
|
||||
|
|
@ -355,14 +349,83 @@ sub new {
|
|||
my $debug = shift;
|
||||
my $self = {_debug=>$debug, _workflowSession=>$workflowSession, _config=>$config, _logger=>$logger};
|
||||
bless $self, $class;
|
||||
my @publicEvents = qw(addJob deleteJob);
|
||||
my @publicEvents = qw(runJob runJobResponse addJob deleteJob);
|
||||
POE::Session->create(
|
||||
object_states => [ $self => [qw(_start _stop addJob deleteJob checkSchedules checkSchedule loadSchedule), @publicEvents] ],
|
||||
object_states => [ $self => [qw(_start _stop runJob runJobResponse addJob deleteJob checkSchedules checkSchedule loadSchedule), @publicEvents] ],
|
||||
args=>[\@publicEvents]
|
||||
);
|
||||
}
|
||||
|
||||
|
||||
#-------------------------------------------------------------------
|
||||
|
||||
=head2 runJob ( )
|
||||
|
||||
Calls a worker to execute a cron job.
|
||||
|
||||
=cut
|
||||
|
||||
sub runJob {
|
||||
my ($kernel, $self, $job, $session) = @_[KERNEL, OBJECT, ARG0, SESSION];
|
||||
$self->debug("Preparing to run a scheduled job ".$job->{taskId}.".");
|
||||
POE::Component::Client::UserAgent->new;
|
||||
my $url = "http://".$job->{sitename}.':'.$self->config->get("webguiPort").'/';
|
||||
my $request = POST $url, [op=>"runCronJob", taskId=>$job->{taskId}];
|
||||
my $cookie = $self->{_cookies}{$job->{sitename}};
|
||||
$request->header("Cookie","wgSession=".$cookie) if (defined $cookie);
|
||||
$request->header("User-Agent","Spectre");
|
||||
$request->header("X-taskId",$job->{taskId});
|
||||
$self->debug("Posting schedule job ".$job->{taskId}." to $url.");
|
||||
$kernel->post( useragent => 'request', { request => $request, response => $session->postback('runJobResponse') });
|
||||
$self->debug("Cron job ".$job->{taskId}." posted.");
|
||||
}
|
||||
|
||||
#-------------------------------------------------------------------
|
||||
|
||||
=head2 runJobResponse ( )
|
||||
|
||||
This method is called when the response from the runJob() method is received.
|
||||
|
||||
=cut
|
||||
|
||||
sub runJobResponse {
|
||||
my ($self, $kernel) = @_[OBJECT, KERNEL];
|
||||
$self->debug("Retrieving response from scheduled job.");
|
||||
my ($request, $response, $entry) = @{$_[ARG1]};
|
||||
my $taskId = $request->header("X-taskId"); # got to figure out how to get this from the request, cuz the response may die
|
||||
$self->debug("Response retrieved is for scheduled task $taskId.");
|
||||
my $job = $self->{_jobs}{$taskId};
|
||||
if ($response->is_success) {
|
||||
$self->debug("Response for scheduled task $taskId retrieved successfully.");
|
||||
if ($response->header("Cookie") ne "") {
|
||||
$self->debug("Storing cookie for $taskId for later use.");
|
||||
my $cookie = $response->header("Set-Cookie");
|
||||
$cookie =~ s/wgSession=([a-zA-Z0-9\_\-]{22})/$1/;
|
||||
$self->{_cookies}{$self->{_jobs}{$taskId}{sitename}} = $cookie;
|
||||
}
|
||||
my $state = $response->content;
|
||||
if ($state eq "done") {
|
||||
$self->debug("Scheduled task $taskId is now complete.");
|
||||
if ($job->{runOnce}) {
|
||||
$kernel->yield("deleteJob",{config=>$job->{config}, taskId=>$job->{taskId}});
|
||||
}
|
||||
} elsif ($state eq "error") {
|
||||
$self->debug("Got an error response for scheduled task $taskId, will try again in ".$self->config->get("suspensionDelay")." seconds.");
|
||||
$kernel->delay_set("runJob",$self->config->get("suspensionDelay"),$job);
|
||||
} else {
|
||||
$self->error("Something bad happened on the return of scheduled task $taskId, will try again in ".$self->config->get("suspensionDelay").". ".$response->error_as_HTML);
|
||||
$kernel->delay_set("runJob",$self->config->get("suspensionDelay"),$job);
|
||||
}
|
||||
} elsif ($response->is_redirect) {
|
||||
$self->debug("Response for $taskId was redirected.");
|
||||
} elsif ($response->is_error) {
|
||||
$self->error("Response for scheduled task $taskId had a communications error. ".$response->error_as_HTML);
|
||||
$kernel->delay_set("runJob",$self->config->get("suspensionDelay"),$job);
|
||||
# we should probably log something
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
#-------------------------------------------------------------------
|
||||
|
||||
=head2 workflowSession ( )
|
||||
|
|
|
|||
|
|
@ -120,6 +120,7 @@ sub getOperations {
|
|||
'editCronJobSave' => 'WebGUI::Operation::Cron',
|
||||
'deleteCronJob' => 'WebGUI::Operation::Cron',
|
||||
'manageCron' => 'WebGUI::Operation::Cron',
|
||||
'runCronJob' => 'WebGUI::Operation::Cron',
|
||||
|
||||
'copyDatabaseLink' => 'WebGUI::Operation::DatabaseLink',
|
||||
'deleteDatabaseLink' => 'WebGUI::Operation::DatabaseLink',
|
||||
|
|
|
|||
|
|
@ -16,6 +16,8 @@ use WebGUI::AdminConsole;
|
|||
use WebGUI::HTMLForm;
|
||||
use WebGUI::International;
|
||||
use WebGUI::Workflow::Cron;
|
||||
use WebGUI::Workflow::Instance;
|
||||
use WebGUI::Utility;
|
||||
|
||||
=head1 NAME
|
||||
|
||||
|
|
@ -239,4 +241,40 @@ sub www_manageCron {
|
|||
return $ac->render($output);
|
||||
}
|
||||
|
||||
|
||||
#-------------------------------------------------------------------
|
||||
|
||||
=head2 www_runCronJob ( )
|
||||
|
||||
Checks to ensure the requestor is who we think it is, and then executes a cron job and returns the results.
|
||||
|
||||
=cut
|
||||
|
||||
sub www_runCronJob {
|
||||
my $session = shift;
|
||||
$session->http->setMimeType("text/plain");
|
||||
unless (isInSubnet($session->env->get("REMOTE_ADDR"), $session->config->get("spectreSubnets"))) {
|
||||
$session->errorHandler->security("make a Spectre cron job runner request, but we're only allowed to
|
||||
accept requests from ".join(",",@{$session->config->get("spectreSubnets")}).".");
|
||||
return "error";
|
||||
}
|
||||
my $taskId = $session->form->param("taskId");
|
||||
if ($taskId) {
|
||||
my $task = WebGUI::Workflow::Cron->new($session, $taskId);
|
||||
return "done" unless (defined $task);
|
||||
my $instance = WebGUI::Workflow::Instance->create($session, {
|
||||
workflowId=>$task->get("workflowId"),
|
||||
className=>$task->get("className"),
|
||||
methodName=>$task->get("methodName"),
|
||||
parameters=>$task->get("parameters"),
|
||||
priority=>$task->get("priority"),
|
||||
});
|
||||
$task->delete(1) if ($task->get("runOnce"));
|
||||
return "done";
|
||||
}
|
||||
$session->errorHandler->warn("No task ID passed to cron job runner.");
|
||||
return "error";
|
||||
}
|
||||
|
||||
|
||||
1;
|
||||
|
|
|
|||
|
|
@ -169,7 +169,7 @@ sub sendHeader {
|
|||
$self->session->request->status(301);
|
||||
} else {
|
||||
$self->session->request->content_type($self->{_http}{mimetype} || "text/html");
|
||||
$self->session->request->set_last_modified($self->{_http}{lastModified} || time());
|
||||
# $self->session->request->set_last_modified($self->{_http}{lastModified} || time());
|
||||
if ($self->session->setting->get("preventProxyCache")) {
|
||||
$self->session->request->headers_out->set(Expires => "-1d");
|
||||
}
|
||||
|
|
|
|||
Loading…
Add table
Add a link
Reference in a new issue