converted to hash queue

This commit is contained in:
JT Smith 2009-11-03 14:20:29 -06:00
parent 4e00b2fa03
commit d14a93f399

View file

@ -20,6 +20,7 @@ use HTTP::Cookies;
use POE qw(Component::Client::HTTP);
use Tie::IxHash;
use JSON qw/ encode_json /;
use Clone qw(clone);
#-------------------------------------------------------------------
@ -83,9 +84,10 @@ sub addInstance {
}
else {
$instance->{workingPriority} = ($instance->{priority} -1) * 10;
$instance->{lastState} = "never run";
$self->debug("Adding workflow instance ".$instance->{instanceId}." from ".$instance->{sitename}." to queue at priority ".$priority.".");
$self->{queue}{$instance->{instanceId}} = $instance;
$instance->{lastState} = 'never run';
$instance->{status} = 'waiting';
$self->debug("Adding workflow instance ".$instance->{instanceId}." from ".$instance->{sitename}." to queue at priority ".$instance->{workingPriority}.".");
$self->{_queue}{$instance->{instanceId}} = $instance;
}
}
@ -133,7 +135,7 @@ Returns an integer representing the number of running instances.
sub countRunningInstances {
my $self = shift;
my $count = 0;
foreach my $instance (values %{$self->{queue}}) {
foreach my $instance ($self->getInstances) {
if ($instance->{status} eq 'running') {
$count++;
}
@ -174,7 +176,7 @@ Removes a workflow instance from the processing queue.
sub deleteInstance {
my ($self, $instanceId,$kernel, $session ) = @_[OBJECT, ARG0, KERNEL, SESSION];
$self->debug("Deleting workflow instance $instanceId from queue.");
delete $self->{queue}{$instanceId};
delete $self->{_queue}{$instanceId};
}
#-------------------------------------------------------------------
@ -207,19 +209,19 @@ sub editWorkflowPriority {
$self->debug("Updating the priority of $instanceId to $newPriority.");
my $instance = $self->getInstance($instanceId);
$instance->{priority} = $newPriority;
$instance->{workingPriority} = ($instance->{priority} -1) * 10;
$self->updateInstance($instance);
if (! $found) {
if (defined $instance) {
$instance->{priority} = $newPriority;
$instance->{workingPriority} = ($instance->{priority} -1) * 10;
$self->updateInstance($instance);
# return success message
$kernel->call(IKC=>post=>$rsvp, encode_json({message => 'edit priority success'}));
}
else {
# return an error message
my $error = 'edit priority instance not found error';
$kernel->call(IKC=>post=>$rsvp, encode_json({message => $error}));
}
else {
# return success message
$kernel->call(IKC=>post=>$rsvp, encode_json({message => 'edit priority success'}));
}
}
#-------------------------------------------------------------------
@ -256,10 +258,24 @@ The id of the instance to retrieve.
sub getInstance {
my ($self, $instanceId) = @_;
return $self->{queue}{$instanceId};
return clone($self->{_queue}{$instanceId});
}
#-------------------------------------------------------------------
=head2 getInstances ( )
Returns the array of instances from the queue.
=cut
sub getInstances {
my ($self) = @_;
my @instances = values %{$self->{_queue}};
return @{clone(\@instances)};
}
#-------------------------------------------------------------------
=head2 getJsonStatus ( )
@ -271,7 +287,8 @@ Returns JSON report about the workflow engine.
sub getJsonStatus {
my ($kernel, $request, $self) = @_[KERNEL,ARG0,OBJECT];
my ($data, $rsvp) = @$request;
$kernel->call(IKC=>post=>$rsvp, encode_json($self->{queue}));
my @instances = $self->getInstances;
$kernel->call(IKC=>post=>$rsvp, encode_json(\@instances));
}
#-------------------------------------------------------------------
@ -298,13 +315,14 @@ Returns the next available instance.
sub getNextInstance {
my $self = shift;
$self->debug("Looking for a workflow instance to run.");
if (scalar(keys %{$self->{queue}}) > 0) {
my @instances = $self->getInstances;
if (scalar(@instances) > 0) {
my $lowInstance = {};
my $lowPriority = 999999999999;
my $waitingCount = 0;
foreach my $instance (values %{$self->{queue}}) {
foreach my $instance (@instances) {
next unless $instance->{status} eq 'waiting';
$waiting++;
$waitingCount++;
if ($instance->{workingPriority} > $lowPriority) {
$lowInstance = $instance;
}
@ -333,7 +351,7 @@ sub getStatus {
my $summaryPattern = "%19.19s %4d\n";
my $total = 0;
my $output = sprintf $pattern, "Priority", "Status", "Sitename", "Instance Id", "Last State", "Last Run Time";
foreach my $instance (values %{$self->{queue}}) {
foreach my $instance ($self->getInstances) {
my $originalPriority = ($instance->{priority} - 1) * 10;
my $priority = $instance->{workingPriority}."/".$originalPriority;
$output .= sprintf $pattern, $priority, $instance->{status}, $instance->{sitename}, $instance->{instanceId}, $instance->{lastState}, $instance->{lastRunTime};
@ -383,7 +401,7 @@ sub new {
Alias => 'workflow-ua',
CookieJar => $cookies
);
$self->{queue} = {};
$self->{_queue} = {};
}
@ -414,7 +432,7 @@ sub runWorker {
my ($kernel, $self, $instance, $session) = @_[KERNEL, OBJECT, ARG0, SESSION];
$self->debug("Preparing to run workflow instance ".$instance->{instanceId}.".");
$self->debug("Incrementing ".$instance->{instanceId}." priority from ".$instance->{workingPriority});
$instance->{workingPriority} = $priority + 1;
$instance->{workingPriority}++;
$instance->{status} = 'running';
$self->updateInstance($instance);
my $url = "http://".$instance->{sitename}.':'.$self->config->get("webguiPort").$instance->{gateway};
@ -437,7 +455,7 @@ Suspends a workflow instance for a number of seconds defined in the config file,
=cut
sub suspendInstance {
my ($self, $instance, $waitTimeout) = @_[OBJECT, ARG0, ARG1];
my ($self, $kernel, $instance, $waitTimeout) = @_[OBJECT, KERNEL, ARG0, ARG1];
$waitTimeout ||= $self->config->get("suspensionDelay");
$self->debug("Suspending workflow instance ".$instance->{instanceId}." for ".$waitTimeout." seconds.");
$instance->{status} = 'suspended';
@ -460,7 +478,7 @@ A hash reference of the properties of the instance.
sub updateInstance {
my ($self, $instance) = @_;
$self->debug("Updating ".$instance->{instanceId}."'s properties.");
$self->{queue}{$instance->{instanceId}};
$self->{_queue}{$instance->{instanceId}};
}
@ -494,7 +512,7 @@ sub workerResponse {
$instance->{lastRunTime} = localtime(time());
if ($state =~ m/^waiting\s*(\d+)?$/) {
my $waitTime = $1;
$self->debug("Was told to wait on $instanceId because we're still waiting on some external event.");
$self->debug("Was told to suspend $instanceId because we're still waiting on some external event.");
$kernel->yield("suspendInstance",$instance, $waitTime);
}
elsif ($state eq "complete") {