Fixed &User::Utmp::USER_PROCESS FC5 incompatiblity.
[PerlMail.git] / perlmail-submit
1 #! /usr/bin/perl
2
3 #       $Id$
4 # Copyright (C) 2002-2003 Jan Kratochvil <project-PerlMail@jankratochvil.net>
5
6 # This program is free software; you can redistribute it and/or modify
7 # it under the terms of the GNU General Public License as published by
8 # the Free Software Foundation; either version 2 of the License, or
9 # (at your option) any later version.
10
11 # This program is distributed in the hope that it will be useful,
12 # but WITHOUT ANY WARRANTY; without even the implied warranty of
13 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
14 # GNU General Public License for more details.
15
16 # You should have received a copy of the GNU General Public License
17 # along with this program; if not, write to the Free Software
18 # Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA  02111-1307  USA
19
20
21 use vars qw($VERSION);
22 $VERSION=do { my @r=(q$Revision$=~/\d+/g); sprintf "%d.".("%03d"x$#r),@r; };
23 use strict;
24 use warnings;
25
26 use File::Basename;
27 BEGIN {
28         use lib $ENV{"PERLMAIL_BASEDIR"} || File::Basename::dirname($0);
29         use PerlMail::Config;
30         }
31
32 use Getopt::Long;
33 use DBI;
34 use Carp qw(cluck confess);
35 require IO::Socket::INET;
36 use IO::Handle;
37 use POSIX qw(mktime);
38 use Fcntl qw(:flock);
39
40
41 my $DBI_CACHE=0;        # Cache DBI requests - may cause: MySQL server has gone away
42
43
44 open DBI_PWD,$DBI_pwd or die "open \"$DBI_pwd\": $!";
45 $DBI_pwd=<DBI_PWD>;
46 close DBI_PWD or warn "close DBI_pwd: $!";
47 chomp $DBI_pwd;
48
49 my $DBI;
50 sub DBI
51 {
52         return $DBI if $DBI_CACHE && $DBI;
53         $DBI=DBI->connect("DBI:mysql:database=$DBI_database;host=","$DBI_user",$DBI_pwd,{
54                 "PrintError"=>0,        # handled by "RaiseError" below
55                 "RaiseError"=>1,
56                 "ShowErrorStatement"=>1,
57                 "AutoCommit"=>1,
58                 }) or confess "Failed DBI->connect(): $!";
59         return $DBI;
60 }
61
62 # $name,@$cols
63 sub create_table
64 {
65 my($name,$cols)=@_;
66
67         eval { DBI()->do("drop table $name"); };
68         DBI()->do("create table $name (".join(",",@$cols).")");
69 }
70
71 sub initdb
72 {
73         create_table($DB_table,[
74                                         "id int not null auto_increment primary key",
75                                         "time timestamp not null",      # assume ." default now()"
76                                         "message longtext not null",
77                                         "retries int null default 0",   # null=>done, 0=not yet tried to submit
78                                         ],
79                         );
80         DBI()->do("alter table $DB_table add index (retries,id)");
81         DBI()->do("alter table $DB_table add index (time,retries)");
82         print "done.\n";
83         exit 0;
84 }
85
86 sub store
87 {
88         my $message;
89         {
90                 local $/;
91                 $message=<STDIN>;
92                 }
93         close STDIN or cluck "close STDIN: $!";
94         my %row=(
95                         "message"=>$message,
96                         # assume "retries"=>0,
97                         );
98         my $prep=DBI()->prepare("insert into $DB_table (".join(",",keys(%row)).")"
99                         ." values (".join(",",map("?",keys(%row))).")");
100         $prep->execute(values(%row));
101         print $prep->{"mysql_insertid"}."\n";
102 }
103
104 sub forkoff
105 {
106         my $pid=fork();
107         confess if !defined $pid;
108         $DBI=undef();   # Prevent: Server has gone away
109         exit 0 if $pid; # parent
110         # child
111 }
112
113 my $submitonce_run=0;
114 sub submitonce
115 {
116         $submitonce_run++;
117         local *LOCK;
118         open LOCK,">>$Lock_pathname" or die "open-append \"$Lock_pathname\": $!";
119         if (!flock LOCK,LOCK_EX|LOCK_NB) {
120                 # NEVER unlink here, we are not the lock owning process!
121                 print "LOCKED\n";
122                 exit 0;
123                 }
124         my $sth=DBI()->prepare("select id,message from $DB_table where retries is not null"
125                         # process only non-problematic mails during rerun
126                         .($submitonce_run==1 ? "" : " and retries=0")
127                         ." order by retries asc,id asc");
128         $sth->execute();
129         my $progresschar="";
130         autoflush STDOUT 1;
131         my $sock;
132         while (my $row=$sth->fetchrow_hashref()) {
133                 DBI()->do("update $DB_table set retries=retries+1 where id=".$row->{"id"});
134                 if (!$sock) {
135                         $sock=IO::Socket::INET->new(
136                                         "PeerAddr"=>$PeerAddr,
137                                         "Proto"   =>"tcp",
138                                         ) or confess "IO::Socket::INET->new(\"$PeerAddr\"): $!";
139                         $sock->connected() or confess "socket not connected";
140                         }
141                 $sock->printflush(length($row->{"message"})."\n".$row->{"message"});
142                 alarm $Socket_timeout and $sock->timeout($Socket_timeout) if $Socket_timeout;
143                 my $got;
144                 my $gotlen=$sock->sysread($got,1);
145                 confess $row->{"id"}.": sysread(1)=".(!defined $gotlen ? "undef" : $gotlen).": $!"
146                                 if !defined($gotlen) || $gotlen!=1;
147                 alarm 0;
148                 if ($got ne "1") {
149                         # Prevent mailing errors from cron invoking us etc.
150                         #print STDERR "FAIL:".$row->{"id"}."\n";
151                         undef $sock;
152                         }
153                 else {
154                         DBI()->do("update $DB_table set retries=null where id=".$row->{"id"});
155                         }
156                 print $progresschar.$row->{"id"}.($got eq "1" ? "" : "=FAIL");
157                 $progresschar=",";
158                 }
159         if ($sock) {
160                 $sock->shutdown(0);     # stopped reading
161                 $sock->printflush("BYE\n");
162                 $sock->shutdown(2);     # stopped using
163                 undef $sock;
164                 }
165         print "\n" if $progresschar;
166         unlink $Lock_pathname;
167         close LOCK;
168         return $progresschar;
169 }
170
171 sub submit
172 {
173         1 while submitonce();
174 }
175
176 sub print_messages
177 {
178 my($cond)=@_;
179
180         my $sth=DBI()->prepare("select message from $DB_table $cond order by id"
181 #                       ." limit 4001,999999"           # FIXME
182                         );
183         $sth->execute();
184         while (my $row=$sth->fetchrow_hashref()) {
185                 print $row->{"message"},"\n";
186                 }
187 }
188
189 sub pending
190 {
191         print_messages("where retries is not null");
192 }
193
194 sub dump
195 {
196         print_messages("");
197 }
198
199 sub clean
200 {
201 my($keyword,$interval)=@_;
202
203         # FIXME: SQL "now()" is raced against the block above
204         my $sth=DBI()->prepare("select id,time,retries from $DB_table where time>now()");
205         $sth->execute();
206         while (my $row=$sth->fetchrow_hashref()) {
207                 warn "Message time in future: ".join(",",map(
208                                 "$_=".(!defined $row->{$_} ? "NULL" : $row->{$_})
209                                 ,keys(%$row)));
210                 }
211
212         return if $interval eq "";
213         local $_=$interval;
214         my $print=s/^print://;
215         s/(\d+)y/($1*12)."m"/ge;
216         s/(\d+)m/($1*30)."d"/ge;
217         s/(\d+)d/($1*24)."h"/ge;
218         s/(\d+)h/($1*60)."M"/ge;
219         s/(\d+)M/($1*60)."s"/ge;
220         my $sec=0;
221         $sec+=$1 while s/(\d+)s//g;
222         die "Interval parse error; left \"$_\", parsed: $interval" if $_ ne "";
223         $sth=DBI()->prepare(($print ? "select id" : "delete")
224                         ." from $DB_table where retries is null and time<from_unixtime(unix_timestamp()-$sec)");
225         $sth->execute();
226         if (!$print) {
227                 print $sth->rows()."\n";
228                 }
229         else {
230                 while (my $row=$sth->fetchrow_hashref()) {
231                         print $row->{"id"},"\n";
232                         }
233                 }
234 }
235
236 my $optwrap_err;
237 sub optwrap
238 {
239 my($func,@args)=@_;
240
241         # Prevent successful return due to --forkoff in the case of failed --store when using:
242         # perlmail-submit --store --forkoff --submit
243         if (!eval { &{$func}(@args); 1; }) {
244                 $optwrap_err||=$@||$!;
245                 die "!FINISH";
246                 die "NOTREACHED";
247                 }
248 }
249
250 $Getopt::Long::ignorecase=0;
251 # &GetOptions will return success due to: die "!FINISH"
252 # but our error detection is done by $optwrap_err.
253 GetOptions(
254                   "initdb" ,sub { optwrap \&initdb,@_; },
255                   "store"  ,sub { optwrap \&store,@_; },
256                   "forkoff",sub { optwrap \&forkoff,@_; },
257                   "submit" ,sub { optwrap \&submit,@_; },
258                   "pending",sub { optwrap \&pending,@_; },
259                   "dump"   ,sub { optwrap \&dump,@_; },
260                   "clean:s",sub { optwrap \&clean,@_; },
261                 "V|version",sub { print "perlmail-submit: $VERSION\n"; exit 0; },
262                 );
263 die $optwrap_err if defined $optwrap_err;
264 exit 0;