From 45b5370469d87a29dd108d9aa4a719d652a44617 Mon Sep 17 00:00:00 2001 From: Colin Kuskie Date: Sun, 25 Sep 2011 18:45:19 -0700 Subject: [PATCH] Allow Passive Analytics to scale to analyzing millions of log entries without nuking MySQL in the process. More thorough test cleanup. --- .../Activity/BucketPassiveAnalytics.pm | 86 +++++++++++-------- .../Activity/SummarizePassiveAnalytics.pm | 79 ++++++++++------- t/Workflow/Activity/BucketPassiveAnalytics.t | 2 + 3 files changed, 101 insertions(+), 66 deletions(-) diff --git a/lib/WebGUI/Workflow/Activity/BucketPassiveAnalytics.pm b/lib/WebGUI/Workflow/Activity/BucketPassiveAnalytics.pm index 0bea4a932..430d7a275 100644 --- a/lib/WebGUI/Workflow/Activity/BucketPassiveAnalytics.pm +++ b/lib/WebGUI/Workflow/Activity/BucketPassiveAnalytics.pm @@ -51,6 +51,20 @@ sub definition { return $class->SUPER::definition($session,$definition); } +#------------------------------------------------------------------- + +=head2 get_statement( session, counter ) + +Return a statement handle at the desired offset. + +=cut + +sub get_statement { + my ($session, $logIndex) = @_; + my $deltaSql = q{select SQL_CALC_FOUND_ROWS userId, assetId, url, delta, from_unixtime(timeStamp) as stamp from deltaLog order by timestamp limit ?, 500000}; + my $sth = $session->db->read($deltaSql, [$logIndex+0]); + return $sth; +} #------------------------------------------------------------------- @@ -85,47 +99,49 @@ sub execute { my %bucketCache = (); ##Configure all the SQL - my $deltaSql = <<"EOSQL1"; -select userId, assetId, url, delta, from_unixtime(timeStamp) as stamp - from deltaLog order by timestamp limit $logIndex, 1234567890 -EOSQL1 - my $deltaSth = $session->db->read($deltaSql); - my $bucketSth = $session->db->prepare('insert into bucketLog (userId, Bucket, duration, timeStamp) VALUES (?,?,?,?)'); + my $deltaSth = get_statement($session, $logIndex); + my $total_rows = $session->db->quickScalar('select found_rows()'); + + my $bucketSth = $session->db->prepare('insert into bucketLog (userId, Bucket, duration, timeStamp) VALUES (?,?,?,?)'); ##Walk through the log file entries, one by one. Run each entry against ##all the rules until 1 matches. If it doesn't match any rule, then bin it ##into the "Other" bucket. - DELTA_ENTRY: while (my $entry = $deltaSth->hashRef()) { - ++$logIndex; - my $bucketFound = 0; - my $url = $entry->{url}; - if (exists $bucketCache{$url}) { - $bucketSth->execute([$entry->{userId}, $bucketCache{$url}, $entry->{delta}, $entry->{stamp}]); - } - else { - RULE: foreach my $rule (@rules) { - next RULE unless $url =~ $rule->[1]; - - # Into the bucket she goes.. - $bucketCache{$url} = $rule->[0]; - $bucketSth->execute([$entry->{userId}, $rule->[0], $entry->{delta}, $entry->{stamp}]); - $bucketFound = 1; - last RULE; + DELTA_CHUNK: while (1) { + DELTA_ENTRY: while (my $entry = $deltaSth->hashRef()) { + ++$logIndex; + my $bucketFound = 0; + my $url = $entry->{url}; + if (exists $bucketCache{$url}) { + $bucketSth->execute([$entry->{userId}, $bucketCache{$url}, $entry->{delta}, $entry->{stamp}]); } - if (!$bucketFound) { - $bucketCache{$url} = 'Other'; - $bucketSth->execute([$entry->{userId}, 'Other', $entry->{delta}, $entry->{stamp}]); - } - } - if (time() > $endTime) { - $expired = 1; - last DELTA_ENTRY; - } - } + else { + RULE: foreach my $rule (@rules) { + next RULE unless $url =~ $rule->[1]; - if ($expired) { - $instance->setScratch('logIndex', $logIndex); - return $self->WAITING(1); + # Into the bucket she goes.. + $bucketCache{$url} = $rule->[0]; + $bucketSth->execute([$entry->{userId}, $rule->[0], $entry->{delta}, $entry->{stamp}]); + $bucketFound = 1; + last RULE; + } + if (!$bucketFound) { + $bucketCache{$url} = 'Other'; + $bucketSth->execute([$entry->{userId}, 'Other', $entry->{delta}, $entry->{stamp}]); + } + } + if (time() > $endTime) { + $expired = 1; + last DELTA_ENTRY; + } + } + + if ($expired) { + $instance->setScratch('logIndex', $logIndex); + return $self->WAITING(1); + } + last DELTA_CHUNK if $logIndex >= $total_rows; + $deltaSth = get_statement($session, $logIndex); } my $message = 'Passive analytics is done.'; if ($session->setting->get('passiveAnalyticsDeleteDelta')) { diff --git a/lib/WebGUI/Workflow/Activity/SummarizePassiveAnalytics.pm b/lib/WebGUI/Workflow/Activity/SummarizePassiveAnalytics.pm index fcfb181a5..5b30bb9b1 100644 --- a/lib/WebGUI/Workflow/Activity/SummarizePassiveAnalytics.pm +++ b/lib/WebGUI/Workflow/Activity/SummarizePassiveAnalytics.pm @@ -49,6 +49,20 @@ sub definition { return $class->SUPER::definition($session,$definition); } +#------------------------------------------------------------------- + +=head2 get_statement( session, counter ) + +Return a statement handle at the desired offset. + +=cut + +sub get_statement { + my ($session, $counter) = @_; + my $passive = q{select SQL_CALC_FOUND_ROWS * from passiveLog where userId <> '1' order by userId, sessionId, timeStamp limit ?, 500000}; + my $sth = $session->db->read($passive, [$counter+0]); + return $sth; +} #------------------------------------------------------------------- @@ -72,17 +86,14 @@ sub execute { my $endTime = time() + $self->getTTL; my $deltaInterval = $self->get('deltaInterval'); - my $passive = q{select * from passiveLog where userId <> '1' order by userId, sessionId, timeStamp}; - my $sth; my $lastUserId; my $lastSessionId; my $lastTimeStamp; my $lastAssetId; my $lastUrl; my $counter = $instance->getScratch('counter'); + my $sth = get_statement($session, $counter); if ($counter) { - $passive .= ' limit '. $counter .', 1234567890'; - $sth = $session->db->read($passive); $lastUserId = $instance->getScratch('lastUserId'); $lastSessionId = $instance->getScratch('lastSessionId'); $lastTimeStamp = $instance->getScratch('lastTimeStamp'); @@ -90,46 +101,52 @@ sub execute { $lastUrl = $instance->getScratch('lastUrl'); } else { - $sth = $session->db->read($passive); my $logLine = $sth->hashRef(); $lastUserId = $logLine->{userId}; $lastSessionId = $logLine->{sessionId}; $lastTimeStamp = $logLine->{timeStamp}; $lastAssetId = $logLine->{assetId}; $lastUrl = $logLine->{url}; + $session->db->write('delete from deltaLog'); ##Only if we're starting out } - $session->db->write('delete from deltaLog'); ##Only if we're starting out + my $total_rows = $session->db->quickScalar('select found_rows()'); + my $deltaLog = $session->db->prepare('insert into deltaLog (userId, assetId, delta, timeStamp, url) VALUES (?,?,?,?,?)'); my $expired = 0; - LOG_ENTRY: while (my $logLine = $sth->hashRef()) { - $counter++; - my $delta = $logLine->{timeStamp} - $lastTimeStamp; - if ( $logLine->{userId} eq $lastUserId - && $logLine->{sessionId} eq $lastSessionId - && $delta < $deltaInterval ) { - $deltaLog->execute([$lastUserId, $lastAssetId, $delta, $lastTimeStamp, $lastUrl]); + LOG_CHUNK: while (1) { + LOG_ENTRY: while (my $logLine = $sth->hashRef()) { + $counter++; + my $delta = $logLine->{timeStamp} - $lastTimeStamp; + if ( $logLine->{userId} eq $lastUserId + && $logLine->{sessionId} eq $lastSessionId + && $delta < $deltaInterval ) { + $deltaLog->execute([$lastUserId, $lastAssetId, $delta, $lastTimeStamp, $lastUrl]); + } + $lastUserId = $logLine->{userId}; + $lastSessionId = $logLine->{sessionId}; + $lastTimeStamp = $logLine->{timeStamp}; + $lastAssetId = $logLine->{assetId}; + $lastUrl = $logLine->{url}; + if (time() > $endTime) { + $instance->setScratch('lastUserId', $lastUserId); + $instance->setScratch('lastSessionId', $lastSessionId); + $instance->setScratch('lastTimeStamp', $lastTimeStamp); + $instance->setScratch('lastAssetId', $lastAssetId); + $instance->setScratch('lastUrl', $lastUrl); + $instance->setScratch('counter', $counter); + $expired = 1; + last LOG_ENTRY; + } } - $lastUserId = $logLine->{userId}; - $lastSessionId = $logLine->{sessionId}; - $lastTimeStamp = $logLine->{timeStamp}; - $lastAssetId = $logLine->{assetId}; - $lastUrl = $logLine->{url}; - if (time() > $endTime) { - $instance->setScratch('lastUserId', $lastUserId); - $instance->setScratch('lastSessionId', $lastSessionId); - $instance->setScratch('lastTimeStamp', $lastTimeStamp); - $instance->setScratch('lastAssetId', $lastAssetId); - $instance->setScratch('lastUrl', $lastUrl); - $instance->setScratch('counter', $counter); - $expired = 1; - last LOG_ENTRY; + + $sth->finish; + if ($expired) { + return $self->WAITING(1); } - } - - if ($expired) { - return $self->WAITING(1); + last LOG_CHUNK if $counter >= $total_rows; + $sth = get_statement($session, $counter); } $instance->deleteScratch('lastUserId'); diff --git a/t/Workflow/Activity/BucketPassiveAnalytics.t b/t/Workflow/Activity/BucketPassiveAnalytics.t index d4fade6e4..a0f728260 100644 --- a/t/Workflow/Activity/BucketPassiveAnalytics.t +++ b/t/Workflow/Activity/BucketPassiveAnalytics.t @@ -18,6 +18,8 @@ my $session = WebGUI::Test->session; $session->user({userId => 3}); WebGUI::Test->addToCleanup(SQL => 'delete from passiveLog'); +WebGUI::Test->addToCleanup(SQL => 'delete from deltaLog'); +WebGUI::Test->addToCleanup(SQL => 'delete from bucketLog'); WebGUI::Test->addToCleanup(SQL => 'delete from analyticRule'); my $workflow = WebGUI::Workflow->new($session, 'PassiveAnalytics000001');