Fix (workaround) storing of /^From /m messages to UNIX mbox files.
[PerlMail.git] / perlmail-submit
1 #! /usr/bin/perl
2
3 #       $Id$
4 # Copyright (C) 2002-2003 Jan Kratochvil <project-PerlMail@jankratochvil.net>
5
6 # This program is free software; you can redistribute it and/or modify
7 # it under the terms of the GNU General Public License as published by
8 # the Free Software Foundation; either version 2 of the License, or
9 # (at your option) any later version.
10
11 # This program is distributed in the hope that it will be useful,
12 # but WITHOUT ANY WARRANTY; without even the implied warranty of
13 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
14 # GNU General Public License for more details.
15
16 # You should have received a copy of the GNU General Public License
17 # along with this program; if not, write to the Free Software
18 # Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA  02111-1307  USA
19
20
21 use vars qw($VERSION);
22 $VERSION=do { my @r=(q$Revision$=~/\d+/g); sprintf "%d.".("%03d"x$#r),@r; };
23 use strict;
24 use warnings;
25
26 use File::Basename;
27 BEGIN {
28         use lib $ENV{"PERLMAIL_BASEDIR"} || File::Basename::dirname($0);
29         use PerlMail::Config;
30         }
31
32 use Getopt::Long;
33 use DBI;
34 use Carp qw(cluck confess);
35 require IO::Socket::INET;
36 use IO::Handle;
37 use POSIX qw(mktime);
38 use Fcntl qw(:flock);
39
40
41 open DBI_PWD,$DBI_pwd or die "open \"$DBI_pwd\": $!";
42 $DBI_pwd=<DBI_PWD>;
43 close DBI_PWD or warn "close DBI_pwd: $!";
44 chomp $DBI_pwd;
45
46 my $DBI;
47 sub DBI
48 {
49         return $DBI if $DBI;
50         $DBI=DBI->connect("DBI:mysql:database=$DBI_database;host=","$DBI_user",$DBI_pwd,{
51                 "PrintError"=>0,        # handled by "RaiseError" below
52                 "RaiseError"=>1,
53                 "ShowErrorStatement"=>1,
54                 "AutoCommit"=>1,
55                 }) or confess "Failed DBI->connect(): $!";
56         return $DBI;
57 }
58
59 # $name,@$cols
60 sub create_table
61 {
62 my($name,$cols)=@_;
63
64         eval { DBI()->do("drop table $name"); };
65         DBI()->do("create table $name (".join(",",@$cols).")");
66 }
67
68 sub initdb
69 {
70         create_table($DB_table,[
71                                         "id int not null auto_increment primary key",
72                                         "time timestamp not null",      # assume ." default now()"
73                                         "message longtext not null",
74                                         "retries int null default 0",   # null=>done, 0=not yet tried to submit
75                                         ],
76                         );
77         DBI()->do("alter table $DB_table add index (retries,id)");
78         print "done.\n";
79         exit 0;
80 }
81
82 sub store
83 {
84         my $message;
85         {
86                 local $/;
87                 $message=<STDIN>;
88                 }
89         close STDIN or cluck "close STDIN: $!";
90         my %row=(
91                         "message"=>$message,
92                         # assume "retries"=>0,
93                         );
94         my $prep=DBI()->prepare("insert into $DB_table (".join(",",keys(%row)).")"
95                         ." values (".join(",",map("?",keys(%row))).")");
96         $prep->execute(values(%row));
97         print $prep->{"mysql_insertid"}."\n";
98 }
99
100 sub forkoff
101 {
102         my $pid=fork();
103         confess if !defined $pid;
104         $DBI=undef();   # Prevent: Server has gone away
105         exit 0 if $pid; # parent
106         # child
107 }
108
109 my $submitonce_run=0;
110 sub submitonce
111 {
112         $submitonce_run++;
113         local *LOCK;
114         open LOCK,">>$Lock_pathname" or die "open-append \"$Lock_pathname\": $!";
115         if (!flock LOCK,LOCK_EX|LOCK_NB) {
116                 # NEVER unlink here, we are not the lock owning process!
117                 print "LOCKED\n";
118                 exit 0;
119                 }
120         my $sth=DBI()->prepare("select id,message from $DB_table where retries is not null"
121                         # process only non-problematic mails during rerun
122                         .($submitonce_run==1 ? "" : " and retries=0")
123                         ." order by retries asc,id asc");
124         $sth->execute();
125         my $progresschar="";
126         autoflush STDOUT 1;
127         my $sock;
128         while (my $row=$sth->fetchrow_hashref()) {
129                 DBI()->do("update $DB_table set retries=retries+1 where id=".$row->{"id"});
130                 if (!$sock) {
131                         $sock=IO::Socket::INET->new(
132                                         "PeerAddr"=>$PeerAddr,
133                                         "Proto"   =>"tcp",
134                                         ) or confess "IO::Socket::INET->new(\"$PeerAddr\"): $!";
135                         $sock->connected() or confess "socket not connected";
136                         }
137                 $sock->printflush(length($row->{"message"})."\n".$row->{"message"});
138                 alarm $Socket_timeout and $sock->timeout($Socket_timeout) if $Socket_timeout;
139                 my $got;
140                 my $gotlen=$sock->sysread($got,1);
141                 confess $row->{"id"}.": sysread(1)=".(!defined $gotlen ? "undef" : $gotlen).": $!"
142                                 if !defined($gotlen) || $gotlen!=1;
143                 alarm 0;
144                 if ($got ne "1") {
145                         # Prevent mailing errors from cron invoking us etc.
146                         #print STDERR "FAIL:".$row->{"id"}."\n";
147                         undef $sock;
148                         }
149                 else {
150                         DBI()->do("update $DB_table set retries=null where id=".$row->{"id"});
151                         }
152                 print $progresschar.$row->{"id"}.($got eq "1" ? "" : "=FAIL");
153                 $progresschar=",";
154                 }
155         if ($sock) {
156                 $sock->shutdown(0);     # stopped reading
157                 $sock->printflush("BYE\n");
158                 $sock->shutdown(2);     # stopped using
159                 undef $sock;
160                 }
161         print "\n" if $progresschar;
162         unlink $Lock_pathname;
163         close LOCK;
164         return $progresschar;
165 }
166
167 sub submit
168 {
169         1 while submitonce();
170 }
171
172 sub print_messages
173 {
174 my($cond)=@_;
175
176         my $sth=DBI()->prepare("select message from $DB_table $cond order by id");
177         $sth->execute();
178         while (my $row=$sth->fetchrow_hashref()) {
179                 print $row->{"message"},"\n";
180                 }
181 }
182
183 sub pending
184 {
185         print_messages("where retries is not null");
186 }
187
188 sub dump
189 {
190         print_messages("");
191 }
192
193 sub clean
194 {
195 my($keyword,$interval)=@_;
196
197         # FIXME: SQL "now()" is raced against the block above
198         my $sth=DBI()->prepare("select id,time,retries from $DB_table where time>now()");
199         $sth->execute();
200         while (my $row=$sth->fetchrow_hashref()) {
201                 warn "Message time in future: ".join(",",map(
202                                 "$_=".(!defined $row->{$_} ? "NULL" : $row->{$_})
203                                 ,keys(%$row)));
204                 }
205
206         return if $interval eq "";
207         local $_=$interval;
208         my $print=s/^print://;
209         s/(\d+)y/($1*12)."m"/ge;
210         s/(\d+)m/($1*30)."d"/ge;
211         s/(\d+)d/($1*24)."h"/ge;
212         s/(\d+)h/($1*60)."M"/ge;
213         s/(\d+)M/($1*60)."s"/ge;
214         my $sec=0;
215         $sec+=$1 while s/(\d+)s//g;
216         die "Interval parse error; left \"$_\", parsed: $interval" if $_ ne "";
217         $sth=DBI()->prepare(($print ? "select id" : "delete")
218                         ." from $DB_table where retries is null and time<from_unixtime(unix_timestamp()-$sec)");
219         $sth->execute();
220         if (!$print) {
221                 print $sth->rows()."\n";
222                 }
223         else {
224                 while (my $row=$sth->fetchrow_hashref()) {
225                         print $row->{"id"},"\n";
226                         }
227                 }
228 }
229
230
231 $Getopt::Long::ignorecase=0;
232 die if !GetOptions(
233                   "initdb" ,\&initdb,
234                   "store"  ,\&store,
235                   "forkoff",\&forkoff,
236                   "submit" ,\&submit,
237                   "pending",\&pending,
238                   "dump"   ,\&dump,
239                   "clean:s",\&clean,
240                 "V|version",sub { print "perlmail-submit: $VERSION\n"; exit 0; },
241                 );
242 exit 0;