From bc5fd3c727a998bb56660246b5ce1f27f66736c7 Mon Sep 17 00:00:00 2001 From: JT Smith Date: Wed, 19 Nov 2008 01:15:31 +0000 Subject: [PATCH] - Added a wait timeout parameter to the WAITING method, so that Spectre doesn't have to check something that the workflow activity knows it will have to wait on for a while. - Added --stop and --start aliases to spectre.pl. --- docs/changelog/7.x.x.txt | 4 +++ etc/spectre.conf.original | 2 +- lib/Spectre/Workflow.pm | 33 ++++++++++++------- lib/WebGUI/Workflow/Activity.pm | 11 +++++-- .../Workflow/Activity/CalendarUpdateFeeds.pm | 2 +- .../Workflow/Activity/CleanTempStorage.pm | 4 +-- .../Workflow/Activity/CommitVersionTag.pm | 5 +-- .../Activity/DeleteExpiredSessions.pm | 2 +- .../Workflow/Activity/DeleteExportedFiles.pm | 2 +- .../Activity/DenyUnansweredFriends.pm | 2 +- .../Workflow/Activity/ExpireEmsCartItems.pm | 2 +- .../Workflow/Activity/GetSyndicatedContent.pm | 2 +- .../Workflow/Activity/NotifyAboutLowStock.pm | 2 +- .../Activity/PurgeOldAssetRevisions.pm | 2 +- .../Activity/PurgeOldInboxMessages.pm | 4 +-- .../Activity/RequestApprovalForVersionTag.pm | 4 +-- .../Workflow/Activity/SendNewsletters.pm | 2 +- .../Workflow/Activity/SyncProfilesToLdap.pm | 2 +- lib/WebGUI/Workflow/Activity/WaitUntil.pm | 11 +++++-- .../Workflow/Activity/_activity.skeleton | 6 ++-- sbin/spectre.pl | 29 ++++++++++++---- 21 files changed, 89 insertions(+), 44 deletions(-) diff --git a/docs/changelog/7.x.x.txt b/docs/changelog/7.x.x.txt index 434a2c7c1..7400f9cce 100644 --- a/docs/changelog/7.x.x.txt +++ b/docs/changelog/7.x.x.txt @@ -1,5 +1,9 @@ 7.6.4 - Brand new Survey system. Make sure to export your old results as they will + - Added a wait timeout parameter to the WAITING method, so that Spectre + doesn't have to check something that the workflow activity knows it will + have to wait on for a while. + - Added --stop and --start aliases to spectre.pl. - New pluggable framework added for cutsomizing WebGUI's Account system - New Profile Account module added providing a better interface for users to view and update their profile - New Inbox Account module added providing a better interface into WebGUI's various messaging systems diff --git a/etc/spectre.conf.original b/etc/spectre.conf.original index 91275c33f..62971e4a1 100644 --- a/etc/spectre.conf.original +++ b/etc/spectre.conf.original @@ -35,7 +35,7 @@ # What port should we connect to WebGUI on? If you've configured # WebGUI yourself, then this should probably be 80. If you're using -# the WebGUI Runtime Environment, 81 might be a better choice to go +# the WebGUI Runtime Environment, 8081 might be a better choice to go # directly at the mod_perl server. "webguiPort" : 80, diff --git a/lib/Spectre/Workflow.pm b/lib/Spectre/Workflow.pm index 6abf16812..f7aa3cf9f 100644 --- a/lib/Spectre/Workflow.pm +++ b/lib/Spectre/Workflow.pm @@ -576,11 +576,12 @@ Suspends a workflow instance for a number of seconds defined in the config file, =cut sub suspendInstance { - my ($self, $instance, $kernel) = @_[OBJECT, ARG0, KERNEL]; - $self->debug("Suspending workflow instance ".$instance->{instanceId}." for ".$self->config->get("suspensionDelay")." seconds."); + my ($self, $instance, $waitTimeout, $kernel) = @_[OBJECT, ARG0, ARG1, KERNEL]; + $waitTimeout ||= $self->config->get("suspensionDelay"); + $self->debug("Suspending workflow instance ".$instance->{instanceId}." for ".$waitTimeout." seconds."); my $priority = ($instance->{priority} - 1) * 10; $self->getSuspendedQueue->enqueue($priority, $instance); - $kernel->delay_set("returnInstanceToRunnableState",$self->config->get("suspensionDelay"), $instance); + $kernel->delay_set("returnInstanceToRunnableState", $waitTimeout, $instance); } #------------------------------------------------------------------- @@ -611,30 +612,38 @@ sub workerResponse { my $state = $response->content; $instance->{lastState} = $state; $instance->{lastRunTime} = localtime(time()); - if ($state eq "waiting") { + if ($state =~ m/^waiting\s*(\d+)?$/) { + my $waitTime = $1; $self->debug("Was told to wait on $instanceId because we're still waiting on some external event."); - $kernel->yield("suspendInstance",$instance); - } elsif ($state eq "complete") { + $kernel->yield("suspendInstance",$instance, $waitTime); + } + elsif ($state eq "complete") { $self->debug("Workflow instance $instanceId ran one of it's activities successfully."); $kernel->yield("returnInstanceToRunnableState",$instance); - } elsif ($state eq "disabled") { + } + elsif ($state eq "disabled") { $self->debug("Workflow instance $instanceId is disabled."); $kernel->yield("suspendInstance",$instance); - } elsif ($state eq "done") { + } + elsif ($state eq "done") { $self->debug("Workflow instance $instanceId is now complete."); $kernel->yield("deleteInstance",$instanceId); - } elsif ($state eq "error") { + } + elsif ($state eq "error") { $self->debug("Got an error response for $instanceId."); $kernel->yield("suspendInstance",$instance); - } else { + } + else { $self->error("Something bad happened on the return of $instance->{sitename} - $instanceId. ".$response->error_as_HTML); $kernel->yield("suspendInstance",$instance); } - } elsif ($response->is_redirect) { + } + elsif ($response->is_redirect) { $self->error("Response for $instance->{sitename} - $instanceId was redirected. This should never happen if configured properly!!!"); $instance->{lastState} = "redirect"; $instance->{lastRunTime} = localtime(time()); - } elsif ($response->is_error) { + } + elsif ($response->is_error) { $instance->{lastState} = "comm error"; $instance->{lastRunTime} = localtime(time()); $self->error("Response for $instance->{sitename} - $instanceId had a communications error. ".$response->error_as_HTML); diff --git a/lib/WebGUI/Workflow/Activity.pm b/lib/WebGUI/Workflow/Activity.pm index aa7e50a4e..595af08c3 100644 --- a/lib/WebGUI/Workflow/Activity.pm +++ b/lib/WebGUI/Workflow/Activity.pm @@ -51,15 +51,22 @@ A constant to be sent to Spectre informing it that this activity did not execute sub ERROR { return "error" }; -=head2 WAITING +=head2 WAITING ( [ waitTime ] ) A constant to be sent to Spectre informing it that this actiivty is waiting for some other event to be triggered. This is also used for long running activities to be released by Spectre and to be requeued. +=head3 waitTime + +Instead of sending the constant it will set a time to wait before running the workflow again. Can be any number of seconds 1 or higher. + =cut -sub WAITING { return "waiting" }; +sub WAITING { + my ($class, $waitTime) = @_; + return "waiting $waitTime"; +} =head1 METHODS diff --git a/lib/WebGUI/Workflow/Activity/CalendarUpdateFeeds.pm b/lib/WebGUI/Workflow/Activity/CalendarUpdateFeeds.pm index b53b5e4d0..b00306fec 100755 --- a/lib/WebGUI/Workflow/Activity/CalendarUpdateFeeds.pm +++ b/lib/WebGUI/Workflow/Activity/CalendarUpdateFeeds.pm @@ -381,7 +381,7 @@ sub execute { if ($currentVersionTag) { $currentVersionTag->setWorking; } - return $self->WAITING; + return $self->WAITING(1); } my $eventData = shift @$eventList; my $recur = $eventData->{recur}; diff --git a/lib/WebGUI/Workflow/Activity/CleanTempStorage.pm b/lib/WebGUI/Workflow/Activity/CleanTempStorage.pm index e5eac8bef..51bcbe225 100644 --- a/lib/WebGUI/Workflow/Activity/CleanTempStorage.pm +++ b/lib/WebGUI/Workflow/Activity/CleanTempStorage.pm @@ -119,7 +119,7 @@ sub execute { } } # taking too long, give up - return $self->WAITING if (time() - $start > 50); + return $self->WAITING(1) if (time() - $start > 50); } # kill temporary files @@ -149,7 +149,7 @@ sub recurseFileSystem { foreach my $file (@filelist) { unless ($file eq "." || $file eq "..") { # taking too long, time to abort - return $self->WAITING if (time() - $start > 50); + return $self->WAITING(1) if (time() - $start > 50); # must search for children $self->recurseFileSystem($start, $path."/".$file); diff --git a/lib/WebGUI/Workflow/Activity/CommitVersionTag.pm b/lib/WebGUI/Workflow/Activity/CommitVersionTag.pm index a84771899..b795eb669 100644 --- a/lib/WebGUI/Workflow/Activity/CommitVersionTag.pm +++ b/lib/WebGUI/Workflow/Activity/CommitVersionTag.pm @@ -73,8 +73,9 @@ sub execute { my $completion = $versionTag->commit({timeout => $self->getTTL}); if ($completion == 1) { return $self->COMPLETE; - } elsif ($completion == 2) { - return $self->WAITING; + } + elsif ($completion == 2) { + return $self->WAITING(1); } return $self->ERROR; } diff --git a/lib/WebGUI/Workflow/Activity/DeleteExpiredSessions.pm b/lib/WebGUI/Workflow/Activity/DeleteExpiredSessions.pm index b6dd12d13..ceeec2d9b 100644 --- a/lib/WebGUI/Workflow/Activity/DeleteExpiredSessions.pm +++ b/lib/WebGUI/Workflow/Activity/DeleteExpiredSessions.pm @@ -94,7 +94,7 @@ sub execute { } if ((time() - $time) > $ttl) { $sth->finish; - return $self->WAITING; + return $self->WAITING(1); } } diff --git a/lib/WebGUI/Workflow/Activity/DeleteExportedFiles.pm b/lib/WebGUI/Workflow/Activity/DeleteExportedFiles.pm index 11f227353..6e42a2654 100644 --- a/lib/WebGUI/Workflow/Activity/DeleteExportedFiles.pm +++ b/lib/WebGUI/Workflow/Activity/DeleteExportedFiles.pm @@ -116,7 +116,7 @@ sub execute { pause: $instance->setScratch(DELETE_FILES_SCRATCH, Storable::freeze(\@files)); $instance->setScratch(PRUNE_DIRS_SCRATCH, Storable::freeze(\@dirs)); - return $self->WAITING; + return $self->WAITING(1); } 1; diff --git a/lib/WebGUI/Workflow/Activity/DenyUnansweredFriends.pm b/lib/WebGUI/Workflow/Activity/DenyUnansweredFriends.pm index 3f908b8f6..03238bee8 100644 --- a/lib/WebGUI/Workflow/Activity/DenyUnansweredFriends.pm +++ b/lib/WebGUI/Workflow/Activity/DenyUnansweredFriends.pm @@ -91,7 +91,7 @@ sub execute { } if (time() - $start > $ttl) { $pending->finish; - return $self->WAITING; + return $self->WAITING(1); } } return $self->COMPLETE; diff --git a/lib/WebGUI/Workflow/Activity/ExpireEmsCartItems.pm b/lib/WebGUI/Workflow/Activity/ExpireEmsCartItems.pm index b9403c900..cf9b01dca 100644 --- a/lib/WebGUI/Workflow/Activity/ExpireEmsCartItems.pm +++ b/lib/WebGUI/Workflow/Activity/ExpireEmsCartItems.pm @@ -91,7 +91,7 @@ sub execute { if (time() - $start > $ttl) { $items->finish; $log->('Ran out of time. Will have to expire the rest later.'); - return $self->WAITING; + return $self->WAITING(1); } } $log->info('No more EMS items to expire.'); diff --git a/lib/WebGUI/Workflow/Activity/GetSyndicatedContent.pm b/lib/WebGUI/Workflow/Activity/GetSyndicatedContent.pm index 397a92c47..ed79386b4 100644 --- a/lib/WebGUI/Workflow/Activity/GetSyndicatedContent.pm +++ b/lib/WebGUI/Workflow/Activity/GetSyndicatedContent.pm @@ -112,7 +112,7 @@ sub execute { # if there are urls left, we need to process again if (scalar(@$assets) > 0) { $instance->setScratch("syndicatedassets", JSON->new->encode($assets)); - return $self->WAITING; + return $self->WAITING(1); } # if we've completed the list, clean up diff --git a/lib/WebGUI/Workflow/Activity/NotifyAboutLowStock.pm b/lib/WebGUI/Workflow/Activity/NotifyAboutLowStock.pm index e3ab1ed69..f31451c35 100644 --- a/lib/WebGUI/Workflow/Activity/NotifyAboutLowStock.pm +++ b/lib/WebGUI/Workflow/Activity/NotifyAboutLowStock.pm @@ -119,7 +119,7 @@ sub execute { $instance->setScratch('LowStockMessage', $message); $instance->setScratch('LowStockLast', $counter); $instance->setScratch('LowStockBelow', $belowThreshold); - return $self->WAITING; + return $self->WAITING(1); } $instance->deleteScratch('LowStockMessage'); diff --git a/lib/WebGUI/Workflow/Activity/PurgeOldAssetRevisions.pm b/lib/WebGUI/Workflow/Activity/PurgeOldAssetRevisions.pm index 785b4bd59..965f6b0e3 100644 --- a/lib/WebGUI/Workflow/Activity/PurgeOldAssetRevisions.pm +++ b/lib/WebGUI/Workflow/Activity/PurgeOldAssetRevisions.pm @@ -115,7 +115,7 @@ sub execute { $log->info("Ran out of time, will pick up with revision $version when we start again."); $instance->setScratch("purgeOldAssetsLastRevisionDate", $version); $sth->finish; - return $self->WAITING; + return $self->WAITING(1); } } return $self->COMPLETE; diff --git a/lib/WebGUI/Workflow/Activity/PurgeOldInboxMessages.pm b/lib/WebGUI/Workflow/Activity/PurgeOldInboxMessages.pm index b5745b7b5..e73dda3b6 100644 --- a/lib/WebGUI/Workflow/Activity/PurgeOldInboxMessages.pm +++ b/lib/WebGUI/Workflow/Activity/PurgeOldInboxMessages.pm @@ -100,13 +100,13 @@ sub execute { # give up if we're taking too long if (time - $start > 120) { $sth->finish; - return $self->WAITING; + return $self->WAITING(1); } } # If there are more messages waiting to be purged, return WAITING if ( $sth->rows >= $limit ) { - return $self->WAITING; + return $self->WAITING(1); } else { return $self->COMPLETE; diff --git a/lib/WebGUI/Workflow/Activity/RequestApprovalForVersionTag.pm b/lib/WebGUI/Workflow/Activity/RequestApprovalForVersionTag.pm index 5135eea90..603fbb06b 100644 --- a/lib/WebGUI/Workflow/Activity/RequestApprovalForVersionTag.pm +++ b/lib/WebGUI/Workflow/Activity/RequestApprovalForVersionTag.pm @@ -194,7 +194,7 @@ sub execute { # Update approval status $instance->setScratch( "status", "notified" ); - return $self->WAITING; + return $self->WAITING(60*20); } # Second and subsequent times, check status # Tag is denied @@ -217,7 +217,7 @@ sub execute { } # If we haven't done anything, spin the wheel again - return $self->WAITING; + return $self->WAITING(60*60); } #---------------------------------------------------------------------------- diff --git a/lib/WebGUI/Workflow/Activity/SendNewsletters.pm b/lib/WebGUI/Workflow/Activity/SendNewsletters.pm index 724b34a95..2c6716edb 100644 --- a/lib/WebGUI/Workflow/Activity/SendNewsletters.pm +++ b/lib/WebGUI/Workflow/Activity/SendNewsletters.pm @@ -198,7 +198,7 @@ sub execute { if (time() - $time > 50) { $eh->info("Oops. Ran out of time. Will continue building newsletters in a bit."); $subscriptionResultSet->finish; - return $self->WAITING; + return $self->WAITING(1); } } return $self->COMPLETE; diff --git a/lib/WebGUI/Workflow/Activity/SyncProfilesToLdap.pm b/lib/WebGUI/Workflow/Activity/SyncProfilesToLdap.pm index 6032d30da..32e8df85a 100644 --- a/lib/WebGUI/Workflow/Activity/SyncProfilesToLdap.pm +++ b/lib/WebGUI/Workflow/Activity/SyncProfilesToLdap.pm @@ -184,7 +184,7 @@ sub execute { $link->unbind if defined $link; $instance->setScratch('ldapSelectIndex', $index); $sth->finish; - return $self->WAITING; + return $self->WAITING(1); } } diff --git a/lib/WebGUI/Workflow/Activity/WaitUntil.pm b/lib/WebGUI/Workflow/Activity/WaitUntil.pm index 1bd52598c..3d11ee72e 100644 --- a/lib/WebGUI/Workflow/Activity/WaitUntil.pm +++ b/lib/WebGUI/Workflow/Activity/WaitUntil.pm @@ -82,7 +82,8 @@ sub execute { my $versionTag = shift; my $session = $self->session; my $urlOfSingleAsset = ""; - +$session->log->warn('a'); + #By default, we'll make it so that things happen now. my $time = $session->datetime->time(); @@ -93,19 +94,25 @@ sub execute { elsif ($self->get("type") eq "endTime") { $time = $versionTag->get("endTime"); } +$session->log->warn('b'); #Turn start or end time into an epoch value my $dt = WebGUI::DateTime->new($session,$time); +$session->log->warn('c'); #Get the current UTC time my $now = WebGUI::DateTime->new($session,$session->datetime->time()); +$session->log->warn('d'); #Workflow is complete if the time has passed. if($now->epoch >= $dt->epoch) { return $self->COMPLETE; } - return $self->WAITING; +$session->log->warn('e'); + $session->log->warn($dt->epoch - $now->epoch); + + return $self->WAITING($dt->epoch - $now->epoch); } diff --git a/lib/WebGUI/Workflow/Activity/_activity.skeleton b/lib/WebGUI/Workflow/Activity/_activity.skeleton index 9d62c41bc..61fb641d1 100644 --- a/lib/WebGUI/Workflow/Activity/_activity.skeleton +++ b/lib/WebGUI/Workflow/Activity/_activity.skeleton @@ -80,8 +80,10 @@ sub execute { # do some work here, whatever this activity is supposed to do # Workflow is finished return $self->COMPLETE; - # Or needs to be run again to finish processing - #return $self->WAITING; + # Or we ran out of time, run again ASAP + return $self->WAITING(1); + # Or we're waiting on some external process to complete, wait an hour + #return $self->WAITING(60*60); # Or encountered an error and cannot finish #return $self->ERROR; } diff --git a/sbin/spectre.pl b/sbin/spectre.pl index 56e06e09d..3783261f9 100644 --- a/sbin/spectre.pl +++ b/sbin/spectre.pl @@ -37,7 +37,9 @@ GetOptions( 'help'=>\$help, 'ping'=>\$ping, 'shutdown'=>\$shutdown, + 'stop'=>\$shutdown, 'daemon'=>\$daemon, + 'start'=>\$daemon, 'debug' =>\$debug, 'status' => \$status, 'run' => \$run, @@ -75,17 +77,22 @@ if ($shutdown) { die $POE::Component::IKC::ClientLite::error unless defined $result; $remote->disconnect; undef $remote; -} elsif ($ping) { +} +elsif ($ping) { my $res = ping(); print "Spectre is Alive!\n" unless $res; print "Spectre is not responding.\n".$res if $res; -} elsif ($status) { +} +elsif ($status) { print getStatusReport(); -} elsif ($test) { +} +elsif ($test) { Spectre::Admin->runTests($config); -} elsif ($run) { +} +elsif ($run) { Spectre::Admin->new($config, $debug); -} elsif ($daemon) { +} +elsif ($daemon) { if (!ping()) { die "Spectre is already running.\n"; } @@ -140,9 +147,9 @@ spectre - WebGUI's workflow and scheduling. =head1 SYNOPSIS - spectre {--daemon|--run} [--debug] + spectre {--daemon | --start | --run} [--debug] - spectre --shutdown + spectre --shutdown | --stop spectre --ping @@ -199,11 +206,19 @@ where B is the IP address and B is the port number where Spectre should be listening for connections on according to B. +=item B<--start> + +Alias for --daemon. + =item B<--status> Shows a summary of Spectre's internal status. The summary contains a tally of suspended, waiting and running WebGUI Workflows. +=item B<--stop> + +Alias for --shutdown. + =item B<--test> Tests whether Spectre can connect to WebGUI. Both Spectre