webgui/lib/WebGUI/Fork.pm

701 lines
19 KiB
Perl

package WebGUI::Fork;
use warnings;
use strict;
use File::Spec;
use JSON;
use POSIX;
use Config;
use IO::Pipe;
use WebGUI::Session;
use WebGUI::Pluggable;
use Time::HiRes qw(sleep);
=head1 NAME
WebGUI::Fork
=head1 DESCRIPTION
Safely and portably spawn a long running process that you can check the
status of.
=head1 SYNOPSIS
package WebGUI::Some::Class;
sub doWork {
my ($process, $data) = @_;
$process->update("Starting...");
...
$process->update("About half way done...");
...
$process->update("Finished!");
}
sub www_doWork {
my $self = shift;
my $session = $self->session;
my $process = WebGUI::Fork->start(
$session, 'WebGUI::Some::Class', 'doWork', { some => 'data' }
);
# See WebGUI::Operation::Fork
my $pairs = $process->contentPairs('DoWork');
$session->http->setRedirect($self->getUrl($pairs));
return 'redirect';
}
package WebGUI::Operation::Fork::DoWork;
sub handler {
my $process = shift;
my $session = $process->session;
return $session->style->userStyle($process->status);
# or better yet, an ajaxy page that polls.
}
=head1 LEGAL
-------------------------------------------------------------------
WebGUI is Copyright 2001-2009 Plain Black Corporation.
-------------------------------------------------------------------
Please read the legal notices (docs/legal.txt) and the license
(docs/license.txt) that came with this distribution before using
this software.
-------------------------------------------------------------------
http://www.plainblack.com info@plainblack.com
-------------------------------------------------------------------
=head1 METHODS
=cut
#-----------------------------------------------------------------
=head2 canView ($user?)
Returns whether the current user (or the user passed in, if there is one) has
permission to view the status of the fork. By default, only admins can view,
but see setGroup.
=cut
sub canView {
my $self = shift;
my $session = $self->session;
my $user = shift || $session->user;
$user = WebGUI::User->new( $session, $user )
unless eval { $user->isa('WebGUI::User') };
return
$user->isAdmin
|| $user->userId eq $self->getUserId
|| $user->isInGroup( $self->getGroupId );
}
#-------------------------------------------------------------------
=head2 cleanINC(@INC)
Class method. Returns a new @INC array (but does not set it) to be used for
forked/daemonized processes. Note that you pass in @INC and new one is
returned, but doesn't replace the original. You must do that yourself.
=cut
sub cleanINC {
# gotta get rid of hooks until we think of a way to serialize them in
# forkAndExec (if we ever want to). Serializing coderefs is a tricky
# business.
map { File::Spec->rel2abs($_) } grep { !ref } @_;
}
#-------------------------------------------------------------------
=head2 contentPairs ($module, $pid, $extra)
Returns a bit of query string useful for redirecting to a
WebGUI::Operation::Fork plugin. $module should be the bit that comes after
WebGUI::Operation::Fork, e.g. $process->contentPairs('Foo') should return
something like "op=fork;module=Foo;pid=adlfjafo87ad9f78a7", which will
get dispatched to WebGUI::Operation::Fork::Foo::handler($process).
$extra is an optional hashref that will add further parameters onto the list
of pairs, e.g. { foo => 'bar' } becomes ';foo=bar'
=cut
sub contentPairs {
my ( $self, $module, $extra ) = @_;
my $url = $self->session->url;
my $pid = $self->getId;
my %params = (
op => 'fork',
module => $module,
pid => $self->getId,
$extra ? %$extra : ()
);
return join(
';',
map {
my $k = $_;
join( '=', map { $url->escape($_) } ( $k, $params{$k} ) );
} keys %params
);
} ## end sub contentPairs
#-----------------------------------------------------------------
=head2 create ( )
Internal class method. Creates a new Fork object inserts it into the db.
=cut
sub create {
my ( $class, $session ) = @_;
my $id = $session->id->generate;
my %data = ( userId => $session->user->userId );
$session->db->setRow( $class->tableName, 'id', \%data, $id );
bless { session => $session, id => $id }, $class;
}
#-----------------------------------------------------------------
=head2 daemonize ( $stdin, $sub )
Internal lass method. Runs the given $sub in daemon, and prints $stdin to its
stdin.
=cut
sub daemonize {
my ( $class, $stdin, $sub ) = @_;
my $pid = fork();
die "Cannot fork: $!" unless defined $pid;
if ($pid) {
# The child process will fork again and exit immediately, so we can
# wait for it (and thus not have zombie processes).
waitpid( $pid, 0 );
return;
}
eval {
# detach from controlling terminal, get us into a new process group
die "Cannot become session leader: $!" if POSIX::setsid() < 0;
# Fork again so we never get a controlling terminal
my $worker = IO::Pipe->new;
my $pid = fork();
die "Child cannot fork: $!" unless defined $pid;
# We don't want to call any destructors, as it would mess with the
# parent's mysql connection, etc.
if ($pid) {
$worker->writer;
$worker->printflush($stdin);
POSIX::_exit(0);
}
# We're now in the final target process. STDIN should be whatever the
# parent printed to us, and all output should go to /dev/null.
$worker->reader();
open STDIN, '<&', $worker or die "Cannot dup stdin: $!";
open STDOUT, '>', '/dev/null' or die "Cannot write /dev/null: $!";
open STDERR, '>&', \*STDOUT or die "Cannot dup stdout: $!";
# Standard daemon-y things...
$SIG{HUP} = 'IGNORE';
chdir '/';
umask 0;
# Forcibly close any non-std open file descriptors that remain
my $max = POSIX::sysconf(&POSIX::_SC_OPEN_MAX) || 1024;
POSIX::close($_) for ( $^F .. $max );
# Do whatever we're supposed to do
&$sub();
};
POSIX::_exit( $@ ? -1 : 0 );
} ## end sub daemonize
#-----------------------------------------------------------------
=head2 delete ( )
Clean up the information for this process from the database.
=cut
sub delete {
my $self = shift;
$self->session->db->deleteRow( $self->tableName, 'id', $self->getId );
}
#-----------------------------------------------------------------
=head2 endTime ( )
Returns the epoch time indicating when the subroutine passed to run() finished
executing, or undef if it hasn't finished. Note that even if the sub passed
to run dies, an endTime will be recorded.
=cut
sub endTime { $_[0]->get('endTime') }
#-----------------------------------------------------------------
=head2 error ( $msg )
Call this to record an error status. You probably shouldn't, though -- just
dying from your subroutine will cause this to be set.
=cut
sub error { $_[0]->set( { error => $_[1] } ) }
#-----------------------------------------------------------------
=head2 finish ( )
Mark the process as being finished. This is called for you when your
subroutine is finished. If update() wasn't computed on the last call, it will
be computed now.
=cut
sub finish {
my $self = shift;
my %props = ( finished => 1 );
if ( my $calc = delete $self->{delay} ) {
$props{status} = $calc->();
$props{latch} = 0;
}
$props{endTime} = time();
$props{redirect} = $self->{redirect};
$self->set( \%props );
}
#-----------------------------------------------------------------
=head2 forkAndExec ($request)
Internal method. Forks and execs a new perl process to run $request. This is
used as a fallback if the master daemon runner is not working.
=cut
sub forkAndExec {
my ( $self, $request ) = @_;
my $id = $self->getId;
my $class = ref $self;
my $json = JSON::encode_json($request);
my @inc = map { "-I$_" } $class->cleanINC(@INC);
my @argv = (@inc, "-M$class", "-e$class->runCmd()" );
$class->daemonize(
$json,
sub {
exec ($Config{perlpath}, @argv) or die "Could not exec: $!";
}
);
}
#-----------------------------------------------------------------
=head2 get ( @keys )
Get data from the database record for this process (returned as a simple list,
not an arrayref). Valid keys are: id, status, error, startTime, endTime,
finished, groupId, userId. They all have more specific accessors, but you can
use this to get several at once if you're very careful. You should probably
use the accessors, though, since some of them have extra logic.
=cut
sub get {
my ( $self, @keys ) = @_;
my $db = $self->session->db;
my $dbh = $db->dbh;
my $tbl = $dbh->quote_identifier( $self->tableName );
my $key
= @keys
? join( ',', map { $dbh->quote_identifier($_) } @keys )
: '*';
my $id = $dbh->quote( $self->getId );
my @values = $db->quickArray("SELECT $key FROM $tbl WHERE id = $id");
return ( @values > 1 ) ? @values : $values[0];
}
#-----------------------------------------------------------------
=head2 getError ( )
If the process died, this will be set to stringified $@.
=cut
sub getError { $_[0]->get('error') }
#-----------------------------------------------------------------
=head2 getGroupId
Returns the group ID (not the actual WebGUI::Group) of users who are allowed
to view this process.
=cut
sub getGroupId {
my $id = $_[0]->get('groupId');
return $id || 3;
}
#-----------------------------------------------------------------
=head2 getId ( )
The unique id for this fork. Note: this is NOT the pid, but a WebGUI guid.
=cut
sub getId { shift->{id} }
#-----------------------------------------------------------------
=head2 getStatus()
Signals the fork that it should report its next status, then polls at a
configurable, fractional interval (default: .1 seconds) waiting for the fork
to claim that its status has been updated. Returns the updated status. See
setWait() for a way to change the interval (or disable the waiting procedure
entirely). We will only wait for a maximum of 100 intervals.
=cut
sub getStatus {
my $self = shift;
if ( my $interval = $self->{interval} ) {
$self->set( { latch => 1 } );
my $maxWait;
while ($maxWait++ < 100) {
sleep $interval;
my ( $finished, $latch ) = $self->get( 'finished', 'latch' );
last if $finished || !$latch;
}
}
return $self->get('status');
}
#-----------------------------------------------------------------
=head2 getUserId
Returns the userId of the user who initiated this Fork.
=cut
sub getUserId { $_[0]->get('userId') }
#-----------------------------------------------------------------
=head2 init ( )
Spawn a master process from which Forks will fork(). The intent
is for this to be called once at server startup time, after you've preloaded
modules and before you start listening for requests. Returns a filehandle that
can be used to print requests to the master process, and which you almost
certainly shouldn't use (it's mostly for testing).
=cut
my $pipe;
sub init {
my $class = shift;
$pipe = IO::Pipe->new;
my $pid = fork();
die "Cannot fork: $!" unless defined $pid;
if ($pid) {
$pipe->writer;
return $pipe;
}
$0 = 'webgui-fork-master';
$pipe->reader;
local $/ = "\x{0}";
@INC = $class->cleanINC(@INC);
while ( my $request = $pipe->getline ) {
chomp $request;
eval {
$class->daemonize( $request, sub { $class->runCmd } );
};
}
CORE::exit(0);
} ## end sub init
#-----------------------------------------------------------------
=head2 isFinished ( )
A simple flag indicating that the fork is no longer running.
=cut
sub isFinished { $_[0]->get('finished') }
#-----------------------------------------------------------------
=head2 new ( $session, $id )
Returns an object capable of checking on the status of the fork indicated by
$id. Returns undef if there is no such process.
=cut
sub new {
my ( $class, $session, $id ) = @_;
my $db = $session->db;
my $tbl = $db->dbh->quote_identifier( $class->tableName );
my $sql = "SELECT COUNT(*) FROM $tbl WHERE id = ?";
my $exists = $db->quickScalar( $sql, [$id] );
return $exists
? bless( { session => $session, id => $id, interval => .1 }, $class )
: undef;
}
#-----------------------------------------------------------------
=head2 session ()
Get the WebGUI::Session this process was created with. Note: this is safe to
call in the child process, as it is a duplicated session (same session id) and
doesn't share any handles with the parent process.
=cut
sub session { $_[0]->{session} }
#-----------------------------------------------------------------
=head2 set ($properties)
Updates the database row with the properties given by the $properties hashref.
See get() for a list of valid keys.
=cut
sub set {
my ( $self, $values ) = @_;
my %row = ( id => $self->getId, %$values );
$self->session->db->setRow( $self->tableName, 'id', \%row );
}
#-----------------------------------------------------------------
=head2 setGroup($groupId)
Allow the given group (in addition to admins) the ability to check on the
status of this process
=cut
sub setGroup {
my ( $self, $groupId ) = @_;
$groupId = eval { $groupId->getId } || $groupId;
$self->set( { groupId => $groupId } );
}
#-----------------------------------------------------------------
=head2 setRedirect($url)
Allows a redirect to be set for the process after the initial fork. This happens
in the case when a file is to be downloaded after the fork finishes.
=cut
sub setRedirect {
my ( $self, $url ) = @_;
$self->{redirect} = $url;
}
#-----------------------------------------------------------------
=head2 request ($module, $subname, $data)
Internal method. Generates a hashref suitable for passing to runRequest.
=cut
sub request {
my ( $self, $module, $subname, $data ) = @_;
my $session = $self->session;
my $config = $session->config;
return {
webguiRoot => $config->getWebguiRoot,
configFile => $config->getFilename,
sessionId => $session->getId,
module => $module,
subname => $subname,
id => $self->getId,
data => $data,
};
}
#-----------------------------------------------------------------
=head2 runCmd ()
Internal class method. Decodes json off of stdin and passes it to runRequest.
=cut
sub runCmd {
my $class = shift;
my $slurp = do { local $/; <STDIN> };
$class->runRequest( JSON::decode_json($slurp) );
}
#-----------------------------------------------------------------
=head2 runRequest ($hashref)
Internal class method. Expects a hash of arguments describing what to run.
=cut
sub runRequest {
my ( $class, $args ) = @_;
my ( $root, $config, $sid ) = @{$args}{qw(webguiRoot configFile sessionId)};
my $session = WebGUI::Session->open( $root, $config, undef, undef, $sid );
my $id = $args->{id};
my $self = $class->new( $session, $id );
$self->set( { startTime => time } );
$0 = "webgui-fork-$id";
eval {
my ( $module, $subname, $data ) = @{$args}{qw(module subname data)};
WebGUI::Pluggable::run( $module, $subname, [ $self, $data ] );
};
$self->error($@) if $@;
$self->finish();
}
#-----------------------------------------------------------------
=head2 sendRequestToMaster ($request)
Internal method. Attempts to send a request to the master daemon runner.
Returns 1 on success and 0 on failure.
=cut
sub sendRequestToMaster {
my ( $self, $request ) = @_;
my $json = JSON::encode_json($request);
eval {
die 'pipe' unless $pipe && $pipe->isa('IO::Handle');
local $SIG{PIPE} = sub { die 'pipe' };
$pipe->printflush("$json\x{0}") or die 'pipe';
};
return 1 unless $@;
undef $pipe;
$self->session->log->error('Problems talking to master daemon process. Please restart the web server.');
return 0;
}
#-----------------------------------------------------------------
=head2 setWait ( $interval )
Use this to control the pace at which getStatus will poll for updated
statuses. By default, this is a tenth of a second. If you set it to 0,
getStatus will still signal the fork for an update, but will take whatever is
currently recorded as the status and return immediately.
=cut
sub setWait { $_[0]->{interval} = $_[1] }
#-----------------------------------------------------------------
=head2 start ( $session, $module, $subname, $data )
Class method. Executes $module::subname in a forked process with ($process,
$data) as its arguments. The only restriction on $data is that it be
serializable by JSON.
=head3 $0
The process name (as it appears in ps) will be set to webgui-fork-$id,
where $id is the value returned by $process->getId. It thus won't look like a
modperl process to anyone monitoring the process table (wremonitor.pl, for
example).
=cut
sub start {
my ( $class, $session, $module, $subname, $data ) = @_;
my $self = $class->create($session);
my $request = $self->request( $module, $subname, $data );
$self->sendRequestToMaster($request) or $self->forkAndExec($request);
return $self;
}
#-----------------------------------------------------------------
=head2 startTime ( )
Returns the time this process started running in epoch format.
=cut
sub startTime { $_[0]->get('startTime') }
#-----------------------------------------------------------------
=head2 tableName ( )
Class method: a constant, for convenience. The name of the table that process
data is stored in.
=cut
sub tableName {'Fork'}
#-----------------------------------------------------------------
=head2 update ( $msg )
Set a new status for the fork. This can be anything, and will overwrite the
old status. JSON is recommended for complex statuses. Optionally, $msg can
be a subroutine that returns the new status -- if your status may take a long
time to compute, you should use this, as you may be able to avoid computing
some (or all) of your status updates, depending on how often they're being
asked for. See the getStatus method for details.
=cut
sub update {
my ( $self, $msg ) = @_;
if ( ref $msg eq 'CODE' ) {
if ( $self->get('latch') ) {
$msg = $msg->();
}
else {
$self->{delay} = $msg;
return;
}
}
delete $self->{delay};
$self->set( { latch => 0, status => $msg } );
}
1;