First production release
[PerlMail.git] / perlmail-submit
diff --git a/perlmail-submit b/perlmail-submit
new file mode 100755 (executable)
index 0000000..2901153
--- /dev/null
@@ -0,0 +1,159 @@
+#! /usr/bin/perl
+#
+#      $Id$
+
+use vars qw($VERSION);
+$VERSION=do { my @r=(q$Revision$=~/\d+/g); sprintf "%d.".("%03d"x$#r),@r; };
+use strict;
+use warnings;
+
+use Getopt::Long;
+use DBI;
+use Carp qw(cluck confess);
+require IO::Socket::INET;
+use IO::Handle;
+use POSIX qw(mktime);
+use Fcntl qw(:flock);
+
+
+my $Lock_pathname="/tmp/LaceMail.lock";
+my $PeerAddr="dejhome.dyn.jankratochvil.net.:852";
+my $Socket_timeout=15;
+my $DB_table="LaceMail_folder";
+my $DBI_database="short";
+my $DBI_user="short";
+my $DBI_pwd=$ENV{"HOME"}."/priv/mysql.".$DBI_user.".pwd";
+open DBI_PWD,$DBI_pwd or die "open \"$DBI_pwd\": $!";
+$DBI_pwd=<DBI_PWD>;
+close DBI_PWD or warn "close DBI_pwd: $!";
+chomp $DBI_pwd;
+
+my $DBI=DBI->connect_cached("DBI:mysql:database=$DBI_database;host=","$DBI_user",$DBI_pwd,{
+               "PrintError"=>0,        # handled by "RaiseError" below
+               "RaiseError"=>1,
+               "ShowErrorStatement"=>1,
+               "AutoCommit"=>1,
+               }) or confess "Failed DBI->connect(): $!";
+
+# $name,@$cols
+sub create_table
+{
+my($name,$cols)=@_;
+
+       eval { $DBI->do("drop table $name"); };
+       $DBI->do("create table $name (".join(",",@$cols).")");
+}
+
+sub initdb
+{
+       create_table($DB_table,[
+                                       "id int not null auto_increment primary key",
+                                       "time timestamp not null",      # assume ." default now()"
+                                       "message longtext not null",
+                                       "retries int null default 0",   # null=>done, 0=not yet tried to submit
+                                       ],
+                       );
+       $DBI->do("alter table $DB_table add index (retries,id)");
+       print "done.\n";
+       exit 0;
+}
+
+sub store
+{
+       my $message;
+       {
+               local $/;
+               $message=<STDIN>;
+               }
+       my %row=(
+                       "message"=>$message,
+                       # assume "retries"=>0,
+                       );
+       my $prep=$DBI->prepare_cached("insert into $DB_table (".join(",",keys(%row)).")"
+                       ." values (".join(",",map("?",keys(%row))).")");
+       $prep->execute(values(%row));
+       print $prep->{"mysql_insertid"}."\n";
+}
+
+my $submitonce_run=0;
+sub submitonce
+{
+       $submitonce_run++;
+       local *LOCK;
+       open LOCK,">>$Lock_pathname" or die "open-append \"$Lock_pathname\": $!";
+       if (!flock LOCK,LOCK_EX|LOCK_NB) {
+               # NEVER unlink here, we are not the lock owning process!
+               print "LOCKED\n";
+               exit 0;
+               }
+       my $sth=$DBI->prepare("select id,message from $DB_table where retries is not null"
+                       # process only non-problematic mails during rerun
+                       .($submitonce_run==1 ? "" : " and retries=0")
+                       ." order by retries asc,id asc");
+       $sth->execute();
+       my $progresschar="";
+       autoflush STDOUT 1;
+       my $sock;
+       while (my $row=$sth->fetchrow_hashref()) {
+               $DBI->do("update $DB_table set retries=retries+1 where id=".$row->{"id"});
+               if (!$sock) {
+                       $sock=IO::Socket::INET->new(
+                                       "PeerAddr"=>$PeerAddr,
+                                       "Proto"   =>"tcp",
+                                       ) or confess "IO::Socket::INET->new(\"$PeerAddr\"): $!";
+                       $sock->connected() or confess "socket not connected";
+                       }
+               $sock->printflush(length($row->{"message"})."\n".$row->{"message"});
+               alarm $Socket_timeout and $sock->timeout($Socket_timeout) if $Socket_timeout;
+               my $got;
+               my $gotlen=$sock->sysread($got,1);
+               confess $row->{"id"}.": sysread(1)=".(!defined $gotlen ? "undef" : $gotlen).": $!"
+                               if !defined($gotlen) || $gotlen!=1;
+               alarm 0;
+               if ($got ne "1") {
+                       # Prevent mailing errors from cron invoking us etc.
+                       #print STDERR "FAIL:".$row->{"id"}."\n";
+                       undef $sock;
+                       }
+               else {
+                       $DBI->do("update $DB_table set retries=null where id=".$row->{"id"});
+                       }
+               print $progresschar.$row->{"id"}.($got eq "1" ? "" : "=FAIL");
+               $progresschar=",";
+               }
+       if ($sock) {
+               $sock->shutdown(0);     # stopped reading
+               $sock->printflush("BYE\n");
+               $sock->shutdown(2);     # stopped using
+               undef $sock;
+               }
+       print "\n" if $progresschar;
+       unlink $Lock_pathname;
+       close LOCK;
+       return $progresschar;
+}
+
+sub submit
+{
+       1 while submitonce();
+}
+
+sub pending
+{
+       my $sth=$DBI->prepare("select message from $DB_table where state='pending' order by id");
+       $sth->execute();
+       while (my $row=$sth->fetchrow_hashref()) {
+               print $row->{"message"},"\n";
+               }
+}
+
+
+$Getopt::Long::ignorecase=0;
+die if !GetOptions(
+                 "initdb" ,\&initdb,
+                 "store"  ,\&store,
+                 "submit" ,\&submit,
+                 "pending",\&pending,
+               "V|version",sub { print "lacemail-submit: $VERSION\n"; exit 0; },
+               );
+exit 0;