diff --git a/docs/gotcha.txt b/docs/gotcha.txt index 8c2750fdb..9c9b742d2 100644 --- a/docs/gotcha.txt +++ b/docs/gotcha.txt @@ -32,6 +32,8 @@ save you many hours of grief. * The following perl modules are now required: MIME::Tools + POE + POE::Component::IKC::Server diff --git a/lib/Spectre/Admin.pm b/lib/Spectre/Admin.pm new file mode 100644 index 000000000..3dce52ee9 --- /dev/null +++ b/lib/Spectre/Admin.pm @@ -0,0 +1,89 @@ +package Spectre::Admin; + +=head1 LEGAL + + ------------------------------------------------------------------- + WebGUI is Copyright 2001-2006 Plain Black Corporation. + ------------------------------------------------------------------- + Please read the legal notices (docs/legal.txt) and the license + (docs/license.txt) that came with this distribution before using + this software. + ------------------------------------------------------------------- + http://www.plainblack.com info@plainblack.com + ------------------------------------------------------------------- + +=cut + +use strict; +use POE; +use POE::Component::IKC::Server; +use POE::Component::IKC::Specifier; + +#------------------------------------------------------------------- + +=head2 _start ( ) + +Initializes the admin interface. + +=cut + +sub _start { + print "Starting WebGUI Spectre Admin..."; + my ( $kernel, $self, $publicEvents) = @_[ KERNEL, OBJECT, ARG0 ]; + my $serviceName = "admin"; + $kernel->alias_set($serviceName); + $kernel->call( IKC => publish => $serviceName, $publicEvents ); + my $configs = WebGUI::Config->readAllConfigs($self->{_webguiRoot}); + print "OK\n"; +} + +#------------------------------------------------------------------- + +=head2 _stop ( ) + +Gracefully shuts down the admin interface. + +=cut + +sub _stop { + my ($kernel, $self) = @_[KERNEL, OBJECT]; + print "Stopping WebGUI Admin..."; + undef $self; + $kernel->stop; + print "OK\n"; +} + +#------------------------------------------------------------------- + +=head2 new ( webguiRoot ) + +Constructor. + +=head3 webguiRoot + +The path to the root of the WebGUI installation. + +=cut + +sub new { + my $class = shift; + my $webguiRoot = shift; + my $self = {_webguiRoot=>$webguiRoot}; + bless $self, $class; + create_ikc_server( + port => 32133, + name => 'Spectre', + ); + POE::Session->create( + object_states => [ $self => {_start=>"_start", _stop=>"_stop", "shutdown"=>"_stop"} ], + args=>[["shutdown"]] + ); + $self->{_cron} = Spectre::Cron->new($webguiRoot); + $self->{_workflow} = Spectre::Workflow->new($webguiRoot); + POE::Kernel->run(); +} + + + +1; + diff --git a/lib/Spectre/Cron.pm b/lib/Spectre/Cron.pm index f9e473ecf..7694cc5fa 100644 --- a/lib/Spectre/Cron.pm +++ b/lib/Spectre/Cron.pm @@ -17,8 +17,48 @@ package Spectre::Cron; use strict; use DateTime; use DateTime::Cron::Simple; +use POE; use WebGUI::Session; +#------------------------------------------------------------------- + +=head2 _start ( ) + +Initializes the scheduler. + +=cut + +sub _start { + print "Starting WebGUI Spectre Scheduler..."; + my ( $kernel, $self, $publicEvents) = @_[ KERNEL, OBJECT, ARG0 ]; + my $serviceName = "scheduler"; + $kernel->alias_set($serviceName); + $kernel->call( IKC => publish => $serviceName, $publicEvents ); + my $configs = WebGUI::Config->readAllConfigs($self->{_webguiRoot}); + foreach my $config (keys %{$configs}) { + $kernel->yield("loadSchedule", $config); + } + print "OK\n"; + $kernel->yield("checkSchedules"); +} + +#------------------------------------------------------------------- + +=head2 _stop ( ) + +Gracefully shuts down the scheduler. + +=cut + +sub _stop { + my ($kernel, $self) = @_[KERNEL, OBJECT]; + print "Stopping WebGUI Spectre Scheduler..."; + undef $self; + print "OK\n"; +} + + + #------------------------------------------------------------------- =head2 addJob ( config, job ) @@ -36,11 +76,10 @@ A hash reference containing the properties of the job from the WorkflowSchedule =cut sub addJob { - my $self = shift; - my $config = shift; - my $job = shift; + my ($self, $config, $job) = @_[OBJECT, ARG0, ARG1]; return 0 unless ($job->{enabled}); $self->{_jobs}{$job->{jobId}} = { + jobId=>$job->{jobId}, config=>$config, schedule=>join(" ", $job->{minuteOfHour}, $job->{hourOfDay}, $job->{dayOfMonth}, $job->{monthOfYear}, $job->{dayOfWeek}), workflowId=>$job->{workflowId} @@ -48,6 +87,30 @@ sub addJob { } +#------------------------------------------------------------------- + +=head2 checkSchedule ( job, now ) + +Compares a schedule with the current time and kicks off an event if necessary. This method should only ever need to be called by checkSchedules(). + +=head3 job + +A job definition created through the addJob() method. + +=head3 now + +A DateTime object representing the time to compare the schedule with. + +=cut + +sub checkSchedule { + my ($kernel, $self, $job, $now) = @_[KERNEL, OBJECT, ARG0, ARG1]; + my $cron = DateTime::Cron::Simple->new($job->{schedule}); + if ($cron->validate_time($now)) { + # kick off an event here once we know what that api looks like + } +} + #------------------------------------------------------------------- =head2 checkSchedules ( ) @@ -57,13 +120,10 @@ Checks all the schedules of the jobs in the queue and triggers a workflow if a s =cut sub checkSchedules { - my $self = shift + my ($kernel, $self) = @_[KERNEL, OBJECT]; my $now = DateTime->from_epoch(epoch=>time()); foreach my $jobId (keys %{$self->{_jobs}}) { - my $cron = DateTime::Cron::Simple->new($self->{_jobs}{$jobId}{schedule}); - if ($cron->validate_time($now) { - # kick off an event here once we know what that api looks like - } + $kernel->yield("checkSchedule", $self->{_jobs}{$jobId}, $now) } } @@ -81,30 +141,38 @@ The unique id of the job to remove. =cut sub deleteJob { - my $self = shift; - delete $self->{_jobs}{shift}; + my ($self, $jobId) = @_[OBJECT, ARG0]; + delete $self->{_jobs}{$jobId}; } #------------------------------------------------------------------- -=head2 DESTROY ( ) +=head2 loadSchedule ( config ) -Deconstructor. +Loads the workflow schedule from a particular site. + +=head3 config + +The config filename for the site to load the schedule. =cut -sub DESTROY { - my $self = shift; - undef $self; +sub loadSchedule { + my ($kernel, $self, $config) = @_[KERNEL, OBJECT, ARG0]; + my $session = WebGUI::Session->open($self->{_webguiRoot}, $config); + my $result = $session->db->read("select * from WorkflowSchedule"); + while (my $data = $result->hashRef) { + $kernel->yield("addJob",$config, $data); + } + $session->close; } - #------------------------------------------------------------------- =head2 new ( webguiRoot ) -Constructor. Loads all schedules from WebGUI sites into it's job queue. +Constructor. =head3 webguiRoot @@ -117,17 +185,14 @@ sub new { my $webguiRoot = shift; my $self = {_webguiRoot=>$webguiRoot}; bless $self, $class; - my $configs = WebGUI::Config->readAllConfigs($webguiRoot); - foreach my $config (keys %{$configs}) { - my $session = WebGUI::Session->open($webguiRoot, $config); - my $result = $session->db->read("select * from WorkflowSchedule"); - while (my $data = $result->hashRef) { - $self->addJob($config, $data); - } - $session->close; - } + my @publicEvents = qw(addJob deleteJob); + POE::Session->create( + object_states => [ $self => [qw(_start _stop checkEvents checkEvent loadSchedule), @publicEvents] ], + args=>[\@publicEvents] + ); } + 1; diff --git a/lib/Spectre/ProcessManager.pm b/lib/Spectre/ProcessManager.pm deleted file mode 100644 index a0b7e7e99..000000000 --- a/lib/Spectre/ProcessManager.pm +++ /dev/null @@ -1,117 +0,0 @@ -package Spectre::ProcessManager; - -=head1 LEGAL - - ------------------------------------------------------------------- - WebGUI is Copyright 2001-2006 Plain Black Corporation. - ------------------------------------------------------------------- - Please read the legal notices (docs/legal.txt) and the license - (docs/license.txt) that came with this distribution before using - this software. - ------------------------------------------------------------------- - http://www.plainblack.com info@plainblack.com - ------------------------------------------------------------------- - -=cut - -use strict; -use Proc::Background; - -#------------------------------------------------------------------- - -=head2 cleanUp ( ) - -Cleans up the process list by clearning out old processes that have completed. - -=cut - -sub cleanUp { - my $self = shift; - my @newList = (); - foreach my $process (@{$self->{_processList}}) { - push(@newList, $process) if ($process->alive); - } - $self->{_processList} = \@newList; -} - -#------------------------------------------------------------------- - -=head2 createProcess ( command ) - -Spawns a new process. - -=head3 command - -The commandline to execute under the new process. - -=cut - -sub createProcess { - my $self = shift; - my $command = shift; - my $process = Proc::Background->new({'die_upon_destroy' => 1}, $command); - push(@{$self->{_processList}}, $process); -} - -#------------------------------------------------------------------- - -=head2 DESTROY ( ) - -Deconstructor. - -=cut - -sub DESTROY { - my $self = shift; - $self->killAllProcesses; - undef $self; -} - -#------------------------------------------------------------------- - -=head2 getProcessCount ( ) - -Returns an integer representing the number of processes currently running. This runs cleanUp() before counting to ensure an accurate count. - -=cut - -sub getProcessCount { - my $self = shift; - $self->cleanUp; - return scalar(@{$self->{_processList}}); -} - - -#------------------------------------------------------------------- - -=head2 killAllProcesses ( ) - -Kills all of the running processes. - -=cut - -sub killAllProcesses { - my $self = shift; - foreach my $process (@{$self->{_processList}}) { - $process->die; - } - $self->{_processList} = (); -} - - -#------------------------------------------------------------------- - -=head2 new ( ) - -Constructor. - -=cut - -sub new { - my $class = shift; - bless {_processList} => ()}, $class; -} - -1; - - diff --git a/lib/Spectre/Workflow.pm b/lib/Spectre/Workflow.pm index 2d2d74a2e..7cb7b54ed 100644 --- a/lib/Spectre/Workflow.pm +++ b/lib/Spectre/Workflow.pm @@ -15,6 +15,45 @@ package Spectre::Workflow; =cut use strict; +use POE; + +#------------------------------------------------------------------- + +=head2 _start ( ) + +Initializes the workflow manager. + +=cut + +sub _start { + print "Starting WebGUI Spectre Workflow Manager..."; + my ( $kernel, $self, $publicEvents) = @_[ KERNEL, OBJECT, ARG0 ]; + my $serviceName = "workflow"; + $kernel->alias_set($serviceName); + $kernel->call( IKC => publish => $serviceName, $publicEvents ); + my $configs = WebGUI::Config->readAllConfigs($self->{_webguiRoot}); + foreach my $config (keys %{$configs}) { + $kernel->yield("loadWorkflows", $config); + } + print "OK\n"; + $kernel->yield("checkJobs"); +} + +#------------------------------------------------------------------- + +=head2 _stop ( ) + +Gracefully shuts down the workflow manager. + +=cut + +sub _stop { + my ($kernel, $self) = @_[KERNEL, OBJECT]; + print "Stopping WebGUI Spectre Workflow Manager..."; + undef $self; + print "OK\n"; +} + #------------------------------------------------------------------- @@ -34,10 +73,9 @@ A hash reference containing a row of data from the WorkflowInstance table. =cut sub addJob { - my $self = shift; - my $config = shift; - my $job = shift; + my ($self, $config, $job) = @_[OBJECT, ARG0, ARG1]; $self->{"_priority".$job->{priority}}{$job->{instanceId}} = { + instanceId=>$job->{instanceId}, config=>$config, status=>"waiting" }; @@ -45,6 +83,36 @@ sub addJob { #------------------------------------------------------------------- +=head2 checkJobs ( ) + +Checks to see if there are any open job slots available, and if there are assigns a new job to be run to fill it. + +=cut + +sub checkJobs { + my ($kernel, $self) = @_[KERNEL, OBJECT]; + if ($self->countRunningJobs < 5) { + my $job = $self->getNextJob; + $job->{status} = "running"; + $kernel->yield("runJob",$job); + } +} + +#------------------------------------------------------------------- + +=head2 countRunningJobs ( ) + +Returns an integer representing the number of running jobs. + +=cut + +sub countRunningJobs { + my $self = shift; + return scalar(@{$self->{_runningJobs}}); +} + +#------------------------------------------------------------------- + =head2 deleteJob ( instanceId ) Removes a workflow job from the processing queue. @@ -52,24 +120,10 @@ Removes a workflow job from the processing queue. =cut sub deleteJob { - my $self = shift; - delete $self->{_priority1}{shift}; - delete $self->{_priority2}{shift}; - delete $self->{_priority3}{shift}; -} - - -#------------------------------------------------------------------- - -=head2 DESTROY ( ) - -Deconstructor. - -=cut - -sub DESTROY { - my $self = shift; - undef $self; + my ($self, $instanceId) = @_[OBJECT, ARG0]; + delete $self->{_priority1}{$instanceId}; + delete $self->{_priority2}{$instanceId}; + delete $self->{_priority3}{$instanceId}; } @@ -83,8 +137,9 @@ sub getNextJob { my $self = shift; foreach my $priority (1..3) { foreach my $instanceId (keys %{$self->{"_priority".$priority}}) { - if ($self->{"_priority".$priority}{status} eq "waiting") { - + if ($self->{"_priority".$priority}{$instanceId}{status} eq "waiting") { + return $self->{"_priority".$priority}{$instanceId}; + } } } return undef; @@ -92,6 +147,22 @@ sub getNextJob { #------------------------------------------------------------------- +=head2 loadWorkflows ( ) + +=cut + +sub loadWorkflows { + my ($kernel, $self, $config) = @_[KERNEL, OBJECT, ARG0]; + my $session = WebGUI::Session->open($self->{_webguiRoot}, $config); + my $result = $session->db->read("select * from WorkflowInstance"); + while (my $data = $result->hashRef) { + $kernel->yield("addJob", $config, $data); + } + $session->close; +} + +#------------------------------------------------------------------- + =head2 new ( webguiRoot ) Constructor. Loads all active workflows from each WebGUI site and begins executing them. @@ -107,18 +178,13 @@ sub new { my $webguiRoot = shift; my $self = {_webguiRoot=>$webguiRoot}; bless $self, $class; - my $configs = WebGUI::Config->readAllConfigs($webguiRoot); - foreach my $config (keys %{$configs}) { - my $session = WebGUI::Session->open($webguiRoot, $config); - my $result = $session->db->read("select * from WorkflowInstance"); - while (my $data = $result->hashRef) { - $self->addJob($config, $data); - } - $session->close; - } + my @publicEvents = qw(addJob deleteJob); + POE::Session->create( + object_states => [ $self => [qw(_start _stop checkJobs loadWorkflows runJob), @publicEvents] ], + args=>[\@publicEvents] + ); } - 1; diff --git a/sbin/spectre.pl b/sbin/spectre.pl new file mode 100644 index 000000000..e4ffa8b80 --- /dev/null +++ b/sbin/spectre.pl @@ -0,0 +1,228 @@ +#------------------------------------------------------------------- +# WebGUI is Copyright 2001-2005 Plain Black Corporation. +#------------------------------------------------------------------- +# Please read the legal notices (docs/legal.txt) and the license +# (docs/license.txt) that came with this distribution before using +# this software. +#------------------------------------------------------------------- +# http://www.plainblack.com info@plainblack.com +#------------------------------------------------------------------- + +use strict; +use warnings; +use lib '../lib'; +use DateTime; +use DateTime::Cron::Simple; +use Getopt::Long; +use POE qw(Session); +use POE::Component::IKC::ClientLite; +use POE::Component::JobQueue; +use WebGUI::Session; +use WebGUI::Workflow; + +$|=1; # disable output buffering +my $help; +my $shutdown; + +GetOptions( + 'help'=>\$help, + 'shutdown'=>\$shutdown + ); + +if ($help) { + print <32133, + ip=>'127.0.0.1', + name=>rand(100000), + timeout=>10 + ); + die $POE::Component::IKC::ClientLite::error unless $remote; + my $result = $remote->post('scheduler/shutdown'); + die $POE::Component::IKC::ClientLite::error unless defined $result; + undef $remote; + exit; +} + +fork and exit; + + +create_ikc_server( + port => 32133, + name => 'Spectre', + ); + +POE::Session->create( + inline_states => { + _start => \&initializeScheduler, + _stop => \&shutdown, + "shutdown" => \&shutdown, + loadSchedule => \&loadSchedule, + checkSchedule => \&checkSchedule, + checkEvent => \&checkEvent, + } + ); + +POE::Session->create( + inline_states => { + _start => \&initializeJobQueue, + _stop => \&shutdown, + } + ); + +POE::Component::JobQueue->spawn ( + Alias => 'queuer', + WorkerLimit => 10, + Worker => \&spawnWorker, + Passive => { + Prioritizer => \&prioritizeJobs, + }, + ); + +POE::Kernel->run(); +exit 0; + + +#------------------------------------------------------------------- +sub checkEvent { + my ($kernel, $schedule, $workflowId, $time) = @_[KERNEL, ARG0, ARG1, ARG2]; + my $cron = DateTime::Cron::Simple->new($schedule); + if ($cron->validate_time(DateTime->from_epoch(epoch=>$time))) { + print "Supposed to run task ".$workflowId." now!!\n"; + } +} + +#------------------------------------------------------------------- +sub checkSchedule { + my ($kernel, $heap) = @_[KERNEL, HEAP]; + my $now = time(); + foreach my $config (keys %{$heap->{workflowSchedules}}) { + foreach my $event (@{$heap->{workflowSchedules}{$config}}) { + $kernel->yield("checkEvent",$event->{schedule},$event->{workflowId},$now); + } + } + $kernel->delay_set("checkSchedule",60); +} + +#------------------------------------------------------------------- +sub initializeJobQueue { + print "Starting WebGUI Spectre Job Queue..."; + my $kernel = $_[KERNEL]; + my $serviceName = "queue"; + $kernel->alias_set($serviceName); + $kernel->call( IKC => publish => $serviceName, ["shutdown"] ); + print "OK\n"; + foreach my $config (keys %{WebGUI::Config::readAllConfigs("..")}) { + $kernel->yield("loadJobs", $config); + } +} + +#------------------------------------------------------------------- +sub initializeScheduler { + print "Starting WebGUI Spectre Scheduler..."; + my ( $kernel, $heap) = @_[ KERNEL, HEAP ]; + my $serviceName = "scheduler"; + $kernel->alias_set($serviceName); + $kernel->call( IKC => publish => $serviceName, ["shutdown", "loadSchedule"] ); + foreach my $config (keys %{WebGUI::Config::readAllConfigs("..")}) { + $kernel->yield("loadSchedule", $config); + } + print "OK\n"; + $kernel->yield("checkSchedule"); +} + +#------------------------------------------------------------------- +sub loadJobs { + my ($heap, $config) = @_[HEAP, ARG0]; + sessionOpen($config); +} + +#------------------------------------------------------------------- +sub loadSchedule { + my ($heap, $config) = @_[HEAP, ARG0]; + sessionOpen($config); + $heap->{workflowSchedules}{$config} = WebGUI::Workflow::getSchedules(); + sessionClose(); +} + +#------------------------------------------------------------------- +sub performJob { + +} + +#------------------------------------------------------------------- +sub prioritizeJobs { + return 1; # FIFO queue, but let's add priorities at some point +} + +#------------------------------------------------------------------- +sub sessionOpen { + WebGUI::Session::open("..",shift); + WebGUI::Session::refreshUserInfo("pbuser_________spectre"); +} + +#------------------------------------------------------------------- +sub sessionClose { + WebGUI::Session::end(); + WebGUI::Session::close(); +} + +#------------------------------------------------------------------- +sub shutdown { + my $kernel = $_[KERNEL]; + print "Stopping WebGUI Spectre..."; + if ($session{var}{userId}) { + sessionClose(); + } + print "OK\n"; + $kernel->stop; +} + +#------------------------------------------------------------------- +sub spawnWorker { + my ($postback, @jobParams) = @_; + POE::Session->create ( + inline_states => { + _start => \&startWorker, + _stop => \&stopWorker, + performJob => \&performJob + }, + args => [ + $postback, + @jobParams, + ], + ); +} + +#------------------------------------------------------------------- +sub startWorker { + +} + +#------------------------------------------------------------------- +sub stopWorker { + +} + + + + + diff --git a/sbin/testEnvironment.pl b/sbin/testEnvironment.pl index 5c2fe22ab..39a8f31e3 100644 --- a/sbin/testEnvironment.pl +++ b/sbin/testEnvironment.pl @@ -78,9 +78,8 @@ checkModule("Parse::PlainConfig",1.1); checkModule("XML::RSSLite",0.11); checkModule("JSON",0.991); checkModule("Finance::Quote",1.08); -#checkModule("POE",0.3202); -#checkModule("POE::Component::IKC::Server",0.18); -#checkModule("POE::Component::JobQueue",0.5402); +checkModule("POE",0.3202); +checkModule("POE::Component::IKC::Server",0.18); checkModule("Data::Structure::Util",0.11); checkModule("Apache2::Request",2.06);