cleaned up some spectre code and fixed a few bugs

This commit is contained in:
JT Smith 2006-07-03 19:22:14 +00:00
parent 2b9efcb217
commit d95fbffa39
5 changed files with 75 additions and 78 deletions

View file

@ -19,4 +19,5 @@
- fix: Edit LDAP Connection
- fix: SQL Report w/ pagination and nested queries
- fix: Unable to add Web Services Client
- Fixed a bug in spectre where it wasn't using session cookies.

View file

@ -153,7 +153,7 @@ sub new {
args=>[["shutdown","ping"]]
);
$self->{_workflow} = Spectre::Workflow->new($config, $logger, $debug);
$self->{_cron} = Spectre::Cron->new($config, $logger, $self->{_workflow}, $debug);
$self->{_cron} = Spectre::Cron->new($config, $logger, $debug);
POE::Kernel->run();
}

View file

@ -137,8 +137,9 @@ An integer (1,2,3) that determines what priority the workflow should be executed
sub addJob {
my ($self, $params) = @_[OBJECT, ARG0];
return 0 unless ($params->{enabled});
my $id = $params->{config}."-".$params->{taskId};
$self->debug("Adding schedule ".$params->{taskId}." to the queue.");
$self->{_jobs}{$params->{config}}{$params->{taskId}} = {
$self->{_jobs}{$id} = {
taskId=>$params->{taskId},
config=>$params->{config},
gateway=>$params->{gateway},
@ -160,9 +161,9 @@ sub addJob {
Compares a schedule with the current time and kicks off an event if necessary. This method should only ever need to be called by checkSchedules().
=head3 job
=head3 jobId
A job definition created through the addJob() method.
A jobId definition created through the addJob() method.
=head3 now
@ -171,12 +172,12 @@ A DateTime object representing the time to compare the schedule with.
=cut
sub checkSchedule {
my ($kernel, $self, $job, $now) = @_[KERNEL, OBJECT, ARG0, ARG1];
$self->debug("Checking schedule ".$job->{taskId}." for ".$job->{config}." against the current time.");
my $cron = DateTime::Cron::Simple->new($job->{schedule});
my ($kernel, $self, $id, $now) = @_[KERNEL, OBJECT, ARG0, ARG1];
$self->debug("Checking schedule ".$id." against the current time.");
my $cron = DateTime::Cron::Simple->new($self->getJob($id)->{schedule});
if ($cron->validate_time($now)) {
$self->debug("It's time to run ".$job->{taskId}." for ".$job->{config}.". Creating workflow instance.");
$kernel->yield("runJob",$job);
$self->debug("It's time to run ".$id.". Creating workflow instance.");
$kernel->yield("runJob",$id);
}
}
@ -192,10 +193,8 @@ sub checkSchedules {
my ($kernel, $self) = @_[KERNEL, OBJECT];
$self->debug("Checking schedules against current time.");
my $now = DateTime->from_epoch(epoch=>time());
foreach my $config (keys %{$self->{_jobs}}) {
foreach my $taskId (keys %{$self->{_jobs}{$config}}) {
$kernel->yield("checkSchedule", $self->{_jobs}{$config}{$taskId}, $now)
}
foreach my $id (keys %{$self->{_jobs}}) {
$kernel->yield("checkSchedule", $id, $now)
}
$kernel->delay_set("checkSchedules",60);
}
@ -243,21 +242,17 @@ Removes a job from the monitoring queue.
A hash reference containing the info needed to delete this job.
=head4 taskId
=head4 id
The unique ID for this job.
=head4 config
The config file name for the site this job belongs to.
=cut
sub deleteJob {
my ($self, $params) = @_[OBJECT, ARG0];
$self->debug("Deleting schedule ".$params->{taskId}." for ".$params->{config}." from queue.");
delete $self->{_errorCount}{$params->{config}}{$params->{taskId}};
delete $self->{_jobs}{$params->{config}}{$params->{taskId}};
my ($self, $id) = @_[OBJECT, ARG0];
$self->debug("Deleting schedule ".$id." from queue.");
delete $self->{_errorCount}{$id};
delete $self->{_jobs}{$id};
}
@ -282,6 +277,24 @@ sub error {
#-------------------------------------------------------------------
=head2 getJob ( id )
Returns a hash reference to a job.
=head3 id
The unique id of the job to fetch.
=cut
sub getJob {
my $self = shift;
my $id = shift;
return $self->{_jobs}{$id};
}
#-------------------------------------------------------------------
=head3 getLogger ( )
Returns a reference to the logger.
@ -349,9 +362,8 @@ sub new {
my $class = shift;
my $config = shift;
my $logger = shift;
my $workflowSession = shift;
my $debug = shift;
my $self = {_debug=>$debug, _workflowSession=>$workflowSession, _config=>$config, _logger=>$logger};
my $self = {_jobs=>{}, _debug=>$debug, _config=>$config, _logger=>$logger};
bless $self, $class;
my @publicEvents = qw(runJob runJobResponse addJob deleteJob);
POE::Session->create(
@ -370,27 +382,27 @@ Calls a worker to execute a cron job.
=cut
sub runJob {
my ($kernel, $self, $job, $session) = @_[KERNEL, OBJECT, ARG0, SESSION];
$self->debug("Preparing to run a scheduled job ".$job->{taskId}.".");
POE::Component::Client::UserAgent->new;
my ($kernel, $self, $id, $session) = @_[KERNEL, OBJECT, ARG0, SESSION];
$self->debug("Preparing to run a job ".$id.".");
my $job = $self->getJob($id);
if ($job->{sitename} eq "" || $job->{config} eq "" || $job->{taskId} eq "") {
$self->error("A scheduled task has corrupt information and is not able to be run. Skipping execution.");
$kernel->yield("deleteJob",{config=>$job->{config}, taskId=>$job->{taskId}}) if ($job->{config} ne "" && $job->{taskId} ne "");
} elsif ($self->{_errorCount}{$job->{config}}{$job->{taskId}} >= 5) {
$self->error("Scheduled task ".$job->{config}." / ".$job->{taskId}." has failed ".$self->{_errorCount}{$job->{config}}{$job->{taskId}}." times in a row and will no longer attempt to execute.");
$kernel->yield("deleteJob",{config=>$job->{config}, taskId=>$job->{taskId}});
$self->error("A job has corrupt information and is not able to be run. Skipping execution.");
$kernel->yield("deleteJob",$id);
} elsif ($self->{_errorCount}{$id} >= 5) {
$self->error("Job ".$id." has failed ".$self->{_errorCount}{$id}." times in a row and will no longer attempt to execute.");
$kernel->yield("deleteJob",$id);
} else {
POE::Component::Client::UserAgent->new;
my $url = "http://".$job->{sitename}.':'.$self->config->get("webguiPort").$job->{gateway};
my $request = POST $url, [op=>"runCronJob", taskId=>$job->{taskId}];
my $cookie = $self->{_cookies}{$job->{sitename}};
$request->header("Cookie","wgSession=".$cookie) if (defined $cookie);
$request->header("User-Agent","Spectre");
$request->header("X-taskId",$job->{taskId});
$request->header("X-config",$job->{config});
$self->debug("Posting schedule job ".$job->{taskId}." to $url.");
$request->header("X-jobId",$id);
$self->debug("Posting job ".$id." to $url.");
$kernel->post( useragent => 'request', { request => $request, response => $session->postback('runJobResponse') });
$kernel->post( useragent => 'shutdown'); # we'll still get the response, we're just done sending the request
$self->debug("Cron job ".$job->{taskId}." posted.");
$self->debug("Cron job ".$id." posted.");
}
}
@ -404,59 +416,43 @@ This method is called when the response from the runJob() method is received.
sub runJobResponse {
my ($self, $kernel) = @_[OBJECT, KERNEL];
$self->debug("Retrieving response from scheduled job.");
$self->debug("Retrieving response from job.");
my ($request, $response, $entry) = @{$_[ARG1]};
my $taskId = $request->header("X-taskId"); # got to figure out how to get this from the request, cuz the response may die
my $config = $request->header("X-config"); # got to figure out how to get this from the request, cuz the response may die
$self->debug("Response retrieved is for scheduled task $config / $taskId.");
my $job = $self->{_jobs}{$config}{$taskId};
my $id = $request->header("X-jobId"); # got to figure out how to get this from the request, cuz the response may die
$self->debug("Response retrieved is for job $id.");
if ($response->is_success) {
$self->debug("Response for scheduled task $config / $taskId retrieved successfully.");
if ($response->header("Cookie") ne "") {
$self->debug("Storing cookie for $config / $taskId for later use.");
$self->debug("Response for job $id retrieved successfully.");
if ($response->header("Set-Cookie") ne "") {
$self->debug("Storing cookie for $id for later use.");
my $cookie = $response->header("Set-Cookie");
$cookie =~ s/wgSession=([a-zA-Z0-9\_\-]{22})/$1/;
$self->{_cookies}{$job->{sitename}} = $cookie;
$cookie =~ s/wgSession=([a-zA-Z0-9\_\-]{22}).*/$1/;
$self->{_cookies}{$self->getJob($id)->{sitename}} = $cookie;
}
my $state = $response->content;
if ($state eq "done") {
delete $self->{_errorCount}{$config}{$taskId};
$self->debug("Scheduled task $config / $taskId is now complete.");
if ($job->{runOnce}) {
$kernel->yield("deleteJob",{config=>$job->{config}, taskId=>$job->{taskId}});
delete $self->{_errorCount}{$id};
$self->debug("Job $id is now complete.");
if ($self->getJob($id)->{runOnce}) {
$kernel->yield("deleteJob",$id);
}
} elsif ($state eq "error") {
$self->{_errorCount}{$config}{$taskId}++;
$self->debug("Got an error response for scheduled task $config / $taskId, will try again in ".$self->config->get("suspensionDelay")." seconds.");
$kernel->delay_set("runJob",$self->config->get("suspensionDelay"),$job);
$self->{_errorCount}{$id}++;
$self->debug("Got an error response for job $id, will try again in ".$self->config->get("suspensionDelay")." seconds.");
$kernel->delay_set("runJob",$self->config->get("suspensionDelay"),$id);
} else {
$self->{_errorCount}{$config}{$taskId}++;
$self->error("Something bad happened on the return of scheduled task $config / $taskId, will try again in ".$self->config->get("suspensionDelay")." seconds. ".$response->error_as_HTML);
$kernel->delay_set("runJob",$self->config->get("suspensionDelay"),$job);
$self->{_errorCount}{$id}++;
$self->error("Something bad happened on the return of job $id, will try again in ".$self->config->get("suspensionDelay")." seconds. ".$response->error_as_HTML);
$kernel->delay_set("runJob",$self->config->get("suspensionDelay"),$id);
}
} elsif ($response->is_redirect) {
$self->error("Response for $config / $taskId was redirected. This should never happen if configured properly!!!");
$self->error("Response for $id was redirected. This should never happen if configured properly!!!");
} elsif ($response->is_error) {
$self->error("Response for scheduled task $config / $taskId had a communications error. ".$response->error_as_HTML);
$self->{_errorCount}{$config}{$taskId}++;
$kernel->delay_set("runJob",$self->config->get("suspensionDelay"),$job);
$self->error("Response for job $id had a communications error. ".$response->error_as_HTML);
$self->{_errorCount}{$id}++;
$kernel->delay_set("runJob",$self->config->get("suspensionDelay"),$id);
}
}
#-------------------------------------------------------------------
=head2 workflowSession ( )
Returns a reference to the workflow session.
=cut
sub workflowSession {
my $self = shift;
return $self->{_workflowSession};
}
1;

View file

@ -393,10 +393,10 @@ sub workerResponse {
$self->debug("Response retrieved is for $instanceId.");
if ($response->is_success) {
$self->debug("Response for $instanceId retrieved successfully.");
if ($response->header("Cookie") ne "") {
if ($response->header("Set-Cookie") ne "") {
$self->debug("Storing cookie for $instanceId for later use.");
my $cookie = $response->header("Set-Cookie");
$cookie =~ s/wgSession=([a-zA-Z0-9\_\-]{22})/$1/;
$cookie =~ s/wgSession=([a-zA-Z0-9\_\-]{22}).*/$1/;
$self->{_cookies}{$self->{_instances}{$instanceId}{sitename}} = $cookie;
}
my $state = $response->content;

View file

@ -85,7 +85,7 @@ sub delete {
my $skipNotify = shift;
$self->session->db->deleteRow("WorkflowSchedule","taskId",$self->getId);
if ($skipNotify) {
WebGUI::Workflow::Spectre->new($self->session)->notify("cron/deleteJob",{taskId=>$self->getId, config=>$self->session->config->getFilename});
WebGUI::Workflow::Spectre->new($self->session)->notify("cron/deleteJob", $self->session->config->getFilename."-".$self->getId);
}
undef $self;
}
@ -283,7 +283,7 @@ sub set {
$self->{_data}{enabled} = 0 unless ($self->{_data}{workflowId});
my $spectre = WebGUI::Workflow::Spectre->new($self->session);
$self->session->db->setRow("WorkflowSchedule","taskId",$self->{_data});
$spectre->notify("cron/deleteJob",{taskId=>$self->getId,config=>$self->session->config->getFilename});
$spectre->notify("cron/deleteJob", $self->session->config->getFilename."-".$self->getId);
my %params = %{$self->{_data}};
$params{parameters} = $self->get("parameters");
$params{config} = $self->session->config->getFilename;