From c7235378d1248b579084a4de0a295269df9ef5f2 Mon Sep 17 00:00:00 2001 From: Patrick Donelan Date: Wed, 7 Apr 2010 01:02:01 -0400 Subject: [PATCH] Streaming response API for contentHandlers is now taking shape --- lib/WebGUI.pm | 140 +++++++++++++++++++---------------- lib/WebGUI/Request.pm | 6 +- lib/WebGUI/Response.pm | 5 +- lib/WebGUI/Session.pm | 7 +- lib/WebGUI/Session/Output.pm | 10 ++- 5 files changed, 94 insertions(+), 74 deletions(-) diff --git a/lib/WebGUI.pm b/lib/WebGUI.pm index 2337566f3..fe116d57b 100644 --- a/lib/WebGUI.pm +++ b/lib/WebGUI.pm @@ -31,7 +31,6 @@ use Try::Tiny; has root => ( is => 'ro', isa => 'Str', default => '/data/WebGUI' ); has site => ( is => 'ro', isa => 'Str', default => 'dev.localhost.localdomain.conf' ); -has session => ( is => 'rw', isa => 'WebGUI::Session' ); has config => ( is => 'rw', isa => 'WebGUI::Config' ); use overload q(&{}) => sub { shift->psgi_app }, fallback => 1; @@ -75,60 +74,86 @@ sub BUILD { # Instantiate the WebGUI::Config object my $config = WebGUI::Config->new( $self->root, $self->site ); $self->config($config); -} +} sub psgi_app { my $self = shift; return $self->{psgi_app} ||= $self->compile_psgi_app; } +sub new_session { + my $self = shift; + my $request = shift; + + # determine session id + my $sessionId = $request->cookies->{$self->config->getCookieName}; + + # Instantiate the session object + return WebGUI::Session->open($self->root, $self->config, $request, $sessionId); +} + sub compile_psgi_app { my $self = shift; + my $catch = [ 500, [ 'Content-Type' => 'text/plain' ], [ "Internal Server Error\n" ] ]; + my $app = sub { my $env = shift; + my $log = sub { + $env->{'psgi.errors'}->print(join '', @_, "\n"); + }; + return sub { my $callback = shift; my $request = WebGUI::Request->new($env); - my $res = $self->dispatch($request); + my $session = $self->new_session($request); - if ( ref $res eq 'WebGUI::Session' ) { - my $session = $res; - my $response = $session->response; + try { + $self->handle($request); + } catch { + $log->( "Error handling request: $_" ); + $callback->( $catch ); + return; + }; + + my $response = $session->response; + my $psgi_response = $response->finalize; + + if ( $response->streaming ) { try { - # Response wants to stream itself, so ask PSGI server for a - # streaming writer object by returning the PSGI response, minus the body - - # Anything in the response body gets cleared (should be empty anyway) - $response->body([]); - my $psgi_response = $response->finalize; - + # Ask PSGI server for a streaming writer object by returning a 2-part + # arrayref instead of a 3-part array my $writer = $callback->( [ $psgi_response->[0], $psgi_response->[1] ] ); # Store the writer object in the WebGUI::Response object $response->writer($writer); - # ..and let the response stream itself + # Now call the callback that does the streaming $response->streamer->($session); + # And finally, clean up $writer->close; - $session->close; + } catch { if ($response->writer) { # Response has already been started, so log error and close writer - warn "error caught after streaming response started"; + $log->("Error detected after streaming response started"); $response->writer->close; } else { - $callback->( [ 500, [ 'Content-type: text/html' ], [ 'An error occurred' ] ] ); + $callback->( $catch ); } - } + } finally { + $session->close; + }; } else { + # Not streaming, so immediately tell the callback to return # the response. In the future we could use an Event framework here # to make this a non-blocking delayed response. - $callback->($res); + $session->close; + $callback->($psgi_response); } } }; @@ -153,53 +178,35 @@ sub compile_psgi_app { ); return $app; -} +} -sub dispatch { +sub handle { my ( $self, $request ) = @_; - my $config = $self->config; + my $session = $request->session; - # determine session id - my $sessionId = $request->cookies->{$config->getCookieName}; - - # Instantiate the session object - my $session = $self->session( WebGUI::Session->open($self->root, $config, $request, $sessionId) ); - - # Short-circuit contentHandlers - for benchmarking PSGI scaffolding vs. modperl -# $session->close; + # uncomment the following to short-circuit contentHandlers (for benchmarking PSGI scaffolding vs. modperl) # $session->output->print("WebGUI PSGI with contentHandlers short-circuited for benchmarking\n"); -# return $session->response->finalize; +# return; - # TODO: From here, contentHandlers need to decide if they want to stream the response body: - # $session->response->stream( sub { ... } ) # this replaces 'chunked' - # or return a psgi response body. - # - # We use the $session->response->streaming flag to detect if a contentHandler has requested - # to use streaming response. - # - # Otherwise, whatever they return (arrayref or IO::Handle) is used as the psgi response - # - # Regular assets should use streaming response body, unless they want to send a file + # contentHandlers that return text will have that content returned as the response + # Alternatively, contentHandlers can stream the response body by calling: + # $session->response->stream_write() + # inside of a callback registered via: + # $session->response->stream( sub { } ) + # This is generally a good thing to do, unless you want to send a file. + + # uncomment the following to short-circuit contentHandlers with a streaming response: +# $session->response->stream(sub { +# my $session = shift; +# $session->output->print("WebGUI PSGI with contentHandlers short-circuited for benchmarking (streaming)\n"); +# sleep 1; +# $session->output->print("...see?\n"); +# }); +# return; - # Here's an example of what a contentHandler would call to do a streaming response: - $session->response->stream(sub { - my $session = shift; - $session->output->print("WebGUI PSGI with contentHandlers short-circuited for benchmarking (streaming)\n"); - sleep 1; - $session->output->print("...see?\n"); - }); - - # Afterwards, we check $session->response->streaming, and if it is set, return the - # WebGUI::Session (since our caller doesn't have a reference to it) TODO - or does it via $request->session->response??? - - # TODO: give WebGUI::Req/Res a weak session reference - - if ( $session->response->streaming ) { - return $session; - } - - for my $handler (@{$config->get("contentHandlers")}) { + # TODO: refactor the following loop, find all instances of "chunked" and "empty" in codebase, etc.. + for my $handler (@{$self->config->get("contentHandlers")}) { my $output = eval { WebGUI::Pluggable::run($handler, "handler", [ $session ] )}; if ( my $e = WebGUI::Error->caught ) { $session->errorHandler->error($e->package.":".$e->line." - ".$e->error); @@ -209,14 +216,19 @@ sub dispatch { $session->errorHandler->error( $@ ); } else { + + # Stop if the contentHandler is going to stream the response body + return if $session->response->streaming; + # We decide what to do next depending on what the contentHandler returned # "chunked" or "empty" means it took care of its own output needs if (defined $output && ( $output eq "chunked" || $output eq "empty" )) { +# warn "chunked and empty no longer stream, use session->response->stream() instead"; if ($session->errorHandler->canShowDebug()) { $session->output->print($session->errorHandler->showDebug(),1); } - last; + return; } # non-empty output should be used as the response body elsif (defined $output && $output ne "") { @@ -228,18 +240,16 @@ sub dispatch { if ($session->errorHandler->canShowDebug()) { $session->output->print($session->errorHandler->showDebug(),1); } - last; + return; } # Keep processing for success codes elsif ($session->http->getStatus < 200 || $session->http->getStatus > 299) { $session->http->sendHeader; - last; + return; } } } - - $session->close; - return $session->response->finalize; + return; } no Moose; diff --git a/lib/WebGUI/Request.pm b/lib/WebGUI/Request.pm index 873462102..0d3c9bc06 100644 --- a/lib/WebGUI/Request.pm +++ b/lib/WebGUI/Request.pm @@ -6,7 +6,9 @@ The WebGUI server response object. See L =cut +use strict; use parent qw(Plack::Request); +use Plack::Util::Accessor qw(session); use WebGUI::Response; =head1 METHODS @@ -23,7 +25,9 @@ See L sub new_response { my $self = shift; - WebGUI::Response->new(@_); + my $response = WebGUI::Response->new(@_); + $response->session($self->session); + return $response; } 1; \ No newline at end of file diff --git a/lib/WebGUI/Response.pm b/lib/WebGUI/Response.pm index dbd3a29dc..6c0e15fc9 100644 --- a/lib/WebGUI/Response.pm +++ b/lib/WebGUI/Response.pm @@ -2,7 +2,7 @@ package WebGUI::Response; use strict; use parent qw(Plack::Response); -use Plack::Util::Accessor qw(streaming writer streamer); +use Plack::Util::Accessor qw(session streaming writer streamer); =head2 DESCRIPTION @@ -12,9 +12,8 @@ The WebGUI server response object. See of L sub stream { my $self = shift; - my $streamer = shift; + $self->streamer(shift); $self->streaming(1); - $self->streamer($streamer); } sub stream_write { diff --git a/lib/WebGUI/Session.pm b/lib/WebGUI/Session.pm index 43dc23601..a012560c0 100644 --- a/lib/WebGUI/Session.pm +++ b/lib/WebGUI/Session.pm @@ -458,8 +458,11 @@ sub open { my $config = ref $c ? $c : WebGUI::Config->new($webguiRoot,$c); my $self = {_config=>$config }; # TODO - if we store reference here, should we weaken WebGUI->config? bless $self , $class; - $self->{_request} = $request if defined $request; - $self->{_response} = $request->new_response( 200 ) if defined $request; + if (defined $request) { + $request->session($self); # hello circular reference + $self->{_request} = $request; + $self->{_response} = $request->new_response( 200 ); + } my $sessionId = shift || $request->cookies->{$config->getCookieName} || $self->id->generate; $sessionId = $self->id->generate unless $self->id->valid($sessionId); my $noFuss = shift; diff --git a/lib/WebGUI/Session/Output.pm b/lib/WebGUI/Session/Output.pm index 82e31d09e..c148dc62c 100644 --- a/lib/WebGUI/Session/Output.pm +++ b/lib/WebGUI/Session/Output.pm @@ -95,10 +95,14 @@ sub print { print $handle $content; } elsif ($self->session->response) { - if ($self->session->response->streaming) { - $self->session->response->stream_write($content); + my $response = $self->session->response; + if ($response->streaming) { + $response->stream_write($content); } else { - + # Not streaming, so buffer the response instead + # warn "buffering output"; + $response->body([]) unless $response->body && ref $response->body eq 'ARRAY'; + push @{$response->body}, $content; } } else {