Streaming response API for contentHandlers is now taking shape
This commit is contained in:
parent
c0abcc3e4a
commit
c7235378d1
5 changed files with 94 additions and 74 deletions
140
lib/WebGUI.pm
140
lib/WebGUI.pm
|
|
@ -31,7 +31,6 @@ use Try::Tiny;
|
|||
|
||||
has root => ( is => 'ro', isa => 'Str', default => '/data/WebGUI' );
|
||||
has site => ( is => 'ro', isa => 'Str', default => 'dev.localhost.localdomain.conf' );
|
||||
has session => ( is => 'rw', isa => 'WebGUI::Session' );
|
||||
has config => ( is => 'rw', isa => 'WebGUI::Config' );
|
||||
|
||||
use overload q(&{}) => sub { shift->psgi_app }, fallback => 1;
|
||||
|
|
@ -75,60 +74,86 @@ sub BUILD {
|
|||
# Instantiate the WebGUI::Config object
|
||||
my $config = WebGUI::Config->new( $self->root, $self->site );
|
||||
$self->config($config);
|
||||
}
|
||||
}
|
||||
|
||||
sub psgi_app {
|
||||
my $self = shift;
|
||||
return $self->{psgi_app} ||= $self->compile_psgi_app;
|
||||
}
|
||||
|
||||
sub new_session {
|
||||
my $self = shift;
|
||||
my $request = shift;
|
||||
|
||||
# determine session id
|
||||
my $sessionId = $request->cookies->{$self->config->getCookieName};
|
||||
|
||||
# Instantiate the session object
|
||||
return WebGUI::Session->open($self->root, $self->config, $request, $sessionId);
|
||||
}
|
||||
|
||||
sub compile_psgi_app {
|
||||
my $self = shift;
|
||||
|
||||
my $catch = [ 500, [ 'Content-Type' => 'text/plain' ], [ "Internal Server Error\n" ] ];
|
||||
|
||||
my $app = sub {
|
||||
my $env = shift;
|
||||
|
||||
my $log = sub {
|
||||
$env->{'psgi.errors'}->print(join '', @_, "\n");
|
||||
};
|
||||
|
||||
return sub {
|
||||
my $callback = shift;
|
||||
my $request = WebGUI::Request->new($env);
|
||||
my $res = $self->dispatch($request);
|
||||
my $session = $self->new_session($request);
|
||||
|
||||
if ( ref $res eq 'WebGUI::Session' ) {
|
||||
my $session = $res;
|
||||
my $response = $session->response;
|
||||
try {
|
||||
$self->handle($request);
|
||||
} catch {
|
||||
$log->( "Error handling request: $_" );
|
||||
$callback->( $catch );
|
||||
return;
|
||||
};
|
||||
|
||||
my $response = $session->response;
|
||||
my $psgi_response = $response->finalize;
|
||||
|
||||
if ( $response->streaming ) {
|
||||
|
||||
try {
|
||||
# Response wants to stream itself, so ask PSGI server for a
|
||||
# streaming writer object by returning the PSGI response, minus the body
|
||||
|
||||
# Anything in the response body gets cleared (should be empty anyway)
|
||||
$response->body([]);
|
||||
my $psgi_response = $response->finalize;
|
||||
|
||||
# Ask PSGI server for a streaming writer object by returning a 2-part
|
||||
# arrayref instead of a 3-part array
|
||||
my $writer = $callback->( [ $psgi_response->[0], $psgi_response->[1] ] );
|
||||
|
||||
# Store the writer object in the WebGUI::Response object
|
||||
$response->writer($writer);
|
||||
|
||||
# ..and let the response stream itself
|
||||
# Now call the callback that does the streaming
|
||||
$response->streamer->($session);
|
||||
|
||||
# And finally, clean up
|
||||
$writer->close;
|
||||
$session->close;
|
||||
|
||||
} catch {
|
||||
if ($response->writer) {
|
||||
# Response has already been started, so log error and close writer
|
||||
warn "error caught after streaming response started";
|
||||
$log->("Error detected after streaming response started");
|
||||
$response->writer->close;
|
||||
} else {
|
||||
$callback->( [ 500, [ 'Content-type: text/html' ], [ 'An error occurred' ] ] );
|
||||
$callback->( $catch );
|
||||
}
|
||||
}
|
||||
} finally {
|
||||
$session->close;
|
||||
};
|
||||
} else {
|
||||
|
||||
# Not streaming, so immediately tell the callback to return
|
||||
# the response. In the future we could use an Event framework here
|
||||
# to make this a non-blocking delayed response.
|
||||
$callback->($res);
|
||||
$session->close;
|
||||
$callback->($psgi_response);
|
||||
}
|
||||
}
|
||||
};
|
||||
|
|
@ -153,53 +178,35 @@ sub compile_psgi_app {
|
|||
);
|
||||
|
||||
return $app;
|
||||
}
|
||||
}
|
||||
|
||||
sub dispatch {
|
||||
sub handle {
|
||||
my ( $self, $request ) = @_;
|
||||
|
||||
my $config = $self->config;
|
||||
my $session = $request->session;
|
||||
|
||||
# determine session id
|
||||
my $sessionId = $request->cookies->{$config->getCookieName};
|
||||
|
||||
# Instantiate the session object
|
||||
my $session = $self->session( WebGUI::Session->open($self->root, $config, $request, $sessionId) );
|
||||
|
||||
# Short-circuit contentHandlers - for benchmarking PSGI scaffolding vs. modperl
|
||||
# $session->close;
|
||||
# uncomment the following to short-circuit contentHandlers (for benchmarking PSGI scaffolding vs. modperl)
|
||||
# $session->output->print("WebGUI PSGI with contentHandlers short-circuited for benchmarking\n");
|
||||
# return $session->response->finalize;
|
||||
# return;
|
||||
|
||||
# TODO: From here, contentHandlers need to decide if they want to stream the response body:
|
||||
# $session->response->stream( sub { ... } ) # this replaces 'chunked'
|
||||
# or return a psgi response body.
|
||||
#
|
||||
# We use the $session->response->streaming flag to detect if a contentHandler has requested
|
||||
# to use streaming response.
|
||||
#
|
||||
# Otherwise, whatever they return (arrayref or IO::Handle) is used as the psgi response
|
||||
#
|
||||
# Regular assets should use streaming response body, unless they want to send a file
|
||||
# contentHandlers that return text will have that content returned as the response
|
||||
# Alternatively, contentHandlers can stream the response body by calling:
|
||||
# $session->response->stream_write()
|
||||
# inside of a callback registered via:
|
||||
# $session->response->stream( sub { } )
|
||||
# This is generally a good thing to do, unless you want to send a file.
|
||||
|
||||
# uncomment the following to short-circuit contentHandlers with a streaming response:
|
||||
# $session->response->stream(sub {
|
||||
# my $session = shift;
|
||||
# $session->output->print("WebGUI PSGI with contentHandlers short-circuited for benchmarking (streaming)\n");
|
||||
# sleep 1;
|
||||
# $session->output->print("...see?\n");
|
||||
# });
|
||||
# return;
|
||||
|
||||
# Here's an example of what a contentHandler would call to do a streaming response:
|
||||
$session->response->stream(sub {
|
||||
my $session = shift;
|
||||
$session->output->print("WebGUI PSGI with contentHandlers short-circuited for benchmarking (streaming)\n");
|
||||
sleep 1;
|
||||
$session->output->print("...see?\n");
|
||||
});
|
||||
|
||||
# Afterwards, we check $session->response->streaming, and if it is set, return the
|
||||
# WebGUI::Session (since our caller doesn't have a reference to it) TODO - or does it via $request->session->response???
|
||||
|
||||
# TODO: give WebGUI::Req/Res a weak session reference
|
||||
|
||||
if ( $session->response->streaming ) {
|
||||
return $session;
|
||||
}
|
||||
|
||||
for my $handler (@{$config->get("contentHandlers")}) {
|
||||
# TODO: refactor the following loop, find all instances of "chunked" and "empty" in codebase, etc..
|
||||
for my $handler (@{$self->config->get("contentHandlers")}) {
|
||||
my $output = eval { WebGUI::Pluggable::run($handler, "handler", [ $session ] )};
|
||||
if ( my $e = WebGUI::Error->caught ) {
|
||||
$session->errorHandler->error($e->package.":".$e->line." - ".$e->error);
|
||||
|
|
@ -209,14 +216,19 @@ sub dispatch {
|
|||
$session->errorHandler->error( $@ );
|
||||
}
|
||||
else {
|
||||
|
||||
# Stop if the contentHandler is going to stream the response body
|
||||
return if $session->response->streaming;
|
||||
|
||||
# We decide what to do next depending on what the contentHandler returned
|
||||
|
||||
# "chunked" or "empty" means it took care of its own output needs
|
||||
if (defined $output && ( $output eq "chunked" || $output eq "empty" )) {
|
||||
# warn "chunked and empty no longer stream, use session->response->stream() instead";
|
||||
if ($session->errorHandler->canShowDebug()) {
|
||||
$session->output->print($session->errorHandler->showDebug(),1);
|
||||
}
|
||||
last;
|
||||
return;
|
||||
}
|
||||
# non-empty output should be used as the response body
|
||||
elsif (defined $output && $output ne "") {
|
||||
|
|
@ -228,18 +240,16 @@ sub dispatch {
|
|||
if ($session->errorHandler->canShowDebug()) {
|
||||
$session->output->print($session->errorHandler->showDebug(),1);
|
||||
}
|
||||
last;
|
||||
return;
|
||||
}
|
||||
# Keep processing for success codes
|
||||
elsif ($session->http->getStatus < 200 || $session->http->getStatus > 299) {
|
||||
$session->http->sendHeader;
|
||||
last;
|
||||
return;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
$session->close;
|
||||
return $session->response->finalize;
|
||||
return;
|
||||
}
|
||||
|
||||
no Moose;
|
||||
|
|
|
|||
|
|
@ -6,7 +6,9 @@ The WebGUI server response object. See L<Plack::Response>
|
|||
|
||||
=cut
|
||||
|
||||
use strict;
|
||||
use parent qw(Plack::Request);
|
||||
use Plack::Util::Accessor qw(session);
|
||||
use WebGUI::Response;
|
||||
|
||||
=head1 METHODS
|
||||
|
|
@ -23,7 +25,9 @@ See L<WebGUI::Session/response>
|
|||
|
||||
sub new_response {
|
||||
my $self = shift;
|
||||
WebGUI::Response->new(@_);
|
||||
my $response = WebGUI::Response->new(@_);
|
||||
$response->session($self->session);
|
||||
return $response;
|
||||
}
|
||||
|
||||
1;
|
||||
|
|
@ -2,7 +2,7 @@ package WebGUI::Response;
|
|||
use strict;
|
||||
use parent qw(Plack::Response);
|
||||
|
||||
use Plack::Util::Accessor qw(streaming writer streamer);
|
||||
use Plack::Util::Accessor qw(session streaming writer streamer);
|
||||
|
||||
=head2 DESCRIPTION
|
||||
|
||||
|
|
@ -12,9 +12,8 @@ The WebGUI server response object. See of L<Plack::Response>
|
|||
|
||||
sub stream {
|
||||
my $self = shift;
|
||||
my $streamer = shift;
|
||||
$self->streamer(shift);
|
||||
$self->streaming(1);
|
||||
$self->streamer($streamer);
|
||||
}
|
||||
|
||||
sub stream_write {
|
||||
|
|
|
|||
|
|
@ -458,8 +458,11 @@ sub open {
|
|||
my $config = ref $c ? $c : WebGUI::Config->new($webguiRoot,$c);
|
||||
my $self = {_config=>$config }; # TODO - if we store reference here, should we weaken WebGUI->config?
|
||||
bless $self , $class;
|
||||
$self->{_request} = $request if defined $request;
|
||||
$self->{_response} = $request->new_response( 200 ) if defined $request;
|
||||
if (defined $request) {
|
||||
$request->session($self); # hello circular reference
|
||||
$self->{_request} = $request;
|
||||
$self->{_response} = $request->new_response( 200 );
|
||||
}
|
||||
my $sessionId = shift || $request->cookies->{$config->getCookieName} || $self->id->generate;
|
||||
$sessionId = $self->id->generate unless $self->id->valid($sessionId);
|
||||
my $noFuss = shift;
|
||||
|
|
|
|||
|
|
@ -95,10 +95,14 @@ sub print {
|
|||
print $handle $content;
|
||||
}
|
||||
elsif ($self->session->response) {
|
||||
if ($self->session->response->streaming) {
|
||||
$self->session->response->stream_write($content);
|
||||
my $response = $self->session->response;
|
||||
if ($response->streaming) {
|
||||
$response->stream_write($content);
|
||||
} else {
|
||||
|
||||
# Not streaming, so buffer the response instead
|
||||
# warn "buffering output";
|
||||
$response->body([]) unless $response->body && ref $response->body eq 'ARRAY';
|
||||
push @{$response->body}, $content;
|
||||
}
|
||||
}
|
||||
else {
|
||||
|
|
|
|||
Loading…
Add table
Add a link
Reference in a new issue