- Simplified Spectre's initial data load, and reduced memory footprint in the

process.
This commit is contained in:
JT Smith 2007-02-01 18:48:23 +00:00
parent f140fdead4
commit b19bf14389
8 changed files with 159 additions and 73 deletions

View file

@ -16,6 +16,10 @@
- fix: Wiki Error on plainblack.com
- Made exipred sessions workflow activity more fault tollerant.
- Made rss fetching more fault tollerant.
- fix: caps problem in IntSlider (thanks to patspam)
- rfe: added user defined fields to collaboration system rss (Dept of State)
- Simplified Spectre's initial data load, and reduced memory footprint in the
process.
- fix: lack of testing for valid object creation
- fix: No mention of intermediate upgrade step in gotcha's
- fix: A newly released version of Html::Template fixes a bug with global

View file

@ -16,6 +16,8 @@ package Spectre::Admin;
use strict;
use HTTP::Request;
use JSON;
use Log::Log4perl;
use LWP::UserAgent;
use POE;
use POE::Component::IKC::Server;
@ -23,6 +25,7 @@ use POE::Component::IKC::Specifier;
use Spectre::Cron;
use Spectre::Workflow;
#-------------------------------------------------------------------
=head2 _start ( )
@ -37,6 +40,7 @@ sub _start {
my $serviceName = "admin";
$kernel->alias_set($serviceName);
$kernel->call( IKC => publish => $serviceName, $publicEvents );
$kernel->delay_set("loadSiteData",3);
}
#-------------------------------------------------------------------
@ -109,7 +113,7 @@ sub error {
#-------------------------------------------------------------------
=head3 getLogger ( )
=head2 getLogger ( )
Returns a reference to the logger.
@ -122,6 +126,51 @@ sub getLogger {
#-------------------------------------------------------------------
=head2 loadSiteData ( )
Fetches the site from each defined site, and loads it into the Workflow and Cron governors.
=cut
sub loadSiteData {
my ( $kernel, $self) = @_[ KERNEL, OBJECT ];
my $configs = WebGUI::Config->readAllConfigs($self->{_config}->getWebguiRoot);
foreach my $key (keys %{$configs}) {
next if $key =~ m/^demo/;
$self->debug("Fetching site data for $key");
my $userAgent = new LWP::UserAgent;
$userAgent->env_proxy;
$userAgent->agent("Spectre");
$userAgent->timeout(30);
my $url = "http://".$configs->{$key}->get("sitename")->[0].":".$self->{_config}->get("webguiPort").$configs->{$key}->get("gateway")."?op=spectreGetSiteData";
my $request = new HTTP::Request (GET => $url);
my $response = $userAgent->request($request);
if ($response->is_error) {
$self->error( "Couldn't connect to WebGUI site $key");
}
else {
my $siteData = {};
eval { $siteData = JSON::jsonToObj($response->content); };
if ($@) {
$self->error("Couldn't fetch Spectre configuration data for $key");
}
else {
$self->debug("Loading workflow data for $key");
foreach my $instance (@{$siteData->{workflow}}) {
$kernel->post("workflow" ,"addInstance", $instance);
}
$self->debug("Loading scheduler data for $key");
foreach my $task (@{$siteData->{cron}}) {
$task->{config} = $key;
$kernel->post("cron", "addJob", $task);
}
}
}
}
}
#-------------------------------------------------------------------
=head2 new ( config [ , debug ] )
Constructor.
@ -152,7 +201,7 @@ sub new {
name => 'Spectre'
);
POE::Session->create(
object_states => [ $self => {_start=>"_start", _stop=>"_stop", "shutdown"=>"_stop", "ping"=>"ping"} ],
object_states => [ $self => {_start=>"_start", _stop=>"_stop", "shutdown"=>"_stop", "ping"=>"ping", "loadSiteData"=>"loadSiteData"} ],
args=>[["shutdown","ping"]]
);
Spectre::Workflow->new($config, $logger, $debug);
@ -192,7 +241,7 @@ sub runTests {
print "Running connectivity tests.\n";
my $configs = WebGUI::Config->readAllConfigs($config->getWebguiRoot);
foreach my $key (keys %{$configs}) {
next if $config =~ m/^demo/;
next if $key =~ m/^demo/;
print "Testing $key\n";
my $userAgent = new LWP::UserAgent;
$userAgent->env_proxy;

View file

@ -19,8 +19,6 @@ use DateTime;
use HTTP::Request::Common;
use HTTP::Cookies;
use POE qw(Component::Client::HTTP);
use WebGUI::Session;
use WebGUI::Workflow::Cron;
#-------------------------------------------------------------------
@ -36,12 +34,6 @@ sub _start {
my $serviceName = "cron";
$kernel->alias_set($serviceName);
$kernel->call( IKC => publish => $serviceName, $publicEvents );
$self->debug("Loading the schedules from all the sites.");
my $configs = WebGUI::Config->readAllConfigs($self->config->getWebguiRoot);
foreach my $config (keys %{$configs}) {
next if $config =~ m/^demo/;
$kernel->yield("loadSchedule", $config);
}
$kernel->yield("checkSchedules");
}
@ -133,7 +125,6 @@ An integer (1,2,3) that determines what priority the workflow should be executed
sub addJob {
my ($self, $params) = @_[OBJECT, ARG0];
return 0 unless ($params->{enabled});
my $id = $params->{config}."-".$params->{taskId};
$self->debug("Adding schedule ".$params->{taskId}." to the queue.");
$params->{schedule} = join(" ", $params->{minuteOfHour}, $params->{hourOfDay}, $params->{dayOfMonth}, $params->{monthOfYear}, $params->{dayOfWeek});
@ -338,38 +329,10 @@ sub getLogger {
return $self->{_logger};
}
#-------------------------------------------------------------------
=head2 loadSchedule ( config )
Loads the workflow schedule from a particular site.
=head3 config
The config filename for the site to load the schedule.
=cut
sub loadSchedule {
my ($kernel, $self, $config) = @_[KERNEL, OBJECT, ARG0];
$self->debug("Loading schedules for $config.");
my $session = WebGUI::Session->open($self->config->getWebguiRoot, $config);
my $result = $session->db->read("select * from WorkflowSchedule");
while (my $data = $result->hashRef) {
my $params = JSON::jsonToObj($data->{parameters});
$data->{parameters} = $params->{parameters};
$data->{config} = $config;
$data->{gateway} = $session->config->get("gateway");
$data->{sitename} = $session->config->get("sitename")->[0];
$kernel->yield("addJob", $data);
}
$result->finish;
$session->close;
}
#-------------------------------------------------------------------
=head2 new ( config, logger, workflow, [ debug ] )
=head2 new ( config, logger, [ debug ] )
Constructor.
@ -381,10 +344,6 @@ A WebGUI::Config object that represents the spectre.conf file.
A reference to the logger object.
=head3 workflow
A reference to the Worfklow session.
=head3 debug
A boolean indicating Spectre should spew forth debug as it runs.
@ -400,7 +359,7 @@ sub new {
bless $self, $class;
my @publicEvents = qw(runJob runJobResponse addJob deleteJob);
POE::Session->create(
object_states => [ $self => [qw(_start _stop runJob runJobResponse addJob deleteJob checkSchedules checkSchedule loadSchedule), @publicEvents] ],
object_states => [ $self => [qw(_start _stop runJob runJobResponse addJob deleteJob checkSchedules checkSchedule), @publicEvents] ],
args=>[\@publicEvents]
);
my $cookies = HTTP::Cookies->new(file => '/tmp/cookies');

View file

@ -18,7 +18,6 @@ use strict;
use HTTP::Request::Common;
use HTTP::Cookies;
use POE qw(Component::Client::HTTP);
use WebGUI::Session;
#-------------------------------------------------------------------
@ -34,12 +33,6 @@ sub _start {
my $serviceName = "workflow";
$kernel->alias_set($serviceName);
$kernel->call( IKC => publish => $serviceName, $publicEvents );
$self->debug("Reading workflow configs.");
my $configs = WebGUI::Config->readAllConfigs($self->config->getWebguiRoot);
foreach my $config (keys %{$configs}) {
next if $config =~ m/^demo/;
$kernel->yield("loadWorkflows", $configs->{$config});
}
$kernel->yield("checkInstances");
}
@ -260,24 +253,6 @@ sub getNextInstance {
#-------------------------------------------------------------------
=head2 loadWorkflows ( )
=cut
sub loadWorkflows {
my ($kernel, $self, $config) = @_[KERNEL, OBJECT, ARG0];
$self->debug("Loading workflows for ".$config->getFilename.".");
my $session = WebGUI::Session->open($config->getWebguiRoot, $config->getFilename);
my $result = $session->db->read("select instanceId,priority from WorkflowInstance");
while (my ($id, $priority) = $result->array) {
$kernel->yield("addInstance", {gateway=>$config->get("gateway"), sitename=>$config->get("sitename")->[0], instanceId=>$id, priority=>$priority});
}
$result->finish;
$session->close;
}
#-------------------------------------------------------------------
=head2 new ( config, logger, [ , debug ] )
Constructor. Loads all active workflows from each WebGUI site and begins executing them.
@ -305,7 +280,7 @@ sub new {
bless $self, $class;
my @publicEvents = qw(addInstance deleteInstance);
POE::Session->create(
object_states => [ $self => [qw(_start _stop returnInstanceToRunnableState addInstance checkInstances deleteInstance suspendInstance loadWorkflows runWorker workerResponse), @publicEvents] ],
object_states => [ $self => [qw(_start _stop returnInstanceToRunnableState addInstance checkInstances deleteInstance suspendInstance runWorker workerResponse), @publicEvents] ],
args=>[\@publicEvents]
);
my $cookies = HTTP::Cookies->new(file => '/tmp/cookies');

View file

@ -219,6 +219,7 @@ sub getOperations {
'editSettings' => 'WebGUI::Operation::Settings',
'saveSettings' => 'WebGUI::Operation::Settings',
'spectreGetSiteData' => 'WebGUI::Operation::Spectre',
'spectreTest' => 'WebGUI::Operation::Spectre',
'viewStatistics' => 'WebGUI::Operation::Statistics',

View file

@ -11,8 +11,11 @@ package WebGUI::Operation::Spectre;
#-------------------------------------------------------------------
use strict;
use WebGUI::Utility;
use JSON;
use POE::Component::IKC::ClientLite;
use WebGUI::Utility;
use WebGUI::Workflow::Cron;
use WebGUI::Workflow::Instance;
=head1 NAME
@ -26,6 +29,60 @@ Operations for Spectre.
#-------------------------------------------------------------------
=head2 www_spectreGetSiteData ( )
Checks to ensure the requestor is who we think it is, and then returns a JSON string with worklfow and cron data. We do it in one payload for efficiency.
=cut
sub www_spectreGetSiteData {
my $session = shift;
$session->http->setMimeType("text/json");
$session->http->setCacheControl("none");
my %siteData = ();
if (!isInSubnet($session->env->get("REMOTE_ADDR"), $session->config->get("spectreSubnets"))) {
$session->errorHandler->security("make a Spectre workflow data load request, but we're only allowed to accept requests from "
.join(",",@{$session->config->get("spectreSubnets")}).".");
}
else {
my $sitename = $session->config->get("sitename")->[0];
my $gateway = $session->config->get("gateway");
my $cookieName = $session->config->getCookieName;
my @instances = ();
foreach my $instance (@{WebGUI::Workflow::Instance->getAllInstances($session)}) {
next unless $instance->getWorkflow->get("enabled");
push(@instances, {
instanceId => $instance->getId,
priority => $instance->get("priority"),
cookieName => $cookieName,
gateway => $gateway,
sitename => $sitename,
});
}
$siteData{workflow} = \@instances;
my @schedules = ();
foreach my $task (@{WebGUI::Workflow::Cron->getAllTasks($session)}) {
next unless $task->get("enabled");
push(@schedules, {
taskId => $task->getId,
cookieName => $cookieName,
gateway => $gateway,
sitename => $sitename,
minuteOfHour => $task->get('minuteOfHour'),
hourOfDay => $task->get('hourOfDay'),
dayOfMonth => $task->get('dayOfMonth'),
monthOfYear => $task->get('monthOfYear'),
dayOfWeek => $task->get('dayOfWeek'),
runOnce => $task->get('runOnce'),
});
}
$siteData{cron} = \@schedules;
}
return JSON::objToJson(\%siteData);
}
#-------------------------------------------------------------------
=head2 www_spectreTest ( )
Spectre executes this function to see if WebGUI connectivity is working.

View file

@ -122,6 +122,26 @@ sub get {
return $self->{_data}{$name};
}
#-------------------------------------------------------------------
=head2 getAllTasks ( session )
Returns an array reference of all the schedule objects defined in this system. A class method.
=cut
sub getAllTasks {
my $class = shift;
my $session = shift;
my @schedules = ();
my $rs = $session->db->read("SELECT taskId FROM WorkflowSchedule");
while (my ($taskId) = $rs->array) {
push(@schedules, WebGUI::Workflow::Cron->new($session, $taskId));
}
return \@schedules;
}
#-------------------------------------------------------------------
=head2 getId ( )

View file

@ -142,6 +142,27 @@ sub get {
return $self->{_data}{$name};
}
#-------------------------------------------------------------------
=head2 getAllInstances ( session )
Returns an array reference of all the instance objects defined in this system. A class method.
=cut
sub getAllInstances {
my $class = shift;
my $session = shift;
my @instances = ();
my $rs = $session->db->read("SELECT instanceId FROM WorkflowInstance");
while (my ($instanceId) = $rs->array) {
push(@instances, WebGUI::Workflow::Instance->new($session, $instanceId));
}
return \@instances;
}
#-------------------------------------------------------------------
=head2 getId ( )