adding debug to spectre

This commit is contained in:
JT Smith 2006-03-16 23:32:18 +00:00
parent baf9a200af
commit 71b2634741
5 changed files with 215 additions and 51 deletions

View file

@ -30,12 +30,11 @@ Initializes the admin interface.
=cut
sub _start {
print "Starting WebGUI Spectre Admin...";
my ( $kernel, $self, $publicEvents) = @_[ KERNEL, OBJECT, ARG0 ];
$self->debug("Starting Spectre administrative manager.");
my $serviceName = "admin";
$kernel->alias_set($serviceName);
$kernel->call( IKC => publish => $serviceName, $publicEvents );
print "OK\n";
}
#-------------------------------------------------------------------
@ -48,15 +47,45 @@ Gracefully shuts down the admin interface.
sub _stop {
my ($kernel, $self) = @_[KERNEL, OBJECT];
print "Stopping Spectre...";
$self->debug("Stopping Spectre administrative manager.");
undef $self;
$kernel->stop;
print "OK\n";
}
#-------------------------------------------------------------------
=head2 new ( config )
=head2 config ()
Returns a reference to the config object.
=cut
sub config {
my $self = shift;
return $self->{_config};
}
#-------------------------------------------------------------------
=head2 debug ( output )
Prints out debug information if debug is enabled.
=head3
=cut
sub debug {
my $self = shift;
my $output = shift;
if ($self->{_debug}) {
print "ADMIN: ".$output."\n";
}
}
#-------------------------------------------------------------------
=head2 new ( config [ , debug ] )
Constructor.
@ -64,12 +93,17 @@ Constructor.
A WebGUI::Config object that represents the spectre.conf file.
=head3 debug
A boolean indicating Spectre should spew forth debug as it runs.
=cut
sub new {
my $class = shift;
my $config = shift;
my $self = {_config=>$config};
my $debug = shift;
my $self = {_debug=>$debug, _config=>$config};
bless $self, $class;
create_ikc_server(
port => $config->get("port"),
@ -79,8 +113,8 @@ sub new {
object_states => [ $self => {_start=>"_start", _stop=>"_stop", "shutdown"=>"_stop"} ],
args=>[["shutdown"]]
);
$self->{_cron} = Spectre::Cron->new($config);
$self->{_workflow} = Spectre::Workflow->new($config);
$self->{_cron} = Spectre::Cron->new($config, $debug);
$self->{_workflow} = Spectre::Workflow->new($config, $debug);
POE::Kernel->run();
}

View file

@ -31,16 +31,16 @@ Initializes the scheduler.
=cut
sub _start {
print "Starting WebGUI Spectre Scheduler...";
my ( $kernel, $self, $publicEvents) = @_[ KERNEL, OBJECT, ARG0 ];
$self->debug("Starting Spectre scheduler.");
my $serviceName = "scheduler";
$kernel->alias_set($serviceName);
$kernel->call( IKC => publish => $serviceName, $publicEvents );
my $configs = WebGUI::Config->readAllConfigs($self->{_config}->getWebguiRoot);
$self->debug("Loading the schedules from all the sites.");
my $configs = WebGUI::Config->readAllConfigs($self->config->getWebguiRoot);
foreach my $config (keys %{$configs}) {
$kernel->yield("loadSchedule", $config);
}
print "OK\n";
$kernel->yield("checkSchedules");
}
@ -54,9 +54,8 @@ Gracefully shuts down the scheduler.
sub _stop {
my ($kernel, $self) = @_[KERNEL, OBJECT];
print "Stopping WebGUI Spectre Scheduler...";
$self->debug("Stopping the scheduler.");
undef $self;
print "OK\n";
}
@ -80,8 +79,9 @@ A hash reference containing the properties of the job from the WorkflowSchedule
sub addJob {
my ($self, $config, $job) = @_[OBJECT, ARG0, ARG1];
return 0 unless ($job->{enabled});
$self->{_jobs}{$job->{jobId}} = {
jobId=>$job->{jobId},
$self->debug("Adding schedule ".$job->{taskId}." to the queue.");
$self->{_jobs}{$job->{taskId}} = {
taskId=>$job->{taskId},
config=>$config,
schedule=>join(" ", $job->{minuteOfHour}, $job->{hourOfDay}, $job->{dayOfMonth}, $job->{monthOfYear}, $job->{dayOfWeek}),
runOnce=>$job->{runOnce},
@ -112,19 +112,32 @@ A DateTime object representing the time to compare the schedule with.
sub checkSchedule {
my ($kernel, $self, $job, $now) = @_[KERNEL, OBJECT, ARG0, ARG1];
$self->debug("Checking schedule ".$job->{taskId}." against the current time.");
my $cron = DateTime::Cron::Simple->new($job->{schedule});
if ($cron->validate_time($now)) {
my $session = WebGUI::Session->open($self->{_config}->getWebguiRoot, $job->{config});
WebGUI::Workflow::Instance->create($session, {
$self->debug("It's time to run ".$job->{taskId}.". Creating workflow instance.");
my $session = WebGUI::Session->open($self->config->getWebguiRoot, $job->{config});
my $instance = WebGUI::Workflow::Instance->create($session, {
workflowId=>$job->{workflowId},
className=>$job->{className},
methodName=>$job->{methodName},
parameters=>$job->{parameters},
priority=>$job->{priority}
});
if (defined $instance) {
$self->debug("Created workflow instance ".$instance->getId.".");
} else {
$self->debug("Something bad happened. Couldn't create workflow instance for schedule ".$job->{taskId}.".");
}
if ($job->{runOnce}) {
my $cron = WebGUI::Workflow::Cron->new($session, $job->{jobId});
$cron->delete if defined $cron;
$self->debug("Schedule ".$job->{taskId}." is only supposed to run once.");
my $cron = WebGUI::Workflow::Cron->new($session, $job->{taskId});
if (defined $cron) {
$self->debug("Deleting schedule from database.");
$cron->delete;
} else {
$self->debug("Couldn't instanciate schedule ".$job->{taskId}." in order to delete it.");
}
}
$session->close;
}
@ -140,9 +153,10 @@ Checks all the schedules of the jobs in the queue and triggers a workflow if a s
sub checkSchedules {
my ($kernel, $self) = @_[KERNEL, OBJECT];
$self->debug("Checking schedules against current time.");
my $now = DateTime->from_epoch(epoch=>time());
foreach my $jobId (keys %{$self->{_jobs}}) {
$kernel->yield("checkSchedule", $self->{_jobs}{$jobId}, $now)
foreach my $taskId (keys %{$self->{_jobs}}) {
$kernel->yield("checkSchedule", $self->{_jobs}{$taskId}, $now)
}
$kernel->delay_set("checkSchedules",60);
}
@ -150,19 +164,51 @@ sub checkSchedules {
#-------------------------------------------------------------------
=head2 deleteJob ( jobId )
=head2 config
Returns a reference to the config object.
=cut
sub config {
my $self = shift;
return $self->{_config};
}
#-------------------------------------------------------------------
=head2 debug ( output )
Prints out debug information if debug is enabled.
=head3
=cut
sub debug {
my $self = shift;
my $output = shift;
if ($self->{_debug}) {
print "CRON: ".$output."\n";
}
}
#-------------------------------------------------------------------
=head2 deleteJob ( taskId )
Removes a job from the monitoring queue.
=head3 jobId
=head3 taskId
The unique id of the job to remove.
=cut
sub deleteJob {
my ($self, $jobId) = @_[OBJECT, ARG0];
delete $self->{_jobs}{$jobId};
my ($self, $taskId) = @_[OBJECT, ARG0];
$self->debug("Deleting schedule $taskId from queue.");
delete $self->{_jobs}{$taskId};
}
@ -180,7 +226,8 @@ The config filename for the site to load the schedule.
sub loadSchedule {
my ($kernel, $self, $config) = @_[KERNEL, OBJECT, ARG0];
my $session = WebGUI::Session->open($self->{_config}->getWebguiRoot, $config);
$self->debug("Loading schedules for $config.");
my $session = WebGUI::Session->open($self->config->getWebguiRoot, $config);
my $result = $session->db->read("select * from WorkflowSchedule");
while (my $data = $result->hashRef) {
$kernel->yield("addJob",$config, $data);
@ -198,12 +245,17 @@ Constructor.
A WebGUI::Config object that represents the spectre.conf file.
=head3 debug
A boolean indicating Spectre should spew forth debug as it runs.
=cut
sub new {
my $class = shift;
my $config = shift;
my $self = {_config=>$config};
my $debug = shift;
my $self = {_debug=>$debug, _config=>$config};
bless $self, $class;
my @publicEvents = qw(addJob deleteJob);
POE::Session->create(

View file

@ -29,16 +29,16 @@ Initializes the workflow manager.
=cut
sub _start {
print "Starting WebGUI Spectre Workflow Manager...";
my ( $kernel, $self, $publicEvents) = @_[ KERNEL, OBJECT, ARG0 ];
$self->debug("Starting workflow manager.");
my $serviceName = "workflow";
$kernel->alias_set($serviceName);
$kernel->call( IKC => publish => $serviceName, $publicEvents );
my $configs = WebGUI::Config->readAllConfigs($self->{_config}->getWebguiRoot);
$self->debug("Reading workflow configs.");
my $configs = WebGUI::Config->readAllConfigs($self->config->getWebguiRoot);
foreach my $config (keys %{$configs}) {
$kernel->yield("loadWorkflows", $config);
}
print "OK\n";
$kernel->yield("checkJobs");
}
@ -51,10 +51,9 @@ Gracefully shuts down the workflow manager.
=cut
sub _stop {
my ($kernel, $self) = @_[KERNEL, OBJECT];
print "Stopping WebGUI Spectre Workflow Manager...";
my ($kernel, $self) = @_[KERNEL, OBJECT];
$self->debug("Stopping workflow manager.");
undef $self;
print "OK\n";
}
@ -77,6 +76,7 @@ A hash reference containing a row of data from the WorkflowInstance table.
sub addJob {
my ($self, $config, $job) = @_[OBJECT, ARG0, ARG1];
$self->debug->("Adding workflow instance ".$job->{instanceId}." from ".$config." to job queue at priority ".$job->{priority}.".");
# job list
$self->{_jobs}{$job->{instanceId}} = {
instanceId=>$job->{instanceId},
@ -97,7 +97,8 @@ Checks to see if there are any open job slots available, and if there are assign
sub checkJobs {
my ($kernel, $self) = @_[KERNEL, OBJECT];
if ($self->countRunningJobs < $self->{_config}->get("maxWorkers")) {
$self->debug("Checking to see if we can run anymore jobs right now.");
if ($self->countRunningJobs < $self->config->get("maxWorkers")) {
my $job = $self->getNextJob;
if (defined $job) {
$job->{status} = "running";
@ -109,6 +110,19 @@ sub checkJobs {
#-------------------------------------------------------------------
=head2 config
Returns a reference to the config object.
=cut
sub config {
my $self = shift;
return $self->{_config};
}
#-------------------------------------------------------------------
=head2 countRunningJobs ( )
Returns an integer representing the number of running jobs.
@ -118,7 +132,27 @@ Returns an integer representing the number of running jobs.
sub countRunningJobs {
my $self = shift;
my $runningJobs = $self->{_runningJobs} || [];
return scalar(@{$runningJobs});
my $jobCount = scalar(@{$runningJobs});
$self->debug("There are $jobCount running jobs.");
return $jobCount;
}
#-------------------------------------------------------------------
=head2 debug ( output )
Prints out debug information if debug is enabled.
=head3
=cut
sub debug {
my $self = shift;
my $output = shift;
if ($self->{_debug}) {
print "WORKFLOW: ".$output."\n";
}
}
#-------------------------------------------------------------------
@ -131,6 +165,7 @@ Removes a workflow job from the processing queue.
sub deleteJob {
my ($self, $instanceId) = @_[OBJECT, ARG0];
$self->debug("Deleting workflow instance $instanceId from job queue.");
my $priority = $self->{_jobs}{$instanceId}{priority};
delete $self->{_jobs}{$instanceId};
for (my $i=0; $i < scalar(@{$self->{"_priority".$priority}}); $i++) {
@ -148,13 +183,16 @@ sub deleteJob {
sub getNextJob {
my $self = shift;
$self->debug("Looking for a workflow instance to execute.");
foreach my $priority (1..3) {
foreach my $job (@{$self->{"_priority".$priority}}) {
if ($job->{status} eq "waiting") {
$self->debug("Looks like ".$job->{instanceId}." would be a good workflow instance to run.");
return $job;
}
}
}
$self->debug("Didn't see any workflow instances to run.");
return undef;
}
@ -166,7 +204,8 @@ sub getNextJob {
sub loadWorkflows {
my ($kernel, $self, $config) = @_[KERNEL, OBJECT, ARG0];
my $session = WebGUI::Session->open($self->{_config}->getWebguiRoot, $config);
$self->debug("Loading workflows for ".$config.".");
my $session = WebGUI::Session->open($self->config->getWebguiRoot, $config);
my $result = $session->db->read("select * from WorkflowInstance");
while (my $data = $result->hashRef) {
$kernel->yield("addJob", $config, $data);
@ -176,7 +215,7 @@ sub loadWorkflows {
#-------------------------------------------------------------------
=head2 new ( config )
=head2 new ( config [ , debug ] )
Constructor. Loads all active workflows from each WebGUI site and begins executing them.
@ -184,12 +223,17 @@ Constructor. Loads all active workflows from each WebGUI site and begins executi
The path to the root of the WebGUI installation.
=head3 debug
A boolean indicating Spectre should spew forth debug as it runs.
=cut
sub new {
my $class = shift;
my $config = shift;
my $self = {_config=>$config};
my $debug = shift;
my $self = {_debug=>$debug, _config=>$config};
bless $self, $class;
my @publicEvents = qw(addJob deleteJob);
POE::Session->create(
@ -208,6 +252,7 @@ Calls a worker to execute a workflow activity.
sub runWorker {
my ($kernel, $self, $job, $session) = @_[KERNEL, OBJECT, ARG0, SESSION];
$self->debug("Preparing to run workflow instance ".$job->{instanceId}.".");
POE::Component::Client::UserAgent->new;
my $url = $job->{sitename}.'/'.$job->{gateway};
$url =~ s/\/\//\//g;
@ -216,18 +261,20 @@ sub runWorker {
'do'=>'runWorkflow',
instanceId=>$job->{instanceId},
};
my $cipher = Crypt::Blowfish->new($self->{_config}->get("cryptoKey"));
my $cipher = Crypt::Blowfish->new($self->config->get("cryptoKey"));
my $request = HTTP::Request->new(POST => $url, Content => { op=>"spectre", payload=>$cipher->encrypt(objToJson($payload)) });
my $cookie = $self->{_cookies}{$job->{sitename}};
$request->header("Cookie","wgSession=".$cookie) if (defined $cookie);
$request->header("User-Agent","Spectre");
$request->header("X-JobId",$job->{instanceId});
$self->debug("Posting workflow instance ".$job->{instanceId}.".");
$kernel->post( useragent => 'request', { request => $request, response => $session->postback('workerResponse') });
$self->debug("Workflow instance ".$job->{instanceId}." posted.");
}
#-------------------------------------------------------------------
=head2 suspendJob ( jobId ) {
=head2 suspendJob ( jobId )
This method puts a running job back into the available jobs pool thusly freeing up a slot in the running jobs pool. This is done when a job has executed a workflow activity, but the entire workflow has not yet completed.
@ -240,6 +287,7 @@ The job being suspended.
sub suspendJob {
my $self = shift;
my $instanceId = shift;
$self->debug("Suspending workflow instance ".$instanceId.".");
$self->{_jobs}{$instanceId}{status} = "waiting";
for (my $i=0; $i < scalar(@{$self->{_runningJobs}}); $i++) {
if ($self->{_runningJobs}[$i]{instanceId} eq $instanceId) {
@ -258,28 +306,44 @@ This method is called when the response from the runWorker() method is received.
sub workerResponse {
my $self = $_[OBJECT];
$self->debug("Retrieving response from workflow instance job.");
my ($request, $response, $entry) = @{$_[ARG1]};
my $jobId = $request->header("X-JobId"); # got to figure out how to get this from the request, cuz the response may die
$self->debug("Response retrieved is for $jobId.");
if ($response->is_success) {
$self->debug("Response for $jobId retrieved successfully.");
if ($response->header("Cookie") ne "") {
$self->debug("Storing cookie for $jobId for later use.");
my $cookie = $response->header("Set-Cookie");
$cookie =~ s/wgSession=([a-zA-Z0-9\_\-]{22})/$1/;
$self->{_cookies}{$self->{_jobs}{$jobId}{sitename}} = $cookie;
}
my $cipher = Crypt::Blowfish->new($self->{_config}->get("cryptoKey"));
my $cipher = Crypt::Blowfish->new($self->config->get("cryptoKey"));
my $payload = jsonToObj($cipher->decrypt($response->content));
my $state = $payload->{state};
if ($state eq "continue") {
$self->suspendJob($jobId);
} elsif ($state eq "done") {
if ($state eq "waiting") {
$self->debug("Was told to wait on $jobId because we're still waiting on some external event.");
$self->suspendJob($jobId, $self->config->get("waitTime"));
} elsif ($state eq "complete") {
$self->debug("Workflow instance $jobId ran one of it's activities successfully.");
} elsif ($state eq "disabled") {
$self->debug("Workflow instance $jobId is disabled.");
$self->deleteJob($jobId);
} elsif ($state eq "done") {
$self->debug("Workflow instance $jobId is now complete.");
$self->deleteJob($jobId);
} elsif ($state eq "error") {
$self->debug("Got an error for $jobId.");
$self->suspendJob($jobId, $self->config->get("waitTime"));
} else {
$self->suspendJob($jobId);
$self->debug("Something bad happened on the return of $jobId, so we're suspending it's run for a while.");
$self->suspendJob($jobId, $self->config->get("waitTime"));
# something bad happened
}
} elsif ($response->is_redirect) {
# nothing to do, cuz we're following the redirect to see what happens
$self->debug("Response for $jobId was redirected.");
} elsif ($response->is_error) {
$self->debug("Response for $jobId had a communications error.");
$self->suspendJob($jobId)
# we should probably log something
}

View file

@ -202,8 +202,10 @@ Executes the next iteration in this workflow. Returns a status code based upon w
undefined The workflow doesn't exist.
disabled The workflow is disabled.
complete Workflow has completely run it's course.
done Workflow has completely run it's course.
error Something bad happened. Try again later.
complete The activity completed successfully, you may run the next one.
waiting The activity is waiting on an external event such as user input.
=cut
@ -213,7 +215,7 @@ sub run {
return "undefined" unless (defined $workflow);
return "disabled" unless ($workflow->get("enabled"));
my $activity = $workflow->getNextActivity($self->get("currentActivity"));
return "complete" unless (defined $activity);
return "done" unless (defined $activity);
my $object = {};
my $class = $self->get("className");
my $method = $self->get("methodName");

View file

@ -20,14 +20,18 @@ $|=1; # disable output buffering
my $help;
my $shutdown;
my $daemon;
my $run;
my $debug;
GetOptions(
'help'=>\$help,
'shutdown'=>\$shutdown,
'daemon'=>\$daemon
'daemon'=>\$daemon,
'debug' =>\$debug,
'run' => \$run
);
if ($help || !($shutdown||$daemon)) {
if ($help || !($shutdown||$daemon||$run)) {
print <<STOP;
S.P.E.C.T.R.E. is the Supervisor of Perplexing Event-handling Contraptions for
@ -41,6 +45,12 @@ if ($help || !($shutdown||$daemon)) {
--daemon Starts the Spectre server.
--debug If specified at startup, Spectre will provide verbose
debug to standard out so that you can see exactly what
it's doing.
--run Starts Spectre without forking it as a daemon.
--shutdown Stops the running Spectre server.
STOP
@ -71,8 +81,10 @@ if ($shutdown) {
my $result = $remote->post('admin/shutdown');
die $POE::Component::IKC::ClientLite::error unless defined $result;
undef $remote;
} elsif ($run) {
Spectre::Admin->new($config, $debug);
} elsif ($daemon) {
fork and exit;
Spectre::Admin->new($config);
Spectre::Admin->new($config, $debug);
}