diff --git a/lib/Spectre/Cron.pm b/lib/Spectre/Cron.pm
index d27416dae..8835c9baf 100644
--- a/lib/Spectre/Cron.pm
+++ b/lib/Spectre/Cron.pm
@@ -19,6 +19,7 @@ use DateTime;
use HTTP::Request::Common;
use HTTP::Cookies;
use POE qw(Component::Client::HTTP);
+use JSON 'objToJson';
#-------------------------------------------------------------------
@@ -29,12 +30,12 @@ Initializes the scheduler.
=cut
sub _start {
- my ($kernel, $self, $publicEvents) = @_[ KERNEL, OBJECT, ARG0 ];
- $self->debug("Starting Spectre scheduler.");
- my $serviceName = "cron";
- $kernel->alias_set($serviceName);
- $kernel->call( IKC => publish => $serviceName, $publicEvents );
- $kernel->yield("checkSchedules");
+ my ($kernel, $self, $publicEvents) = @_[ KERNEL, OBJECT, ARG0 ];
+ $self->debug("Starting Spectre scheduler.");
+ my $serviceName = "cron";
+ $kernel->alias_set($serviceName);
+ $kernel->call( IKC => publish => $serviceName, $publicEvents );
+ $kernel->yield("checkSchedules");
}
#-------------------------------------------------------------------
@@ -46,9 +47,9 @@ Gracefully shuts down the scheduler.
=cut
sub _stop {
- my ($kernel, $self) = @_[KERNEL, OBJECT];
- $self->debug("Stopping the scheduler.");
- undef $self;
+ my ($kernel, $self) = @_[KERNEL, OBJECT];
+ $self->debug("Stopping the scheduler.");
+ undef $self;
}
#-------------------------------------------------------------------
@@ -318,7 +319,27 @@ sub getJob {
#-------------------------------------------------------------------
-=head3 getLogger ( )
+=head2 getJsonStatus ( )
+
+Returns JSON of the jobs.
+
+=cut
+
+sub getJsonStatus {
+ my ($kernel, $request, $self) = @_[KERNEL,ARG0,OBJECT];
+ my ($sitename, $rsvp) = @$request;
+ my %data = ();
+ for my $key (keys %{ $self->{_jobs} }) {
+ next unless $self->{_jobs}->{$key}->{sitename} eq $sitename;
+ $data{$key} = $self->{_jobs}->{$key};
+ }
+ $kernel->call(IKC => post => $rsvp, objToJson(\%data));
+}
+
+
+#-------------------------------------------------------------------
+
+=head2 getLogger ( )
Returns a reference to the logger.
@@ -357,7 +378,7 @@ sub new {
my $debug = shift;
my $self = {_jobs=>{}, _debug=>$debug, _config=>$config, _logger=>$logger};
bless $self, $class;
- my @publicEvents = qw(runJob runJobResponse addJob deleteJob);
+ my @publicEvents = qw(runJob runJobResponse addJob deleteJob getJsonStatus);
POE::Session->create(
object_states => [ $self => [qw(_start _stop runJob runJobResponse addJob deleteJob checkSchedules checkSchedule), @publicEvents] ],
args=>[\@publicEvents]
diff --git a/lib/Spectre/Workflow.pm b/lib/Spectre/Workflow.pm
index eedcfa599..1b4b23093 100644
--- a/lib/Spectre/Workflow.pm
+++ b/lib/Spectre/Workflow.pm
@@ -20,6 +20,7 @@ use HTTP::Cookies;
use POE qw(Component::Client::HTTP);
use POE::Queue::Array;
use Tie::IxHash;
+use JSON 'objToJson';
#-------------------------------------------------------------------
@@ -177,6 +178,66 @@ sub deleteInstance {
#-------------------------------------------------------------------
+=head2 editWorkflowPriority ( href )
+
+Updates the priority of a given workflow instance.
+
+=head3 href
+
+Contains information about the instance and the new priority.
+
+=head4 instanceId
+
+The id of the instance to update.
+
+=head4 newPriority
+
+The new priority value.
+
+=cut
+
+sub editWorkflowPriority {
+ my ($self, $request, $kernel, $session ) = @_[OBJECT, ARG0, KERNEL, SESSION];
+ my ($argsHref, $rsvp) = @$request;
+
+ my $instanceId = $argsHref->{instanceId};
+ my $newPriority = $argsHref->{newPriority};
+
+ $self->debug("Updating the priority of $instanceId to $newPriority.");
+
+ # I'm guessing that the payload can't change queues on us
+ my $found = 0;
+ my $filterCref = sub { shift->{instanceId} eq $instanceId };
+ for my $getQueueMethod (map "get${_}Queue", qw( Suspended Waiting Running )) {
+ my $q = $self->$getQueueMethod;
+ my($itemAref) = $q->peek_items($filterCref); # there should be only one
+
+ next unless (ref $itemAref eq 'ARRAY' and @$itemAref);
+
+ my($priority, $id, $payload) = @$itemAref;
+ my $ackPriority = $q->set_priority($id, $filterCref, $newPriority);
+ if ($ackPriority != $newPriority) {
+ # return an error
+ my $error = 'edit priority setting error';
+ $kernel->call(IKC=>post=>$rsvp, objToJson({message => $error}));
+ }
+ $found = 1;
+ last;
+ }
+
+ if (! $found) {
+ # return an error message
+ my $error = 'edit priority instance not found error';
+ $kernel->call(IKC=>post=>$rsvp, objToJson({message => $error}));
+ }
+ else {
+ # return success message
+ $kernel->call(IKC=>post=>$rsvp, objToJson({message => 'edit priority success'}));
+ }
+}
+
+#-------------------------------------------------------------------
+
=head2 error ( output )
Prints out error information if debug is enabled.
@@ -198,6 +259,44 @@ sub error {
#-------------------------------------------------------------------
+=head2 getJsonStatus ( )
+
+Returns JSON report about the workflow engine.
+
+=cut
+
+sub getJsonStatus {
+ my ($kernel, $request, $self) = @_[KERNEL,ARG0,OBJECT];
+ my ($sitename, $rsvp) = @$request;
+
+ # only return this site's info
+ return $kernel->call(IKC=>post=>$rsvp, '{}') unless $sitename;
+
+ my %queues = ();
+ tie %queues, 'Tie::IxHash';
+ %queues = (
+ Suspended => $self->getSuspendedQueue,
+ Waiting => $self->getWaitingQueue,
+ Running => $self->getRunningQueue,
+ );
+ my %output = ();
+ foreach my $queueName (keys %queues) {
+ my $queue = $queues{$queueName};
+ my $count = $queue->get_item_count;
+ my @instances;
+ if ($count > 0) {
+ foreach my $itemAref ($queue->peek_items(sub { shift()->{sitename} eq $sitename })) {
+ push @instances, $itemAref;
+ }
+ }
+ $output{$queueName} = \@instances;
+ }
+
+ $kernel->call(IKC=>post=>$rsvp, objToJson(\%output));
+}
+
+#-------------------------------------------------------------------
+
=head2 getLogger ( )
Returns a reference to the logger.
@@ -340,7 +439,7 @@ sub new {
my $debug = shift;
my $self = {_debug=>$debug, _config=>$config, _logger=>$logger};
bless $self, $class;
- my @publicEvents = qw(addInstance deleteInstance getStatus);
+ my @publicEvents = qw(addInstance deleteInstance editWorkflowPriority getStatus getJsonStatus);
POE::Session->create(
object_states => [ $self => [qw(_start _stop returnInstanceToRunnableState addInstance checkInstances deleteInstance suspendInstance runWorker workerResponse), @publicEvents] ],
args=>[\@publicEvents]
@@ -431,10 +530,11 @@ Suspends a workflow instance for a number of seconds defined in the config file,
=cut
sub suspendInstance {
- my ($self, $instance, $kernel) = @_[OBJECT, ARG0, KERNEL];
- $self->debug("Suspending workflow instance ".$instance->{instanceId}." for ".$self->config->get("suspensionDelay")." seconds.");
- $self->getSuspendedQueue->enqueue("1", $instance);
- $kernel->delay_set("returnInstanceToRunnableState",$self->config->get("suspensionDelay"), $instance);
+ my ($self, $instance, $kernel) = @_[OBJECT, ARG0, KERNEL];
+ $self->debug("Suspending workflow instance ".$instance->{instanceId}." for ".$self->config->get("suspensionDelay")." seconds.");
+ my $priority = ($instance->{priority} - 1) * 10;
+ $self->getSuspendedQueue->enqueue($priority, $instance);
+ $kernel->delay_set("returnInstanceToRunnableState",$self->config->get("suspensionDelay"), $instance);
}
#-------------------------------------------------------------------
diff --git a/lib/WebGUI/AdminConsole.pm b/lib/WebGUI/AdminConsole.pm
index d52e5a542..cdcb7fcf7 100644
--- a/lib/WebGUI/AdminConsole.pm
+++ b/lib/WebGUI/AdminConsole.pm
@@ -182,6 +182,15 @@ sub getAdminFunction {
my $self = shift;
my $id = shift;
my $functions = { # at some point in the future we'll need to make this pluggable/configurable
+ "spectre"=>{
+ title=>{
+ id=>"spectre",
+ namespace=>"Spectre"
+ },
+ icon=>"spectre.gif",
+ op=>"spectreStatus",
+ group=>"3"
+ },
"assets"=>{
title=>{
id=>"assets",
diff --git a/lib/WebGUI/Operation.pm b/lib/WebGUI/Operation.pm
index c85d0b041..6ce8d47bf 100644
--- a/lib/WebGUI/Operation.pm
+++ b/lib/WebGUI/Operation.pm
@@ -227,6 +227,7 @@ sub getOperations {
'spectreGetSiteData' => 'WebGUI::Operation::Spectre',
'spectreTest' => 'WebGUI::Operation::Spectre',
+ 'spectreStatus' => 'WebGUI::Operation::Spectre',
'ssoViaSessionId' => 'WebGUI::Operation::SSO',
diff --git a/lib/WebGUI/Operation/Spectre.pm b/lib/WebGUI/Operation/Spectre.pm
index 2c3d0a724..7eebeccd0 100644
--- a/lib/WebGUI/Operation/Spectre.pm
+++ b/lib/WebGUI/Operation/Spectre.pm
@@ -81,6 +81,70 @@ sub www_spectreGetSiteData {
return JSON::objToJson(\%siteData,{autoconv=>0, skipinvalid=>1});
}
+#-------------------------------------------------------------------
+
+=head2 www_spectreStatus ( )
+
+Show information about Spectre's current workload.
+
+=cut
+
+sub www_spectreStatus {
+ my $session = shift;
+
+ return $session->privilege->adminOnly() unless $session->user->isInGroup(3);
+
+ # start to prepare the display
+ my $ac = WebGUI::AdminConsole->new($session, 'spectre');
+ my $i18n = WebGUI::International->new($session, 'Spectre');
+
+ $session->http->setCacheControl("none");
+ unless (isInSubnet($session->env->get("REMOTE_ADDR"), $session->config->get("spectreSubnets"))) {
+ $session->errorHandler->security("make a Spectre workflow runner request, but we're only allowed to accept requests from ".join(",",@{$session->config->get("spectreSubnets")}).".");
+ return "subnet";
+ }
+
+ my $remote = create_ikc_client(
+ port=>$session->config->get("spectrePort"),
+ ip=>$session->config->get("spectreIp"),
+ name=>rand(100000),
+ timeout=>10
+ );
+
+ if (!$remote) {
+ return $ac->render($i18n->get('not running'), $i18n->get('spectre'));
+ }
+
+ my $sitename = $session->config()->get('sitename')->[0];
+ my $workflowResult = $remote->post_respond('workflow/getJsonStatus',$sitename);
+ if (!$workflowResult) {
+ $remote->disconnect();
+ return $ac->render($i18n->get('workflow status error'), $i18n->get('spectre'));
+ }
+
+ my $cronResult = $remote->post_respond('cron/getJsonStatus',$sitename);
+ if (! defined $cronResult) {
+ $remote->disconnect();
+ return $ac->render($i18n->get('cron status error'), $i18n->get('spectre'));
+ }
+
+ my %data = (
+ workflow => jsonToObj($workflowResult),
+ cron => jsonToObj($cronResult),
+ );
+
+ my $workflowCount = @{ $data{workflow}{Suspended} } + @{ $data{workflow}{Waiting} } + @{ $data{workflow}{Running} };
+ my $workflowUrl = $session->url->page('op=showRunningWorkflows');
+ my $cronCount = keys %{ $data{cron} };
+ my $cronUrl = $session->url->page('op=manageCron');
+
+ my $output = $i18n->get('running').'
';
+ $output .= sprintf $i18n->get('workflow header'), $workflowUrl, $workflowCount;
+ $output .= sprintf $i18n->get('cron header'), $cronUrl, $cronCount;
+
+ return $ac->render($output, $i18n->get('spectre'));
+}
+
#-------------------------------------------------------------------
=head2 www_spectreTest ( )
diff --git a/lib/WebGUI/Operation/Workflow.pm b/lib/WebGUI/Operation/Workflow.pm
index df46776ae..8fdb13d50 100644
--- a/lib/WebGUI/Operation/Workflow.pm
+++ b/lib/WebGUI/Operation/Workflow.pm
@@ -19,6 +19,8 @@ use WebGUI::Workflow;
use WebGUI::Workflow::Activity;
use WebGUI::Workflow::Instance;
use WebGUI::Utility;
+use POE::Component::IKC::ClientLite;
+use JSON 'jsonToObj';
=head1 NAME
@@ -231,6 +233,60 @@ sub www_editWorkflow {
return $ac->render($f->print.$addmenu.$steps, 'edit workflow');
}
+#-------------------------------------------------------------------
+
+=head2 www_editWorkflowPriority ( )
+
+Save the submitted new workflow priority.
+
+=cut
+
+sub www_editWorkflowPriority {
+ my $session = shift;
+
+ return $session->privilege->insufficient() unless $session->user->isInGroup(3);
+
+ my $i18n = WebGUI::International->new($session, 'Workflow');
+ my $ac = WebGUI::AdminConsole->new($session,"workflow");
+ $ac->addSubmenuItem($session->url->page("op=showRunningWorkflows"), $i18n->get('show running workflows'));
+ $ac->setHelp('manage workflows', 'Workflow');
+
+ # make sure the input is good
+ my $instanceId = $session->form->get('instanceId') || '';
+ my $newPriority = $session->form->get('newPriority') || '';
+ if (! $instanceId) {
+ my $output = $i18n->get('edit priority bad request');
+ return $ac->render($output, $i18n->get('show running workflows'));
+ }
+
+ # make the request
+ my $remote = create_ikc_client(
+ port=>$session->config->get("spectrePort"),
+ ip=>$session->config->get("spectreIp"),
+ name=>rand(100000),
+ timeout=>10
+ );
+ if (! $remote) {
+ my $output = $i18n->get('edit priority no spectre error');
+ return $ac->render($output, $i18n->get('show running workflows'));
+ }
+
+ my $argHref = {
+ instanceId => $instanceId,
+ newPriority => $newPriority,
+ };
+ my $resultJson = $remote->post_respond('workflow/editWorkflowPriority', $argHref);
+ if (! defined $resultJson) {
+ $remote->disconnect();
+ my $output = $i18n->get('edit priority no info error');
+ return $ac->render($output, $i18n->get('show running workflows'));
+ }
+
+ my $responseHref = jsonToObj($resultJson);
+
+ my $message = $i18n->get($responseHref->{message}) || $i18n->get('edit priority unknown error');
+ return $ac->render($message, $i18n->get('show running workflows'));
+}
#-------------------------------------------------------------------
@@ -397,39 +453,132 @@ Display a list of the running workflow instances.
=cut
sub www_showRunningWorkflows {
- my $session = shift;
- return $session->privilege->insufficient() unless ($session->user->isInGroup("pbgroup000000000000015"));
- my $i18n = WebGUI::International->new($session, "Workflow");
- my $output = '
| '.$title.' | ' - .''.$session->datetime->epochToHuman($runningSince).' | '; - if ($status) { - $output .= '' - .$status.' / '.$session->datetime->epochToHuman($lastUpdate) - .' | '; - } - $output .= ''.$i18n->get("run").' | ' if ($isAdmin); - $output .= "
| $titleHeader | $priorityHeader | $activityHeader | "; + $output .= "$lastStateHeader | $lastRunTimeHeader | |
|---|---|---|---|---|---|
| $title | "; + $output .= qq[$priority/$originalPriority | ]; + $output .= "$lastActivity | "; + $output .= "$instance->{lastState} | "; + $output .= "$lastRunTime | "; + + if ($isAdmin) { + my $run = $i18n->get('run'); + my $href = $session->url->page(qq[op=runWorkflow;instanceId=$instanceId]); + $output .= qq[$run | ]; + } + $output .= "
This screen can help you debug problems with workflows by showing which workflows are currently running. The workflows are shown in a table with the name of the workflow, the date it started running. If the workflow has a defined status, then that status will also be shown, along with the date the workflow's status was last updated.
-The screen will not automatically update. To update the list of running workflows, reload the page.
+This screen can help you debug problems with workflows by showing which workflows are currently running. The workflows are grouped by status, with their names, current and original priorities, current activities (if any), last state, and when the workflow was last run.
+You can edit the priority of workflows by clicking on the priority links and submitting the form that appears. You can also run a workflow by clicking the "Run" link in the right column of the table, if present.
+The screen will not automatically update. To update the list of running workflows, reload the page.
|, lastUpdated => 1151719633, }, @@ -195,6 +196,108 @@ and add activities to it. lastUpdated => 1151721687, }, + 'edit priority success' => { + message => q|Workflow priority updated successfully.|, + context => q||, + lastUpdated => 0, + }, + + 'edit priority instance not found error' => { + message => q|I could not find that workflow. Perhaps it's finished running.|, + context => q||, + lastUpdated => 0, + }, + + 'edit priority cancel' => { + message => q|cancel|, + context => q||, + lastUpdated => 0, + }, + + 'edit priority update priority' => { + message => q|Update Priority|, + context => q||, + lastUpdated => 0, + }, + + 'spectre not running error' => { + message => q|Spectre is not running.