SMS aliasing rewritten
[PerlMail.git] / perlmail-submit
1 #! /usr/bin/perl
2 #
3 #       $Id$
4
5 use vars qw($VERSION);
6 $VERSION=do { my @r=(q$Revision$=~/\d+/g); sprintf "%d.".("%03d"x$#r),@r; };
7 use strict;
8 use warnings;
9
10 use Getopt::Long;
11 use DBI;
12 use Carp qw(cluck confess);
13 require IO::Socket::INET;
14 use IO::Handle;
15 use POSIX qw(mktime);
16 use Fcntl qw(:flock);
17
18 my $ExitCode;
19 END {
20         exit $ExitCode if defined $ExitCode;
21         }
22
23 my $Lock_pathname="/tmp/LaceMail.lock";
24 my $PeerAddr="dejhome.dyn.jankratochvil.net.:852";
25 my $Socket_timeout=600; # 15sec is NOT enough!
26 my $DB_table="LaceMail_folder";
27 my $DBI_database="short";
28 my $DBI_user="short";
29 my $DBI_pwd=$ENV{"HOME"}."/priv/mysql.".$DBI_user.".pwd";
30 open DBI_PWD,$DBI_pwd or die "open \"$DBI_pwd\": $!";
31 $DBI_pwd=<DBI_PWD>;
32 close DBI_PWD or warn "close DBI_pwd: $!";
33 chomp $DBI_pwd;
34
35 my $DBI=DBI->connect_cached("DBI:mysql:database=$DBI_database;host=","$DBI_user",$DBI_pwd,{
36                 "PrintError"=>0,        # handled by "RaiseError" below
37                 "RaiseError"=>1,
38                 "ShowErrorStatement"=>1,
39                 "AutoCommit"=>1,
40                 }) or confess "Failed DBI->connect(): $!";
41
42 # $name,@$cols
43 sub create_table
44 {
45 my($name,$cols)=@_;
46
47         eval { $DBI->do("drop table $name"); };
48         $DBI->do("create table $name (".join(",",@$cols).")");
49 }
50
51 sub initdb
52 {
53         create_table($DB_table,[
54                                         "id int not null auto_increment primary key",
55                                         "time timestamp not null",      # assume ." default now()"
56                                         "message longtext not null",
57                                         "retries int null default 0",   # null=>done, 0=not yet tried to submit
58                                         ],
59                         );
60         $DBI->do("alter table $DB_table add index (retries,id)");
61         print "done.\n";
62         exit 0;
63 }
64
65 sub store
66 {
67         my $message;
68         {
69                 local $/;
70                 $message=<STDIN>;
71                 }
72         my %row=(
73                         "message"=>$message,
74                         # assume "retries"=>0,
75                         );
76         my $prep=$DBI->prepare_cached("insert into $DB_table (".join(",",keys(%row)).")"
77                         ." values (".join(",",map("?",keys(%row))).")");
78         $prep->execute(values(%row));
79         print $prep->{"mysql_insertid"}."\n";
80         $ExitCode=0;    # we will succeed even if --submit fails
81 }
82
83 my $submitonce_run=0;
84 sub submitonce
85 {
86         $submitonce_run++;
87         local *LOCK;
88         open LOCK,">>$Lock_pathname" or die "open-append \"$Lock_pathname\": $!";
89         if (!flock LOCK,LOCK_EX|LOCK_NB) {
90                 # NEVER unlink here, we are not the lock owning process!
91                 print "LOCKED\n";
92                 exit 0;
93                 }
94         my $sth=$DBI->prepare("select id,message from $DB_table where retries is not null"
95                         # process only non-problematic mails during rerun
96                         .($submitonce_run==1 ? "" : " and retries=0")
97                         ." order by retries asc,id asc");
98         $sth->execute();
99         my $progresschar="";
100         autoflush STDOUT 1;
101         my $sock;
102         while (my $row=$sth->fetchrow_hashref()) {
103                 $DBI->do("update $DB_table set retries=retries+1 where id=".$row->{"id"});
104                 if (!$sock) {
105                         $sock=IO::Socket::INET->new(
106                                         "PeerAddr"=>$PeerAddr,
107                                         "Proto"   =>"tcp",
108                                         ) or confess "IO::Socket::INET->new(\"$PeerAddr\"): $!";
109                         $sock->connected() or confess "socket not connected";
110                         }
111                 $sock->printflush(length($row->{"message"})."\n".$row->{"message"});
112                 alarm $Socket_timeout and $sock->timeout($Socket_timeout) if $Socket_timeout;
113                 my $got;
114                 my $gotlen=$sock->sysread($got,1);
115                 confess $row->{"id"}.": sysread(1)=".(!defined $gotlen ? "undef" : $gotlen).": $!"
116                                 if !defined($gotlen) || $gotlen!=1;
117                 alarm 0;
118                 if ($got ne "1") {
119                         # Prevent mailing errors from cron invoking us etc.
120                         #print STDERR "FAIL:".$row->{"id"}."\n";
121                         undef $sock;
122                         }
123                 else {
124                         $DBI->do("update $DB_table set retries=null where id=".$row->{"id"});
125                         }
126                 print $progresschar.$row->{"id"}.($got eq "1" ? "" : "=FAIL");
127                 $progresschar=",";
128                 }
129         if ($sock) {
130                 $sock->shutdown(0);     # stopped reading
131                 $sock->printflush("BYE\n");
132                 $sock->shutdown(2);     # stopped using
133                 undef $sock;
134                 }
135         print "\n" if $progresschar;
136         unlink $Lock_pathname;
137         close LOCK;
138         return $progresschar;
139 }
140
141 sub submit
142 {
143         1 while submitonce();
144 }
145
146 sub pending
147 {
148         my $sth=$DBI->prepare("select message from $DB_table where retries is not null order by id");
149         $sth->execute();
150         while (my $row=$sth->fetchrow_hashref()) {
151                 print $row->{"message"},"\n";
152                 }
153 }
154
155 sub clean
156 {
157 my($keyword,$interval)=@_;
158
159         # FIXME: SQL "now()" is raced against the block above
160         my $sth=$DBI->prepare("select id,time,retries from $DB_table where time>now()");
161         $sth->execute();
162         while (my $row=$sth->fetchrow_hashref()) {
163                 warn "Message time in future: ".join(",",map(
164                                 "$_=".(!defined $row->{$_} ? "NULL" : $row->{$_})
165                                 ,keys(%$row)));
166                 }
167
168         return if $interval eq "";
169         local $_=$interval;
170         my $print=s/^print://;
171         s/(\d+)y/($1*12)."m"/ge;
172         s/(\d+)m/($1*30)."d"/ge;
173         s/(\d+)d/($1*24)."h"/ge;
174         s/(\d+)h/($1*60)."M"/ge;
175         s/(\d+)M/($1*60)."s"/ge;
176         my $sec=0;
177         $sec+=$1 while s/(\d+)s//g;
178         die "Interval parse error; left \"$_\", parsed: $interval" if $_ ne "";
179         $sth=$DBI->prepare(($print ? "select id" : "delete")
180                         ." from $DB_table where retries is null and time<from_unixtime(unix_timestamp()-$sec)");
181         $sth->execute();
182         if (!$print) {
183                 print $sth->rows()."\n";
184                 }
185         else {
186                 while (my $row=$sth->fetchrow_hashref()) {
187                         print $row->{"id"},"\n";
188                         }
189                 }
190 }
191
192
193 $Getopt::Long::ignorecase=0;
194 die if !GetOptions(
195                   "initdb" ,\&initdb,
196                   "store"  ,\&store,
197                   "submit" ,\&submit,
198                   "pending",\&pending,
199                   "clean:s",\&clean,
200                 "V|version",sub { print "lacemail-submit: $VERSION\n"; exit 0; },
201                 );
202 exit 0;