diff --git a/lib/WebGUI/Workflow.pm b/lib/WebGUI/Workflow.pm new file mode 100644 index 000000000..49ef2bc8b --- /dev/null +++ b/lib/WebGUI/Workflow.pm @@ -0,0 +1,64 @@ +package WebGUI::Workflow; + + +=head1 LEGAL + + ------------------------------------------------------------------- + WebGUI is Copyright 2001-2005 Plain Black Corporation. + ------------------------------------------------------------------- + Please read the legal notices (docs/legal.txt) and the license + (docs/license.txt) that came with this distribution before using + this software. + ------------------------------------------------------------------- + http://www.plainblack.com info@plainblack.com + ------------------------------------------------------------------- + +=cut + +use strict; +use WebGUI::Session; + +=head1 NAME + +Package WebGUI::Workflow + +=head1 DESCRIPTION + +This package provides global utility functions for workflows. + +=head1 SYNOPSIS + + use WebGUI::Workflow; + + $arrayRef = getSchedules(); + +=head1 FUNCTIONS + +These subroutines are available from this package: + +=cut + +#------------------------------------------------------------------- + +=head2 getSchedules + +Returns an array reference of hashes containing the workflow schedule data for this site. + +=cut + +sub getSchedules { + my @schedules; + my $sth = WebGUI::SQL->read("select * from WorkflowSchedule where enabled=1"); + while (my $event = $sth->hashRef) { + my $schedule = join(" ",$event->{minuteOfHour},$event->{hourOfDay},$event->{dayOfMonth},$event->{monthOfYear},$event->{dayOfWeek}); + push(@schedules,{ + schedule=>$schedule, + workflowId=>$event->{workflowId} + }); + } + return \@schedules; +} + +1; + + diff --git a/sbin/spectre.pl b/sbin/spectre.pl index dc6e0c224..225831394 100644 --- a/sbin/spectre.pl +++ b/sbin/spectre.pl @@ -11,6 +11,7 @@ use strict; use warnings; use lib '../lib'; +use DateTime; use DateTime::Cron::Simple; use Getopt::Long; use POE qw(Session); @@ -56,7 +57,7 @@ if ($shutdown) { timeout=>10 ); die $POE::Component::IKC::ClientLite::error unless $remote; - my $result = $remote->post('Spectre/shutdown'); + my $result = $remote->post('scheduler/shutdown'); die $POE::Component::IKC::ClientLite::error unless defined $result; undef $remote; exit; @@ -65,18 +66,26 @@ if ($shutdown) { fork and exit; -POE::Component::IKC::Server->spawn( +create_ikc_server( port => 32133, name => 'Spectre', ); POE::Session->create( inline_states => { - _start => \&serviceStart, - _stop => \&serviceStop, - "shutdown" => \&serviceStop, - initializeWorkflowScheduler => \&initializeWorkflowScheduler, - loadSchedule => \&loadSchedule + _start => \&initializeScheduler, + _stop => \&shutdown, + "shutdown" => \&shutdown, + loadSchedule => \&loadSchedule, + checkSchedule => \&checkSchedule, + checkEvent => \&checkEvent, + } +); + +POE::Session->create( + inline_states => { + _start => \&initializeJobQueue, + _stop => \&shutdown, } ); @@ -85,13 +94,50 @@ exit 0; #------------------------------------------------------------------- -sub initializeWorkflowScheduler { - my ($kernel, $session) = @_[KERNEL, SESSION]; - foreach my $config (keys %{WebGUI::Config::readAllConfigs("..")}) { - $kernel->post($session,"loadSchedule", $config); +sub checkEvent { + my ($kernel, $schedule, $workflowId, $time) = @_[KERNEL, ARG0, ARG1, ARG2]; + my $cron = DateTime::Cron::Simple->new($schedule); + if ($cron->validate_time(DateTime->from_epoch(epoch=>$time))) { + print "Supposed to run task ".$workflowId." now!!\n"; } } +#------------------------------------------------------------------- +sub checkSchedule { + my ($kernel, $heap) = @_[KERNEL, HEAP]; + my $now = time(); + foreach my $config (keys %{$heap->{workflowSchedules}}) { + foreach my $event (@{$heap->{workflowSchedules}{$config}}) { + $kernel->yield("checkEvent",$event->{schedule},$event->{workflowId},$now); + } + } + $kernel->delay_set("checkSchedule",60); +} + +#------------------------------------------------------------------- +sub initializeJobQueue { + print "Starting WebGUI Spectre Job Queue..."; + my $kernel = $_[KERNEL]; + my $serviceName = "queue"; + $kernel->alias_set($serviceName); + $kernel->call( IKC => publish => $serviceName, ["shutdown"] ); + print "OK\n"; +} + +#------------------------------------------------------------------- +sub initializeScheduler { + print "Starting WebGUI Spectre Scheduler..."; + my ( $kernel, $heap) = @_[ KERNEL, HEAP ]; + my $serviceName = "scheduler"; + $kernel->alias_set($serviceName); + $kernel->call( IKC => publish => $serviceName, ["shutdown", "loadSchedule"] ); + foreach my $config (keys %{WebGUI::Config::readAllConfigs("..")}) { + $kernel->yield("loadSchedule", $config); + } + print "OK\n"; + $kernel->yield("checkSchedule"); +} + #------------------------------------------------------------------- sub loadSchedule { my ($heap, $config) = @_[HEAP, ARG0]; @@ -101,24 +147,7 @@ sub loadSchedule { } #------------------------------------------------------------------- -sub serviceShutdown { - my $kernel = $_[KERNEL]; - $kernel->yield("_stop"); -} - -#------------------------------------------------------------------- -sub serviceStart { - print "Starting WebGUI Spectre..."; - my ( $kernel, $heap, $session) = @_[ KERNEL, HEAP, SESSION ]; - my $serviceName = "Spectre"; - $kernel->alias_set($serviceName); - $kernel->call( IKC => publish => $serviceName, ["shutdown", "loadSchedule"] ); - print "OK\n"; - $kernel->post($session, "initializeWorkflowScheduler"); -} - -#------------------------------------------------------------------- -sub serviceStop { +sub shutdown { my $kernel = $_[KERNEL]; print "Stopping WebGUI Spectre..."; if ($session{var}{userId}) {