From 630f1b0537f59cff2ae2a5cab1b6b868bda86744 Mon Sep 17 00:00:00 2001 From: JT Smith Date: Tue, 3 Nov 2009 12:04:24 -0600 Subject: [PATCH] changed queue to an array --- lib/Spectre/Workflow.pm | 350 +++++++++++++---------------------- lib/WebGUI/Operation/Cron.pm | 4 +- 2 files changed, 128 insertions(+), 226 deletions(-) diff --git a/lib/Spectre/Workflow.pm b/lib/Spectre/Workflow.pm index fccb983f3..e1b76c26d 100644 --- a/lib/Spectre/Workflow.pm +++ b/lib/Spectre/Workflow.pm @@ -18,7 +18,6 @@ use strict; use HTTP::Request::Common; use HTTP::Cookies; use POE qw(Component::Client::HTTP); -use POE::Queue::Array; use Tie::IxHash; use JSON qw/ encode_json /; @@ -31,12 +30,12 @@ Initializes the workflow manager. =cut sub _start { - my ( $kernel, $self, $publicEvents) = @_[ KERNEL, OBJECT, ARG0 ]; + my ( $kernel, $self, $publicEvents) = @_[ KERNEL, OBJECT, ARG0 ]; $self->debug("Starting workflow manager."); - my $serviceName = "workflow"; - $kernel->alias_set($serviceName); - $kernel->call( IKC => publish => $serviceName, $publicEvents ); - $kernel->yield("checkInstances"); + my $serviceName = "workflow"; + $kernel->alias_set($serviceName); + $kernel->call( IKC => publish => $serviceName, $publicEvents ); + $kernel->yield("checkInstances"); } #------------------------------------------------------------------- @@ -81,11 +80,12 @@ sub addInstance { my ($self, $instance) = @_[OBJECT, ARG0]; if ($instance->{priority} < 1 || $instance->{instanceId} eq "" || $instance->{sitename} eq "") { $self->error("Can't add workflow instance with missing data: ". $instance->{sitename}." - ".$instance->{instanceId}); - } else { - my $priority = ($instance->{priority} -1) * 10; + } + else { + $instance->{workingPriority} = ($instance->{priority} -1) * 10; $instance->{lastState} = "never run"; $self->debug("Adding workflow instance ".$instance->{instanceId}." from ".$instance->{sitename}." to queue at priority ".$priority."."); - $self->getWaitingQueue->enqueue($priority, $instance); + push @{$self->{queue}}, $instance; } } @@ -101,7 +101,6 @@ sub checkInstances { my ($kernel, $self) = @_[KERNEL, OBJECT]; $self->debug("Checking to see if we can run anymore instances right now."); if ($self->countRunningInstances < $self->config->get("maxWorkers")) { - $self->debug("Total workflows waiting to run: ".$self->getWaitingQueue->get_item_count); my $instance = $self->getNextInstance; if (defined $instance) { $kernel->yield("runWorker",$instance); @@ -133,9 +132,14 @@ Returns an integer representing the number of running instances. sub countRunningInstances { my $self = shift; - my $instanceCount = $self->getRunningQueue->get_item_count; - $self->debug("There are $instanceCount running instances."); - return $instanceCount; + my $count = 0; + foreach my $instance (@{$self->{queue}}) { + if ($instance->{status} eq 'running') { + $count++; + } + } + $self->debug("There are $count running instances."); + return $count; } #------------------------------------------------------------------- @@ -170,14 +174,15 @@ Removes a workflow instance from the processing queue. sub deleteInstance { my ($self, $instanceId,$kernel, $session ) = @_[OBJECT, ARG0, KERNEL, SESSION]; $self->debug("Deleting workflow instance $instanceId from queue."); - $self->getWaitingQueue->remove_items( - sub { - my $instance = shift; - return 1 if ($instance->{instanceId} eq $instanceId); - return 0; - } - ); - $self->removeInstanceFromRunningQueue($instanceId); + my $index; + foreach my $i (0..$#{$self->{queue}}) { + if ($self->{queue}[$i]{instanceId} eq $instanceId) { + $index = $i; + } + } + if ($index) { + splice @{$self->{queue}}, $index, 1; + } } #------------------------------------------------------------------- @@ -209,25 +214,10 @@ sub editWorkflowPriority { $self->debug("Updating the priority of $instanceId to $newPriority."); - # I'm guessing that the payload can't change queues on us - my $found = 0; - my $filterCref = sub { shift->{instanceId} eq $instanceId }; - for my $getQueueMethod (map "get${_}Queue", qw( Suspended Waiting Running )) { - my $q = $self->$getQueueMethod; - my($itemAref) = $q->peek_items($filterCref); # there should be only one - - next unless (ref $itemAref eq 'ARRAY' and @$itemAref); - - my($priority, $id, $payload) = @$itemAref; - my $ackPriority = $q->set_priority($id, $filterCref, $newPriority); - if ($ackPriority != $newPriority) { - # return an error - my $error = 'edit priority setting error'; - $kernel->call(IKC=>post=>$rsvp, encode_json({message => $error})); - } - $found = 1; - last; - } + my $instance = $self->getInstance($instanceId); + $instance->{priority} = $newPriority; + $instance->{workingPriority} = ($instance->{priority} -1) * 10; + $self->updateInstance($instance); if (! $found) { # return an error message @@ -253,14 +243,36 @@ The error message to be printed if debug is enabled. =cut sub error { - my $self = shift; - my $output = shift; + my ($self, $output) = @_; if ($self->{_debug}) { print "WORKFLOW: [Error] ".$output."\n"; } $self->getLogger->error("WORKFLOW: ".$output); } +#------------------------------------------------------------------- + +=head2 getInstance ( instanceId ) + +Returns the properties of an instance. + +=head3 instanceId + +The id of the instance to retrieve. + +=cut + +sub getInstance { + my ($self, $instanceId) = @_; + foreach my $instance (@{$self->{queue}}) { + if ($instance->{instanceId} eq $instanceId) { + return $instance; + } + } + return undef; +} + + #------------------------------------------------------------------- =head2 getJsonStatus ( ) @@ -271,74 +283,8 @@ Returns JSON report about the workflow engine. sub getJsonStatus { my ($kernel, $request, $self) = @_[KERNEL,ARG0,OBJECT]; - my ($sitename, $rsvp) = @$request; - - my %queues = (); - tie %queues, 'Tie::IxHash'; - %queues = ( - Suspended => $self->getSuspendedQueue, - Waiting => $self->getWaitingQueue, - Running => $self->getRunningQueue, - ); - - my %output = (); - foreach my $queueName (keys %queues) { - - # get the queue name, and how many items it has - my $queue = $queues{$queueName}; - my $count = $queue->get_item_count; - - # list of instances to be added to the %output structure in the event - # that a site name is provided - my @instances; - - # and if there are items in that queue, add them to our data structure - if ($count > 0) { - - # if a site name is provided, only process data for that site name, - # and only construct data for that site name - if($sitename ne '') { - foreach my $itemAref ($queue->peek_items(sub { shift()->{sitename} eq $sitename })) { - push @instances, $itemAref; - } - $output{$queueName} = \@instances; - } - - # otherwise, process data for all sites. - else { - foreach my $queueItem ($queue->peek_items(sub {1})) { - my($priority, $id, $instance) = @{$queueItem}; - - # The site's name in the list of %output keys isn't a hashref; - # we haven't seen it yet - if(ref $output{$instance->{sitename}} ne 'HASH') { - $output{$instance->{sitename}} = {}; - } - - # The queue name in the $output{sitename} hashref isn't an - # arrayref; we haven't seen it yet - if(ref $output{$instance->{sitename}}{$queueName} ne 'ARRAY') { - $output{$instance->{sitename}}{$queueName} = []; - } - - # calculate originalPriority separately - $instance->{originalPriority} = ($instance->{priority} - 1) * 10; - - # finally, add the instance to the returned data structure - push @{$output{$instance->{sitename}}{$queueName}}, $instance; - } - } - } - # there's no items in this queue, but the version of this call that - # accepts a sitename expects an empty array ref anyway. Give it one. - else { - if($sitename) { - $output{$queueName} = \@instances; - } - } - } - - $kernel->call(IKC=>post=>$rsvp, encode_json(\%output)); + my ($data, $rsvp) = @$request; + $kernel->call(IKC=>post=>$rsvp, encode_json($self->{queue})); } #------------------------------------------------------------------- @@ -365,13 +311,22 @@ Returns the next available instance. sub getNextInstance { my $self = shift; $self->debug("Looking for a workflow instance to run."); - my $waiting = $self->getWaitingQueue; - if ($waiting->get_item_count > 0) { - my ($priority, $id, $instance) = $waiting->dequeue_next; - $instance->{workingPriority} = $priority; - $self->getRunningQueue->enqueue($priority, $instance); - $self->debug("Looks like ".$instance->{instanceId}." at priority $priority would be a good workflow instance to run."); - return $instance; + if (scalar(@{$self->{queue}}) > 0) { + my $lowInstance = {}; + my $lowPriority = 999999999999; + my $waitingCount = 0; + foreach my $instance (@{$self->{queue}}) { + next unless $instance->{status} eq 'waiting'; + $waiting++; + if ($instance->{workingPriority} > $lowPriority) { + $lowInstance = $instance; + } + } + $self->debug("Total workflows waiting to run: ".$waitingCount); + if ($lowInstance->{instanceId} ne '') { + $self->debug("Looks like ".$lowInstance->{instanceId}." would be a good workflow instance to run."); + return $lowInstance; + } } $self->debug("Didn't see any workflow instances to run."); return undef; @@ -379,19 +334,6 @@ sub getNextInstance { #------------------------------------------------------------------- -=head2 getRunningQueue ( ) - -Returns a reference to the queue of workflow instances that are running now. - -=cut - -sub getRunningQueue { - my $self = shift; - return $self->{_runningQueue}; -} - -#------------------------------------------------------------------- - =head2 getStatus ( ) Returns a formatted text status report about the workflow engine. @@ -400,61 +342,19 @@ Returns a formatted text status report about the workflow engine. sub getStatus { my ($kernel, $request, $self) = @_[KERNEL,ARG0,OBJECT]; - my $pattern = "\t%8.8s %-30.30s %-22.22s %-15.15s %-20.20s\n"; + my $pattern = "%8.8s %-9.9s %-30.30s %-22.22s %-15.15s %-20.20s\n"; my $summaryPattern = "%19.19s %4d\n"; - my %queues = (); - tie %queues, 'Tie::IxHash'; - %queues = ( - "Suspended" => $self->getSuspendedQueue, - "Waiting" => $self->getWaitingQueue, - "Running" => $self->getRunningQueue, - ); my $total = 0; - my $output = ""; - foreach my $queueName (keys %queues) { - my $queue = $queues{$queueName}; - my $count = $queue->get_item_count; - $output .= sprintf $summaryPattern, $queueName." Workflows", $count; - if ($count > 0) { - $output .= sprintf $pattern, "Priority", "Sitename", "Instance Id", "Last State", "Last Run Time"; - foreach my $item ($queue->peek_items(sub {1})) { - my ($priority, $id, $instance) = @{$item}; - my $originalPriority = ($instance->{priority} - 1) * 10; - $output .= sprintf $pattern, $priority."/".$originalPriority, $instance->{sitename}, $instance->{instanceId}, $instance->{lastState}, $instance->{lastRunTime}; - } - $output .= "\n"; - } - $total += $count; - } - $output .= sprintf $summaryPattern, "Total Workflows", $total; - my ($data, $rsvp) = @$request; - $kernel->call(IKC=>post=>$rsvp,$output); -} - -#------------------------------------------------------------------- - -=head2 getSuspendedQueue ( ) - -Returns a reference to the queue of workflow instances that have been suspended due to error or wait timeouts. - -=cut - -sub getSuspendedQueue { - my $self = shift; - return $self->{_suspendedQueue}; -} - -#------------------------------------------------------------------- - -=head2 getWaitingQueue ( ) - -Returns a reference to the queue of workflow instances waiting to run. - -=cut - -sub getWaitingQueue { - my $self = shift; - return $self->{_waitingQueue}; + my $output = sprintf $pattern, "Priority", "Status", "Sitename", "Instance Id", "Last State", "Last Run Time"; + foreach my $instance (@{$self->{queue}}) { + my $originalPriority = ($instance->{priority} - 1) * 10; + my $priority = $instance->{workingPriority}."/".$originalPriority; + $output .= sprintf $pattern, $priority, $instance->{status}, $instance->{sitename}, $instance->{instanceId}, $instance->{lastState}, $instance->{lastRunTime}; + $total++; + } + $output .= sprintf "\n%19.19s %4d\n", "Total Workflows", $total; + my ($data, $rsvp) = @$request; + $kernel->call(IKC=>post=>$rsvp,$output); } @@ -496,34 +396,10 @@ sub new { Alias => 'workflow-ua', CookieJar => $cookies ); - $self->{_runningQueue} = POE::Queue::Array->new; - $self->{_waitingQueue} = POE::Queue::Array->new; - $self->{_suspendedQueue} = POE::Queue::Array->new; + $self->{queue} = []; } -#------------------------------------------------------------------- - -=head2 removeInstanceFromRunningQueue ( ) - -Removes a workflow instance from the queue that tracks what's running and returns a reference to it. - -=cut - -sub removeInstanceFromRunningQueue { - my $self = shift; - my $instanceId = shift; - my @items = $self->getRunningQueue->remove_items( - sub { - my $payload = shift; - return 1 if ($payload->{instanceId} eq $instanceId); - return 0; - } - ); - my $instance = $items[0][2]; - return $instance; -} - #------------------------------------------------------------------- =head2 returnInstanceToRunnableState ( ) @@ -535,14 +411,8 @@ Returns a workflow instance back to runnable queue. sub returnInstanceToRunnableState { my ($self, $instance) = @_[OBJECT, ARG0]; $self->debug("Returning ".$instance->{instanceId}." to runnable state."); - $self->getSuspendedQueue->remove_items( - sub { - my $payload = shift; - return 1 if ($payload->{instanceId} eq $instance->{instanceId}); - return 0; - } - ); - $self->getWaitingQueue->enqueue($instance->{workingPriority}+1, $instance); + $instance->{status} = 'waiting'; + $self->updateInstance($instance); } #------------------------------------------------------------------- @@ -556,6 +426,10 @@ Calls a worker to execute a workflow activity. sub runWorker { my ($kernel, $self, $instance, $session) = @_[KERNEL, OBJECT, ARG0, SESSION]; $self->debug("Preparing to run workflow instance ".$instance->{instanceId}."."); + $self->debug("Incrementing ".$instance->{instanceId}." priority from ".$instance->{workingPriority}); + $instance->{workingPriority} = $priority + 1; + $instance->{status} = 'running'; + $self->updateInstance($instance); my $url = "http://".$instance->{sitename}.':'.$self->config->get("webguiPort").$instance->{gateway}; my $request = POST $url, [op=>"runWorkflow", instanceId=>$instance->{instanceId}]; my $cookie = $self->{_cookies}{$instance->{sitename}}; @@ -576,14 +450,41 @@ Suspends a workflow instance for a number of seconds defined in the config file, =cut sub suspendInstance { - my ($self, $instance, $waitTimeout, $kernel) = @_[OBJECT, ARG0, ARG1, KERNEL]; + my ($self, $instance, $waitTimeout) = @_[OBJECT, ARG0, ARG1]; $waitTimeout ||= $self->config->get("suspensionDelay"); $self->debug("Suspending workflow instance ".$instance->{instanceId}." for ".$waitTimeout." seconds."); - my $priority = ($instance->{priority} - 1) * 10; - $self->getSuspendedQueue->enqueue($priority, $instance); + $instance->{status} = 'suspended'; + $self->updateInstance($instance); $kernel->delay_set("returnInstanceToRunnableState", $waitTimeout, $instance); } +#------------------------------------------------------------------- + +=head2 updateInstance ( properties ) + +Updates an instance's properties. + +=head3 properties + +A hash reference of the properties of the instance. + +=cut + +sub updateInstance { + my ($self, $instance) = @_; + $self->debug("Updating ".$instance->{instanceId}."'s properties."); + my $index; + foreach my $i (0..$#{$self->{queue}}) { + if ($self->{queue}[$i]{instanceId} eq $instance->{instanceId}) { + $index = $i; + } + } + if ($index) { + $self->{queue}[$index] = $instance; + } +} + + #------------------------------------------------------------------- =head2 workerResponse ( ) @@ -596,10 +497,10 @@ sub workerResponse { my ($self, $kernel, $requestPacket, $responsePacket) = @_[OBJECT, KERNEL, ARG0, ARG1]; $self->debug("Retrieving response from workflow instance."); my $request = $requestPacket->[0]; - my $response = $responsePacket->[0]; + my $response = $responsePacket->[0]; my $instanceId = $request->header("X-instanceId"); # got to figure out how to get this from the request, cuz the response may die $self->debug("Response retrieved is for $instanceId."); - my $instance = $self->removeInstanceFromRunningQueue($instanceId); + my $instance = $self->getInstance($instanceId); if ($response->is_success) { $self->debug("Response for $instanceId retrieved successfully."); if ($response->header("Set-Cookie") ne "") { @@ -613,7 +514,7 @@ sub workerResponse { $instance->{lastState} = $state; $instance->{lastRunTime} = localtime(time()); if ($state =~ m/^waiting\s*(\d+)?$/) { - my $waitTime = $1; + my $waitTime = $1; $self->debug("Was told to wait on $instanceId because we're still waiting on some external event."); $kernel->yield("suspendInstance",$instance, $waitTime); } @@ -634,7 +535,7 @@ sub workerResponse { $kernel->yield("suspendInstance",$instance); } else { - $self->error("Something bad happened on the return of $instance->{sitename} - $instanceId. ".$response->error_as_HTML); + $self->error("Something bad happened on the return of $instance->{sitename} - $instanceId. ".$response->code.": ".$response->message); $kernel->yield("suspendInstance",$instance); } } @@ -642,11 +543,12 @@ sub workerResponse { $self->error("Response for $instance->{sitename} - $instanceId was redirected. This should never happen if configured properly!!!"); $instance->{lastState} = "redirect"; $instance->{lastRunTime} = localtime(time()); + $kernel->yield("suspendInstance",$instance) } elsif ($response->is_error) { $instance->{lastState} = "comm error"; $instance->{lastRunTime} = localtime(time()); - $self->error("Response for $instance->{sitename} - $instanceId had a communications error. ".$response->error_as_HTML); + $self->error("Response for $instance->{sitename} - $instanceId had a communications error. ".$response->code.": ".$response->message); $kernel->yield("suspendInstance",$instance) } } diff --git a/lib/WebGUI/Operation/Cron.pm b/lib/WebGUI/Operation/Cron.pm index 7dff1314b..a0da869c2 100644 --- a/lib/WebGUI/Operation/Cron.pm +++ b/lib/WebGUI/Operation/Cron.pm @@ -294,13 +294,13 @@ sub www_runCronJob { if ($session->stow->get('singletonWorkflowClash')) { $session->errorHandler->warn( "Could not create workflow instance for workflowId '" . $task->get( "workflowId" ) - . "': It is a singleton workflow and is still running from the last invocation." + . "' from taskId '".$taskId."': It is a singleton workflow and is still running from the last invocation." ); return "done"; } $session->errorHandler->error( "Could not create workflow instance for workflowId '" . $task->get( "workflowId" ) - . "': The result was undefined" + . "' from taskId '".$taskId."': The result was undefined" ); return "done"; }