spectre/workflows/priorities RFE

This commit is contained in:
James Tolley 2007-06-26 23:48:41 +00:00
parent 794da40e5c
commit 60eeebdba9
10 changed files with 556 additions and 51 deletions

View file

@ -19,6 +19,7 @@ use DateTime;
use HTTP::Request::Common;
use HTTP::Cookies;
use POE qw(Component::Client::HTTP);
use JSON 'objToJson';
#-------------------------------------------------------------------
@ -29,12 +30,12 @@ Initializes the scheduler.
=cut
sub _start {
my ($kernel, $self, $publicEvents) = @_[ KERNEL, OBJECT, ARG0 ];
$self->debug("Starting Spectre scheduler.");
my $serviceName = "cron";
$kernel->alias_set($serviceName);
$kernel->call( IKC => publish => $serviceName, $publicEvents );
$kernel->yield("checkSchedules");
my ($kernel, $self, $publicEvents) = @_[ KERNEL, OBJECT, ARG0 ];
$self->debug("Starting Spectre scheduler.");
my $serviceName = "cron";
$kernel->alias_set($serviceName);
$kernel->call( IKC => publish => $serviceName, $publicEvents );
$kernel->yield("checkSchedules");
}
#-------------------------------------------------------------------
@ -46,9 +47,9 @@ Gracefully shuts down the scheduler.
=cut
sub _stop {
my ($kernel, $self) = @_[KERNEL, OBJECT];
$self->debug("Stopping the scheduler.");
undef $self;
my ($kernel, $self) = @_[KERNEL, OBJECT];
$self->debug("Stopping the scheduler.");
undef $self;
}
#-------------------------------------------------------------------
@ -318,7 +319,27 @@ sub getJob {
#-------------------------------------------------------------------
=head3 getLogger ( )
=head2 getJsonStatus ( )
Returns JSON of the jobs.
=cut
sub getJsonStatus {
my ($kernel, $request, $self) = @_[KERNEL,ARG0,OBJECT];
my ($sitename, $rsvp) = @$request;
my %data = ();
for my $key (keys %{ $self->{_jobs} }) {
next unless $self->{_jobs}->{$key}->{sitename} eq $sitename;
$data{$key} = $self->{_jobs}->{$key};
}
$kernel->call(IKC => post => $rsvp, objToJson(\%data));
}
#-------------------------------------------------------------------
=head2 getLogger ( )
Returns a reference to the logger.
@ -357,7 +378,7 @@ sub new {
my $debug = shift;
my $self = {_jobs=>{}, _debug=>$debug, _config=>$config, _logger=>$logger};
bless $self, $class;
my @publicEvents = qw(runJob runJobResponse addJob deleteJob);
my @publicEvents = qw(runJob runJobResponse addJob deleteJob getJsonStatus);
POE::Session->create(
object_states => [ $self => [qw(_start _stop runJob runJobResponse addJob deleteJob checkSchedules checkSchedule), @publicEvents] ],
args=>[\@publicEvents]

View file

@ -20,6 +20,7 @@ use HTTP::Cookies;
use POE qw(Component::Client::HTTP);
use POE::Queue::Array;
use Tie::IxHash;
use JSON 'objToJson';
#-------------------------------------------------------------------
@ -177,6 +178,66 @@ sub deleteInstance {
#-------------------------------------------------------------------
=head2 editWorkflowPriority ( href )
Updates the priority of a given workflow instance.
=head3 href
Contains information about the instance and the new priority.
=head4 instanceId
The id of the instance to update.
=head4 newPriority
The new priority value.
=cut
sub editWorkflowPriority {
my ($self, $request, $kernel, $session ) = @_[OBJECT, ARG0, KERNEL, SESSION];
my ($argsHref, $rsvp) = @$request;
my $instanceId = $argsHref->{instanceId};
my $newPriority = $argsHref->{newPriority};
$self->debug("Updating the priority of $instanceId to $newPriority.");
# I'm guessing that the payload can't change queues on us
my $found = 0;
my $filterCref = sub { shift->{instanceId} eq $instanceId };
for my $getQueueMethod (map "get${_}Queue", qw( Suspended Waiting Running )) {
my $q = $self->$getQueueMethod;
my($itemAref) = $q->peek_items($filterCref); # there should be only one
next unless (ref $itemAref eq 'ARRAY' and @$itemAref);
my($priority, $id, $payload) = @$itemAref;
my $ackPriority = $q->set_priority($id, $filterCref, $newPriority);
if ($ackPriority != $newPriority) {
# return an error
my $error = 'edit priority setting error';
$kernel->call(IKC=>post=>$rsvp, objToJson({message => $error}));
}
$found = 1;
last;
}
if (! $found) {
# return an error message
my $error = 'edit priority instance not found error';
$kernel->call(IKC=>post=>$rsvp, objToJson({message => $error}));
}
else {
# return success message
$kernel->call(IKC=>post=>$rsvp, objToJson({message => 'edit priority success'}));
}
}
#-------------------------------------------------------------------
=head2 error ( output )
Prints out error information if debug is enabled.
@ -198,6 +259,44 @@ sub error {
#-------------------------------------------------------------------
=head2 getJsonStatus ( )
Returns JSON report about the workflow engine.
=cut
sub getJsonStatus {
my ($kernel, $request, $self) = @_[KERNEL,ARG0,OBJECT];
my ($sitename, $rsvp) = @$request;
# only return this site's info
return $kernel->call(IKC=>post=>$rsvp, '{}') unless $sitename;
my %queues = ();
tie %queues, 'Tie::IxHash';
%queues = (
Suspended => $self->getSuspendedQueue,
Waiting => $self->getWaitingQueue,
Running => $self->getRunningQueue,
);
my %output = ();
foreach my $queueName (keys %queues) {
my $queue = $queues{$queueName};
my $count = $queue->get_item_count;
my @instances;
if ($count > 0) {
foreach my $itemAref ($queue->peek_items(sub { shift()->{sitename} eq $sitename })) {
push @instances, $itemAref;
}
}
$output{$queueName} = \@instances;
}
$kernel->call(IKC=>post=>$rsvp, objToJson(\%output));
}
#-------------------------------------------------------------------
=head2 getLogger ( )
Returns a reference to the logger.
@ -340,7 +439,7 @@ sub new {
my $debug = shift;
my $self = {_debug=>$debug, _config=>$config, _logger=>$logger};
bless $self, $class;
my @publicEvents = qw(addInstance deleteInstance getStatus);
my @publicEvents = qw(addInstance deleteInstance editWorkflowPriority getStatus getJsonStatus);
POE::Session->create(
object_states => [ $self => [qw(_start _stop returnInstanceToRunnableState addInstance checkInstances deleteInstance suspendInstance runWorker workerResponse), @publicEvents] ],
args=>[\@publicEvents]
@ -431,10 +530,11 @@ Suspends a workflow instance for a number of seconds defined in the config file,
=cut
sub suspendInstance {
my ($self, $instance, $kernel) = @_[OBJECT, ARG0, KERNEL];
$self->debug("Suspending workflow instance ".$instance->{instanceId}." for ".$self->config->get("suspensionDelay")." seconds.");
$self->getSuspendedQueue->enqueue("1", $instance);
$kernel->delay_set("returnInstanceToRunnableState",$self->config->get("suspensionDelay"), $instance);
my ($self, $instance, $kernel) = @_[OBJECT, ARG0, KERNEL];
$self->debug("Suspending workflow instance ".$instance->{instanceId}." for ".$self->config->get("suspensionDelay")." seconds.");
my $priority = ($instance->{priority} - 1) * 10;
$self->getSuspendedQueue->enqueue($priority, $instance);
$kernel->delay_set("returnInstanceToRunnableState",$self->config->get("suspensionDelay"), $instance);
}
#-------------------------------------------------------------------