diff --git a/lib/Spectre/Workflow.pm b/lib/Spectre/Workflow.pm index a2e12b6a9..77eb7b94b 100644 --- a/lib/Spectre/Workflow.pm +++ b/lib/Spectre/Workflow.pm @@ -19,6 +19,7 @@ use HTTP::Request::Common; use HTTP::Cookies; use POE qw(Component::Client::HTTP); use POE::Queue::Array; +use Tie::IxHash; #------------------------------------------------------------------- @@ -78,6 +79,7 @@ The priority (1,2, or 3) that this instance should be run at. sub addInstance { my ($self, $instance) = @_[OBJECT, ARG0]; my $priority = ($instance->{priority} -1) * 10; + $instance->{lastState} = "never run"; $self->debug("Adding workflow instance ".$instance->{instanceId}." from ".$instance->{sitename}." to queue at priority ".$priority."."); $self->getWaitingQueue->enqueue($priority, $instance); } @@ -253,37 +255,52 @@ Returns a formatted text status report about the workflow engine. sub getStatus { my ($kernel, $request, $self) = @_[KERNEL,ARG0,OBJECT]; - my $pattern = "\t%8.8s %-30.30s %-22.22s\n"; - my $summaryPattern = "%24.24s %4d\n"; - my $waiting = $self->getWaitingQueue; - my $waitingCount = $waiting->get_item_count; - my $output = sprintf $summaryPattern, "Workflows Waiting To Run", $waitingCount; - if ($waitingCount > 0) { - $output .= sprintf $pattern, "Priority", "Sitename", "Instance Id"; - foreach my $item ($waiting->peek_items(sub {1})) { - my ($priority, $id, $instance) = @{$item}; - $output .= sprintf $pattern, $priority, $instance->{sitename}, $instance->{instanceId}; + my $pattern = "\t%8.8s %-30.30s %-22.22s %-15.15s\n"; + my $summaryPattern = "%19.19s %4d\n"; + my %queues = (); + tie %queues, 'Tie::IxHash'; + %queues = ( + "Suspended" => $self->getSuspendedQueue, + "Waiting" => $self->getWaitingQueue, + "Running" => $self->getRunningQueue, + ); + my $total = 0; + my $output = ""; + foreach my $queueName (keys %queues) { + my $queue = $queues{$queueName}; + my $count = $queue->get_item_count; + $output .= sprintf $summaryPattern, $queueName." Workflows", $count; + if ($count > 0) { + $output .= sprintf $pattern, "Priority", "Sitename", "Instance Id", "Last State"; + foreach my $item ($queue->peek_items(sub {1})) { + my ($priority, $id, $instance) = @{$item}; + my $originalPriority = ($instance->{priority} - 1) * 10; + $output .= sprintf $pattern, $priority."/".$originalPriority, $instance->{sitename}, $instance->{instanceId}, $instance->{lastState}; + } + $output .= "\n"; } - $output .= "\n"; + $total += $count; } - my $running = $self->getRunningQueue; - my $runningCount = $running->get_item_count; - $output .= sprintf $summaryPattern, "Running Workflows", $runningCount; - if ($runningCount > 0) { - $output .= sprintf $pattern, "Priority", "Sitename", "Instance Id"; - foreach my $item ($running->peek_items(sub {1})) { - my ($priority, $id, $instance) = @{$item}; - $output .= sprintf $pattern, $priority, $instance->{sitename}, $instance->{instanceId}; - } - $output .= "\n"; - } - $output .= sprintf $summaryPattern, "Total Workflows", ($runningCount + $waitingCount); + $output .= sprintf $summaryPattern, "Total Workflows", $total; my ($data, $rsvp) = @$request; $kernel->call(IKC=>post=>$rsvp,$output); } #------------------------------------------------------------------- +=head2 getSuspendedQueue ( ) + +Returns a reference to the queue of workflow instances that have been suspended due to error or wait timeouts. + +=cut + +sub getSuspendedQueue { + my $self = shift; + return $self->{_suspendedQueue}; +} + +#------------------------------------------------------------------- + =head2 getWaitingQueue ( ) Returns a reference to the queue of workflow instances waiting to run. @@ -336,6 +353,7 @@ sub new { ); $self->{_runningQueue} = POE::Queue::Array->new; $self->{_waitingQueue} = POE::Queue::Array->new; + $self->{_suspendedQueue} = POE::Queue::Array->new; } @@ -372,6 +390,13 @@ Returns a workflow instance back to runnable queue. sub returnInstanceToRunnableState { my ($self, $instance) = @_[OBJECT, ARG0]; $self->debug("Returning ".$instance->{instanceId}." to runnable state."); + $self->getSuspendedQueue->remove_items( + sub { + my $payload = shift; + return 1 if ($payload->{instanceId} eq $instance->{instanceId}); + return 0; + } + ); $self->getWaitingQueue->enqueue($instance->{workingPriority}+1, $instance); } @@ -408,6 +433,7 @@ Suspends a workflow instance for a number of seconds defined in the config file, sub suspendInstance { my ($self, $instance, $kernel) = @_[OBJECT, ARG0, KERNEL]; $self->debug("Suspending workflow instance ".$instance->{instanceId}." for ".$self->config->get("suspensionDelay")." seconds."); + $self->getSuspendedQueue->enqueue("1", $instance); $kernel->delay_set("returnInstanceToRunnableState",$self->config->get("suspensionDelay"), $instance); } @@ -437,6 +463,7 @@ sub workerResponse { $self->{_cookies}{$instance->{sitename}} = $cookie; } my $state = $response->content; + $instance->{lastState} = $state; if ($state eq "waiting") { $self->debug("Was told to wait on $instanceId because we're still waiting on some external event."); $kernel->yield("suspendInstance",$instance); @@ -458,7 +485,9 @@ sub workerResponse { } } elsif ($response->is_redirect) { $self->error("Response for $instanceId was redirected. This should never happen if configured properly!!!"); + $instance->{lastState} = "redirect"; } elsif ($response->is_error) { + $instance->{lastState} = "comm error"; $self->error("Response for $instanceId had a communications error. ".$response->error_as_HTML); $kernel->yield("suspendInstance",$instance) }