#!/usr/bin/perl use strict; use warnings; use DBI; use IO::Socket; my $peer = shift; my $debug = 0; if ($peer eq '-v') { $debug++; $peer = shift; } my $dsn = $ENV{'DBI_DSN'}; $dsn = 'dbi:Pg:dbname=news' unless defined $dsn; my $db = DBI->connect($dsn,'','',{AutoCommit => 0,RaiseError=>1}); die unless $db; $db->do("set CLIENT_ENCODING to 'SQL_ASCII'"); my $batch = $db->prepare(q{select Q.article as id, multiline(A.header || E'\r\n' || A.body) as content from feedq Q left join articles A on A.id = Q.article where Q.peer = ? and response is null limit ?}); my $qsize = $db->prepare(q{select count(Q.article) from feedq Q left join articles A on A.id = Q.article where Q.peer = ? and response is null}); my $peerinfo = $db->prepare('select coalesce(P.host,P.name), P.port, P.feedtime is null or P.feedtime + P.frequency <= CURRENT_TIMESTAMP as wantfeed from feeds P where P.name = ?'); my $mark = $db->prepare('update feedq set response = ? where article = ? and peer = ?'); my $purge = $db->prepare('delete from feedq where (response = 235 or response = 437 or response = 435) and peer = ?'); my $clean = $db->prepare('update feedq set response = NULL where response = 436 and peer = ?'); my $feedtime = $db->prepare('update feeds set feedtime = CURRENT_TIMESTAMP where host = ?'); my $session = $db->prepare(q{insert into sessions (peer, addr, port, incoming, connected, received, refused, rejected, postponed) values (?,?,?,?,timestamp with time zone 'epoch'+? * INTERVAL '1 second' ,?,?,?,?)}); my $summary = $db->prepare(q{select received, refused, rejected, postponed, connected, duration from session_summary where connected = timestamp with time zone 'epoch' + ? * INTERVAL '1 second'}); my $connecttime; my ($host, $port, $feed); sub login { my ($peer) = @_; $peerinfo->execute($peer); ($host, $port, $feed) = $peerinfo->fetchrow_array; $qsize->execute($peer); my ($count) = $qsize->fetchrow_array; warn "$peer: $count articles in queue\n"; if ($count == 0) { exit 0; } if (!$feed) { warn "no feed due\n"; exit 0; } warn "connecting to $peer:$port\n"; my $s = IO::Socket::INET->new( PeerAddr => $host, PeerPort => $port, Proto => 'tcp' ); die unless $s; $host = $s->peerhost(); $connecttime = time; my $rc = response($s); if ($rc != 200) { if ($rc == 201) { warn "posting prohibited on $peer\n"; $s->print("quit\r\n"); } $s->close(); exit 1; } return $s; } #my $s = IO::Handle->new(); $s->fdopen(0, 'r'); $s->fdopen(1, 'w'); #exit 0 unless $feed; my $batchsize = 100; my $setfeedtime = 1; my $cleanup = 1; my $s = login($peer); if ($setfeedtime) { $feedtime->execute($peer); $db->commit(); } # TODO use streaming CHECK/TAKETHIS while (1) { my %r = (435 => 0, 436 => 0, 437 => 0, 235 => 0); $batch->execute($peer, $batchsize); while (my ($id, $data) = $batch->fetchrow_array) { my $rc = ihave($s, $id, $data); $r{$rc}++; } last unless $batch->rows; $session->execute($peer, $host, $port, 0, $connecttime, $r{235}, $r{435}, $r{437}, $r{436}); warn "checkpointing $peer ", $batch->rows, " articles\n"; $db->commit; } warn ":closing $peer:$port\n"; warn "quit\n"; $s->print("quit\r\n"); my $rc = response($s); if ($rc != 205) { warn "server failed to properly respond to close\n"; } $s->close(); if ($cleanup) { my $rv = $purge->execute($peer); warn "deleted $rv articles from queue\n"; $rv = $clean->execute($peer); warn "requeued $rv articles\n"; $db->commit(); } $summary->execute($connecttime); while (my @row = $summary->fetchrow_array) { printf(q{%s: %d received, %d refused, %d rejected, %d postponed, %s (%s)}, $peer, @row); print "\n"; } # TODO check for other/illegal responses sub ihave { my ($s, $id, $data) = @_; $s->print("ihave $id\r\n"); warn "ihave $id\n" if $debug; my $rc = response($s); if ($rc == 335) { warn "$peer: sending $id\n" if $debug; $s->print($data); $rc = response($s); } update($rc, $id, $peer); return $rc; } sub response { my ($s) = @_; my $line = $s->getline; warn "> $line" if $debug; $line =~ s/^\s+//; $line =~ s/\s+$//; my ($rc, $msg) = split(/\s+/, $line, 2); if ($rc !~ /^\d\d\d/) { die "invalid response: $line"; } return ($rc, $msg) if wantarray; return $rc; } sub update { my ($rc, $id, $peer) = @_; $mark->execute(@_); return 1; } #$s->close();