Allow Passive Analytics to scale to analyzing millions of log entries without nuking MySQL in the process. More thorough test cleanup.

This commit is contained in:
Colin Kuskie 2011-09-25 18:45:19 -07:00
parent ec00e867a9
commit 45b5370469
3 changed files with 101 additions and 66 deletions

View file

@ -51,6 +51,20 @@ sub definition {
return $class->SUPER::definition($session,$definition); return $class->SUPER::definition($session,$definition);
} }
#-------------------------------------------------------------------
=head2 get_statement( session, counter )
Return a statement handle at the desired offset.
=cut
sub get_statement {
my ($session, $logIndex) = @_;
my $deltaSql = q{select SQL_CALC_FOUND_ROWS userId, assetId, url, delta, from_unixtime(timeStamp) as stamp from deltaLog order by timestamp limit ?, 500000};
my $sth = $session->db->read($deltaSql, [$logIndex+0]);
return $sth;
}
#------------------------------------------------------------------- #-------------------------------------------------------------------
@ -85,47 +99,49 @@ sub execute {
my %bucketCache = (); my %bucketCache = ();
##Configure all the SQL ##Configure all the SQL
my $deltaSql = <<"EOSQL1"; my $deltaSth = get_statement($session, $logIndex);
select userId, assetId, url, delta, from_unixtime(timeStamp) as stamp my $total_rows = $session->db->quickScalar('select found_rows()');
from deltaLog order by timestamp limit $logIndex, 1234567890
EOSQL1 my $bucketSth = $session->db->prepare('insert into bucketLog (userId, Bucket, duration, timeStamp) VALUES (?,?,?,?)');
my $deltaSth = $session->db->read($deltaSql);
my $bucketSth = $session->db->prepare('insert into bucketLog (userId, Bucket, duration, timeStamp) VALUES (?,?,?,?)');
##Walk through the log file entries, one by one. Run each entry against ##Walk through the log file entries, one by one. Run each entry against
##all the rules until 1 matches. If it doesn't match any rule, then bin it ##all the rules until 1 matches. If it doesn't match any rule, then bin it
##into the "Other" bucket. ##into the "Other" bucket.
DELTA_ENTRY: while (my $entry = $deltaSth->hashRef()) { DELTA_CHUNK: while (1) {
++$logIndex; DELTA_ENTRY: while (my $entry = $deltaSth->hashRef()) {
my $bucketFound = 0; ++$logIndex;
my $url = $entry->{url}; my $bucketFound = 0;
if (exists $bucketCache{$url}) { my $url = $entry->{url};
$bucketSth->execute([$entry->{userId}, $bucketCache{$url}, $entry->{delta}, $entry->{stamp}]); if (exists $bucketCache{$url}) {
} $bucketSth->execute([$entry->{userId}, $bucketCache{$url}, $entry->{delta}, $entry->{stamp}]);
else {
RULE: foreach my $rule (@rules) {
next RULE unless $url =~ $rule->[1];
# Into the bucket she goes..
$bucketCache{$url} = $rule->[0];
$bucketSth->execute([$entry->{userId}, $rule->[0], $entry->{delta}, $entry->{stamp}]);
$bucketFound = 1;
last RULE;
} }
if (!$bucketFound) { else {
$bucketCache{$url} = 'Other'; RULE: foreach my $rule (@rules) {
$bucketSth->execute([$entry->{userId}, 'Other', $entry->{delta}, $entry->{stamp}]); next RULE unless $url =~ $rule->[1];
}
}
if (time() > $endTime) {
$expired = 1;
last DELTA_ENTRY;
}
}
if ($expired) { # Into the bucket she goes..
$instance->setScratch('logIndex', $logIndex); $bucketCache{$url} = $rule->[0];
return $self->WAITING(1); $bucketSth->execute([$entry->{userId}, $rule->[0], $entry->{delta}, $entry->{stamp}]);
$bucketFound = 1;
last RULE;
}
if (!$bucketFound) {
$bucketCache{$url} = 'Other';
$bucketSth->execute([$entry->{userId}, 'Other', $entry->{delta}, $entry->{stamp}]);
}
}
if (time() > $endTime) {
$expired = 1;
last DELTA_ENTRY;
}
}
if ($expired) {
$instance->setScratch('logIndex', $logIndex);
return $self->WAITING(1);
}
last DELTA_CHUNK if $logIndex >= $total_rows;
$deltaSth = get_statement($session, $logIndex);
} }
my $message = 'Passive analytics is done.'; my $message = 'Passive analytics is done.';
if ($session->setting->get('passiveAnalyticsDeleteDelta')) { if ($session->setting->get('passiveAnalyticsDeleteDelta')) {

View file

@ -49,6 +49,20 @@ sub definition {
return $class->SUPER::definition($session,$definition); return $class->SUPER::definition($session,$definition);
} }
#-------------------------------------------------------------------
=head2 get_statement( session, counter )
Return a statement handle at the desired offset.
=cut
sub get_statement {
my ($session, $counter) = @_;
my $passive = q{select SQL_CALC_FOUND_ROWS * from passiveLog where userId <> '1' order by userId, sessionId, timeStamp limit ?, 500000};
my $sth = $session->db->read($passive, [$counter+0]);
return $sth;
}
#------------------------------------------------------------------- #-------------------------------------------------------------------
@ -72,17 +86,14 @@ sub execute {
my $endTime = time() + $self->getTTL; my $endTime = time() + $self->getTTL;
my $deltaInterval = $self->get('deltaInterval'); my $deltaInterval = $self->get('deltaInterval');
my $passive = q{select * from passiveLog where userId <> '1' order by userId, sessionId, timeStamp};
my $sth;
my $lastUserId; my $lastUserId;
my $lastSessionId; my $lastSessionId;
my $lastTimeStamp; my $lastTimeStamp;
my $lastAssetId; my $lastAssetId;
my $lastUrl; my $lastUrl;
my $counter = $instance->getScratch('counter'); my $counter = $instance->getScratch('counter');
my $sth = get_statement($session, $counter);
if ($counter) { if ($counter) {
$passive .= ' limit '. $counter .', 1234567890';
$sth = $session->db->read($passive);
$lastUserId = $instance->getScratch('lastUserId'); $lastUserId = $instance->getScratch('lastUserId');
$lastSessionId = $instance->getScratch('lastSessionId'); $lastSessionId = $instance->getScratch('lastSessionId');
$lastTimeStamp = $instance->getScratch('lastTimeStamp'); $lastTimeStamp = $instance->getScratch('lastTimeStamp');
@ -90,46 +101,52 @@ sub execute {
$lastUrl = $instance->getScratch('lastUrl'); $lastUrl = $instance->getScratch('lastUrl');
} }
else { else {
$sth = $session->db->read($passive);
my $logLine = $sth->hashRef(); my $logLine = $sth->hashRef();
$lastUserId = $logLine->{userId}; $lastUserId = $logLine->{userId};
$lastSessionId = $logLine->{sessionId}; $lastSessionId = $logLine->{sessionId};
$lastTimeStamp = $logLine->{timeStamp}; $lastTimeStamp = $logLine->{timeStamp};
$lastAssetId = $logLine->{assetId}; $lastAssetId = $logLine->{assetId};
$lastUrl = $logLine->{url}; $lastUrl = $logLine->{url};
$session->db->write('delete from deltaLog'); ##Only if we're starting out
} }
$session->db->write('delete from deltaLog'); ##Only if we're starting out my $total_rows = $session->db->quickScalar('select found_rows()');
my $deltaLog = $session->db->prepare('insert into deltaLog (userId, assetId, delta, timeStamp, url) VALUES (?,?,?,?,?)'); my $deltaLog = $session->db->prepare('insert into deltaLog (userId, assetId, delta, timeStamp, url) VALUES (?,?,?,?,?)');
my $expired = 0; my $expired = 0;
LOG_ENTRY: while (my $logLine = $sth->hashRef()) { LOG_CHUNK: while (1) {
$counter++; LOG_ENTRY: while (my $logLine = $sth->hashRef()) {
my $delta = $logLine->{timeStamp} - $lastTimeStamp; $counter++;
if ( $logLine->{userId} eq $lastUserId my $delta = $logLine->{timeStamp} - $lastTimeStamp;
&& $logLine->{sessionId} eq $lastSessionId if ( $logLine->{userId} eq $lastUserId
&& $delta < $deltaInterval ) { && $logLine->{sessionId} eq $lastSessionId
$deltaLog->execute([$lastUserId, $lastAssetId, $delta, $lastTimeStamp, $lastUrl]); && $delta < $deltaInterval ) {
$deltaLog->execute([$lastUserId, $lastAssetId, $delta, $lastTimeStamp, $lastUrl]);
}
$lastUserId = $logLine->{userId};
$lastSessionId = $logLine->{sessionId};
$lastTimeStamp = $logLine->{timeStamp};
$lastAssetId = $logLine->{assetId};
$lastUrl = $logLine->{url};
if (time() > $endTime) {
$instance->setScratch('lastUserId', $lastUserId);
$instance->setScratch('lastSessionId', $lastSessionId);
$instance->setScratch('lastTimeStamp', $lastTimeStamp);
$instance->setScratch('lastAssetId', $lastAssetId);
$instance->setScratch('lastUrl', $lastUrl);
$instance->setScratch('counter', $counter);
$expired = 1;
last LOG_ENTRY;
}
} }
$lastUserId = $logLine->{userId};
$lastSessionId = $logLine->{sessionId}; $sth->finish;
$lastTimeStamp = $logLine->{timeStamp}; if ($expired) {
$lastAssetId = $logLine->{assetId}; return $self->WAITING(1);
$lastUrl = $logLine->{url};
if (time() > $endTime) {
$instance->setScratch('lastUserId', $lastUserId);
$instance->setScratch('lastSessionId', $lastSessionId);
$instance->setScratch('lastTimeStamp', $lastTimeStamp);
$instance->setScratch('lastAssetId', $lastAssetId);
$instance->setScratch('lastUrl', $lastUrl);
$instance->setScratch('counter', $counter);
$expired = 1;
last LOG_ENTRY;
} }
} last LOG_CHUNK if $counter >= $total_rows;
$sth = get_statement($session, $counter);
if ($expired) {
return $self->WAITING(1);
} }
$instance->deleteScratch('lastUserId'); $instance->deleteScratch('lastUserId');

View file

@ -18,6 +18,8 @@ my $session = WebGUI::Test->session;
$session->user({userId => 3}); $session->user({userId => 3});
WebGUI::Test->addToCleanup(SQL => 'delete from passiveLog'); WebGUI::Test->addToCleanup(SQL => 'delete from passiveLog');
WebGUI::Test->addToCleanup(SQL => 'delete from deltaLog');
WebGUI::Test->addToCleanup(SQL => 'delete from bucketLog');
WebGUI::Test->addToCleanup(SQL => 'delete from analyticRule'); WebGUI::Test->addToCleanup(SQL => 'delete from analyticRule');
my $workflow = WebGUI::Workflow->new($session, 'PassiveAnalytics000001'); my $workflow = WebGUI::Workflow->new($session, 'PassiveAnalytics000001');