rename
[PerlMail.git] / perlmail-submit
1 #! /usr/bin/perl
2 #
3 #       $Id$
4
5 use vars qw($VERSION);
6 $VERSION=do { my @r=(q$Revision$=~/\d+/g); sprintf "%d.".("%03d"x$#r),@r; };
7 use strict;
8 use warnings;
9
10 use Getopt::Long;
11 use DBI;
12 use Carp qw(cluck confess);
13 require IO::Socket::INET;
14 use IO::Handle;
15 use POSIX qw(mktime);
16 use Fcntl qw(:flock);
17
18
19 my $Lock_pathname="/tmp/PerlMail.lock";
20 #my $PeerAddr="dejhome.dyn.jankratochvil.net.:852";
21 my $PeerAddr="127.0.0.1:2852";
22 my $Socket_timeout=7600;       # 15sec is NOT enough!
23 my $DB_table="PerlMail_folder";
24 my $DBI_database="short";
25 my $DBI_user="short";
26 my $DBI_pwd=$ENV{"HOME"}."/priv/mysql.".$DBI_user.".pwd";
27 open DBI_PWD,$DBI_pwd or die "open \"$DBI_pwd\": $!";
28 $DBI_pwd=<DBI_PWD>;
29 close DBI_PWD or warn "close DBI_pwd: $!";
30 chomp $DBI_pwd;
31
32 my $DBI=DBI->connect_cached("DBI:mysql:database=$DBI_database;host=","$DBI_user",$DBI_pwd,{
33                 "PrintError"=>0,        # handled by "RaiseError" below
34                 "RaiseError"=>1,
35                 "ShowErrorStatement"=>1,
36                 "AutoCommit"=>1,
37                 }) or confess "Failed DBI->connect(): $!";
38
39 # $name,@$cols
40 sub create_table
41 {
42 my($name,$cols)=@_;
43
44         eval { $DBI->do("drop table $name"); };
45         $DBI->do("create table $name (".join(",",@$cols).")");
46 }
47
48 sub initdb
49 {
50         create_table($DB_table,[
51                                         "id int not null auto_increment primary key",
52                                         "time timestamp not null",      # assume ." default now()"
53                                         "message longtext not null",
54                                         "retries int null default 0",   # null=>done, 0=not yet tried to submit
55                                         ],
56                         );
57         $DBI->do("alter table $DB_table add index (retries,id)");
58         print "done.\n";
59         exit 0;
60 }
61
62 sub store
63 {
64         my $message;
65         {
66                 local $/;
67                 $message=<STDIN>;
68                 }
69         close STDIN or cluck "close STDIN: $!";
70         my %row=(
71                         "message"=>$message,
72                         # assume "retries"=>0,
73                         );
74         my $prep=$DBI->prepare_cached("insert into $DB_table (".join(",",keys(%row)).")"
75                         ." values (".join(",",map("?",keys(%row))).")");
76         $prep->execute(values(%row));
77         print $prep->{"mysql_insertid"}."\n";
78 }
79
80 sub forkoff
81 {
82         my $pid=fork();
83         confess if !defined $pid;
84         exit 0 if $pid; # parent
85         # child
86 }
87
88 my $submitonce_run=0;
89 sub submitonce
90 {
91         $submitonce_run++;
92         local *LOCK;
93         open LOCK,">>$Lock_pathname" or die "open-append \"$Lock_pathname\": $!";
94         if (!flock LOCK,LOCK_EX|LOCK_NB) {
95                 # NEVER unlink here, we are not the lock owning process!
96                 print "LOCKED\n";
97                 exit 0;
98                 }
99         my $sth=$DBI->prepare("select id,message from $DB_table where retries is not null"
100                         # process only non-problematic mails during rerun
101                         .($submitonce_run==1 ? "" : " and retries=0")
102                         ." order by retries asc,id asc");
103         $sth->execute();
104         my $progresschar="";
105         autoflush STDOUT 1;
106         my $sock;
107         while (my $row=$sth->fetchrow_hashref()) {
108                 $DBI->do("update $DB_table set retries=retries+1 where id=".$row->{"id"});
109                 if (!$sock) {
110                         $sock=IO::Socket::INET->new(
111                                         "PeerAddr"=>$PeerAddr,
112                                         "Proto"   =>"tcp",
113                                         ) or confess "IO::Socket::INET->new(\"$PeerAddr\"): $!";
114                         $sock->connected() or confess "socket not connected";
115                         }
116                 $sock->printflush(length($row->{"message"})."\n".$row->{"message"});
117                 alarm $Socket_timeout and $sock->timeout($Socket_timeout) if $Socket_timeout;
118                 my $got;
119                 my $gotlen=$sock->sysread($got,1);
120                 confess $row->{"id"}.": sysread(1)=".(!defined $gotlen ? "undef" : $gotlen).": $!"
121                                 if !defined($gotlen) || $gotlen!=1;
122                 alarm 0;
123                 if ($got ne "1") {
124                         # Prevent mailing errors from cron invoking us etc.
125                         #print STDERR "FAIL:".$row->{"id"}."\n";
126                         undef $sock;
127                         }
128                 else {
129                         $DBI->do("update $DB_table set retries=null where id=".$row->{"id"});
130                         }
131                 print $progresschar.$row->{"id"}.($got eq "1" ? "" : "=FAIL");
132                 $progresschar=",";
133                 }
134         if ($sock) {
135                 $sock->shutdown(0);     # stopped reading
136                 $sock->printflush("BYE\n");
137                 $sock->shutdown(2);     # stopped using
138                 undef $sock;
139                 }
140         print "\n" if $progresschar;
141         unlink $Lock_pathname;
142         close LOCK;
143         return $progresschar;
144 }
145
146 sub submit
147 {
148         1 while submitonce();
149 }
150
151 sub pending
152 {
153         my $sth=$DBI->prepare("select message from $DB_table where retries is not null order by id");
154         $sth->execute();
155         while (my $row=$sth->fetchrow_hashref()) {
156                 print $row->{"message"},"\n";
157                 }
158 }
159
160 sub clean
161 {
162 my($keyword,$interval)=@_;
163
164         # FIXME: SQL "now()" is raced against the block above
165         my $sth=$DBI->prepare("select id,time,retries from $DB_table where time>now()");
166         $sth->execute();
167         while (my $row=$sth->fetchrow_hashref()) {
168                 warn "Message time in future: ".join(",",map(
169                                 "$_=".(!defined $row->{$_} ? "NULL" : $row->{$_})
170                                 ,keys(%$row)));
171                 }
172
173         return if $interval eq "";
174         local $_=$interval;
175         my $print=s/^print://;
176         s/(\d+)y/($1*12)."m"/ge;
177         s/(\d+)m/($1*30)."d"/ge;
178         s/(\d+)d/($1*24)."h"/ge;
179         s/(\d+)h/($1*60)."M"/ge;
180         s/(\d+)M/($1*60)."s"/ge;
181         my $sec=0;
182         $sec+=$1 while s/(\d+)s//g;
183         die "Interval parse error; left \"$_\", parsed: $interval" if $_ ne "";
184         $sth=$DBI->prepare(($print ? "select id" : "delete")
185                         ." from $DB_table where retries is null and time<from_unixtime(unix_timestamp()-$sec)");
186         $sth->execute();
187         if (!$print) {
188                 print $sth->rows()."\n";
189                 }
190         else {
191                 while (my $row=$sth->fetchrow_hashref()) {
192                         print $row->{"id"},"\n";
193                         }
194                 }
195 }
196
197
198 $Getopt::Long::ignorecase=0;
199 die if !GetOptions(
200                   "initdb" ,\&initdb,
201                   "store"  ,\&store,
202                   "forkoff",\&forkoff,
203                   "submit" ,\&submit,
204                   "pending",\&pending,
205                   "clean:s",\&clean,
206                 "V|version",sub { print "perlmail-submit: $VERSION\n"; exit 0; },
207                 );
208 exit 0;