diff --git a/lib/Spectre/Workflow.pm b/lib/Spectre/Workflow.pm index ce36ff9e5..7e2853126 100644 --- a/lib/Spectre/Workflow.pm +++ b/lib/Spectre/Workflow.pm @@ -20,6 +20,7 @@ use HTTP::Cookies; use POE qw(Component::Client::HTTP); use Tie::IxHash; use JSON qw/ encode_json /; +use Clone qw(clone); #------------------------------------------------------------------- @@ -83,9 +84,10 @@ sub addInstance { } else { $instance->{workingPriority} = ($instance->{priority} -1) * 10; - $instance->{lastState} = "never run"; - $self->debug("Adding workflow instance ".$instance->{instanceId}." from ".$instance->{sitename}." to queue at priority ".$priority."."); - $self->{queue}{$instance->{instanceId}} = $instance; + $instance->{lastState} = 'never run'; + $instance->{status} = 'waiting'; + $self->debug("Adding workflow instance ".$instance->{instanceId}." from ".$instance->{sitename}." to queue at priority ".$instance->{workingPriority}."."); + $self->{_queue}{$instance->{instanceId}} = $instance; } } @@ -133,7 +135,7 @@ Returns an integer representing the number of running instances. sub countRunningInstances { my $self = shift; my $count = 0; - foreach my $instance (values %{$self->{queue}}) { + foreach my $instance ($self->getInstances) { if ($instance->{status} eq 'running') { $count++; } @@ -174,7 +176,7 @@ Removes a workflow instance from the processing queue. sub deleteInstance { my ($self, $instanceId,$kernel, $session ) = @_[OBJECT, ARG0, KERNEL, SESSION]; $self->debug("Deleting workflow instance $instanceId from queue."); - delete $self->{queue}{$instanceId}; + delete $self->{_queue}{$instanceId}; } #------------------------------------------------------------------- @@ -207,19 +209,19 @@ sub editWorkflowPriority { $self->debug("Updating the priority of $instanceId to $newPriority."); my $instance = $self->getInstance($instanceId); - $instance->{priority} = $newPriority; - $instance->{workingPriority} = ($instance->{priority} -1) * 10; - $self->updateInstance($instance); - if (! $found) { + if (defined $instance) { + $instance->{priority} = $newPriority; + $instance->{workingPriority} = ($instance->{priority} -1) * 10; + $self->updateInstance($instance); + # return success message + $kernel->call(IKC=>post=>$rsvp, encode_json({message => 'edit priority success'})); + } + else { # return an error message my $error = 'edit priority instance not found error'; $kernel->call(IKC=>post=>$rsvp, encode_json({message => $error})); } - else { - # return success message - $kernel->call(IKC=>post=>$rsvp, encode_json({message => 'edit priority success'})); - } } #------------------------------------------------------------------- @@ -256,10 +258,24 @@ The id of the instance to retrieve. sub getInstance { my ($self, $instanceId) = @_; - return $self->{queue}{$instanceId}; + return clone($self->{_queue}{$instanceId}); } +#------------------------------------------------------------------- + +=head2 getInstances ( ) + +Returns the array of instances from the queue. + +=cut + +sub getInstances { + my ($self) = @_; + my @instances = values %{$self->{_queue}}; + return @{clone(\@instances)}; +} + #------------------------------------------------------------------- =head2 getJsonStatus ( ) @@ -271,7 +287,8 @@ Returns JSON report about the workflow engine. sub getJsonStatus { my ($kernel, $request, $self) = @_[KERNEL,ARG0,OBJECT]; my ($data, $rsvp) = @$request; - $kernel->call(IKC=>post=>$rsvp, encode_json($self->{queue})); + my @instances = $self->getInstances; + $kernel->call(IKC=>post=>$rsvp, encode_json(\@instances)); } #------------------------------------------------------------------- @@ -298,13 +315,14 @@ Returns the next available instance. sub getNextInstance { my $self = shift; $self->debug("Looking for a workflow instance to run."); - if (scalar(keys %{$self->{queue}}) > 0) { + my @instances = $self->getInstances; + if (scalar(@instances) > 0) { my $lowInstance = {}; my $lowPriority = 999999999999; my $waitingCount = 0; - foreach my $instance (values %{$self->{queue}}) { + foreach my $instance (@instances) { next unless $instance->{status} eq 'waiting'; - $waiting++; + $waitingCount++; if ($instance->{workingPriority} > $lowPriority) { $lowInstance = $instance; } @@ -333,7 +351,7 @@ sub getStatus { my $summaryPattern = "%19.19s %4d\n"; my $total = 0; my $output = sprintf $pattern, "Priority", "Status", "Sitename", "Instance Id", "Last State", "Last Run Time"; - foreach my $instance (values %{$self->{queue}}) { + foreach my $instance ($self->getInstances) { my $originalPriority = ($instance->{priority} - 1) * 10; my $priority = $instance->{workingPriority}."/".$originalPriority; $output .= sprintf $pattern, $priority, $instance->{status}, $instance->{sitename}, $instance->{instanceId}, $instance->{lastState}, $instance->{lastRunTime}; @@ -383,7 +401,7 @@ sub new { Alias => 'workflow-ua', CookieJar => $cookies ); - $self->{queue} = {}; + $self->{_queue} = {}; } @@ -414,7 +432,7 @@ sub runWorker { my ($kernel, $self, $instance, $session) = @_[KERNEL, OBJECT, ARG0, SESSION]; $self->debug("Preparing to run workflow instance ".$instance->{instanceId}."."); $self->debug("Incrementing ".$instance->{instanceId}." priority from ".$instance->{workingPriority}); - $instance->{workingPriority} = $priority + 1; + $instance->{workingPriority}++; $instance->{status} = 'running'; $self->updateInstance($instance); my $url = "http://".$instance->{sitename}.':'.$self->config->get("webguiPort").$instance->{gateway}; @@ -437,7 +455,7 @@ Suspends a workflow instance for a number of seconds defined in the config file, =cut sub suspendInstance { - my ($self, $instance, $waitTimeout) = @_[OBJECT, ARG0, ARG1]; + my ($self, $kernel, $instance, $waitTimeout) = @_[OBJECT, KERNEL, ARG0, ARG1]; $waitTimeout ||= $self->config->get("suspensionDelay"); $self->debug("Suspending workflow instance ".$instance->{instanceId}." for ".$waitTimeout." seconds."); $instance->{status} = 'suspended'; @@ -460,7 +478,7 @@ A hash reference of the properties of the instance. sub updateInstance { my ($self, $instance) = @_; $self->debug("Updating ".$instance->{instanceId}."'s properties."); - $self->{queue}{$instance->{instanceId}}; + $self->{_queue}{$instance->{instanceId}}; } @@ -494,7 +512,7 @@ sub workerResponse { $instance->{lastRunTime} = localtime(time()); if ($state =~ m/^waiting\s*(\d+)?$/) { my $waitTime = $1; - $self->debug("Was told to wait on $instanceId because we're still waiting on some external event."); + $self->debug("Was told to suspend $instanceId because we're still waiting on some external event."); $kernel->yield("suspendInstance",$instance, $waitTime); } elsif ($state eq "complete") {