- Added a wait timeout parameter to the WAITING method, so that Spectre

doesn't have to check something that the workflow activity knows it will 
   have to wait on for a while.
 - Added --stop and --start aliases to spectre.pl.
This commit is contained in:
JT Smith 2008-11-19 01:15:31 +00:00
parent 7349633d63
commit bc5fd3c727
21 changed files with 89 additions and 44 deletions

View file

@ -1,5 +1,9 @@
7.6.4
- Brand new Survey system. Make sure to export your old results as they will
- Added a wait timeout parameter to the WAITING method, so that Spectre
doesn't have to check something that the workflow activity knows it will
have to wait on for a while.
- Added --stop and --start aliases to spectre.pl.
- New pluggable framework added for cutsomizing WebGUI's Account system
- New Profile Account module added providing a better interface for users to view and update their profile
- New Inbox Account module added providing a better interface into WebGUI's various messaging systems

View file

@ -35,7 +35,7 @@
# What port should we connect to WebGUI on? If you've configured
# WebGUI yourself, then this should probably be 80. If you're using
# the WebGUI Runtime Environment, 81 might be a better choice to go
# the WebGUI Runtime Environment, 8081 might be a better choice to go
# directly at the mod_perl server.
"webguiPort" : 80,

View file

@ -576,11 +576,12 @@ Suspends a workflow instance for a number of seconds defined in the config file,
=cut
sub suspendInstance {
my ($self, $instance, $kernel) = @_[OBJECT, ARG0, KERNEL];
$self->debug("Suspending workflow instance ".$instance->{instanceId}." for ".$self->config->get("suspensionDelay")." seconds.");
my ($self, $instance, $waitTimeout, $kernel) = @_[OBJECT, ARG0, ARG1, KERNEL];
$waitTimeout ||= $self->config->get("suspensionDelay");
$self->debug("Suspending workflow instance ".$instance->{instanceId}." for ".$waitTimeout." seconds.");
my $priority = ($instance->{priority} - 1) * 10;
$self->getSuspendedQueue->enqueue($priority, $instance);
$kernel->delay_set("returnInstanceToRunnableState",$self->config->get("suspensionDelay"), $instance);
$kernel->delay_set("returnInstanceToRunnableState", $waitTimeout, $instance);
}
#-------------------------------------------------------------------
@ -611,30 +612,38 @@ sub workerResponse {
my $state = $response->content;
$instance->{lastState} = $state;
$instance->{lastRunTime} = localtime(time());
if ($state eq "waiting") {
if ($state =~ m/^waiting\s*(\d+)?$/) {
my $waitTime = $1;
$self->debug("Was told to wait on $instanceId because we're still waiting on some external event.");
$kernel->yield("suspendInstance",$instance);
} elsif ($state eq "complete") {
$kernel->yield("suspendInstance",$instance, $waitTime);
}
elsif ($state eq "complete") {
$self->debug("Workflow instance $instanceId ran one of it's activities successfully.");
$kernel->yield("returnInstanceToRunnableState",$instance);
} elsif ($state eq "disabled") {
}
elsif ($state eq "disabled") {
$self->debug("Workflow instance $instanceId is disabled.");
$kernel->yield("suspendInstance",$instance);
} elsif ($state eq "done") {
}
elsif ($state eq "done") {
$self->debug("Workflow instance $instanceId is now complete.");
$kernel->yield("deleteInstance",$instanceId);
} elsif ($state eq "error") {
}
elsif ($state eq "error") {
$self->debug("Got an error response for $instanceId.");
$kernel->yield("suspendInstance",$instance);
} else {
}
else {
$self->error("Something bad happened on the return of $instance->{sitename} - $instanceId. ".$response->error_as_HTML);
$kernel->yield("suspendInstance",$instance);
}
} elsif ($response->is_redirect) {
}
elsif ($response->is_redirect) {
$self->error("Response for $instance->{sitename} - $instanceId was redirected. This should never happen if configured properly!!!");
$instance->{lastState} = "redirect";
$instance->{lastRunTime} = localtime(time());
} elsif ($response->is_error) {
}
elsif ($response->is_error) {
$instance->{lastState} = "comm error";
$instance->{lastRunTime} = localtime(time());
$self->error("Response for $instance->{sitename} - $instanceId had a communications error. ".$response->error_as_HTML);

View file

@ -51,15 +51,22 @@ A constant to be sent to Spectre informing it that this activity did not execute
sub ERROR { return "error" };
=head2 WAITING
=head2 WAITING ( [ waitTime ] )
A constant to be sent to Spectre informing it that this actiivty is
waiting for some other event to be triggered. This is also used for
long running activities to be released by Spectre and to be requeued.
=head3 waitTime
Instead of sending the constant it will set a time to wait before running the workflow again. Can be any number of seconds 1 or higher.
=cut
sub WAITING { return "waiting" };
sub WAITING {
my ($class, $waitTime) = @_;
return "waiting $waitTime";
}
=head1 METHODS

View file

@ -381,7 +381,7 @@ sub execute {
if ($currentVersionTag) {
$currentVersionTag->setWorking;
}
return $self->WAITING;
return $self->WAITING(1);
}
my $eventData = shift @$eventList;
my $recur = $eventData->{recur};

View file

@ -119,7 +119,7 @@ sub execute {
}
}
# taking too long, give up
return $self->WAITING if (time() - $start > 50);
return $self->WAITING(1) if (time() - $start > 50);
}
# kill temporary files
@ -149,7 +149,7 @@ sub recurseFileSystem {
foreach my $file (@filelist) {
unless ($file eq "." || $file eq "..") {
# taking too long, time to abort
return $self->WAITING if (time() - $start > 50);
return $self->WAITING(1) if (time() - $start > 50);
# must search for children
$self->recurseFileSystem($start, $path."/".$file);

View file

@ -73,8 +73,9 @@ sub execute {
my $completion = $versionTag->commit({timeout => $self->getTTL});
if ($completion == 1) {
return $self->COMPLETE;
} elsif ($completion == 2) {
return $self->WAITING;
}
elsif ($completion == 2) {
return $self->WAITING(1);
}
return $self->ERROR;
}

View file

@ -94,7 +94,7 @@ sub execute {
}
if ((time() - $time) > $ttl) {
$sth->finish;
return $self->WAITING;
return $self->WAITING(1);
}
}

View file

@ -116,7 +116,7 @@ sub execute {
pause:
$instance->setScratch(DELETE_FILES_SCRATCH, Storable::freeze(\@files));
$instance->setScratch(PRUNE_DIRS_SCRATCH, Storable::freeze(\@dirs));
return $self->WAITING;
return $self->WAITING(1);
}
1;

View file

@ -91,7 +91,7 @@ sub execute {
}
if (time() - $start > $ttl) {
$pending->finish;
return $self->WAITING;
return $self->WAITING(1);
}
}
return $self->COMPLETE;

View file

@ -91,7 +91,7 @@ sub execute {
if (time() - $start > $ttl) {
$items->finish;
$log->('Ran out of time. Will have to expire the rest later.');
return $self->WAITING;
return $self->WAITING(1);
}
}
$log->info('No more EMS items to expire.');

View file

@ -112,7 +112,7 @@ sub execute {
# if there are urls left, we need to process again
if (scalar(@$assets) > 0) {
$instance->setScratch("syndicatedassets", JSON->new->encode($assets));
return $self->WAITING;
return $self->WAITING(1);
}
# if we've completed the list, clean up

View file

@ -119,7 +119,7 @@ sub execute {
$instance->setScratch('LowStockMessage', $message);
$instance->setScratch('LowStockLast', $counter);
$instance->setScratch('LowStockBelow', $belowThreshold);
return $self->WAITING;
return $self->WAITING(1);
}
$instance->deleteScratch('LowStockMessage');

View file

@ -115,7 +115,7 @@ sub execute {
$log->info("Ran out of time, will pick up with revision $version when we start again.");
$instance->setScratch("purgeOldAssetsLastRevisionDate", $version);
$sth->finish;
return $self->WAITING;
return $self->WAITING(1);
}
}
return $self->COMPLETE;

View file

@ -100,13 +100,13 @@ sub execute {
# give up if we're taking too long
if (time - $start > 120) {
$sth->finish;
return $self->WAITING;
return $self->WAITING(1);
}
}
# If there are more messages waiting to be purged, return WAITING
if ( $sth->rows >= $limit ) {
return $self->WAITING;
return $self->WAITING(1);
}
else {
return $self->COMPLETE;

View file

@ -194,7 +194,7 @@ sub execute {
# Update approval status
$instance->setScratch( "status", "notified" );
return $self->WAITING;
return $self->WAITING(60*20);
}
# Second and subsequent times, check status
# Tag is denied
@ -217,7 +217,7 @@ sub execute {
}
# If we haven't done anything, spin the wheel again
return $self->WAITING;
return $self->WAITING(60*60);
}
#----------------------------------------------------------------------------

View file

@ -198,7 +198,7 @@ sub execute {
if (time() - $time > 50) {
$eh->info("Oops. Ran out of time. Will continue building newsletters in a bit.");
$subscriptionResultSet->finish;
return $self->WAITING;
return $self->WAITING(1);
}
}
return $self->COMPLETE;

View file

@ -184,7 +184,7 @@ sub execute {
$link->unbind if defined $link;
$instance->setScratch('ldapSelectIndex', $index);
$sth->finish;
return $self->WAITING;
return $self->WAITING(1);
}
}

View file

@ -82,7 +82,8 @@ sub execute {
my $versionTag = shift;
my $session = $self->session;
my $urlOfSingleAsset = "";
$session->log->warn('a');
#By default, we'll make it so that things happen now.
my $time = $session->datetime->time();
@ -93,19 +94,25 @@ sub execute {
elsif ($self->get("type") eq "endTime") {
$time = $versionTag->get("endTime");
}
$session->log->warn('b');
#Turn start or end time into an epoch value
my $dt = WebGUI::DateTime->new($session,$time);
$session->log->warn('c');
#Get the current UTC time
my $now = WebGUI::DateTime->new($session,$session->datetime->time());
$session->log->warn('d');
#Workflow is complete if the time has passed.
if($now->epoch >= $dt->epoch) {
return $self->COMPLETE;
}
return $self->WAITING;
$session->log->warn('e');
$session->log->warn($dt->epoch - $now->epoch);
return $self->WAITING($dt->epoch - $now->epoch);
}

View file

@ -80,8 +80,10 @@ sub execute {
# do some work here, whatever this activity is supposed to do
# Workflow is finished
return $self->COMPLETE;
# Or needs to be run again to finish processing
#return $self->WAITING;
# Or we ran out of time, run again ASAP
return $self->WAITING(1);
# Or we're waiting on some external process to complete, wait an hour
#return $self->WAITING(60*60);
# Or encountered an error and cannot finish
#return $self->ERROR;
}

View file

@ -37,7 +37,9 @@ GetOptions(
'help'=>\$help,
'ping'=>\$ping,
'shutdown'=>\$shutdown,
'stop'=>\$shutdown,
'daemon'=>\$daemon,
'start'=>\$daemon,
'debug' =>\$debug,
'status' => \$status,
'run' => \$run,
@ -75,17 +77,22 @@ if ($shutdown) {
die $POE::Component::IKC::ClientLite::error unless defined $result;
$remote->disconnect;
undef $remote;
} elsif ($ping) {
}
elsif ($ping) {
my $res = ping();
print "Spectre is Alive!\n" unless $res;
print "Spectre is not responding.\n".$res if $res;
} elsif ($status) {
}
elsif ($status) {
print getStatusReport();
} elsif ($test) {
}
elsif ($test) {
Spectre::Admin->runTests($config);
} elsif ($run) {
}
elsif ($run) {
Spectre::Admin->new($config, $debug);
} elsif ($daemon) {
}
elsif ($daemon) {
if (!ping()) {
die "Spectre is already running.\n";
}
@ -140,9 +147,9 @@ spectre - WebGUI's workflow and scheduling.
=head1 SYNOPSIS
spectre {--daemon|--run} [--debug]
spectre {--daemon | --start | --run} [--debug]
spectre --shutdown
spectre --shutdown | --stop
spectre --ping
@ -199,11 +206,19 @@ where B<IP-address> is the IP address and B<Port> is the port number
where Spectre should be listening for connections on according to
B<spectre.conf>.
=item B<--start>
Alias for --daemon.
=item B<--status>
Shows a summary of Spectre's internal status. The summary contains
a tally of suspended, waiting and running WebGUI Workflows.
=item B<--stop>
Alias for --shutdown.
=item B<--test>
Tests whether Spectre can connect to WebGUI. Both Spectre