diff --git a/docs/upgrades/upgrade_6.8.7-6.99.0.pl b/docs/upgrades/upgrade_6.8.7-6.99.0.pl index 057851ea6..ec86e45ea 100644 --- a/docs/upgrades/upgrade_6.8.7-6.99.0.pl +++ b/docs/upgrades/upgrade_6.8.7-6.99.0.pl @@ -106,6 +106,12 @@ sub addWorkflow { runningSince bigint, lastUpdate bigint )"); + $session->db->write("create table WorkflowInstanceScratch ( + instanceId varchar(22) binary not null, + name varchar(255) not null, + value text, + primary key (instanceId, name) + )"); $session->db->write("create table Workflow ( workflowId varchar(22) binary not null primary key, title varchar(255) not null default 'Untitled', diff --git a/lib/Spectre/Workflow.pm b/lib/Spectre/Workflow.pm index dfac9e0c2..bd4c46723 100644 --- a/lib/Spectre/Workflow.pm +++ b/lib/Spectre/Workflow.pm @@ -76,7 +76,6 @@ A hash reference containing a row of data from the WorkflowInstance table. sub addJob { my ($self, $config, $job) = @_[OBJECT, ARG0, ARG1]; $self->debug("Adding workflow instance ".$job->{instanceId}." from ".$config->getFilename." to job queue at priority ".$job->{priority}."."); - # job list my $sitename = $config->get("sitename"); $self->{_jobs}{$job->{instanceId}} = { sitename=>$sitename->[0], @@ -165,7 +164,8 @@ Removes a workflow job from the processing queue. =cut sub deleteJob { - my ($self, $instanceId) = @_[OBJECT, ARG0]; + my ($self, $instanceId,$kernel) = @_[OBJECT, ARG0, KERNEL]; + $kernel->yield("suspendJob",$instanceId); $self->debug("Deleting workflow instance $instanceId from job queue."); my $priority = $self->{_jobs}{$instanceId}{priority}; delete $self->{_jobs}{$instanceId}; @@ -242,7 +242,7 @@ sub new { bless $self, $class; my @publicEvents = qw(addJob deleteJob); POE::Session->create( - object_states => [ $self => [qw(_start _stop checkJobs loadWorkflows runWorker), @publicEvents] ], + object_states => [ $self => [qw(_start _stop checkJobs deleteJob suspendJob loadWorkflows runWorker workerResponse), @publicEvents] ], args=>[\@publicEvents] ); } @@ -272,19 +272,14 @@ sub runWorker { #------------------------------------------------------------------- -=head2 suspendJob ( jobId ) +=head2 suspendJob ( ) This method puts a running job back into the available jobs pool thusly freeing up a slot in the running jobs pool. This is done when a job has executed a workflow activity, but the entire workflow has not yet completed. -=head3 jobId - -The job being suspended. - =cut sub suspendJob { - my $self = shift; - my $instanceId = shift; + my ($self, $instanceId) = @_[OBJECT, ARG0]; $self->debug("Suspending workflow instance ".$instanceId."."); $self->{_jobs}{$instanceId}{status} = "delay"; $self->{_jobs}{$instanceId}{statusDelay} = $self->config->get("delayAfterSuspension") + time(); @@ -304,7 +299,7 @@ This method is called when the response from the runWorker() method is received. =cut sub workerResponse { - my $self = $_[OBJECT]; + my ($self, $kernel) = @_[OBJECT, KERNEL]; $self->debug("Retrieving response from workflow instance job."); my ($request, $response, $entry) = @{$_[ARG1]}; my $jobId = $request->header("X-JobId"); # got to figure out how to get this from the request, cuz the response may die @@ -320,29 +315,29 @@ sub workerResponse { my $state = $response->content; if ($state eq "waiting") { $self->debug("Was told to wait on $jobId because we're still waiting on some external event."); - $self->suspendJob($jobId); + $kernel->yield("suspendJob",$jobId); } elsif ($state eq "complete") { $self->debug("Workflow instance $jobId ran one of it's activities successfully."); - $self->suspendJob($jobId); + $kernel->yield("suspendJob",$jobId); } elsif ($state eq "disabled") { $self->debug("Workflow instance $jobId is disabled."); - $self->deleteJob($jobId); + $kernel->yield("deleteJob",$jobId); } elsif ($state eq "done") { $self->debug("Workflow instance $jobId is now complete."); - $self->deleteJob($jobId); + $kernel->yield("deleteJob",$jobId); } elsif ($state eq "error") { $self->debug("Got an error for $jobId."); - $self->suspendJob($jobId); + $kernel->yield("suspendJob",$jobId); } else { $self->debug("Something bad happened on the return of $jobId."); - $self->suspendJob($jobId); + $kernel->yield("suspendJob",$jobId); # something bad happened } } elsif ($response->is_redirect) { $self->debug("Response for $jobId was redirected."); } elsif ($response->is_error) { $self->debug("Response for $jobId had a communications error."); - $self->suspendJob($jobId) + $kernel->yield("suspendJob",$jobId) # we should probably log something } } diff --git a/lib/WebGUI/Operation/Workflow.pm b/lib/WebGUI/Operation/Workflow.pm index cf3c5c097..7a5d62416 100644 --- a/lib/WebGUI/Operation/Workflow.pm +++ b/lib/WebGUI/Operation/Workflow.pm @@ -309,21 +309,15 @@ Checks to ensure the requestor is who we think it is, and then executes a workfl sub www_runWorkflow { my $session = shift; -$session->errorHandler->warn("A"); $session->http->setMimeType("text/plain"); -$session->errorHandler->warn("b"); return "error" unless (isInSubnet($session->env->get("REMOTE_ADDR"), $session->config->get("spectreSubnets"))); -$session->errorHandler->warn("c"); my $instanceId = $session->form->param("instanceId"); if ($instanceId) { -$session->errorHandler->warn("d"); my $instance = WebGUI::Workflow::Instance->new($session, $instanceId); if (defined $instance) { -$session->errorHandler->warn("e"); return $instance->run; } } -$session->errorHandler->warn("f"); return "error"; } diff --git a/lib/WebGUI/SQL/ResultSet.pm b/lib/WebGUI/SQL/ResultSet.pm index bde882507..6b2556c56 100644 --- a/lib/WebGUI/SQL/ResultSet.pm +++ b/lib/WebGUI/SQL/ResultSet.pm @@ -117,7 +117,7 @@ sub execute { my $self = shift; my $placeholders = shift || []; my $sql = $self->{_sql}; - $self->sth->execute(@{ $placeholders }) or $self->db->session->errorHandler->fatal("Couldn't execute prepared statement: $sql Root cause: ". $self->errorMessage); + $self->sth->execute(@{ $placeholders }) or $self->db->session->errorHandler->fatal("Couldn't execute prepared statement: $sql : With place holders: ".join(", ", @{$placeholders}).". Root cause: ". $self->errorMessage); } diff --git a/lib/WebGUI/VersionTag.pm b/lib/WebGUI/VersionTag.pm index 37aa01cab..9722302d6 100644 --- a/lib/WebGUI/VersionTag.pm +++ b/lib/WebGUI/VersionTag.pm @@ -242,7 +242,7 @@ sub requestCommit { my $instance = WebGUI::Workflow::Instance->create($self->session, { workflowId=>$self->get("workflowId"), className=>"WebGUI::VersionTag", - method=>"new", + methodName=>"new", parameters=>$self->getId }); } diff --git a/lib/WebGUI/Workflow/Instance.pm b/lib/WebGUI/Workflow/Instance.pm index a07630930..1a7fe080d 100644 --- a/lib/WebGUI/Workflow/Instance.pm +++ b/lib/WebGUI/Workflow/Instance.pm @@ -214,20 +214,32 @@ sub run { my $workflow = WebGUI::Workflow->new($self->session, $self->get("workflowId")); return "undefined" unless (defined $workflow); return "disabled" unless ($workflow->get("enabled")); - my $activity = $workflow->getNextActivity($self->get("currentActivity")); - return "done" unless (defined $activity); + my $activity = $workflow->getNextActivity($self->get("currentActivityId")); + if (defined $activity) { + $self->set({"currentActivityId",$activity->getId}); + } else { + $self->delete; + return "done"; + } my $object = {}; my $class = $self->get("className"); my $method = $self->get("methodName"); my $params = $self->get("parameters"); if ($class && $method) { - $object = eval($class->$method($self->session, $params)); + my $cmd = "use $class"; + eval($cmd); if ($@) { - $self->session->errorHandler->warn("Error instanciating activity (".$activity->getId.") pass-in object: ".$@); + $self->session->errorHandler->warn("Error loading activity class $class: ".$@); return "error"; } + eval{ $object = $class->$method($self->session, $params) }; + if ($@) { + $self->session->errorHandler->warn("Error instanciating activity (".$activity->getId.") pass-in object: ".$@); + return "error"; + } + return $activity->execute($object, $self); } - return $activity->execute($object, $self); + return $activity->execute(undef, $self); }