changed queue to an array

This commit is contained in:
JT Smith 2009-11-03 12:04:24 -06:00
parent 3958a36271
commit 630f1b0537
2 changed files with 128 additions and 226 deletions

View file

@ -18,7 +18,6 @@ use strict;
use HTTP::Request::Common;
use HTTP::Cookies;
use POE qw(Component::Client::HTTP);
use POE::Queue::Array;
use Tie::IxHash;
use JSON qw/ encode_json /;
@ -31,12 +30,12 @@ Initializes the workflow manager.
=cut
sub _start {
my ( $kernel, $self, $publicEvents) = @_[ KERNEL, OBJECT, ARG0 ];
my ( $kernel, $self, $publicEvents) = @_[ KERNEL, OBJECT, ARG0 ];
$self->debug("Starting workflow manager.");
my $serviceName = "workflow";
$kernel->alias_set($serviceName);
$kernel->call( IKC => publish => $serviceName, $publicEvents );
$kernel->yield("checkInstances");
my $serviceName = "workflow";
$kernel->alias_set($serviceName);
$kernel->call( IKC => publish => $serviceName, $publicEvents );
$kernel->yield("checkInstances");
}
#-------------------------------------------------------------------
@ -81,11 +80,12 @@ sub addInstance {
my ($self, $instance) = @_[OBJECT, ARG0];
if ($instance->{priority} < 1 || $instance->{instanceId} eq "" || $instance->{sitename} eq "") {
$self->error("Can't add workflow instance with missing data: ". $instance->{sitename}." - ".$instance->{instanceId});
} else {
my $priority = ($instance->{priority} -1) * 10;
}
else {
$instance->{workingPriority} = ($instance->{priority} -1) * 10;
$instance->{lastState} = "never run";
$self->debug("Adding workflow instance ".$instance->{instanceId}." from ".$instance->{sitename}." to queue at priority ".$priority.".");
$self->getWaitingQueue->enqueue($priority, $instance);
push @{$self->{queue}}, $instance;
}
}
@ -101,7 +101,6 @@ sub checkInstances {
my ($kernel, $self) = @_[KERNEL, OBJECT];
$self->debug("Checking to see if we can run anymore instances right now.");
if ($self->countRunningInstances < $self->config->get("maxWorkers")) {
$self->debug("Total workflows waiting to run: ".$self->getWaitingQueue->get_item_count);
my $instance = $self->getNextInstance;
if (defined $instance) {
$kernel->yield("runWorker",$instance);
@ -133,9 +132,14 @@ Returns an integer representing the number of running instances.
sub countRunningInstances {
my $self = shift;
my $instanceCount = $self->getRunningQueue->get_item_count;
$self->debug("There are $instanceCount running instances.");
return $instanceCount;
my $count = 0;
foreach my $instance (@{$self->{queue}}) {
if ($instance->{status} eq 'running') {
$count++;
}
}
$self->debug("There are $count running instances.");
return $count;
}
#-------------------------------------------------------------------
@ -170,14 +174,15 @@ Removes a workflow instance from the processing queue.
sub deleteInstance {
my ($self, $instanceId,$kernel, $session ) = @_[OBJECT, ARG0, KERNEL, SESSION];
$self->debug("Deleting workflow instance $instanceId from queue.");
$self->getWaitingQueue->remove_items(
sub {
my $instance = shift;
return 1 if ($instance->{instanceId} eq $instanceId);
return 0;
}
);
$self->removeInstanceFromRunningQueue($instanceId);
my $index;
foreach my $i (0..$#{$self->{queue}}) {
if ($self->{queue}[$i]{instanceId} eq $instanceId) {
$index = $i;
}
}
if ($index) {
splice @{$self->{queue}}, $index, 1;
}
}
#-------------------------------------------------------------------
@ -209,25 +214,10 @@ sub editWorkflowPriority {
$self->debug("Updating the priority of $instanceId to $newPriority.");
# I'm guessing that the payload can't change queues on us
my $found = 0;
my $filterCref = sub { shift->{instanceId} eq $instanceId };
for my $getQueueMethod (map "get${_}Queue", qw( Suspended Waiting Running )) {
my $q = $self->$getQueueMethod;
my($itemAref) = $q->peek_items($filterCref); # there should be only one
next unless (ref $itemAref eq 'ARRAY' and @$itemAref);
my($priority, $id, $payload) = @$itemAref;
my $ackPriority = $q->set_priority($id, $filterCref, $newPriority);
if ($ackPriority != $newPriority) {
# return an error
my $error = 'edit priority setting error';
$kernel->call(IKC=>post=>$rsvp, encode_json({message => $error}));
}
$found = 1;
last;
}
my $instance = $self->getInstance($instanceId);
$instance->{priority} = $newPriority;
$instance->{workingPriority} = ($instance->{priority} -1) * 10;
$self->updateInstance($instance);
if (! $found) {
# return an error message
@ -253,14 +243,36 @@ The error message to be printed if debug is enabled.
=cut
sub error {
my $self = shift;
my $output = shift;
my ($self, $output) = @_;
if ($self->{_debug}) {
print "WORKFLOW: [Error] ".$output."\n";
}
$self->getLogger->error("WORKFLOW: ".$output);
}
#-------------------------------------------------------------------
=head2 getInstance ( instanceId )
Returns the properties of an instance.
=head3 instanceId
The id of the instance to retrieve.
=cut
sub getInstance {
my ($self, $instanceId) = @_;
foreach my $instance (@{$self->{queue}}) {
if ($instance->{instanceId} eq $instanceId) {
return $instance;
}
}
return undef;
}
#-------------------------------------------------------------------
=head2 getJsonStatus ( )
@ -271,74 +283,8 @@ Returns JSON report about the workflow engine.
sub getJsonStatus {
my ($kernel, $request, $self) = @_[KERNEL,ARG0,OBJECT];
my ($sitename, $rsvp) = @$request;
my %queues = ();
tie %queues, 'Tie::IxHash';
%queues = (
Suspended => $self->getSuspendedQueue,
Waiting => $self->getWaitingQueue,
Running => $self->getRunningQueue,
);
my %output = ();
foreach my $queueName (keys %queues) {
# get the queue name, and how many items it has
my $queue = $queues{$queueName};
my $count = $queue->get_item_count;
# list of instances to be added to the %output structure in the event
# that a site name is provided
my @instances;
# and if there are items in that queue, add them to our data structure
if ($count > 0) {
# if a site name is provided, only process data for that site name,
# and only construct data for that site name
if($sitename ne '') {
foreach my $itemAref ($queue->peek_items(sub { shift()->{sitename} eq $sitename })) {
push @instances, $itemAref;
}
$output{$queueName} = \@instances;
}
# otherwise, process data for all sites.
else {
foreach my $queueItem ($queue->peek_items(sub {1})) {
my($priority, $id, $instance) = @{$queueItem};
# The site's name in the list of %output keys isn't a hashref;
# we haven't seen it yet
if(ref $output{$instance->{sitename}} ne 'HASH') {
$output{$instance->{sitename}} = {};
}
# The queue name in the $output{sitename} hashref isn't an
# arrayref; we haven't seen it yet
if(ref $output{$instance->{sitename}}{$queueName} ne 'ARRAY') {
$output{$instance->{sitename}}{$queueName} = [];
}
# calculate originalPriority separately
$instance->{originalPriority} = ($instance->{priority} - 1) * 10;
# finally, add the instance to the returned data structure
push @{$output{$instance->{sitename}}{$queueName}}, $instance;
}
}
}
# there's no items in this queue, but the version of this call that
# accepts a sitename expects an empty array ref anyway. Give it one.
else {
if($sitename) {
$output{$queueName} = \@instances;
}
}
}
$kernel->call(IKC=>post=>$rsvp, encode_json(\%output));
my ($data, $rsvp) = @$request;
$kernel->call(IKC=>post=>$rsvp, encode_json($self->{queue}));
}
#-------------------------------------------------------------------
@ -365,13 +311,22 @@ Returns the next available instance.
sub getNextInstance {
my $self = shift;
$self->debug("Looking for a workflow instance to run.");
my $waiting = $self->getWaitingQueue;
if ($waiting->get_item_count > 0) {
my ($priority, $id, $instance) = $waiting->dequeue_next;
$instance->{workingPriority} = $priority;
$self->getRunningQueue->enqueue($priority, $instance);
$self->debug("Looks like ".$instance->{instanceId}." at priority $priority would be a good workflow instance to run.");
return $instance;
if (scalar(@{$self->{queue}}) > 0) {
my $lowInstance = {};
my $lowPriority = 999999999999;
my $waitingCount = 0;
foreach my $instance (@{$self->{queue}}) {
next unless $instance->{status} eq 'waiting';
$waiting++;
if ($instance->{workingPriority} > $lowPriority) {
$lowInstance = $instance;
}
}
$self->debug("Total workflows waiting to run: ".$waitingCount);
if ($lowInstance->{instanceId} ne '') {
$self->debug("Looks like ".$lowInstance->{instanceId}." would be a good workflow instance to run.");
return $lowInstance;
}
}
$self->debug("Didn't see any workflow instances to run.");
return undef;
@ -379,19 +334,6 @@ sub getNextInstance {
#-------------------------------------------------------------------
=head2 getRunningQueue ( )
Returns a reference to the queue of workflow instances that are running now.
=cut
sub getRunningQueue {
my $self = shift;
return $self->{_runningQueue};
}
#-------------------------------------------------------------------
=head2 getStatus ( )
Returns a formatted text status report about the workflow engine.
@ -400,61 +342,19 @@ Returns a formatted text status report about the workflow engine.
sub getStatus {
my ($kernel, $request, $self) = @_[KERNEL,ARG0,OBJECT];
my $pattern = "\t%8.8s %-30.30s %-22.22s %-15.15s %-20.20s\n";
my $pattern = "%8.8s %-9.9s %-30.30s %-22.22s %-15.15s %-20.20s\n";
my $summaryPattern = "%19.19s %4d\n";
my %queues = ();
tie %queues, 'Tie::IxHash';
%queues = (
"Suspended" => $self->getSuspendedQueue,
"Waiting" => $self->getWaitingQueue,
"Running" => $self->getRunningQueue,
);
my $total = 0;
my $output = "";
foreach my $queueName (keys %queues) {
my $queue = $queues{$queueName};
my $count = $queue->get_item_count;
$output .= sprintf $summaryPattern, $queueName." Workflows", $count;
if ($count > 0) {
$output .= sprintf $pattern, "Priority", "Sitename", "Instance Id", "Last State", "Last Run Time";
foreach my $item ($queue->peek_items(sub {1})) {
my ($priority, $id, $instance) = @{$item};
my $originalPriority = ($instance->{priority} - 1) * 10;
$output .= sprintf $pattern, $priority."/".$originalPriority, $instance->{sitename}, $instance->{instanceId}, $instance->{lastState}, $instance->{lastRunTime};
}
$output .= "\n";
}
$total += $count;
}
$output .= sprintf $summaryPattern, "Total Workflows", $total;
my ($data, $rsvp) = @$request;
$kernel->call(IKC=>post=>$rsvp,$output);
}
#-------------------------------------------------------------------
=head2 getSuspendedQueue ( )
Returns a reference to the queue of workflow instances that have been suspended due to error or wait timeouts.
=cut
sub getSuspendedQueue {
my $self = shift;
return $self->{_suspendedQueue};
}
#-------------------------------------------------------------------
=head2 getWaitingQueue ( )
Returns a reference to the queue of workflow instances waiting to run.
=cut
sub getWaitingQueue {
my $self = shift;
return $self->{_waitingQueue};
my $output = sprintf $pattern, "Priority", "Status", "Sitename", "Instance Id", "Last State", "Last Run Time";
foreach my $instance (@{$self->{queue}}) {
my $originalPriority = ($instance->{priority} - 1) * 10;
my $priority = $instance->{workingPriority}."/".$originalPriority;
$output .= sprintf $pattern, $priority, $instance->{status}, $instance->{sitename}, $instance->{instanceId}, $instance->{lastState}, $instance->{lastRunTime};
$total++;
}
$output .= sprintf "\n%19.19s %4d\n", "Total Workflows", $total;
my ($data, $rsvp) = @$request;
$kernel->call(IKC=>post=>$rsvp,$output);
}
@ -496,34 +396,10 @@ sub new {
Alias => 'workflow-ua',
CookieJar => $cookies
);
$self->{_runningQueue} = POE::Queue::Array->new;
$self->{_waitingQueue} = POE::Queue::Array->new;
$self->{_suspendedQueue} = POE::Queue::Array->new;
$self->{queue} = [];
}
#-------------------------------------------------------------------
=head2 removeInstanceFromRunningQueue ( )
Removes a workflow instance from the queue that tracks what's running and returns a reference to it.
=cut
sub removeInstanceFromRunningQueue {
my $self = shift;
my $instanceId = shift;
my @items = $self->getRunningQueue->remove_items(
sub {
my $payload = shift;
return 1 if ($payload->{instanceId} eq $instanceId);
return 0;
}
);
my $instance = $items[0][2];
return $instance;
}
#-------------------------------------------------------------------
=head2 returnInstanceToRunnableState ( )
@ -535,14 +411,8 @@ Returns a workflow instance back to runnable queue.
sub returnInstanceToRunnableState {
my ($self, $instance) = @_[OBJECT, ARG0];
$self->debug("Returning ".$instance->{instanceId}." to runnable state.");
$self->getSuspendedQueue->remove_items(
sub {
my $payload = shift;
return 1 if ($payload->{instanceId} eq $instance->{instanceId});
return 0;
}
);
$self->getWaitingQueue->enqueue($instance->{workingPriority}+1, $instance);
$instance->{status} = 'waiting';
$self->updateInstance($instance);
}
#-------------------------------------------------------------------
@ -556,6 +426,10 @@ Calls a worker to execute a workflow activity.
sub runWorker {
my ($kernel, $self, $instance, $session) = @_[KERNEL, OBJECT, ARG0, SESSION];
$self->debug("Preparing to run workflow instance ".$instance->{instanceId}.".");
$self->debug("Incrementing ".$instance->{instanceId}." priority from ".$instance->{workingPriority});
$instance->{workingPriority} = $priority + 1;
$instance->{status} = 'running';
$self->updateInstance($instance);
my $url = "http://".$instance->{sitename}.':'.$self->config->get("webguiPort").$instance->{gateway};
my $request = POST $url, [op=>"runWorkflow", instanceId=>$instance->{instanceId}];
my $cookie = $self->{_cookies}{$instance->{sitename}};
@ -576,14 +450,41 @@ Suspends a workflow instance for a number of seconds defined in the config file,
=cut
sub suspendInstance {
my ($self, $instance, $waitTimeout, $kernel) = @_[OBJECT, ARG0, ARG1, KERNEL];
my ($self, $instance, $waitTimeout) = @_[OBJECT, ARG0, ARG1];
$waitTimeout ||= $self->config->get("suspensionDelay");
$self->debug("Suspending workflow instance ".$instance->{instanceId}." for ".$waitTimeout." seconds.");
my $priority = ($instance->{priority} - 1) * 10;
$self->getSuspendedQueue->enqueue($priority, $instance);
$instance->{status} = 'suspended';
$self->updateInstance($instance);
$kernel->delay_set("returnInstanceToRunnableState", $waitTimeout, $instance);
}
#-------------------------------------------------------------------
=head2 updateInstance ( properties )
Updates an instance's properties.
=head3 properties
A hash reference of the properties of the instance.
=cut
sub updateInstance {
my ($self, $instance) = @_;
$self->debug("Updating ".$instance->{instanceId}."'s properties.");
my $index;
foreach my $i (0..$#{$self->{queue}}) {
if ($self->{queue}[$i]{instanceId} eq $instance->{instanceId}) {
$index = $i;
}
}
if ($index) {
$self->{queue}[$index] = $instance;
}
}
#-------------------------------------------------------------------
=head2 workerResponse ( )
@ -596,10 +497,10 @@ sub workerResponse {
my ($self, $kernel, $requestPacket, $responsePacket) = @_[OBJECT, KERNEL, ARG0, ARG1];
$self->debug("Retrieving response from workflow instance.");
my $request = $requestPacket->[0];
my $response = $responsePacket->[0];
my $response = $responsePacket->[0];
my $instanceId = $request->header("X-instanceId"); # got to figure out how to get this from the request, cuz the response may die
$self->debug("Response retrieved is for $instanceId.");
my $instance = $self->removeInstanceFromRunningQueue($instanceId);
my $instance = $self->getInstance($instanceId);
if ($response->is_success) {
$self->debug("Response for $instanceId retrieved successfully.");
if ($response->header("Set-Cookie") ne "") {
@ -613,7 +514,7 @@ sub workerResponse {
$instance->{lastState} = $state;
$instance->{lastRunTime} = localtime(time());
if ($state =~ m/^waiting\s*(\d+)?$/) {
my $waitTime = $1;
my $waitTime = $1;
$self->debug("Was told to wait on $instanceId because we're still waiting on some external event.");
$kernel->yield("suspendInstance",$instance, $waitTime);
}
@ -634,7 +535,7 @@ sub workerResponse {
$kernel->yield("suspendInstance",$instance);
}
else {
$self->error("Something bad happened on the return of $instance->{sitename} - $instanceId. ".$response->error_as_HTML);
$self->error("Something bad happened on the return of $instance->{sitename} - $instanceId. ".$response->code.": ".$response->message);
$kernel->yield("suspendInstance",$instance);
}
}
@ -642,11 +543,12 @@ sub workerResponse {
$self->error("Response for $instance->{sitename} - $instanceId was redirected. This should never happen if configured properly!!!");
$instance->{lastState} = "redirect";
$instance->{lastRunTime} = localtime(time());
$kernel->yield("suspendInstance",$instance)
}
elsif ($response->is_error) {
$instance->{lastState} = "comm error";
$instance->{lastRunTime} = localtime(time());
$self->error("Response for $instance->{sitename} - $instanceId had a communications error. ".$response->error_as_HTML);
$self->error("Response for $instance->{sitename} - $instanceId had a communications error. ".$response->code.": ".$response->message);
$kernel->yield("suspendInstance",$instance)
}
}

View file

@ -294,13 +294,13 @@ sub www_runCronJob {
if ($session->stow->get('singletonWorkflowClash')) {
$session->errorHandler->warn(
"Could not create workflow instance for workflowId '" . $task->get( "workflowId" )
. "': It is a singleton workflow and is still running from the last invocation."
. "' from taskId '".$taskId."': It is a singleton workflow and is still running from the last invocation."
);
return "done";
}
$session->errorHandler->error(
"Could not create workflow instance for workflowId '" . $task->get( "workflowId" )
. "': The result was undefined"
. "' from taskId '".$taskId."': The result was undefined"
);
return "done";
}