Separate Config space.
[PerlMail.git] / perlmail-submit
1 #! /usr/bin/perl
2 #
3 #       $Id$
4
5 use vars qw($VERSION);
6 $VERSION=do { my @r=(q$Revision$=~/\d+/g); sprintf "%d.".("%03d"x$#r),@r; };
7 use strict;
8 use warnings;
9
10 use PerlMail::Config;
11
12 use Getopt::Long;
13 use DBI;
14 use Carp qw(cluck confess);
15 require IO::Socket::INET;
16 use IO::Handle;
17 use POSIX qw(mktime);
18 use Fcntl qw(:flock);
19
20
21 open DBI_PWD,$DBI_pwd or die "open \"$DBI_pwd\": $!";
22 $DBI_pwd=<DBI_PWD>;
23 close DBI_PWD or warn "close DBI_pwd: $!";
24 chomp $DBI_pwd;
25
26 my $DBI=DBI->connect_cached("DBI:mysql:database=$DBI_database;host=","$DBI_user",$DBI_pwd,{
27                 "PrintError"=>0,        # handled by "RaiseError" below
28                 "RaiseError"=>1,
29                 "ShowErrorStatement"=>1,
30                 "AutoCommit"=>1,
31                 }) or confess "Failed DBI->connect(): $!";
32
33 # $name,@$cols
34 sub create_table
35 {
36 my($name,$cols)=@_;
37
38         eval { $DBI->do("drop table $name"); };
39         $DBI->do("create table $name (".join(",",@$cols).")");
40 }
41
42 sub initdb
43 {
44         create_table($DB_table,[
45                                         "id int not null auto_increment primary key",
46                                         "time timestamp not null",      # assume ." default now()"
47                                         "message longtext not null",
48                                         "retries int null default 0",   # null=>done, 0=not yet tried to submit
49                                         ],
50                         );
51         $DBI->do("alter table $DB_table add index (retries,id)");
52         print "done.\n";
53         exit 0;
54 }
55
56 sub store
57 {
58         my $message;
59         {
60                 local $/;
61                 $message=<STDIN>;
62                 }
63         close STDIN or cluck "close STDIN: $!";
64         my %row=(
65                         "message"=>$message,
66                         # assume "retries"=>0,
67                         );
68         my $prep=$DBI->prepare_cached("insert into $DB_table (".join(",",keys(%row)).")"
69                         ." values (".join(",",map("?",keys(%row))).")");
70         $prep->execute(values(%row));
71         print $prep->{"mysql_insertid"}."\n";
72 }
73
74 sub forkoff
75 {
76         my $pid=fork();
77         confess if !defined $pid;
78         exit 0 if $pid; # parent
79         # child
80 }
81
82 my $submitonce_run=0;
83 sub submitonce
84 {
85         $submitonce_run++;
86         local *LOCK;
87         open LOCK,">>$Lock_pathname" or die "open-append \"$Lock_pathname\": $!";
88         if (!flock LOCK,LOCK_EX|LOCK_NB) {
89                 # NEVER unlink here, we are not the lock owning process!
90                 print "LOCKED\n";
91                 exit 0;
92                 }
93         my $sth=$DBI->prepare("select id,message from $DB_table where retries is not null"
94                         # process only non-problematic mails during rerun
95                         .($submitonce_run==1 ? "" : " and retries=0")
96                         ." order by retries asc,id asc");
97         $sth->execute();
98         my $progresschar="";
99         autoflush STDOUT 1;
100         my $sock;
101         while (my $row=$sth->fetchrow_hashref()) {
102                 $DBI->do("update $DB_table set retries=retries+1 where id=".$row->{"id"});
103                 if (!$sock) {
104                         $sock=IO::Socket::INET->new(
105                                         "PeerAddr"=>$PeerAddr,
106                                         "Proto"   =>"tcp",
107                                         ) or confess "IO::Socket::INET->new(\"$PeerAddr\"): $!";
108                         $sock->connected() or confess "socket not connected";
109                         }
110                 $sock->printflush(length($row->{"message"})."\n".$row->{"message"});
111                 alarm $Socket_timeout and $sock->timeout($Socket_timeout) if $Socket_timeout;
112                 my $got;
113                 my $gotlen=$sock->sysread($got,1);
114                 confess $row->{"id"}.": sysread(1)=".(!defined $gotlen ? "undef" : $gotlen).": $!"
115                                 if !defined($gotlen) || $gotlen!=1;
116                 alarm 0;
117                 if ($got ne "1") {
118                         # Prevent mailing errors from cron invoking us etc.
119                         #print STDERR "FAIL:".$row->{"id"}."\n";
120                         undef $sock;
121                         }
122                 else {
123                         $DBI->do("update $DB_table set retries=null where id=".$row->{"id"});
124                         }
125                 print $progresschar.$row->{"id"}.($got eq "1" ? "" : "=FAIL");
126                 $progresschar=",";
127                 }
128         if ($sock) {
129                 $sock->shutdown(0);     # stopped reading
130                 $sock->printflush("BYE\n");
131                 $sock->shutdown(2);     # stopped using
132                 undef $sock;
133                 }
134         print "\n" if $progresschar;
135         unlink $Lock_pathname;
136         close LOCK;
137         return $progresschar;
138 }
139
140 sub submit
141 {
142         1 while submitonce();
143 }
144
145 sub pending
146 {
147         my $sth=$DBI->prepare("select message from $DB_table where retries is not null order by id");
148         $sth->execute();
149         while (my $row=$sth->fetchrow_hashref()) {
150                 print $row->{"message"},"\n";
151                 }
152 }
153
154 sub clean
155 {
156 my($keyword,$interval)=@_;
157
158         # FIXME: SQL "now()" is raced against the block above
159         my $sth=$DBI->prepare("select id,time,retries from $DB_table where time>now()");
160         $sth->execute();
161         while (my $row=$sth->fetchrow_hashref()) {
162                 warn "Message time in future: ".join(",",map(
163                                 "$_=".(!defined $row->{$_} ? "NULL" : $row->{$_})
164                                 ,keys(%$row)));
165                 }
166
167         return if $interval eq "";
168         local $_=$interval;
169         my $print=s/^print://;
170         s/(\d+)y/($1*12)."m"/ge;
171         s/(\d+)m/($1*30)."d"/ge;
172         s/(\d+)d/($1*24)."h"/ge;
173         s/(\d+)h/($1*60)."M"/ge;
174         s/(\d+)M/($1*60)."s"/ge;
175         my $sec=0;
176         $sec+=$1 while s/(\d+)s//g;
177         die "Interval parse error; left \"$_\", parsed: $interval" if $_ ne "";
178         $sth=$DBI->prepare(($print ? "select id" : "delete")
179                         ." from $DB_table where retries is null and time<from_unixtime(unix_timestamp()-$sec)");
180         $sth->execute();
181         if (!$print) {
182                 print $sth->rows()."\n";
183                 }
184         else {
185                 while (my $row=$sth->fetchrow_hashref()) {
186                         print $row->{"id"},"\n";
187                         }
188                 }
189 }
190
191
192 $Getopt::Long::ignorecase=0;
193 die if !GetOptions(
194                   "initdb" ,\&initdb,
195                   "store"  ,\&store,
196                   "forkoff",\&forkoff,
197                   "submit" ,\&submit,
198                   "pending",\&pending,
199                   "clean:s",\&clean,
200                 "V|version",sub { print "perlmail-submit: $VERSION\n"; exit 0; },
201                 );
202 exit 0;