Move My-Audit to PerlMail::Config.
[PerlMail.git] / perlmail-submit
1 #! /usr/bin/perl
2 #
3 #       $Id$
4
5 use vars qw($VERSION);
6 $VERSION=do { my @r=(q$Revision$=~/\d+/g); sprintf "%d.".("%03d"x$#r),@r; };
7 use strict;
8 use warnings;
9
10 use File::Basename;
11 BEGIN {
12         use lib $ENV{"PERLMAIL_BASEDIR"} || File::Basename::dirname($0);
13         use PerlMail::Config;
14         }
15
16 use Getopt::Long;
17 use DBI;
18 use Carp qw(cluck confess);
19 require IO::Socket::INET;
20 use IO::Handle;
21 use POSIX qw(mktime);
22 use Fcntl qw(:flock);
23
24
25 open DBI_PWD,$DBI_pwd or die "open \"$DBI_pwd\": $!";
26 $DBI_pwd=<DBI_PWD>;
27 close DBI_PWD or warn "close DBI_pwd: $!";
28 chomp $DBI_pwd;
29
30 my $DBI=DBI->connect_cached("DBI:mysql:database=$DBI_database;host=","$DBI_user",$DBI_pwd,{
31                 "PrintError"=>0,        # handled by "RaiseError" below
32                 "RaiseError"=>1,
33                 "ShowErrorStatement"=>1,
34                 "AutoCommit"=>1,
35                 }) or confess "Failed DBI->connect(): $!";
36
37 # $name,@$cols
38 sub create_table
39 {
40 my($name,$cols)=@_;
41
42         eval { $DBI->do("drop table $name"); };
43         $DBI->do("create table $name (".join(",",@$cols).")");
44 }
45
46 sub initdb
47 {
48         create_table($DB_table,[
49                                         "id int not null auto_increment primary key",
50                                         "time timestamp not null",      # assume ." default now()"
51                                         "message longtext not null",
52                                         "retries int null default 0",   # null=>done, 0=not yet tried to submit
53                                         ],
54                         );
55         $DBI->do("alter table $DB_table add index (retries,id)");
56         print "done.\n";
57         exit 0;
58 }
59
60 sub store
61 {
62         my $message;
63         {
64                 local $/;
65                 $message=<STDIN>;
66                 }
67         close STDIN or cluck "close STDIN: $!";
68         my %row=(
69                         "message"=>$message,
70                         # assume "retries"=>0,
71                         );
72         my $prep=$DBI->prepare_cached("insert into $DB_table (".join(",",keys(%row)).")"
73                         ." values (".join(",",map("?",keys(%row))).")");
74         $prep->execute(values(%row));
75         print $prep->{"mysql_insertid"}."\n";
76 }
77
78 sub forkoff
79 {
80         my $pid=fork();
81         confess if !defined $pid;
82         exit 0 if $pid; # parent
83         # child
84 }
85
86 my $submitonce_run=0;
87 sub submitonce
88 {
89         $submitonce_run++;
90         local *LOCK;
91         open LOCK,">>$Lock_pathname" or die "open-append \"$Lock_pathname\": $!";
92         if (!flock LOCK,LOCK_EX|LOCK_NB) {
93                 # NEVER unlink here, we are not the lock owning process!
94                 print "LOCKED\n";
95                 exit 0;
96                 }
97         my $sth=$DBI->prepare("select id,message from $DB_table where retries is not null"
98                         # process only non-problematic mails during rerun
99                         .($submitonce_run==1 ? "" : " and retries=0")
100                         ." order by retries asc,id asc");
101         $sth->execute();
102         my $progresschar="";
103         autoflush STDOUT 1;
104         my $sock;
105         while (my $row=$sth->fetchrow_hashref()) {
106                 $DBI->do("update $DB_table set retries=retries+1 where id=".$row->{"id"});
107                 if (!$sock) {
108                         $sock=IO::Socket::INET->new(
109                                         "PeerAddr"=>$PeerAddr,
110                                         "Proto"   =>"tcp",
111                                         ) or confess "IO::Socket::INET->new(\"$PeerAddr\"): $!";
112                         $sock->connected() or confess "socket not connected";
113                         }
114                 $sock->printflush(length($row->{"message"})."\n".$row->{"message"});
115                 alarm $Socket_timeout and $sock->timeout($Socket_timeout) if $Socket_timeout;
116                 my $got;
117                 my $gotlen=$sock->sysread($got,1);
118                 confess $row->{"id"}.": sysread(1)=".(!defined $gotlen ? "undef" : $gotlen).": $!"
119                                 if !defined($gotlen) || $gotlen!=1;
120                 alarm 0;
121                 if ($got ne "1") {
122                         # Prevent mailing errors from cron invoking us etc.
123                         #print STDERR "FAIL:".$row->{"id"}."\n";
124                         undef $sock;
125                         }
126                 else {
127                         $DBI->do("update $DB_table set retries=null where id=".$row->{"id"});
128                         }
129                 print $progresschar.$row->{"id"}.($got eq "1" ? "" : "=FAIL");
130                 $progresschar=",";
131                 }
132         if ($sock) {
133                 $sock->shutdown(0);     # stopped reading
134                 $sock->printflush("BYE\n");
135                 $sock->shutdown(2);     # stopped using
136                 undef $sock;
137                 }
138         print "\n" if $progresschar;
139         unlink $Lock_pathname;
140         close LOCK;
141         return $progresschar;
142 }
143
144 sub submit
145 {
146         1 while submitonce();
147 }
148
149 sub pending
150 {
151         my $sth=$DBI->prepare("select message from $DB_table where retries is not null order by id");
152         $sth->execute();
153         while (my $row=$sth->fetchrow_hashref()) {
154                 print $row->{"message"},"\n";
155                 }
156 }
157
158 sub clean
159 {
160 my($keyword,$interval)=@_;
161
162         # FIXME: SQL "now()" is raced against the block above
163         my $sth=$DBI->prepare("select id,time,retries from $DB_table where time>now()");
164         $sth->execute();
165         while (my $row=$sth->fetchrow_hashref()) {
166                 warn "Message time in future: ".join(",",map(
167                                 "$_=".(!defined $row->{$_} ? "NULL" : $row->{$_})
168                                 ,keys(%$row)));
169                 }
170
171         return if $interval eq "";
172         local $_=$interval;
173         my $print=s/^print://;
174         s/(\d+)y/($1*12)."m"/ge;
175         s/(\d+)m/($1*30)."d"/ge;
176         s/(\d+)d/($1*24)."h"/ge;
177         s/(\d+)h/($1*60)."M"/ge;
178         s/(\d+)M/($1*60)."s"/ge;
179         my $sec=0;
180         $sec+=$1 while s/(\d+)s//g;
181         die "Interval parse error; left \"$_\", parsed: $interval" if $_ ne "";
182         $sth=$DBI->prepare(($print ? "select id" : "delete")
183                         ." from $DB_table where retries is null and time<from_unixtime(unix_timestamp()-$sec)");
184         $sth->execute();
185         if (!$print) {
186                 print $sth->rows()."\n";
187                 }
188         else {
189                 while (my $row=$sth->fetchrow_hashref()) {
190                         print $row->{"id"},"\n";
191                         }
192                 }
193 }
194
195
196 $Getopt::Long::ignorecase=0;
197 die if !GetOptions(
198                   "initdb" ,\&initdb,
199                   "store"  ,\&store,
200                   "forkoff",\&forkoff,
201                   "submit" ,\&submit,
202                   "pending",\&pending,
203                   "clean:s",\&clean,
204                 "V|version",sub { print "perlmail-submit: $VERSION\n"; exit 0; },
205                 );
206 exit 0;