18 my $dsn = $ENV{'DBI_DSN'};
19 $dsn = 'dbi:Pg:dbname=news' unless defined $dsn;
21 my $db = DBI->connect($dsn,'','',{AutoCommit => 0,RaiseError=>1});
25 $db->do("set CLIENT_ENCODING to 'SQL_ASCII'");
27 my $batch = $db->prepare(q{select Q.article as id, multiline(A.header || E'\r\n' || A.body) as content from feedq Q left join articles A on A.id = Q.article where Q.peer = ? and response is null limit ?});
28 my $qsize = $db->prepare(q{select count(Q.article) from feedq Q left join articles A on A.id = Q.article where Q.peer = ? and response is null});
30 my $peerinfo = $db->prepare('select coalesce(P.host,P.name), P.port, P.feedtime is null or P.feedtime + P.frequency <= CURRENT_TIMESTAMP as wantfeed from feeds P where P.name = ?');
32 my $mark = $db->prepare('update feedq set response = ? where article = ? and peer = ?');
34 my $purge = $db->prepare('delete from feedq where (response = 235 or response = 437 or response = 435) and peer = ?');
35 my $clean = $db->prepare('update feedq set response = NULL where response = 436 and peer = ?');
36 my $feedtime = $db->prepare('update feeds set feedtime = CURRENT_TIMESTAMP where host = ?');
37 my $session = $db->prepare(q{insert into sessions (peer, addr, port, incoming, connected, received, refused, rejected, postponed) values (?,?,?,?,timestamp with time zone 'epoch'+? * INTERVAL '1 second' ,?,?,?,?)});
39 my $summary = $db->prepare(q{select received, refused, rejected, postponed, connected, duration from session_summary where connected = timestamp with time zone 'epoch' + ? * INTERVAL '1 second'});
43 my ($host, $port, $feed);
47 $peerinfo->execute($peer);
48 ($host, $port, $feed) = $peerinfo->fetchrow_array;
50 $qsize->execute($peer);
51 my ($count) = $qsize->fetchrow_array;
53 warn "$peer: $count articles in queue\n";
63 warn "connecting to $peer:$port\n";
64 my $s = IO::Socket::INET->new(
71 $host = $s->peerhost();
74 my $rc = response($s);
77 warn "posting prohibited on $peer\n";
78 $s->print("quit\r\n");
87 #my $s = IO::Handle->new(); $s->fdopen(0, 'r'); $s->fdopen(1, 'w');
99 $feedtime->execute($peer);
103 # TODO use streaming CHECK/TAKETHIS
105 my %r = (435 => 0, 436 => 0, 437 => 0, 235 => 0);
106 $batch->execute($peer, $batchsize);
107 while (my ($id, $data) = $batch->fetchrow_array) {
108 my $rc = ihave($s, $id, $data);
111 last unless $batch->rows;
113 $session->execute($peer, $host, $port, 0, $connecttime,
114 $r{235}, $r{435}, $r{437}, $r{436});
116 warn "checkpointing $peer ", $batch->rows, " articles\n";
119 warn ":closing $peer:$port\n";
121 $s->print("quit\r\n");
122 my $rc = response($s);
124 warn "server failed to properly respond to close\n";
129 my $rv = $purge->execute($peer);
130 warn "deleted $rv articles from queue\n";
131 $rv = $clean->execute($peer);
132 warn "requeued $rv articles\n";
136 $summary->execute($connecttime);
137 while (my @row = $summary->fetchrow_array) {
138 printf(q{%s: %d received, %d refused, %d rejected, %d postponed, %s (%s)}, $peer, @row);
142 # TODO check for other/illegal responses
144 my ($s, $id, $data) = @_;
146 $s->print("ihave $id\r\n");
147 warn "ihave $id\n" if $debug;
148 my $rc = response($s);
150 warn "$peer: sending $id\n" if $debug;
154 update($rc, $id, $peer);
161 my $line = $s->getline;
162 warn "> $line" if $debug;
163 $line =~ s/^\s+//; $line =~ s/\s+$//;
164 my ($rc, $msg) = split(/\s+/, $line, 2);
165 if ($rc !~ /^\d\d\d/) {
166 die "invalid response: $line";
168 return ($rc, $msg) if wantarray;
173 my ($rc, $id, $peer) = @_;