]> pd.if.org Git - newsd/blob - sendfeed
Let database figure out close time for connection.
[newsd] / sendfeed
1 #!/usr/bin/perl
2
3 use strict;
4 use warnings;
5
6 use DBI;
7 use IO::Socket;
8
9 my $peer = shift;
10
11 my $debug = 0;
12
13 if ($peer eq '-v') {
14         $debug++;
15         $peer = shift;
16 }
17
18 my $dsn = $ENV{'DBI_DSN'};
19 $dsn = 'dbi:Pg:dbname=news' unless defined $dsn;
20
21 my $db = DBI->connect($dsn,'','',{AutoCommit => 0,RaiseError=>1});
22
23 die unless $db;
24
25 $db->do("set CLIENT_ENCODING to 'SQL_ASCII'");
26
27 my $batch = $db->prepare(q{select Q.article as id, multiline(A.header || E'\r\n' || A.body) as content from feedq Q left join articles A on A.id = Q.article where Q.peer = ? and response is null limit ?});
28 my $qsize = $db->prepare(q{select count(Q.article) from feedq Q left join articles A on A.id = Q.article where Q.peer = ? and response is null});
29
30 my $peerinfo = $db->prepare('select coalesce(P.host,P.name), P.port, P.feedtime is null or P.feedtime + P.frequency <= CURRENT_TIMESTAMP as wantfeed from feeds P where P.name = ?');
31
32 my $mark = $db->prepare('update feedq set response = ? where article = ? and peer = ?');
33
34 my $purge = $db->prepare('delete from feedq where (response = 235 or response = 437 or response = 435) and peer = ?');
35 my $clean = $db->prepare('update feedq set response = NULL where response = 436 and peer = ?');
36 my $feedtime = $db->prepare('update feeds set feedtime = CURRENT_TIMESTAMP where host = ?');
37 my $session = $db->prepare(q{insert into sessions (peer, addr, port, incoming, connected, received, refused, rejected, postponed) values (?,?,?,?,timestamp with time zone 'epoch'+? * INTERVAL '1 second' ,?,?,?,?)});
38
39 my $summary = $db->prepare(q{select received, refused, rejected, postponed, connected, duration from session_summary where connected = timestamp with time zone 'epoch' + ? * INTERVAL '1 second'});
40
41 my $connecttime;
42
43 my ($host, $port, $feed);
44 sub login {
45         my ($peer) = @_;
46
47         $peerinfo->execute($peer);
48         ($host, $port, $feed) = $peerinfo->fetchrow_array;
49
50         $qsize->execute($peer);
51         my ($count) = $qsize->fetchrow_array;
52
53         warn "$peer: $count articles in queue\n";
54         if ($count == 0) {
55                 exit 0;
56         }
57
58         if (!$feed) {
59                 warn "no feed due\n";
60                 exit 0;
61         }
62
63         warn "connecting to $peer:$port\n";
64         my $s = IO::Socket::INET->new(
65                 PeerAddr => $host,
66                 PeerPort => $port,
67                 Proto => 'tcp'
68         );
69         die unless $s;
70
71         $host = $s->peerhost();
72         $connecttime = time;
73
74         my $rc = response($s);
75         if ($rc != 200) {
76                 if ($rc == 201) {
77                         warn "posting prohibited on $peer\n";
78                         $s->print("quit\r\n");
79                 }
80                 $s->close();
81                 exit 1;
82         }
83
84         return $s;
85 }
86
87 #my $s = IO::Handle->new(); $s->fdopen(0, 'r'); $s->fdopen(1, 'w');
88
89 #exit 0 unless $feed;
90
91 my $batchsize = 100;
92
93 my $setfeedtime = 1;
94 my $cleanup = 1;
95
96 my $s = login($peer);
97
98 if ($setfeedtime) {
99         $feedtime->execute($peer);
100         $db->commit();
101 }
102
103 # TODO use streaming CHECK/TAKETHIS
104 while (1) {
105         my %r = (435 => 0, 436 => 0, 437 => 0, 235 => 0);
106         $batch->execute($peer, $batchsize);
107         while (my ($id, $data) = $batch->fetchrow_array) {
108                 my $rc = ihave($s, $id, $data);
109                 $r{$rc}++;
110         }
111         last unless $batch->rows;
112
113         $session->execute($peer, $host, $port, 0, $connecttime, 
114                 $r{235}, $r{435}, $r{437}, $r{436});
115
116         warn "checkpointing $peer ", $batch->rows, " articles\n";
117         $db->commit;
118 }
119 warn ":closing $peer:$port\n";
120 warn "quit\n";
121 $s->print("quit\r\n");
122 my $rc = response($s);
123 if ($rc != 205) {
124         warn "server failed to properly respond to close\n";
125 }
126 $s->close();
127
128 if ($cleanup) {
129         my $rv = $purge->execute($peer);
130         warn "deleted $rv articles from queue\n";
131         $rv = $clean->execute($peer);
132         warn "requeued $rv articles\n";
133         $db->commit();
134 }
135
136 $summary->execute($connecttime);
137 while (my @row = $summary->fetchrow_array) {
138         printf(q{%s: %d received, %d refused, %d rejected, %d postponed, %s (%s)}, $peer, @row);
139         print "\n";
140 }
141
142 # TODO check for other/illegal responses
143 sub ihave {
144         my ($s, $id, $data) = @_;
145
146         $s->print("ihave $id\r\n");
147         warn "ihave $id\n" if $debug;
148         my $rc = response($s);
149         if ($rc == 335) {
150                 warn "$peer: sending $id\n" if $debug;
151                 $s->print($data);
152                 $rc = response($s);
153         }
154         update($rc, $id, $peer);
155         return $rc;
156 }
157
158 sub response {
159         my ($s) = @_;
160
161         my $line = $s->getline;
162         warn "> $line" if $debug;
163         $line =~ s/^\s+//; $line =~ s/\s+$//;
164         my ($rc, $msg) = split(/\s+/, $line, 2);
165         if ($rc !~ /^\d\d\d/) {
166                 die "invalid response: $line";
167         }
168         return ($rc, $msg) if wantarray;
169         return $rc;
170 }
171
172 sub update {
173         my ($rc, $id, $peer) = @_;
174         $mark->execute(@_);
175         return 1;
176 }
177 #$s->close();