From 524ea87d3664761860f3f9cb9ae71519c9c25090 Mon Sep 17 00:00:00 2001 From: JT Smith Date: Sun, 19 Mar 2006 20:25:52 +0000 Subject: [PATCH] cron officially works!!! --- lib/Spectre/Admin.pm | 11 +-- lib/Spectre/Cron.pm | 164 +++++++++++++++++++++++++------- lib/WebGUI/Workflow/Cron.pm | 18 +++- lib/WebGUI/Workflow/Instance.pm | 4 +- 4 files changed, 146 insertions(+), 51 deletions(-) diff --git a/lib/Spectre/Admin.pm b/lib/Spectre/Admin.pm index 0195fdcf3..0c3c8d396 100644 --- a/lib/Spectre/Admin.pm +++ b/lib/Spectre/Admin.pm @@ -110,18 +110,13 @@ sub new { name => 'Spectre', ); POE::Session->create( - object_states => [ $self => {_start=>"_start", _stop=>"_stop", "shutdown"=>"_stop","test"=>"test"} ], - args=>[["shutdown","test"]] + object_states => [ $self => {_start=>"_start", _stop=>"_stop", "shutdown"=>"_stop"} ], + args=>[["shutdown"]] ); - $self->{_cron} = Spectre::Cron->new($config, $debug); $self->{_workflow} = Spectre::Workflow->new($config, $debug); + $self->{_cron} = Spectre::Cron->new($config, $self->{_workflow}, $debug); POE::Kernel->run(); } -sub test { - my $arg = $_[ARG1]; -use JSON; - print objToJson($arg); -} diff --git a/lib/Spectre/Cron.pm b/lib/Spectre/Cron.pm index 7bdb9542b..e3d2bd416 100644 --- a/lib/Spectre/Cron.pm +++ b/lib/Spectre/Cron.pm @@ -33,7 +33,7 @@ Initializes the scheduler. sub _start { my ( $kernel, $self, $publicEvents) = @_[ KERNEL, OBJECT, ARG0 ]; $self->debug("Starting Spectre scheduler."); - my $serviceName = "scheduler"; + my $serviceName = "cron"; $kernel->alias_set($serviceName); $kernel->call( IKC => publish => $serviceName, $publicEvents ); $self->debug("Loading the schedules from all the sites."); @@ -62,35 +62,92 @@ sub _stop { #------------------------------------------------------------------- -=head2 addJob ( config, job ) +=head2 addJob ( params ) Adds a job to the cron monitoring queue. -=head3 config +=head3 params -The filename of the configuration file for the site this job belongs to. +A hash reference containing data about the job. -=head3 job +=head4 taskId -A hash reference containing the properties of the job from the WorkflowSchedule table. +The unique id for the cron job. + +=head4 sitename + +The sitename that the job belongs to. + +=head4 config + +The name of the config file of the site that the job belongs to. + +=head4 enabled + +A boolean indicating whether the job is enabled or not. + +=head4 minuteOfHour + +Part of the schedule. + +=head4 hourOfDay + +Part of the schedule. + +=head4 dayOfMonth + +Part of the schedule. + +=head4 monthOfYear + +Part of the schedule. + +=head4 dayOfWeek + +Part of the schedule. + +=head4 runOnce + +A boolean indicating whether this cron should be executed more than once. + +=head4 workflowId + +The ID of the workflow that should be kicked off when the time is right. + +=head4 className + +The class name of the object to be created to be passed in to the workflow. + +=head4 methodName + +THe method name of the object to be created to be passed in to the workflow. + +=head4 parameters + +The parameters of the object to be created to be passed in to the workflow. + +=head4 priority + +An integer (1,2,3) that determines what priority the workflow should be executed at. =cut sub addJob { - my ($self, $config, $job) = @_[OBJECT, ARG0, ARG1]; - return 0 unless ($job->{enabled}); - $self->debug("Adding schedule ".$job->{taskId}." to the queue."); - $self->{_jobs}{$job->{taskId}} = { - taskId=>$job->{taskId}, - config=>$config, - schedule=>join(" ", $job->{minuteOfHour}, $job->{hourOfDay}, $job->{dayOfMonth}, $job->{monthOfYear}, $job->{dayOfWeek}), - runOnce=>$job->{runOnce}, - workflowId=>$job->{workflowId}, - className=>$job->{className}, - methodName=>$job->{methodName}, - parameters=>$job->{parameters}, - priority=>$job->{priority} - } + my ($self, $params) = @_[OBJECT, ARG0]; + return 0 unless ($params->{enabled}); + $self->debug("Adding schedule ".$params->{taskId}." to the queue."); + $self->{_jobs}{$params->{config}}{$params->{taskId}} = { + taskId=>$params->{taskId}, + config=>$params->{config}, + sitename=>$params->{sitename}, + schedule=>join(" ", $params->{minuteOfHour}, $params->{hourOfDay}, $params->{dayOfMonth}, $params->{monthOfYear}, $params->{dayOfWeek}), + runOnce=>$params->{runOnce}, + workflowId=>$params->{workflowId}, + className=>$params->{className}, + methodName=>$params->{methodName}, + parameters=>$params->{parameters}, + priority=>$params->{priority} + }; } @@ -112,32 +169,35 @@ A DateTime object representing the time to compare the schedule with. sub checkSchedule { my ($kernel, $self, $job, $now) = @_[KERNEL, OBJECT, ARG0, ARG1]; - $self->debug("Checking schedule ".$job->{taskId}." against the current time."); + $self->debug("Checking schedule ".$job->{taskId}." for ".$job->{config}." against the current time."); my $cron = DateTime::Cron::Simple->new($job->{schedule}); if ($cron->validate_time($now)) { - $self->debug("It's time to run ".$job->{taskId}.". Creating workflow instance."); + $self->debug("It's time to run ".$job->{taskId}." for ".$job->{config}.". Creating workflow instance."); my $session = WebGUI::Session->open($self->config->getWebguiRoot, $job->{config}); my $instance = WebGUI::Workflow::Instance->create($session, { workflowId=>$job->{workflowId}, className=>$job->{className}, methodName=>$job->{methodName}, parameters=>$job->{parameters}, - priority=>$job->{priority} + priority=>$job->{priority}, + notifySpectre=>0 }); if (defined $instance) { $self->debug("Created workflow instance ".$instance->getId."."); + $kernel->post($self->workflowSession, "addInstance", {instanceId=>$instance->getId, priority=>$job->{priority}, sitename=>}); } else { - $self->debug("Something bad happened. Couldn't create workflow instance for schedule ".$job->{taskId}."."); + $self->debug("Something bad happened. Couldn't create workflow instance for schedule ".$job->{taskId}." for ".$job->{config}."."); } if ($job->{runOnce}) { - $self->debug("Schedule ".$job->{taskId}." is only supposed to run once."); + $self->debug("Schedule ".$job->{taskId}." for ".$job->{config}." is only supposed to run once."); my $cron = WebGUI::Workflow::Cron->new($session, $job->{taskId}); if (defined $cron) { $self->debug("Deleting schedule from database."); - $cron->delete; + $cron->delete(1); } else { $self->debug("Couldn't instanciate schedule ".$job->{taskId}." in order to delete it."); } + $kernel->yield("deleteJob",{config=>$job->{config}, taskId=>$job->{taskId}}); } $session->close; } @@ -155,8 +215,10 @@ sub checkSchedules { my ($kernel, $self) = @_[KERNEL, OBJECT]; $self->debug("Checking schedules against current time."); my $now = DateTime->from_epoch(epoch=>time()); - foreach my $taskId (keys %{$self->{_jobs}}) { - $kernel->yield("checkSchedule", $self->{_jobs}{$taskId}, $now) + foreach my $config (keys %{$self->{_jobs}}) { + foreach my $taskId (keys %{$self->{_jobs}{$config}}) { + $kernel->yield("checkSchedule", $self->{_jobs}{$config}{$taskId}, $now) + } } $kernel->delay_set("checkSchedules",60); } @@ -195,20 +257,28 @@ sub debug { #------------------------------------------------------------------- -=head2 deleteJob ( taskId ) +=head2 deleteJob ( params ) Removes a job from the monitoring queue. -=head3 taskId +=head3 params -The unique id of the job to remove. +A hash reference containing the info needed to delete this job. + +=head4 taskId + +The unique ID for this job. + +=head4 config + +The config file name for the site this job belongs to. =cut sub deleteJob { - my ($self, $taskId) = @_[OBJECT, ARG0]; - $self->debug("Deleting schedule $taskId from queue."); - delete $self->{_jobs}{$taskId}; + my ($self, $params) = @_[OBJECT, ARG0]; + $self->debug("Deleting schedule ".$params->{taskId}." for ".$params->{config}." from queue."); + delete $self->{_jobs}{$params->{config}}{$params->{taskId}}; } @@ -230,7 +300,9 @@ sub loadSchedule { my $session = WebGUI::Session->open($self->config->getWebguiRoot, $config); my $result = $session->db->read("select * from WorkflowSchedule"); while (my $data = $result->hashRef) { - $kernel->yield("addJob",$config, $data); + $data->{config} = $config; + $data->{sitename} = $session->config->get("sitename")->[0]; + $kernel->yield("addJob", $data); } $session->close; } @@ -245,6 +317,10 @@ Constructor. A WebGUI::Config object that represents the spectre.conf file. +=head3 workflowSession + +A reference to the Worfklow session. + =head3 debug A boolean indicating Spectre should spew forth debug as it runs. @@ -254,17 +330,31 @@ A boolean indicating Spectre should spew forth debug as it runs. sub new { my $class = shift; my $config = shift; + my $workflowSession = shift; my $debug = shift; - my $self = {_debug=>$debug, _config=>$config}; + my $self = {_debug=>$debug, _workflowSession=>$workflowSession, _config=>$config}; bless $self, $class; my @publicEvents = qw(addJob deleteJob); POE::Session->create( - object_states => [ $self => [qw(_start _stop checkSchedules checkSchedule loadSchedule), @publicEvents] ], + object_states => [ $self => [qw(_start _stop addJob deleteJob checkSchedules checkSchedule loadSchedule), @publicEvents] ], args=>[\@publicEvents] ); } +#------------------------------------------------------------------- + +=head2 workflowSession ( ) + +Returns a reference to the workflow session. + +=cut + +sub workflowSession { + my $self = shift; + return $self->{_workflowSession}; +} + 1; diff --git a/lib/WebGUI/Workflow/Cron.pm b/lib/WebGUI/Workflow/Cron.pm index cd26760a7..d40ac5022 100644 --- a/lib/WebGUI/Workflow/Cron.pm +++ b/lib/WebGUI/Workflow/Cron.pm @@ -70,16 +70,23 @@ sub create { #------------------------------------------------------------------- -=head2 delete ( ) +=head2 delete ( [ skipNotify ] ) Removes this job from the schedule. +=head3 skipNotify + +A boolean indicating whether to skip spectre notification of this event. + =cut sub delete { my $self = shift; + my $skipNotify = shift; $self->session->db->deleteRow("WorkflowSchedule","taskId",$self->getId); - WebGUI::Workflow::Spectre->new($self->session)->notify("cron/deleteJob",$self->getId); + if ($skipNotify) { + WebGUI::Workflow::Spectre->new($self->session)->notify("cron/deleteJob",{taskId=>$self->getId, config=>$self->session->config->getFilename}); + } undef $self; } @@ -276,8 +283,11 @@ sub set { $self->{_data}{enabled} = 0 unless ($self->{_data}{workflowId}); my $spectre = WebGUI::Workflow::Spectre->new($self->session); $self->session->db->setRow("WorkflowSchedule","taskId",$self->{_data}); - $spectre->notify("cron/deleteJob",$self->getId); - $spectre->notify("cron/addJob",$self->session->config->getFilename, $self->{_data}); + $spectre->notify("cron/deleteJob",{taskId=>$self->getId,config=>$self->session->config->getFilename}); + my %params = %{$self->{_data}}; + $params{config} = $self->session->config->getFilename; + $params{sitename} = $self->session->config->get("sitename")->[0]; + $spectre->notify("cron/addJob", \%params); } diff --git a/lib/WebGUI/Workflow/Instance.pm b/lib/WebGUI/Workflow/Instance.pm index 0b7cb5932..cd460da4e 100644 --- a/lib/WebGUI/Workflow/Instance.pm +++ b/lib/WebGUI/Workflow/Instance.pm @@ -68,7 +68,7 @@ sub create { return undef if ($isSerial && $count); my $instanceId = $session->db->setRow("WorkflowInstance","instanceId",{instanceId=>"new", runningSince=>time()}, $id); my $self = $class->new($session, $instanceId); - $properties->{priority} ||= 2; + $properties->{notifySpectre} = 1 unless ($properties->{notifySpectre} eq "0"); $self->set($properties); return $self; } @@ -306,7 +306,7 @@ sub set { $self->{_data}{currentActivityId} = (exists $properties->{currentActivityId}) ? $properties->{currentActivityId} : $self->{_data}{currentActivityId}; $self->{_data}{lastUpdate} = time(); $self->session->db->setRow("WorkflowInstance","instanceId",$self->{_data}); - if ($properties->{priority}) { + if ($properties->{priority} && $properties->{notifySpectre}) { my $spectre = WebGUI::Workflow::Spectre->new($self->session); $spectre->notify("workflow/deleteInstance",$self->getId); $spectre->notify("workflow/addInstance", {sitename=>$self->session->config->get("sitename")->[0], instanceId=>$self->getId, priority=>$self->{_data}{priority}});