--- /dev/null
+drop schema if exists nntp cascade;
+create schema nntp;
+
+set search_path to nntp,public;
+
+-- TODO could probably just do a nested replace in straight sql
+create or replace function wildmat_to_re(wm text) returns text as $$
+declare
+ re text;
+begin
+ re := regexp_replace(wm, E'\\.', E'\\.','g'); -- first string will be
+ -- interpreted as a regex, second as just a string
+ re := regexp_replace(re, E'\\?', E'.','g');
+ re := regexp_replace(re, E'\\*', E'.*','g');
+ return re;
+end;
+$$ language 'plpgsql';
+
+create or replace function wildmat_regex(wm text) returns text as $$
+declare
+ skip_init_neg boolean;
+ negated boolean;
+ repat text;
+ sql text;
+ pat text;
+begin
+ sql := '';
+ skip_init_neg := true;
+
+ for pat in select p from regexp_split_to_table(wm, E',') as p loop
+ if skip_init_neg and pat ~ E'^!' then continue; end if;
+ skip_init_neg := false;
+ negated := (pat ~ E'^!');
+ pat := regexp_replace(pat, E'^!', '');
+ repat := wildmat_to_re(pat);
+ if not negated then
+ sql := sql || '|' || repat;
+ else
+ sql := regexp_replace(sql, E'^\\|', '');
+ sql := '(^(?!' || repat || ')(' || sql || ')$)';
+ end if;
+ end loop;
+
+ sql := regexp_replace(sql, E'^\\|', '');
+ if not negated then
+ sql := '^(' || sql || ')$';
+ end if;
+
+ return sql;
+end;
+$$ language 'plpgsql';
+
+-- TODO put the rest of the mandatory overview fields here?
+create table articles (
+ id text primary key,
+ header text not null,
+ body text, -- not null?
+ peer text,
+ received timestamp with time zone default CURRENT_TIMESTAMP,
+ newsgroups text, -- TODO array?
+ expires timestamp with time zone,
+ "date" timestamp with time zone,
+ -- hdr headers
+ lines integer, -- actual lines, see the overview table for Lines hdr
+ bytes integer,
+ control boolean not null default 'f'
+);
+-- check id like '<%' and id like '%>'
+
+create table moderated_posts (
+ id text primary key,
+ article text,
+ received timestamp with time zone default CURRENT_TIMESTAMP
+);
+
+create or replace function multiline(content text) returns text as $$
+select regexp_replace(
+ regexp_replace(
+ regexp_replace($1, E'^\\.', E'..', 'gn'), -- dot stuff
+ E'\\r?\\n', E'\r\n', 'g'), -- lines separated by \r\n
+ E'(\\r\\n)?$', E'\r\n\.\r\n'); -- terminated by \r\n.\r\n
+$$ language 'sql';
+
+create or replace function demultiline(content text) returns text as $$
+select regexp_replace(
+ regexp_replace(
+ regexp_replace($1,
+ E'\\r\\n\\.\\r\\n$', E'\r\n'), -- terminated by \r\n.\r\n
+ E'\\r\\n', E'\n', 'g'), -- lines separated by \r\n
+ E'^\\.\\.', E'.', 'gn'); -- undot stuff
+$$ language 'sql';
+
+create table overview (
+ article text references articles on delete cascade
+ deferrable initially deferred,
+ header text, -- or metadata, but it has a unique name
+ value text,
+ unique(article,header)
+);
+
+create table header_order (
+ header text primary key,
+ ord integer unique not null
+);
+
+insert into header_order (values
+('Subject',1),
+('From',2),
+('Date',3),
+('Message-ID',4),
+('References',5),
+(':bytes',6),
+(':lines',7),
+('Xref', 8)
+);
+
+create view articleover as
+select id, array_to_string(array_agg(regexp_replace(value,E'\t',' ', 'g')),E'\t') as overview from (
+select
+A.id, ORD.header,
+case when ord.ord <= 7 then coalesce(O.value,'') else coalesce(ORD.header || ': ' || nullif(O.value, ''), '') end as value
+from articles A cross join header_order ord
+left join overview O on O.article = A.id and O.header = ORD.header
+order by ord.ord
+) as foo
+group by id
+;
+
+-- TODO trigger to make an insert an update if the unique constraint
+-- would be violated
+
+-- From
+-- Date
+-- Newsgroups
+-- Subject
+-- Message-ID
+-- Path
+
+-- Optional
+-- Reply-To
+-- Sender
+-- Followup-To
+-- Expires
+-- References
+-- Control
+-- Distribution
+-- Organization
+-- Keywords
+-- Summary
+-- Approved
+-- Lines
+-- Xref
+
+create table newsgroups (
+ newsgroup text primary key,
+ posting char(1) not null default 'y',
+ moderator text default '%s@moderators.isc.org',
+ description text,
+ high integer default 0,
+ low integer default 1,
+ created timestamp default CURRENT_TIMESTAMP,
+ creator text,
+ active boolean not null default true,
+ local boolean not null default false
+);
+
+create or replace function newsgroups_update_trig() returns trigger as $$
+begin
+ if TG_OP = 'UPDATE' and NEW.active = false then
+ if NEW.active != OLD.active then
+ insert into log (message) values ('rmgroup ' || NEW.newsgroup);
+ end if;
+ end if;
+ if TG_OP = 'INSERT' and NEW.active = true then
+ insert into log (message) values ('newgroup ' || NEW.newsgroup);
+ end if;
+ return NEW;
+end;
+$$ language 'plpgsql';
+
+create trigger newsgroups_maintenance_log_trigger after update or insert on newsgroups
+for each row execute procedure newsgroups_update_trig();
+
+create table expiration (
+ wildmat text primary key,
+ wildmatre text,
+ moderated boolean, -- true = mod only, false = not mod, null=any
+ crossposts boolean not null default false, -- also expire all crossposts
+ retention interval not null default '30 days', -- how long to keep from receipt
+ max interval not null default '90 days',
+ min interval not null default '7 days'
+);
+
+create table xpost (
+ article text references articles on delete cascade
+ deferrable initially deferred,
+ -- deferrable lets us do the xposts first on new
+ -- article insertion
+ newsgroup text references newsgroups on delete cascade,
+ number integer,
+ unique(newsgroup,number)
+);
+-- the following index makes calculating average cross-posting much
+-- faster.
+create index xpost_article_index on xpost(article);
+
+create view xref as
+select article, array_to_string(array_agg(newsgroup||':'||number),' ') as xref
+from xpost group by article
+;
+
+create or replace function headers(article text)
+returns table (header text, value text) as $$
+ select
+ substring(h from E'^([^:]+):'),
+ substring(h from E'(?n)^[^:]+: (.+?)$')
+ from
+ regexp_split_to_table(
+ regexp_replace($1, E'\r?\n$', ''),
+ E'\r?\n(?!\\s)') as h
+ ;
+$$ language 'sql' immutable strict;
+
+create or replace function strip_header(headers text, strip text)
+returns text as $$
+select array_to_string(array_agg(header || ': ' || value),E'\r\n') || E'\r\n'
+from (
+ select * from headers($1)
+ where upper(header) != upper($2)
+) as base
+;$$ language 'sql';
+
+create or replace function line_count(content text) returns integer as $$
+select octet_length(regexp_replace(regexp_replace($1,E'\\n$', ''), E'[^\\n]', '','g'))+1;
+$$ language 'sql';
+
+create or replace function header_value(headers text, want text)
+returns text as $$
+select value from headers($1) where lower(header) = lower($2);
+$$ language 'sql';
+
+create or replace function header_add(headers text, h text, v text) returns text as $$
+select $1 || $2 || ': ' || $3 || E'\r\n';
+$$ language 'sql';
+
+create or replace function header_replace(headers text, hdr text, content text)
+returns text as $$
+ select array_to_string(array_agg(header || ': ' || value),E'\r\n') || E'\r\n' from (
+ select
+ case when lower(header) = lower($2) then $2 else header end,
+ case when lower(header) = lower($2) then $3 else value end
+ from headers($1)
+ ) as base
+ ;
+$$ language 'sql';
+
+create or replace function rfc_date(d text) returns timestamptz as $$
+declare
+ noct text;
+ simple text;
+ match text[];
+begin
+ simple := E'(\\d\\d?)\\s+(Jan|Feb|Mar|Apr|May|Jun|Jul|Aug|Sep|Oct|Nov|Dec)\\s+(\\d\\d(?:\\d\\d)?)\\s+(\\d\\d?:\\d\\d(?::\\d\\d)?)\\s+([+-]\\d\\d\\d\\d|\\w{3})';
+ if d ~ simple then
+ match := regexp_matches(d, simple);
+ begin
+ -- was getting +2400 as time zone
+ return array_to_string(match, ' ')::timestamptz;
+ exception when others then
+ raise NOTICE 'date parse error: % %', SQLSTATE, SQLERRM;
+ return null;
+ end;
+ end if;
+ raise NOTICE 'date regex mismatch %', d;
+
+ return null;
+end;
+$$ language 'plpgsql';
+
+create or replace function article_post() returns trigger as $$
+declare
+ id text;
+ ng text;
+ xrefhdr text;
+ host text;
+ xrefcur text;
+begin
+ xrefhdr := header_value(NEW.header, 'Path');
+ host := substring(xrefhdr from E'^\\s*([^!]+)');
+
+ id := header_value(NEW.header, 'Message-ID');
+ ng := header_value(NEW.header, 'Newsgroups');
+ xrefcur := header_value(NEW.header, 'Xref');
+
+ -- path
+ -- subject
+ -- date
+
+ insert into xpost (article, newsgroup)
+ select
+ id, GROUPS.newsgroup from
+ regexp_split_to_table(ng, E'\\s*,\\s*') as G
+ inner join newsgroups GROUPS on GROUPS.newsgroup = btrim(G)
+ ;
+
+ xrefhdr := (select xref from xref where article = id);
+
+ if xrefcur is not null then
+ NEW.header :=
+ header_replace(NEW.header, 'Xref', host || ' ' ||xrefhdr);
+ else
+ NEW.header :=
+ header_add(NEW.header, 'Xref', host || ' ' ||xrefhdr);
+ end if;
+
+ if NEW.header is null then
+ raise exception 'null header H';
+ end if;
+
+ NEW.newsgroups = ng;
+ -- Two for the logical crlf between the header and body
+ -- There will be three more for the .\r\n at the end
+ -- TODO need to decide on how to store header and body
+ NEW.bytes = octet_length(NEW.header) + octet_length(NEW.body) + 2;
+ NEW.lines = line_count(NEW.body);
+
+ if NEW.header is null then
+ raise exception 'null header a';
+ end if;
+
+ begin
+ -- TODO improve this regular expression
+ NEW.expires = regexp_replace(header_value(header, 'Expires'),E'\\([^\\)]*\\)', ' ')::timestamptz;
+ exception
+ when OTHERS then NEW.expires = NULL;
+ end;
+
+ if NEW.header is null then
+ raise exception 'null header b';
+ end if;
+
+ -- Overview headers
+ -- TODO just do all headers? what about dups?
+ insert into overview (article, header, value)
+ select NEW.id, HO.header, H.value
+ from headers(NEW.header) H
+ inner join
+ header_order HO on upper(HO.header) = upper(H.header)
+ union
+ values (NEW.id, ':bytes', NEW.bytes::text),
+ (NEW.id, ':lines', NEW.lines::text)
+ ;
+ -- qw(Subject From Date Message-ID References :bytes :lines Xref);
+
+ return NEW;
+end;
+$$ language 'plpgsql';
+
+create trigger article_post_trigger before insert on articles
+for each row execute procedure article_post();
+
+create or replace function xpost_trigger() returns trigger as $$
+declare
+ hiwater integer;
+begin
+ select high + 1 from newsgroups where newsgroup = NEW.newsgroup
+ for update into hiwater;
+ NEW.number := hiwater;
+ update newsgroups set high = high + 1 where newsgroup = NEW.newsgroup;
+ return NEW;
+end;
+$$ language 'plpgsql';
+create trigger highwater before insert on xpost
+ for each row execute procedure xpost_trigger();
+
+create or replace function highwater_trigger() returns trigger as $$
+begin
+ update newsgroups set high = NEW.number where newsgroup = NEW.newsgroup;
+ return NEW;
+end;
+$$ language plpgsql;
+
+-- TODO this function breaks when the last article is deleted
+-- need to look up rfc to determine proper behavior
+create or replace function lowwater_trigger() returns trigger as $$
+begin
+ update newsgroups set low = (select coalesce(min(number),0) from xpost where newsgroup = OLD.newsgroup) where newsgroup = OLD.newsgroup;
+ return OLD;
+end;
+$$ language plpgsql;
+
+--create trigger highwater after insert on xpost
+-- for each row execute procedure highwater_trigger();
+create trigger lowwater after delete on xpost
+ for each row execute procedure lowwater_trigger();
+
+-- who do i forward articles to
+create table feeds (
+ name text primary key,
+ host text,
+ port integer default 119,
+ enabled boolean not null default true,
+ stream boolean not null default false, -- use CHECK/TAKETHIS
+ groups text default '*', -- wildmat
+ wildmat text default '*',
+ wildmatre text default '.', -- parsed wildmat
+ noxposts text,
+ noxpostsre text,
+ distribution text,
+ distributionre text,
+ -- TODO feed moderated/unmoderated
+ -- see http://www.faqs.org/docs/linux_network/x18341.html
+ maxsize integer, -- maximum article size to feed
+ localonly boolean default false,
+ path text, -- regular expression default host?
+ frequency interval default '1 hour'::interval, -- int secs?
+ feedtime timestamp with time zone
+);
+-- todo check(port >= 0 and port <= 65535)
+
+create table sessions (
+ peer text,
+ addr inet,
+ port integer,
+ incoming boolean not null default true,
+ connected timestamp with time zone,
+ closed timestamp with time zone default clock_timestamp(),
+ received integer, -- 239, 235
+ refused integer, -- 435
+ rejected integer, -- 439, 437
+ postponed integer -- 436
+);
+-- offered == all those together
+create view session_summary as select peer, addr, port, connected, sum(received) as received, sum(refused) as refused, sum(rejected) as rejected, sum(postponed) as postponed, coalesce(max(closed), clock_timestamp()) - connected as duration from sessions group by peer, addr, port, connected;
+
+
+create table feedq (
+ peer text references feeds on delete cascade,
+ article text references articles on delete cascade,
+ response integer
+);
+
+create function feedto(newsgroups text, host text) returns boolean as $$
+begin
+ return (select
+ true =
+ any (select regexp_split_to_table(newsgroups, E',')
+ ~ F.wildmatre)
+ and
+ not true =
+ any (select regexp_split_to_table(newsgroups, E',')
+ ~ F.noxpostsre)
+ from feeds F
+ where F.host = host
+ )
+ ;
+end;
+$$ language 'plpgsql';
+
+create or replace function feedout() returns trigger as $$
+begin
+ insert into feedq
+ select coalesce(F.host, F.name), NEW.id
+ from feeds F
+ where true = any (select regexp_split_to_table(NEW.newsgroups, E',')
+ ~ F.wildmatre)
+ -- TODO add in noxposts
+ and coalesce(F.host,F.name) != NEW.peer
+ and F.enabled
+ ;
+ if FOUND then
+ NOTIFY feedq;
+ end if;
+ return NEW;
+end;
+$$ language 'plpgsql';
+
+create trigger feedpeer_trigger after insert on articles
+for each row execute procedure feedout();
+
+create table log (
+ ts timestamp with time zone default CURRENT_TIMESTAMP,
+ client inet,
+ port integer,
+ pid integer,
+ priority integer,
+ facility text,
+ message text
+);
--- /dev/null
+#!/usr/bin/perl
+
+use strict;
+use warnings;
+
+use DBI;
+use IO::Socket;
+
+my $peer = shift;
+
+my $debug = 0;
+
+if ($peer eq '-v') {
+ $debug++;
+ $peer = shift;
+}
+
+my $dsn = $ENV{'DBI_DSN'};
+$dsn = 'dbi:Pg:dbname=news' unless defined $dsn;
+
+my $db = DBI->connect($dsn,'','',{AutoCommit => 0,RaiseError=>1});
+
+die unless $db;
+
+$db->do("set CLIENT_ENCODING to 'SQL_ASCII'");
+
+my $batch = $db->prepare(q{select Q.article as id, multiline(A.header || E'\r\n' || A.body) as content from feedq Q left join articles A on A.id = Q.article where Q.peer = ? and response is null limit ?});
+my $qsize = $db->prepare(q{select count(Q.article) from feedq Q left join articles A on A.id = Q.article where Q.peer = ? and response is null});
+
+my $peerinfo = $db->prepare('select coalesce(P.host,P.name), P.port, P.feedtime is null or P.feedtime + P.frequency <= CURRENT_TIMESTAMP as wantfeed from feeds P where P.name = ?');
+
+my $mark = $db->prepare('update feedq set response = ? where article = ? and peer = ?');
+
+my $purge = $db->prepare('delete from feedq where (response = 235 or response = 437 or response = 435) and peer = ?');
+my $clean = $db->prepare('update feedq set response = NULL where response = 436 and peer = ?');
+my $feedtime = $db->prepare('update feeds set feedtime = CURRENT_TIMESTAMP where host = ?');
+my $session = $db->prepare(q{insert into sessions (peer, addr, port, incoming, connected, received, refused, rejected, postponed) values (?,?,?,?,timestamp with time zone 'epoch'+? * INTERVAL '1 second' ,?,?,?,?)});
+
+my $summary = $db->prepare(q{select received, refused, rejected, postponed, connected, duration from session_summary where connected = timestamp with time zone 'epoch' + ? * INTERVAL '1 second'});
+
+my $connecttime;
+
+my ($host, $port, $feed);
+sub login {
+ my ($peer) = @_;
+
+ $peerinfo->execute($peer);
+ ($host, $port, $feed) = $peerinfo->fetchrow_array;
+
+ $qsize->execute($peer);
+ my ($count) = $qsize->fetchrow_array;
+
+ warn "$peer: $count articles in queue\n";
+ if ($count == 0) {
+ exit 0;
+ }
+
+ if (!$feed) {
+ warn "no feed due\n";
+ exit 0;
+ }
+
+ warn "connecting to $peer:$port\n";
+ my $s = IO::Socket::INET->new(
+ PeerAddr => $host,
+ PeerPort => $port,
+ Proto => 'tcp'
+ );
+ die unless $s;
+
+ $host = $s->peerhost();
+ $connecttime = time;
+
+ my $rc = response($s);
+ if ($rc != 200) {
+ if ($rc == 201) {
+ warn "posting prohibited on $peer\n";
+ $s->print("quit\r\n");
+ }
+ $s->close();
+ exit 1;
+ }
+
+ return $s;
+}
+
+#my $s = IO::Handle->new(); $s->fdopen(0, 'r'); $s->fdopen(1, 'w');
+
+#exit 0 unless $feed;
+
+my $batchsize = 100;
+
+my $setfeedtime = 1;
+my $cleanup = 1;
+
+my $s = login($peer);
+
+if ($setfeedtime) {
+ $feedtime->execute($peer);
+ $db->commit();
+}
+
+# TODO use streaming CHECK/TAKETHIS
+while (1) {
+ my %r = (435 => 0, 436 => 0, 437 => 0, 235 => 0);
+ $batch->execute($peer, $batchsize);
+ while (my ($id, $data) = $batch->fetchrow_array) {
+ my $rc = ihave($s, $id, $data);
+ $r{$rc}++;
+ }
+ last unless $batch->rows;
+
+ $session->execute($peer, $host, $port, 0, $connecttime,
+ $r{235}, $r{435}, $r{437}, $r{436});
+
+ warn "checkpointing $peer ", $batch->rows, " articles\n";
+ $db->commit;
+}
+warn ":closing $peer:$port\n";
+warn "quit\n";
+$s->print("quit\r\n");
+my $rc = response($s);
+if ($rc != 205) {
+ warn "server failed to properly respond to close\n";
+}
+$s->close();
+
+if ($cleanup) {
+ my $rv = $purge->execute($peer);
+ warn "deleted $rv articles from queue\n";
+ $rv = $clean->execute($peer);
+ warn "requeued $rv articles\n";
+ $db->commit();
+}
+
+$summary->execute($connecttime);
+while (my @row = $summary->fetchrow_array) {
+ printf(q{%s: %d received, %d refused, %d rejected, %d postponed, %s (%s)}, $peer, @row);
+ print "\n";
+}
+
+# TODO check for other/illegal responses
+sub ihave {
+ my ($s, $id, $data) = @_;
+
+ $s->print("ihave $id\r\n");
+ warn "ihave $id\n" if $debug;
+ my $rc = response($s);
+ if ($rc == 335) {
+ warn "$peer: sending $id\n" if $debug;
+ $s->print($data);
+ $rc = response($s);
+ }
+ update($rc, $id, $peer);
+ return $rc;
+}
+
+sub response {
+ my ($s) = @_;
+
+ my $line = $s->getline;
+ warn "> $line" if $debug;
+ $line =~ s/^\s+//; $line =~ s/\s+$//;
+ my ($rc, $msg) = split(/\s+/, $line, 2);
+ if ($rc !~ /^\d\d\d/) {
+ die "invalid response: $line";
+ }
+ return ($rc, $msg) if wantarray;
+ return $rc;
+}
+
+sub update {
+ my ($rc, $id, $peer) = @_;
+ $mark->execute(@_);
+ return 1;
+}
+#$s->close();