enhanced the status screen even more

This commit is contained in:
JT Smith 2007-02-02 04:50:15 +00:00
parent 533a128d52
commit 3122e9e58f

View file

@ -19,6 +19,7 @@ use HTTP::Request::Common;
use HTTP::Cookies;
use POE qw(Component::Client::HTTP);
use POE::Queue::Array;
use Tie::IxHash;
#-------------------------------------------------------------------
@ -78,6 +79,7 @@ The priority (1,2, or 3) that this instance should be run at.
sub addInstance {
my ($self, $instance) = @_[OBJECT, ARG0];
my $priority = ($instance->{priority} -1) * 10;
$instance->{lastState} = "never run";
$self->debug("Adding workflow instance ".$instance->{instanceId}." from ".$instance->{sitename}." to queue at priority ".$priority.".");
$self->getWaitingQueue->enqueue($priority, $instance);
}
@ -253,37 +255,52 @@ Returns a formatted text status report about the workflow engine.
sub getStatus {
my ($kernel, $request, $self) = @_[KERNEL,ARG0,OBJECT];
my $pattern = "\t%8.8s %-30.30s %-22.22s\n";
my $summaryPattern = "%24.24s %4d\n";
my $waiting = $self->getWaitingQueue;
my $waitingCount = $waiting->get_item_count;
my $output = sprintf $summaryPattern, "Workflows Waiting To Run", $waitingCount;
if ($waitingCount > 0) {
$output .= sprintf $pattern, "Priority", "Sitename", "Instance Id";
foreach my $item ($waiting->peek_items(sub {1})) {
my ($priority, $id, $instance) = @{$item};
$output .= sprintf $pattern, $priority, $instance->{sitename}, $instance->{instanceId};
my $pattern = "\t%8.8s %-30.30s %-22.22s %-15.15s\n";
my $summaryPattern = "%19.19s %4d\n";
my %queues = ();
tie %queues, 'Tie::IxHash';
%queues = (
"Suspended" => $self->getSuspendedQueue,
"Waiting" => $self->getWaitingQueue,
"Running" => $self->getRunningQueue,
);
my $total = 0;
my $output = "";
foreach my $queueName (keys %queues) {
my $queue = $queues{$queueName};
my $count = $queue->get_item_count;
$output .= sprintf $summaryPattern, $queueName." Workflows", $count;
if ($count > 0) {
$output .= sprintf $pattern, "Priority", "Sitename", "Instance Id", "Last State";
foreach my $item ($queue->peek_items(sub {1})) {
my ($priority, $id, $instance) = @{$item};
my $originalPriority = ($instance->{priority} - 1) * 10;
$output .= sprintf $pattern, $priority."/".$originalPriority, $instance->{sitename}, $instance->{instanceId}, $instance->{lastState};
}
$output .= "\n";
}
$output .= "\n";
$total += $count;
}
my $running = $self->getRunningQueue;
my $runningCount = $running->get_item_count;
$output .= sprintf $summaryPattern, "Running Workflows", $runningCount;
if ($runningCount > 0) {
$output .= sprintf $pattern, "Priority", "Sitename", "Instance Id";
foreach my $item ($running->peek_items(sub {1})) {
my ($priority, $id, $instance) = @{$item};
$output .= sprintf $pattern, $priority, $instance->{sitename}, $instance->{instanceId};
}
$output .= "\n";
}
$output .= sprintf $summaryPattern, "Total Workflows", ($runningCount + $waitingCount);
$output .= sprintf $summaryPattern, "Total Workflows", $total;
my ($data, $rsvp) = @$request;
$kernel->call(IKC=>post=>$rsvp,$output);
}
#-------------------------------------------------------------------
=head2 getSuspendedQueue ( )
Returns a reference to the queue of workflow instances that have been suspended due to error or wait timeouts.
=cut
sub getSuspendedQueue {
my $self = shift;
return $self->{_suspendedQueue};
}
#-------------------------------------------------------------------
=head2 getWaitingQueue ( )
Returns a reference to the queue of workflow instances waiting to run.
@ -336,6 +353,7 @@ sub new {
);
$self->{_runningQueue} = POE::Queue::Array->new;
$self->{_waitingQueue} = POE::Queue::Array->new;
$self->{_suspendedQueue} = POE::Queue::Array->new;
}
@ -372,6 +390,13 @@ Returns a workflow instance back to runnable queue.
sub returnInstanceToRunnableState {
my ($self, $instance) = @_[OBJECT, ARG0];
$self->debug("Returning ".$instance->{instanceId}." to runnable state.");
$self->getSuspendedQueue->remove_items(
sub {
my $payload = shift;
return 1 if ($payload->{instanceId} eq $instance->{instanceId});
return 0;
}
);
$self->getWaitingQueue->enqueue($instance->{workingPriority}+1, $instance);
}
@ -408,6 +433,7 @@ Suspends a workflow instance for a number of seconds defined in the config file,
sub suspendInstance {
my ($self, $instance, $kernel) = @_[OBJECT, ARG0, KERNEL];
$self->debug("Suspending workflow instance ".$instance->{instanceId}." for ".$self->config->get("suspensionDelay")." seconds.");
$self->getSuspendedQueue->enqueue("1", $instance);
$kernel->delay_set("returnInstanceToRunnableState",$self->config->get("suspensionDelay"), $instance);
}
@ -437,6 +463,7 @@ sub workerResponse {
$self->{_cookies}{$instance->{sitename}} = $cookie;
}
my $state = $response->content;
$instance->{lastState} = $state;
if ($state eq "waiting") {
$self->debug("Was told to wait on $instanceId because we're still waiting on some external event.");
$kernel->yield("suspendInstance",$instance);
@ -458,7 +485,9 @@ sub workerResponse {
}
} elsif ($response->is_redirect) {
$self->error("Response for $instanceId was redirected. This should never happen if configured properly!!!");
$instance->{lastState} = "redirect";
} elsif ($response->is_error) {
$instance->{lastState} = "comm error";
$self->error("Response for $instanceId had a communications error. ".$response->error_as_HTML);
$kernel->yield("suspendInstance",$instance)
}