workflows now run, but with errors, must fix that next
This commit is contained in:
parent
6fc6b00b49
commit
ccf1b3633f
6 changed files with 38 additions and 31 deletions
|
|
@ -106,6 +106,12 @@ sub addWorkflow {
|
|||
runningSince bigint,
|
||||
lastUpdate bigint
|
||||
)");
|
||||
$session->db->write("create table WorkflowInstanceScratch (
|
||||
instanceId varchar(22) binary not null,
|
||||
name varchar(255) not null,
|
||||
value text,
|
||||
primary key (instanceId, name)
|
||||
)");
|
||||
$session->db->write("create table Workflow (
|
||||
workflowId varchar(22) binary not null primary key,
|
||||
title varchar(255) not null default 'Untitled',
|
||||
|
|
|
|||
|
|
@ -76,7 +76,6 @@ A hash reference containing a row of data from the WorkflowInstance table.
|
|||
sub addJob {
|
||||
my ($self, $config, $job) = @_[OBJECT, ARG0, ARG1];
|
||||
$self->debug("Adding workflow instance ".$job->{instanceId}." from ".$config->getFilename." to job queue at priority ".$job->{priority}.".");
|
||||
# job list
|
||||
my $sitename = $config->get("sitename");
|
||||
$self->{_jobs}{$job->{instanceId}} = {
|
||||
sitename=>$sitename->[0],
|
||||
|
|
@ -165,7 +164,8 @@ Removes a workflow job from the processing queue.
|
|||
=cut
|
||||
|
||||
sub deleteJob {
|
||||
my ($self, $instanceId) = @_[OBJECT, ARG0];
|
||||
my ($self, $instanceId,$kernel) = @_[OBJECT, ARG0, KERNEL];
|
||||
$kernel->yield("suspendJob",$instanceId);
|
||||
$self->debug("Deleting workflow instance $instanceId from job queue.");
|
||||
my $priority = $self->{_jobs}{$instanceId}{priority};
|
||||
delete $self->{_jobs}{$instanceId};
|
||||
|
|
@ -242,7 +242,7 @@ sub new {
|
|||
bless $self, $class;
|
||||
my @publicEvents = qw(addJob deleteJob);
|
||||
POE::Session->create(
|
||||
object_states => [ $self => [qw(_start _stop checkJobs loadWorkflows runWorker), @publicEvents] ],
|
||||
object_states => [ $self => [qw(_start _stop checkJobs deleteJob suspendJob loadWorkflows runWorker workerResponse), @publicEvents] ],
|
||||
args=>[\@publicEvents]
|
||||
);
|
||||
}
|
||||
|
|
@ -272,19 +272,14 @@ sub runWorker {
|
|||
|
||||
#-------------------------------------------------------------------
|
||||
|
||||
=head2 suspendJob ( jobId )
|
||||
=head2 suspendJob ( )
|
||||
|
||||
This method puts a running job back into the available jobs pool thusly freeing up a slot in the running jobs pool. This is done when a job has executed a workflow activity, but the entire workflow has not yet completed.
|
||||
|
||||
=head3 jobId
|
||||
|
||||
The job being suspended.
|
||||
|
||||
=cut
|
||||
|
||||
sub suspendJob {
|
||||
my $self = shift;
|
||||
my $instanceId = shift;
|
||||
my ($self, $instanceId) = @_[OBJECT, ARG0];
|
||||
$self->debug("Suspending workflow instance ".$instanceId.".");
|
||||
$self->{_jobs}{$instanceId}{status} = "delay";
|
||||
$self->{_jobs}{$instanceId}{statusDelay} = $self->config->get("delayAfterSuspension") + time();
|
||||
|
|
@ -304,7 +299,7 @@ This method is called when the response from the runWorker() method is received.
|
|||
=cut
|
||||
|
||||
sub workerResponse {
|
||||
my $self = $_[OBJECT];
|
||||
my ($self, $kernel) = @_[OBJECT, KERNEL];
|
||||
$self->debug("Retrieving response from workflow instance job.");
|
||||
my ($request, $response, $entry) = @{$_[ARG1]};
|
||||
my $jobId = $request->header("X-JobId"); # got to figure out how to get this from the request, cuz the response may die
|
||||
|
|
@ -320,29 +315,29 @@ sub workerResponse {
|
|||
my $state = $response->content;
|
||||
if ($state eq "waiting") {
|
||||
$self->debug("Was told to wait on $jobId because we're still waiting on some external event.");
|
||||
$self->suspendJob($jobId);
|
||||
$kernel->yield("suspendJob",$jobId);
|
||||
} elsif ($state eq "complete") {
|
||||
$self->debug("Workflow instance $jobId ran one of it's activities successfully.");
|
||||
$self->suspendJob($jobId);
|
||||
$kernel->yield("suspendJob",$jobId);
|
||||
} elsif ($state eq "disabled") {
|
||||
$self->debug("Workflow instance $jobId is disabled.");
|
||||
$self->deleteJob($jobId);
|
||||
$kernel->yield("deleteJob",$jobId);
|
||||
} elsif ($state eq "done") {
|
||||
$self->debug("Workflow instance $jobId is now complete.");
|
||||
$self->deleteJob($jobId);
|
||||
$kernel->yield("deleteJob",$jobId);
|
||||
} elsif ($state eq "error") {
|
||||
$self->debug("Got an error for $jobId.");
|
||||
$self->suspendJob($jobId);
|
||||
$kernel->yield("suspendJob",$jobId);
|
||||
} else {
|
||||
$self->debug("Something bad happened on the return of $jobId.");
|
||||
$self->suspendJob($jobId);
|
||||
$kernel->yield("suspendJob",$jobId);
|
||||
# something bad happened
|
||||
}
|
||||
} elsif ($response->is_redirect) {
|
||||
$self->debug("Response for $jobId was redirected.");
|
||||
} elsif ($response->is_error) {
|
||||
$self->debug("Response for $jobId had a communications error.");
|
||||
$self->suspendJob($jobId)
|
||||
$kernel->yield("suspendJob",$jobId)
|
||||
# we should probably log something
|
||||
}
|
||||
}
|
||||
|
|
|
|||
|
|
@ -309,21 +309,15 @@ Checks to ensure the requestor is who we think it is, and then executes a workfl
|
|||
|
||||
sub www_runWorkflow {
|
||||
my $session = shift;
|
||||
$session->errorHandler->warn("A");
|
||||
$session->http->setMimeType("text/plain");
|
||||
$session->errorHandler->warn("b");
|
||||
return "error" unless (isInSubnet($session->env->get("REMOTE_ADDR"), $session->config->get("spectreSubnets")));
|
||||
$session->errorHandler->warn("c");
|
||||
my $instanceId = $session->form->param("instanceId");
|
||||
if ($instanceId) {
|
||||
$session->errorHandler->warn("d");
|
||||
my $instance = WebGUI::Workflow::Instance->new($session, $instanceId);
|
||||
if (defined $instance) {
|
||||
$session->errorHandler->warn("e");
|
||||
return $instance->run;
|
||||
}
|
||||
}
|
||||
$session->errorHandler->warn("f");
|
||||
return "error";
|
||||
}
|
||||
|
||||
|
|
|
|||
|
|
@ -117,7 +117,7 @@ sub execute {
|
|||
my $self = shift;
|
||||
my $placeholders = shift || [];
|
||||
my $sql = $self->{_sql};
|
||||
$self->sth->execute(@{ $placeholders }) or $self->db->session->errorHandler->fatal("Couldn't execute prepared statement: $sql Root cause: ". $self->errorMessage);
|
||||
$self->sth->execute(@{ $placeholders }) or $self->db->session->errorHandler->fatal("Couldn't execute prepared statement: $sql : With place holders: ".join(", ", @{$placeholders}).". Root cause: ". $self->errorMessage);
|
||||
}
|
||||
|
||||
|
||||
|
|
|
|||
|
|
@ -242,7 +242,7 @@ sub requestCommit {
|
|||
my $instance = WebGUI::Workflow::Instance->create($self->session, {
|
||||
workflowId=>$self->get("workflowId"),
|
||||
className=>"WebGUI::VersionTag",
|
||||
method=>"new",
|
||||
methodName=>"new",
|
||||
parameters=>$self->getId
|
||||
});
|
||||
}
|
||||
|
|
|
|||
|
|
@ -214,20 +214,32 @@ sub run {
|
|||
my $workflow = WebGUI::Workflow->new($self->session, $self->get("workflowId"));
|
||||
return "undefined" unless (defined $workflow);
|
||||
return "disabled" unless ($workflow->get("enabled"));
|
||||
my $activity = $workflow->getNextActivity($self->get("currentActivity"));
|
||||
return "done" unless (defined $activity);
|
||||
my $activity = $workflow->getNextActivity($self->get("currentActivityId"));
|
||||
if (defined $activity) {
|
||||
$self->set({"currentActivityId",$activity->getId});
|
||||
} else {
|
||||
$self->delete;
|
||||
return "done";
|
||||
}
|
||||
my $object = {};
|
||||
my $class = $self->get("className");
|
||||
my $method = $self->get("methodName");
|
||||
my $params = $self->get("parameters");
|
||||
if ($class && $method) {
|
||||
$object = eval($class->$method($self->session, $params));
|
||||
my $cmd = "use $class";
|
||||
eval($cmd);
|
||||
if ($@) {
|
||||
$self->session->errorHandler->warn("Error instanciating activity (".$activity->getId.") pass-in object: ".$@);
|
||||
$self->session->errorHandler->warn("Error loading activity class $class: ".$@);
|
||||
return "error";
|
||||
}
|
||||
eval{ $object = $class->$method($self->session, $params) };
|
||||
if ($@) {
|
||||
$self->session->errorHandler->warn("Error instanciating activity (".$activity->getId.") pass-in object: ".$@);
|
||||
return "error";
|
||||
}
|
||||
return $activity->execute($object, $self);
|
||||
}
|
||||
return $activity->execute($object, $self);
|
||||
return $activity->execute(undef, $self);
|
||||
}
|
||||
|
||||
|
||||
|
|
|
|||
Loading…
Add table
Add a link
Reference in a new issue