$Socket_timeout: 15->600
[PerlMail.git] / perlmail-submit
1 #! /usr/bin/perl
2 #
3 #       $Id$
4
5 use vars qw($VERSION);
6 $VERSION=do { my @r=(q$Revision$=~/\d+/g); sprintf "%d.".("%03d"x$#r),@r; };
7 use strict;
8 use warnings;
9
10 use Getopt::Long;
11 use DBI;
12 use Carp qw(cluck confess);
13 require IO::Socket::INET;
14 use IO::Handle;
15 use POSIX qw(mktime);
16 use Fcntl qw(:flock);
17
18
19 my $Lock_pathname="/tmp/LaceMail.lock";
20 my $PeerAddr="dejhome.dyn.jankratochvil.net.:852";
21 my $Socket_timeout=600; # 15sec is NOT enough!
22 my $DB_table="LaceMail_folder";
23 my $DBI_database="short";
24 my $DBI_user="short";
25 my $DBI_pwd=$ENV{"HOME"}."/priv/mysql.".$DBI_user.".pwd";
26 open DBI_PWD,$DBI_pwd or die "open \"$DBI_pwd\": $!";
27 $DBI_pwd=<DBI_PWD>;
28 close DBI_PWD or warn "close DBI_pwd: $!";
29 chomp $DBI_pwd;
30
31 my $DBI=DBI->connect_cached("DBI:mysql:database=$DBI_database;host=","$DBI_user",$DBI_pwd,{
32                 "PrintError"=>0,        # handled by "RaiseError" below
33                 "RaiseError"=>1,
34                 "ShowErrorStatement"=>1,
35                 "AutoCommit"=>1,
36                 }) or confess "Failed DBI->connect(): $!";
37
38 # $name,@$cols
39 sub create_table
40 {
41 my($name,$cols)=@_;
42
43         eval { $DBI->do("drop table $name"); };
44         $DBI->do("create table $name (".join(",",@$cols).")");
45 }
46
47 sub initdb
48 {
49         create_table($DB_table,[
50                                         "id int not null auto_increment primary key",
51                                         "time timestamp not null",      # assume ." default now()"
52                                         "message longtext not null",
53                                         "retries int null default 0",   # null=>done, 0=not yet tried to submit
54                                         ],
55                         );
56         $DBI->do("alter table $DB_table add index (retries,id)");
57         print "done.\n";
58         exit 0;
59 }
60
61 sub store
62 {
63         my $message;
64         {
65                 local $/;
66                 $message=<STDIN>;
67                 }
68         my %row=(
69                         "message"=>$message,
70                         # assume "retries"=>0,
71                         );
72         my $prep=$DBI->prepare_cached("insert into $DB_table (".join(",",keys(%row)).")"
73                         ." values (".join(",",map("?",keys(%row))).")");
74         $prep->execute(values(%row));
75         print $prep->{"mysql_insertid"}."\n";
76 }
77
78 my $submitonce_run=0;
79 sub submitonce
80 {
81         $submitonce_run++;
82         local *LOCK;
83         open LOCK,">>$Lock_pathname" or die "open-append \"$Lock_pathname\": $!";
84         if (!flock LOCK,LOCK_EX|LOCK_NB) {
85                 # NEVER unlink here, we are not the lock owning process!
86                 print "LOCKED\n";
87                 exit 0;
88                 }
89         my $sth=$DBI->prepare("select id,message from $DB_table where retries is not null"
90                         # process only non-problematic mails during rerun
91                         .($submitonce_run==1 ? "" : " and retries=0")
92                         ." order by retries asc,id asc");
93         $sth->execute();
94         my $progresschar="";
95         autoflush STDOUT 1;
96         my $sock;
97         while (my $row=$sth->fetchrow_hashref()) {
98                 $DBI->do("update $DB_table set retries=retries+1 where id=".$row->{"id"});
99                 if (!$sock) {
100                         $sock=IO::Socket::INET->new(
101                                         "PeerAddr"=>$PeerAddr,
102                                         "Proto"   =>"tcp",
103                                         ) or confess "IO::Socket::INET->new(\"$PeerAddr\"): $!";
104                         $sock->connected() or confess "socket not connected";
105                         }
106                 $sock->printflush(length($row->{"message"})."\n".$row->{"message"});
107                 alarm $Socket_timeout and $sock->timeout($Socket_timeout) if $Socket_timeout;
108                 my $got;
109                 my $gotlen=$sock->sysread($got,1);
110                 confess $row->{"id"}.": sysread(1)=".(!defined $gotlen ? "undef" : $gotlen).": $!"
111                                 if !defined($gotlen) || $gotlen!=1;
112                 alarm 0;
113                 if ($got ne "1") {
114                         # Prevent mailing errors from cron invoking us etc.
115                         #print STDERR "FAIL:".$row->{"id"}."\n";
116                         undef $sock;
117                         }
118                 else {
119                         $DBI->do("update $DB_table set retries=null where id=".$row->{"id"});
120                         }
121                 print $progresschar.$row->{"id"}.($got eq "1" ? "" : "=FAIL");
122                 $progresschar=",";
123                 }
124         if ($sock) {
125                 $sock->shutdown(0);     # stopped reading
126                 $sock->printflush("BYE\n");
127                 $sock->shutdown(2);     # stopped using
128                 undef $sock;
129                 }
130         print "\n" if $progresschar;
131         unlink $Lock_pathname;
132         close LOCK;
133         return $progresschar;
134 }
135
136 sub submit
137 {
138         1 while submitonce();
139 }
140
141 sub pending
142 {
143         my $sth=$DBI->prepare("select message from $DB_table where retries is not null order by id");
144         $sth->execute();
145         while (my $row=$sth->fetchrow_hashref()) {
146                 print $row->{"message"},"\n";
147                 }
148 }
149
150
151 $Getopt::Long::ignorecase=0;
152 die if !GetOptions(
153                   "initdb" ,\&initdb,
154                   "store"  ,\&store,
155                   "submit" ,\&submit,
156                   "pending",\&pending,
157                 "V|version",sub { print "lacemail-submit: $VERSION\n"; exit 0; },
158                 );
159 exit 0;