cron officially works!!!

This commit is contained in:
JT Smith 2006-03-19 20:25:52 +00:00
parent 18e8de5856
commit 524ea87d36
4 changed files with 146 additions and 51 deletions

View file

@ -110,18 +110,13 @@ sub new {
name => 'Spectre',
);
POE::Session->create(
object_states => [ $self => {_start=>"_start", _stop=>"_stop", "shutdown"=>"_stop","test"=>"test"} ],
args=>[["shutdown","test"]]
object_states => [ $self => {_start=>"_start", _stop=>"_stop", "shutdown"=>"_stop"} ],
args=>[["shutdown"]]
);
$self->{_cron} = Spectre::Cron->new($config, $debug);
$self->{_workflow} = Spectre::Workflow->new($config, $debug);
$self->{_cron} = Spectre::Cron->new($config, $self->{_workflow}, $debug);
POE::Kernel->run();
}
sub test {
my $arg = $_[ARG1];
use JSON;
print objToJson($arg);
}

View file

@ -33,7 +33,7 @@ Initializes the scheduler.
sub _start {
my ( $kernel, $self, $publicEvents) = @_[ KERNEL, OBJECT, ARG0 ];
$self->debug("Starting Spectre scheduler.");
my $serviceName = "scheduler";
my $serviceName = "cron";
$kernel->alias_set($serviceName);
$kernel->call( IKC => publish => $serviceName, $publicEvents );
$self->debug("Loading the schedules from all the sites.");
@ -62,35 +62,92 @@ sub _stop {
#-------------------------------------------------------------------
=head2 addJob ( config, job )
=head2 addJob ( params )
Adds a job to the cron monitoring queue.
=head3 config
=head3 params
The filename of the configuration file for the site this job belongs to.
A hash reference containing data about the job.
=head3 job
=head4 taskId
A hash reference containing the properties of the job from the WorkflowSchedule table.
The unique id for the cron job.
=head4 sitename
The sitename that the job belongs to.
=head4 config
The name of the config file of the site that the job belongs to.
=head4 enabled
A boolean indicating whether the job is enabled or not.
=head4 minuteOfHour
Part of the schedule.
=head4 hourOfDay
Part of the schedule.
=head4 dayOfMonth
Part of the schedule.
=head4 monthOfYear
Part of the schedule.
=head4 dayOfWeek
Part of the schedule.
=head4 runOnce
A boolean indicating whether this cron should be executed more than once.
=head4 workflowId
The ID of the workflow that should be kicked off when the time is right.
=head4 className
The class name of the object to be created to be passed in to the workflow.
=head4 methodName
THe method name of the object to be created to be passed in to the workflow.
=head4 parameters
The parameters of the object to be created to be passed in to the workflow.
=head4 priority
An integer (1,2,3) that determines what priority the workflow should be executed at.
=cut
sub addJob {
my ($self, $config, $job) = @_[OBJECT, ARG0, ARG1];
return 0 unless ($job->{enabled});
$self->debug("Adding schedule ".$job->{taskId}." to the queue.");
$self->{_jobs}{$job->{taskId}} = {
taskId=>$job->{taskId},
config=>$config,
schedule=>join(" ", $job->{minuteOfHour}, $job->{hourOfDay}, $job->{dayOfMonth}, $job->{monthOfYear}, $job->{dayOfWeek}),
runOnce=>$job->{runOnce},
workflowId=>$job->{workflowId},
className=>$job->{className},
methodName=>$job->{methodName},
parameters=>$job->{parameters},
priority=>$job->{priority}
}
my ($self, $params) = @_[OBJECT, ARG0];
return 0 unless ($params->{enabled});
$self->debug("Adding schedule ".$params->{taskId}." to the queue.");
$self->{_jobs}{$params->{config}}{$params->{taskId}} = {
taskId=>$params->{taskId},
config=>$params->{config},
sitename=>$params->{sitename},
schedule=>join(" ", $params->{minuteOfHour}, $params->{hourOfDay}, $params->{dayOfMonth}, $params->{monthOfYear}, $params->{dayOfWeek}),
runOnce=>$params->{runOnce},
workflowId=>$params->{workflowId},
className=>$params->{className},
methodName=>$params->{methodName},
parameters=>$params->{parameters},
priority=>$params->{priority}
};
}
@ -112,32 +169,35 @@ A DateTime object representing the time to compare the schedule with.
sub checkSchedule {
my ($kernel, $self, $job, $now) = @_[KERNEL, OBJECT, ARG0, ARG1];
$self->debug("Checking schedule ".$job->{taskId}." against the current time.");
$self->debug("Checking schedule ".$job->{taskId}." for ".$job->{config}." against the current time.");
my $cron = DateTime::Cron::Simple->new($job->{schedule});
if ($cron->validate_time($now)) {
$self->debug("It's time to run ".$job->{taskId}.". Creating workflow instance.");
$self->debug("It's time to run ".$job->{taskId}." for ".$job->{config}.". Creating workflow instance.");
my $session = WebGUI::Session->open($self->config->getWebguiRoot, $job->{config});
my $instance = WebGUI::Workflow::Instance->create($session, {
workflowId=>$job->{workflowId},
className=>$job->{className},
methodName=>$job->{methodName},
parameters=>$job->{parameters},
priority=>$job->{priority}
priority=>$job->{priority},
notifySpectre=>0
});
if (defined $instance) {
$self->debug("Created workflow instance ".$instance->getId.".");
$kernel->post($self->workflowSession, "addInstance", {instanceId=>$instance->getId, priority=>$job->{priority}, sitename=>});
} else {
$self->debug("Something bad happened. Couldn't create workflow instance for schedule ".$job->{taskId}.".");
$self->debug("Something bad happened. Couldn't create workflow instance for schedule ".$job->{taskId}." for ".$job->{config}.".");
}
if ($job->{runOnce}) {
$self->debug("Schedule ".$job->{taskId}." is only supposed to run once.");
$self->debug("Schedule ".$job->{taskId}." for ".$job->{config}." is only supposed to run once.");
my $cron = WebGUI::Workflow::Cron->new($session, $job->{taskId});
if (defined $cron) {
$self->debug("Deleting schedule from database.");
$cron->delete;
$cron->delete(1);
} else {
$self->debug("Couldn't instanciate schedule ".$job->{taskId}." in order to delete it.");
}
$kernel->yield("deleteJob",{config=>$job->{config}, taskId=>$job->{taskId}});
}
$session->close;
}
@ -155,8 +215,10 @@ sub checkSchedules {
my ($kernel, $self) = @_[KERNEL, OBJECT];
$self->debug("Checking schedules against current time.");
my $now = DateTime->from_epoch(epoch=>time());
foreach my $taskId (keys %{$self->{_jobs}}) {
$kernel->yield("checkSchedule", $self->{_jobs}{$taskId}, $now)
foreach my $config (keys %{$self->{_jobs}}) {
foreach my $taskId (keys %{$self->{_jobs}{$config}}) {
$kernel->yield("checkSchedule", $self->{_jobs}{$config}{$taskId}, $now)
}
}
$kernel->delay_set("checkSchedules",60);
}
@ -195,20 +257,28 @@ sub debug {
#-------------------------------------------------------------------
=head2 deleteJob ( taskId )
=head2 deleteJob ( params )
Removes a job from the monitoring queue.
=head3 taskId
=head3 params
The unique id of the job to remove.
A hash reference containing the info needed to delete this job.
=head4 taskId
The unique ID for this job.
=head4 config
The config file name for the site this job belongs to.
=cut
sub deleteJob {
my ($self, $taskId) = @_[OBJECT, ARG0];
$self->debug("Deleting schedule $taskId from queue.");
delete $self->{_jobs}{$taskId};
my ($self, $params) = @_[OBJECT, ARG0];
$self->debug("Deleting schedule ".$params->{taskId}." for ".$params->{config}." from queue.");
delete $self->{_jobs}{$params->{config}}{$params->{taskId}};
}
@ -230,7 +300,9 @@ sub loadSchedule {
my $session = WebGUI::Session->open($self->config->getWebguiRoot, $config);
my $result = $session->db->read("select * from WorkflowSchedule");
while (my $data = $result->hashRef) {
$kernel->yield("addJob",$config, $data);
$data->{config} = $config;
$data->{sitename} = $session->config->get("sitename")->[0];
$kernel->yield("addJob", $data);
}
$session->close;
}
@ -245,6 +317,10 @@ Constructor.
A WebGUI::Config object that represents the spectre.conf file.
=head3 workflowSession
A reference to the Worfklow session.
=head3 debug
A boolean indicating Spectre should spew forth debug as it runs.
@ -254,17 +330,31 @@ A boolean indicating Spectre should spew forth debug as it runs.
sub new {
my $class = shift;
my $config = shift;
my $workflowSession = shift;
my $debug = shift;
my $self = {_debug=>$debug, _config=>$config};
my $self = {_debug=>$debug, _workflowSession=>$workflowSession, _config=>$config};
bless $self, $class;
my @publicEvents = qw(addJob deleteJob);
POE::Session->create(
object_states => [ $self => [qw(_start _stop checkSchedules checkSchedule loadSchedule), @publicEvents] ],
object_states => [ $self => [qw(_start _stop addJob deleteJob checkSchedules checkSchedule loadSchedule), @publicEvents] ],
args=>[\@publicEvents]
);
}
#-------------------------------------------------------------------
=head2 workflowSession ( )
Returns a reference to the workflow session.
=cut
sub workflowSession {
my $self = shift;
return $self->{_workflowSession};
}
1;

View file

@ -70,16 +70,23 @@ sub create {
#-------------------------------------------------------------------
=head2 delete ( )
=head2 delete ( [ skipNotify ] )
Removes this job from the schedule.
=head3 skipNotify
A boolean indicating whether to skip spectre notification of this event.
=cut
sub delete {
my $self = shift;
my $skipNotify = shift;
$self->session->db->deleteRow("WorkflowSchedule","taskId",$self->getId);
WebGUI::Workflow::Spectre->new($self->session)->notify("cron/deleteJob",$self->getId);
if ($skipNotify) {
WebGUI::Workflow::Spectre->new($self->session)->notify("cron/deleteJob",{taskId=>$self->getId, config=>$self->session->config->getFilename});
}
undef $self;
}
@ -276,8 +283,11 @@ sub set {
$self->{_data}{enabled} = 0 unless ($self->{_data}{workflowId});
my $spectre = WebGUI::Workflow::Spectre->new($self->session);
$self->session->db->setRow("WorkflowSchedule","taskId",$self->{_data});
$spectre->notify("cron/deleteJob",$self->getId);
$spectre->notify("cron/addJob",$self->session->config->getFilename, $self->{_data});
$spectre->notify("cron/deleteJob",{taskId=>$self->getId,config=>$self->session->config->getFilename});
my %params = %{$self->{_data}};
$params{config} = $self->session->config->getFilename;
$params{sitename} = $self->session->config->get("sitename")->[0];
$spectre->notify("cron/addJob", \%params);
}

View file

@ -68,7 +68,7 @@ sub create {
return undef if ($isSerial && $count);
my $instanceId = $session->db->setRow("WorkflowInstance","instanceId",{instanceId=>"new", runningSince=>time()}, $id);
my $self = $class->new($session, $instanceId);
$properties->{priority} ||= 2;
$properties->{notifySpectre} = 1 unless ($properties->{notifySpectre} eq "0");
$self->set($properties);
return $self;
}
@ -306,7 +306,7 @@ sub set {
$self->{_data}{currentActivityId} = (exists $properties->{currentActivityId}) ? $properties->{currentActivityId} : $self->{_data}{currentActivityId};
$self->{_data}{lastUpdate} = time();
$self->session->db->setRow("WorkflowInstance","instanceId",$self->{_data});
if ($properties->{priority}) {
if ($properties->{priority} && $properties->{notifySpectre}) {
my $spectre = WebGUI::Workflow::Spectre->new($self->session);
$spectre->notify("workflow/deleteInstance",$self->getId);
$spectre->notify("workflow/addInstance", {sitename=>$self->session->config->get("sitename")->[0], instanceId=>$self->getId, priority=>$self->{_data}{priority}});