]> pd.if.org Git - newsd/commitdiff
Initial commit
authorNathan Wagner <nw@hydaspes.if.org>
Wed, 30 Nov 2011 20:44:37 +0000 (15:44 -0500)
committerNathan Wagner <nw@hydaspes.if.org>
Wed, 30 Nov 2011 20:44:37 +0000 (15:44 -0500)
Makefile [new file with mode: 0644]
convactive [new file with mode: 0755]
convnewsgroups [new file with mode: 0755]
newsd [new file with mode: 0755]
schema.sql [new file with mode: 0644]
sendfeed [new file with mode: 0755]

diff --git a/Makefile b/Makefile
new file mode 100644 (file)
index 0000000..2d12af3
--- /dev/null
+++ b/Makefile
@@ -0,0 +1,60 @@
+PSQL=psql -q -1 --set ON_ERROR_STOP=1 -d news
+
+default:       test
+
+all: postgres
+
+configdl:      active newsgroups control.ctl
+
+reset: stop preserve start
+
+load: stop newsdb schema activesync start
+
+active newsgroups:
+       wget -N ftp://ftp.isc.org/pub/usenet/CONFIG/$@.bz2
+       bzip2 -dfk $@.bz2
+
+control.ctl:
+       wget -N ftp://ftp.isc.org/pub/usenet/CONFIG/$@
+
+newsdb:
+       -dropdb news
+       createdb -O news -T template0 -E SQL_ASCII news
+       psql -d news -c 'alter database news set search_path to nntp,public'
+       psql -d news -c 'alter database news set client_min_messages to warning'
+
+preserve:
+       sudo su -c 'psql -1 --set ON_ERROR_STOP=1 -U news -d news -f preserve.sql' news
+
+activesync:    loadactive loadnewsgroups
+
+loadactive:    active
+       cat active | DBI_DSN='dbi:Pg:dbname=news' perl convactive
+       touch $@
+
+loadnewsgroups:        newsgroups
+       cat newsgroups |DBI_DSN='dbi:Pg:dbname=news' perl convnewsgroups
+       touch $@
+
+testserver:
+       rm -f news.log
+       PERL5LIB=./Net-Server-NNTP/lib ./newsd conf_file=test.conf
+
+test:
+       prove t/*.t
+
+schema:
+       (echo 'set role news;'; cat schema.sql) | $(PSQL) 
+
+startsingle:
+       sudo ./newsd log_level=3 server_type=Single
+
+start:
+       sudo ./newsd log_level=2
+
+stop:
+       -sudo sh -c "kill `cat /var/run/news/newsd.pid`"
+
+restart: stop start
+
+.PHONY:        active newsgroups control.ctl configdl
diff --git a/convactive b/convactive
new file mode 100755 (executable)
index 0000000..dd147c0
--- /dev/null
@@ -0,0 +1,42 @@
+#!/usr/bin/perl
+
+use strict;
+use warnings;
+
+use DBI;
+
+# TODO cmd line option to actually delete the groups
+
+# TODO use LWP::UserAgent to fetch a file and load it
+
+$\="\n";
+my $db = DBI->connect(undef,undef,undef,{RaiseError=>1,AutoCommit=>0});
+
+$db->do('create temp table newnewsgroups (like newsgroups including defaults)');
+$db->do('create index newnewsgroups_idx on newnewsgroups(newsgroup)');
+
+my $i = $db->prepare('insert into newnewsgroups (newsgroup,posting,creator) values (?,?,?)');
+my $rmgroup = $db->prepare('update newsgroups N set active = false where local != true and not exists (select newsgroup from newnewsgroups where newsgroup = N.newsgroup)');
+my $newgroup = $db->prepare('insert into newsgroups select * from newnewsgroups N where not exists (select 1 from newsgroups where newsgroup = N.newsgroup)');
+
+while (<>) {
+       chomp;
+       my ($group, undef, undef, $mod) = split(/\s+/, $_);
+       $i->execute($group,$mod,'iscactivefile');
+}
+$rmgroup->execute();
+$newgroup->execute();
+
+$db->commit;
+
+__END__
+aaa.inu-chan 0000000000 0000000001 m
+ab.arnet 0000000000 0000000001 m
+ab.general 0000000000 0000000001 y
+ab.jobs 0000000000 0000000001 y
+ab.politics 0000000000 0000000001 y
+abg.allgemein 0000000000 0000000001 y
+abg.amiga 0000000000 0000000001 y
+abg.comp 0000000000 0000000001 y
+abg.diskussion 0000000000 0000000001 y
+abg.english 0000000000 0000000001 y
diff --git a/convnewsgroups b/convnewsgroups
new file mode 100755 (executable)
index 0000000..f2ee16b
--- /dev/null
@@ -0,0 +1,32 @@
+#!/usr/bin/perl
+
+use strict;
+use warnings;
+
+use DBI;
+
+$\="\n";
+
+my $db = DBI->connect(undef,undef,undef,{RaiseError=>1,AutoCommit=>0});
+
+#$db->do("set client_encoding to 'LATIN1'");
+my $i = $db->prepare('update newsgroups set description = ? where newsgroup = ?');
+
+while (<>) {
+       chomp;
+       my ($group, $desc) = split(/\s+/, $_, 2);
+       $i->execute($desc,$group);
+}
+$db->commit;
+
+__END__
+aaa.inu-chan 0000000000 0000000001 m
+ab.arnet 0000000000 0000000001 m
+ab.general 0000000000 0000000001 y
+ab.jobs 0000000000 0000000001 y
+ab.politics 0000000000 0000000001 y
+abg.allgemein 0000000000 0000000001 y
+abg.amiga 0000000000 0000000001 y
+abg.comp 0000000000 0000000001 y
+abg.diskussion 0000000000 0000000001 y
+abg.english 0000000000 0000000001 y
diff --git a/newsd b/newsd
new file mode 100755 (executable)
index 0000000..2a93085
--- /dev/null
+++ b/newsd
@@ -0,0 +1,8 @@
+#!/usr/bin/env perl
+
+use strict;
+use warnings;
+
+use Net::Server::NNTP::Postgres;
+
+Net::Server::NNTP::Postgres->new()->run();
diff --git a/schema.sql b/schema.sql
new file mode 100644 (file)
index 0000000..826bc85
--- /dev/null
@@ -0,0 +1,489 @@
+drop schema if exists nntp cascade;
+create schema nntp;
+
+set search_path to nntp,public;
+
+-- TODO could probably just do a nested replace in straight sql
+create or replace function wildmat_to_re(wm text) returns text as $$
+declare
+       re text;
+begin
+       re := regexp_replace(wm, E'\\.', E'\\.','g'); -- first string will be
+       -- interpreted as a regex, second as just a string
+       re := regexp_replace(re, E'\\?', E'.','g');
+       re := regexp_replace(re, E'\\*', E'.*','g');
+       return re;
+end;
+$$ language 'plpgsql';
+
+create or replace function wildmat_regex(wm text) returns text as $$
+declare
+       skip_init_neg boolean;
+       negated boolean;
+       repat text;
+       sql     text;
+       pat     text;
+begin
+       sql := '';
+       skip_init_neg := true;
+
+       for pat in select p from regexp_split_to_table(wm, E',') as p loop
+       if skip_init_neg and pat ~ E'^!' then continue; end if;
+               skip_init_neg := false;
+               negated := (pat ~ E'^!');
+               pat := regexp_replace(pat, E'^!', '');
+               repat := wildmat_to_re(pat);
+               if not negated then
+                       sql := sql || '|' || repat;
+               else
+                       sql := regexp_replace(sql, E'^\\|', '');
+                       sql := '(^(?!' || repat || ')(' || sql || ')$)';
+               end if;
+       end loop;
+
+       sql := regexp_replace(sql, E'^\\|', '');
+       if not negated then
+               sql := '^(' || sql || ')$';
+       end if;
+
+       return sql;
+end;
+$$ language 'plpgsql';
+
+-- TODO put the rest of the mandatory overview fields here?
+create table articles (
+       id      text primary key,
+       header  text not null,
+       body    text, -- not null?
+       peer    text,
+       received        timestamp with time zone default CURRENT_TIMESTAMP,
+       newsgroups      text, -- TODO array?
+       expires timestamp with time zone,
+       "date"  timestamp with time zone,
+       -- hdr headers
+       lines   integer, -- actual lines, see the overview table for Lines hdr
+       bytes   integer,
+       control boolean not null default 'f'
+);
+-- check id like '<%' and id like '%>'
+
+create table moderated_posts (
+       id      text primary key,
+       article text,
+       received        timestamp with time zone default CURRENT_TIMESTAMP
+);
+
+create or replace function multiline(content text) returns text as $$
+select regexp_replace(
+       regexp_replace(
+               regexp_replace($1, E'^\\.', E'..', 'gn'), -- dot stuff
+               E'\\r?\\n', E'\r\n', 'g'), -- lines separated by \r\n
+       E'(\\r\\n)?$', E'\r\n\.\r\n'); -- terminated by \r\n.\r\n
+$$ language 'sql';
+
+create or replace function demultiline(content text) returns text as $$
+select regexp_replace(
+       regexp_replace(
+               regexp_replace($1,
+       E'\\r\\n\\.\\r\\n$', E'\r\n'), -- terminated by \r\n.\r\n
+               E'\\r\\n', E'\n', 'g'), -- lines separated by \r\n
+                       E'^\\.\\.', E'.', 'gn'); -- undot stuff
+$$ language 'sql';
+
+create table overview (
+       article text references articles on delete cascade
+       deferrable initially deferred,
+       header  text, -- or metadata, but it has a unique name
+       value   text,
+       unique(article,header)
+);
+
+create table header_order (
+       header  text primary key,
+       ord     integer unique not null
+);
+
+insert into header_order (values
+('Subject',1),
+('From',2),
+('Date',3),
+('Message-ID',4),
+('References',5),
+(':bytes',6),
+(':lines',7),
+('Xref', 8)
+);
+
+create view articleover as
+select id, array_to_string(array_agg(regexp_replace(value,E'\t',' ', 'g')),E'\t') as overview from (
+select
+A.id, ORD.header,
+case when ord.ord <= 7 then coalesce(O.value,'') else coalesce(ORD.header || ': ' || nullif(O.value, ''), '') end as value
+from articles A cross join header_order ord
+left join overview O on O.article = A.id and O.header = ORD.header
+order by ord.ord
+) as foo
+group by id
+;
+
+-- TODO trigger to make an insert an update if the unique constraint
+-- would be violated
+
+-- From
+-- Date
+-- Newsgroups
+-- Subject
+-- Message-ID
+-- Path
+
+-- Optional
+-- Reply-To
+-- Sender
+-- Followup-To
+-- Expires
+-- References
+-- Control
+-- Distribution
+-- Organization
+-- Keywords
+-- Summary
+-- Approved
+-- Lines
+-- Xref
+
+create table newsgroups (
+       newsgroup       text primary key,
+       posting char(1) not null default 'y',
+       moderator       text default '%s@moderators.isc.org',
+       description     text,
+       high    integer default 0,
+       low     integer default 1,
+       created timestamp default CURRENT_TIMESTAMP,
+       creator text,
+       active  boolean not null default true,
+       local   boolean not null default false
+);
+
+create or replace function newsgroups_update_trig() returns trigger as $$
+begin
+       if TG_OP = 'UPDATE' and NEW.active = false then
+              if NEW.active != OLD.active then
+               insert into log (message) values ('rmgroup ' || NEW.newsgroup);
+       end if;
+       end if;
+       if TG_OP = 'INSERT' and NEW.active = true then
+               insert into log (message) values ('newgroup ' || NEW.newsgroup);
+       end if;
+       return NEW;
+end;
+$$ language 'plpgsql';
+
+create trigger newsgroups_maintenance_log_trigger after update or insert on newsgroups
+for each row execute procedure newsgroups_update_trig();
+
+create table expiration (
+       wildmat text primary key,
+       wildmatre text,
+       moderated       boolean, -- true = mod only, false = not mod, null=any
+       crossposts      boolean not null default false, -- also expire all crossposts
+       retention       interval not null default '30 days', -- how long to keep from receipt
+       max     interval not null default '90 days',
+       min     interval not null default '7 days'
+);
+
+create table xpost (
+       article text references articles on delete cascade
+               deferrable initially deferred,
+               -- deferrable lets us do the xposts first on new
+               -- article insertion
+       newsgroup       text references newsgroups on delete cascade,
+       number  integer,
+       unique(newsgroup,number)
+);
+-- the following index makes calculating average cross-posting much
+-- faster.
+create index xpost_article_index on xpost(article);
+
+create view xref as
+select article, array_to_string(array_agg(newsgroup||':'||number),' ') as xref
+from xpost group by article
+;
+
+create or replace function headers(article text)
+returns table (header text, value text)  as $$
+       select
+       substring(h from E'^([^:]+):'),
+        substring(h from E'(?n)^[^:]+: (.+?)$')
+        from
+        regexp_split_to_table(
+               regexp_replace($1, E'\r?\n$', ''),
+       E'\r?\n(?!\\s)') as h
+       ;
+$$ language 'sql' immutable strict;
+
+create or replace function strip_header(headers text, strip text)
+returns text as $$
+select array_to_string(array_agg(header || ': ' || value),E'\r\n') || E'\r\n'
+from (
+        select * from headers($1)
+        where upper(header) != upper($2)
+) as base
+;$$ language 'sql';
+
+create or replace function line_count(content text) returns integer as $$
+select octet_length(regexp_replace(regexp_replace($1,E'\\n$', ''), E'[^\\n]', '','g'))+1;
+$$ language 'sql';
+
+create or replace function header_value(headers text, want text)
+returns text as $$
+select value from headers($1) where lower(header) = lower($2);
+$$ language 'sql';
+
+create or replace function header_add(headers text, h text, v text) returns text as $$
+select $1 || $2 || ': ' || $3 || E'\r\n';
+$$ language 'sql';
+
+create or replace function header_replace(headers text, hdr text, content text)
+returns text as $$
+       select array_to_string(array_agg(header || ': ' || value),E'\r\n') || E'\r\n' from (
+       select
+               case when lower(header) = lower($2) then $2 else header end,
+               case when lower(header) = lower($2) then $3 else value end
+               from headers($1)
+       ) as base
+       ;
+$$ language 'sql';
+
+create or replace function rfc_date(d text) returns timestamptz as $$
+declare
+       noct text;
+       simple text;
+       match   text[];
+begin
+       simple := E'(\\d\\d?)\\s+(Jan|Feb|Mar|Apr|May|Jun|Jul|Aug|Sep|Oct|Nov|Dec)\\s+(\\d\\d(?:\\d\\d)?)\\s+(\\d\\d?:\\d\\d(?::\\d\\d)?)\\s+([+-]\\d\\d\\d\\d|\\w{3})';
+       if d ~ simple then
+               match := regexp_matches(d, simple);
+               begin
+                       -- was getting +2400 as time zone
+               return array_to_string(match, ' ')::timestamptz;
+               exception when others then
+                       raise NOTICE 'date parse error: % %', SQLSTATE, SQLERRM;
+                       return null;
+               end;
+       end if;
+       raise NOTICE 'date regex mismatch %', d;
+
+       return null;
+end;
+$$ language 'plpgsql';
+
+create or replace function article_post() returns trigger as $$
+declare
+       id text;
+       ng text;
+       xrefhdr text;
+       host text;
+       xrefcur text;
+begin
+       xrefhdr := header_value(NEW.header, 'Path');
+       host := substring(xrefhdr from E'^\\s*([^!]+)');
+
+       id := header_value(NEW.header, 'Message-ID');
+       ng := header_value(NEW.header, 'Newsgroups');
+       xrefcur := header_value(NEW.header, 'Xref');
+
+       -- path
+       -- subject
+       -- date
+
+       insert into xpost (article, newsgroup)
+       select
+       id, GROUPS.newsgroup from
+       regexp_split_to_table(ng, E'\\s*,\\s*') as G
+       inner join newsgroups GROUPS on GROUPS.newsgroup = btrim(G)
+       ;
+
+       xrefhdr := (select xref from xref where article = id);
+
+       if xrefcur is not null then
+               NEW.header :=
+               header_replace(NEW.header, 'Xref', host || ' ' ||xrefhdr);
+       else
+               NEW.header :=
+               header_add(NEW.header, 'Xref', host || ' ' ||xrefhdr);
+       end if;
+
+       if NEW.header is null then
+               raise exception 'null header H';
+       end if;
+
+       NEW.newsgroups = ng;
+       -- Two for the logical crlf between the header and body
+       -- There will be three more for the .\r\n at the end
+       -- TODO need to decide on how to store header and body
+       NEW.bytes = octet_length(NEW.header) + octet_length(NEW.body) + 2;
+       NEW.lines = line_count(NEW.body);
+
+       if NEW.header is null then
+               raise exception 'null header a';
+       end if;
+
+       begin
+               -- TODO improve this regular expression
+               NEW.expires = regexp_replace(header_value(header, 'Expires'),E'\\([^\\)]*\\)', ' ')::timestamptz;
+       exception
+               when OTHERS then NEW.expires = NULL;
+       end;
+
+       if NEW.header is null then
+               raise exception 'null header b';
+       end if;
+
+       -- Overview headers
+       -- TODO just do all headers?  what about dups?
+       insert into overview (article, header, value)
+       select NEW.id, HO.header, H.value
+       from headers(NEW.header) H
+       inner join
+       header_order HO on upper(HO.header) = upper(H.header)
+       union
+       values (NEW.id, ':bytes', NEW.bytes::text),
+       (NEW.id, ':lines', NEW.lines::text)
+       ;
+       -- qw(Subject From Date Message-ID References :bytes :lines Xref);
+
+       return NEW;
+end;
+$$ language 'plpgsql';
+
+create trigger article_post_trigger before insert on articles
+for each row execute procedure article_post();
+
+create or replace function xpost_trigger() returns trigger as $$
+declare
+       hiwater integer;
+begin
+       select high + 1 from newsgroups where newsgroup = NEW.newsgroup
+       for update into hiwater;
+       NEW.number := hiwater;
+       update newsgroups set high = high + 1 where newsgroup = NEW.newsgroup;
+       return NEW;
+end;
+$$ language 'plpgsql';
+create trigger highwater before insert on xpost
+       for each row execute procedure xpost_trigger();
+
+create or replace function highwater_trigger() returns trigger as $$
+begin
+       update newsgroups set high = NEW.number where newsgroup = NEW.newsgroup;
+       return NEW;
+end;
+$$ language plpgsql;
+
+-- TODO this function breaks when the last article is deleted
+-- need to look up rfc to determine proper behavior
+create or replace function lowwater_trigger() returns trigger as $$
+begin
+       update newsgroups set low = (select coalesce(min(number),0) from xpost where newsgroup = OLD.newsgroup) where newsgroup = OLD.newsgroup;
+       return OLD;
+end;
+$$ language plpgsql;
+
+--create trigger highwater after insert on xpost
+--     for each row execute procedure highwater_trigger();
+create trigger lowwater after delete on xpost
+       for each row execute procedure lowwater_trigger();
+
+-- who do i forward articles to
+create table feeds (
+       name    text primary key,
+       host    text,
+       port    integer default 119,
+       enabled boolean not null default true,
+       stream  boolean not null default false, -- use CHECK/TAKETHIS
+       groups  text default '*', -- wildmat
+       wildmat text default '*',
+       wildmatre       text default '.', -- parsed wildmat
+       noxposts        text,
+       noxpostsre      text,
+       distribution    text,
+       distributionre  text,
+       -- TODO feed moderated/unmoderated
+               -- see http://www.faqs.org/docs/linux_network/x18341.html
+       maxsize integer, -- maximum article size to feed
+       localonly       boolean default false,
+       path    text, -- regular expression default host?
+       frequency       interval default '1 hour'::interval, -- int secs?
+       feedtime        timestamp with time zone
+);
+-- todo check(port >= 0 and port <= 65535)
+
+create table sessions (
+       peer    text,
+       addr    inet,
+       port    integer,
+       incoming        boolean not null default true,
+       connected       timestamp with time zone,
+       closed  timestamp with time zone default clock_timestamp(),
+       received        integer, -- 239, 235
+       refused         integer, -- 435
+       rejected        integer, -- 439, 437
+       postponed       integer -- 436
+);
+-- offered == all those together
+create view session_summary as select peer, addr, port, connected, sum(received) as received, sum(refused) as refused, sum(rejected) as rejected, sum(postponed) as postponed, coalesce(max(closed), clock_timestamp()) - connected as duration from sessions group by peer, addr, port, connected;
+
+
+create table feedq (
+       peer    text references feeds on delete cascade,
+       article text references articles on delete cascade,
+       response        integer
+);
+
+create function feedto(newsgroups text, host text) returns boolean as $$
+begin
+       return (select
+               true =
+               any (select regexp_split_to_table(newsgroups, E',')
+                       ~ F.wildmatre)
+               and
+               not true = 
+               any (select regexp_split_to_table(newsgroups, E',')
+                       ~ F.noxpostsre)
+               from feeds F
+               where F.host = host
+       )
+       ;
+end;
+$$ language 'plpgsql';
+
+create or replace function feedout() returns trigger as $$
+begin
+       insert into feedq
+       select coalesce(F.host, F.name), NEW.id
+       from feeds F
+       where true = any (select regexp_split_to_table(NEW.newsgroups, E',')
+                       ~ F.wildmatre)
+       -- TODO add in noxposts
+       and coalesce(F.host,F.name) != NEW.peer
+       and F.enabled
+       ;
+       if FOUND then
+               NOTIFY feedq;
+       end if;
+       return NEW;
+end;
+$$ language 'plpgsql';
+
+create trigger feedpeer_trigger after insert on articles
+for each row execute procedure feedout();
+
+create table log (
+       ts      timestamp with time zone default CURRENT_TIMESTAMP,
+       client  inet,
+       port    integer,
+       pid     integer,
+       priority        integer,
+       facility        text,
+       message text
+);
diff --git a/sendfeed b/sendfeed
new file mode 100755 (executable)
index 0000000..71139ce
--- /dev/null
+++ b/sendfeed
@@ -0,0 +1,177 @@
+#!/usr/bin/perl
+
+use strict;
+use warnings;
+
+use DBI;
+use IO::Socket;
+
+my $peer = shift;
+
+my $debug = 0;
+
+if ($peer eq '-v') {
+       $debug++;
+       $peer = shift;
+}
+
+my $dsn = $ENV{'DBI_DSN'};
+$dsn = 'dbi:Pg:dbname=news' unless defined $dsn;
+
+my $db = DBI->connect($dsn,'','',{AutoCommit => 0,RaiseError=>1});
+
+die unless $db;
+
+$db->do("set CLIENT_ENCODING to 'SQL_ASCII'");
+
+my $batch = $db->prepare(q{select Q.article as id, multiline(A.header || E'\r\n' || A.body) as content from feedq Q left join articles A on A.id = Q.article where Q.peer = ? and response is null limit ?});
+my $qsize = $db->prepare(q{select count(Q.article) from feedq Q left join articles A on A.id = Q.article where Q.peer = ? and response is null});
+
+my $peerinfo = $db->prepare('select coalesce(P.host,P.name), P.port, P.feedtime is null or P.feedtime + P.frequency <= CURRENT_TIMESTAMP as wantfeed from feeds P where P.name = ?');
+
+my $mark = $db->prepare('update feedq set response = ? where article = ? and peer = ?');
+
+my $purge = $db->prepare('delete from feedq where (response = 235 or response = 437 or response = 435) and peer = ?');
+my $clean = $db->prepare('update feedq set response = NULL where response = 436 and peer = ?');
+my $feedtime = $db->prepare('update feeds set feedtime = CURRENT_TIMESTAMP where host = ?');
+my $session = $db->prepare(q{insert into sessions (peer, addr, port, incoming, connected, received, refused, rejected, postponed) values (?,?,?,?,timestamp with time zone 'epoch'+? * INTERVAL '1 second' ,?,?,?,?)});
+
+my $summary = $db->prepare(q{select received, refused, rejected, postponed, connected, duration from session_summary where connected = timestamp with time zone 'epoch' + ? * INTERVAL '1 second'});
+
+my $connecttime;
+
+my ($host, $port, $feed);
+sub login {
+       my ($peer) = @_;
+
+       $peerinfo->execute($peer);
+       ($host, $port, $feed) = $peerinfo->fetchrow_array;
+
+       $qsize->execute($peer);
+       my ($count) = $qsize->fetchrow_array;
+
+       warn "$peer: $count articles in queue\n";
+       if ($count == 0) {
+               exit 0;
+       }
+
+       if (!$feed) {
+               warn "no feed due\n";
+               exit 0;
+       }
+
+       warn "connecting to $peer:$port\n";
+       my $s = IO::Socket::INET->new(
+               PeerAddr => $host,
+               PeerPort => $port,
+               Proto => 'tcp'
+       );
+       die unless $s;
+
+       $host = $s->peerhost();
+       $connecttime = time;
+
+       my $rc = response($s);
+       if ($rc != 200) {
+               if ($rc == 201) {
+                       warn "posting prohibited on $peer\n";
+                       $s->print("quit\r\n");
+               }
+               $s->close();
+               exit 1;
+       }
+
+       return $s;
+}
+
+#my $s = IO::Handle->new(); $s->fdopen(0, 'r'); $s->fdopen(1, 'w');
+
+#exit 0 unless $feed;
+
+my $batchsize = 100;
+
+my $setfeedtime = 1;
+my $cleanup = 1;
+
+my $s = login($peer);
+
+if ($setfeedtime) {
+       $feedtime->execute($peer);
+       $db->commit();
+}
+
+# TODO use streaming CHECK/TAKETHIS
+while (1) {
+       my %r = (435 => 0, 436 => 0, 437 => 0, 235 => 0);
+       $batch->execute($peer, $batchsize);
+       while (my ($id, $data) = $batch->fetchrow_array) {
+               my $rc = ihave($s, $id, $data);
+               $r{$rc}++;
+       }
+       last unless $batch->rows;
+
+       $session->execute($peer, $host, $port, 0, $connecttime, 
+               $r{235}, $r{435}, $r{437}, $r{436});
+
+       warn "checkpointing $peer ", $batch->rows, " articles\n";
+       $db->commit;
+}
+warn ":closing $peer:$port\n";
+warn "quit\n";
+$s->print("quit\r\n");
+my $rc = response($s);
+if ($rc != 205) {
+       warn "server failed to properly respond to close\n";
+}
+$s->close();
+
+if ($cleanup) {
+       my $rv = $purge->execute($peer);
+       warn "deleted $rv articles from queue\n";
+       $rv = $clean->execute($peer);
+       warn "requeued $rv articles\n";
+       $db->commit();
+}
+
+$summary->execute($connecttime);
+while (my @row = $summary->fetchrow_array) {
+       printf(q{%s: %d received, %d refused, %d rejected, %d postponed, %s (%s)}, $peer, @row);
+       print "\n";
+}
+
+# TODO check for other/illegal responses
+sub ihave {
+       my ($s, $id, $data) = @_;
+
+       $s->print("ihave $id\r\n");
+       warn "ihave $id\n" if $debug;
+       my $rc = response($s);
+       if ($rc == 335) {
+               warn "$peer: sending $id\n" if $debug;
+               $s->print($data);
+               $rc = response($s);
+       }
+       update($rc, $id, $peer);
+       return $rc;
+}
+
+sub response {
+       my ($s) = @_;
+
+       my $line = $s->getline;
+       warn "> $line" if $debug;
+       $line =~ s/^\s+//; $line =~ s/\s+$//;
+       my ($rc, $msg) = split(/\s+/, $line, 2);
+       if ($rc !~ /^\d\d\d/) {
+               die "invalid response: $line";
+       }
+       return ($rc, $msg) if wantarray;
+       return $rc;
+}
+
+sub update {
+       my ($rc, $id, $peer) = @_;
+       $mark->execute(@_);
+       return 1;
+}
+#$s->close();