#! /usr/bin/perl # # $Id$ # Copyright (C) 2002-2003 Jan Kratochvil # # This program is free software; you can redistribute it and/or modify # it under the terms of the GNU General Public License as published by # the Free Software Foundation; either version 2 of the License, or # (at your option) any later version. # # This program is distributed in the hope that it will be useful, # but WITHOUT ANY WARRANTY; without even the implied warranty of # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the # GNU General Public License for more details. # # You should have received a copy of the GNU General Public License # along with this program; if not, write to the Free Software # Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA use vars qw($VERSION); $VERSION=do { my @r=(q$Revision$=~/\d+/g); sprintf "%d.".("%03d"x$#r),@r; }; use strict; use warnings; use File::Basename; BEGIN { use lib $ENV{"PERLMAIL_BASEDIR"} || File::Basename::dirname($0); use PerlMail::Config; } use Getopt::Long; use DBI; use Carp qw(cluck confess); require IO::Socket::INET; use IO::Handle; use POSIX qw(mktime); use Fcntl qw(:flock); open DBI_PWD,$DBI_pwd or die "open \"$DBI_pwd\": $!"; $DBI_pwd=; close DBI_PWD or warn "close DBI_pwd: $!"; chomp $DBI_pwd; my $DBI; sub DBI { return $DBI if $DBI; $DBI=DBI->connect("DBI:mysql:database=$DBI_database;host=","$DBI_user",$DBI_pwd,{ "PrintError"=>0, # handled by "RaiseError" below "RaiseError"=>1, "ShowErrorStatement"=>1, "AutoCommit"=>1, }) or confess "Failed DBI->connect(): $!"; return $DBI; } # $name,@$cols sub create_table { my($name,$cols)=@_; eval { DBI()->do("drop table $name"); }; DBI()->do("create table $name (".join(",",@$cols).")"); } sub initdb { create_table($DB_table,[ "id int not null auto_increment primary key", "time timestamp not null", # assume ." default now()" "message longtext not null", "retries int null default 0", # null=>done, 0=not yet tried to submit ], ); DBI()->do("alter table $DB_table add index (retries,id)"); print "done.\n"; exit 0; } sub store { my $message; { local $/; $message=; } close STDIN or cluck "close STDIN: $!"; my %row=( "message"=>$message, # assume "retries"=>0, ); my $prep=DBI()->prepare("insert into $DB_table (".join(",",keys(%row)).")" ." values (".join(",",map("?",keys(%row))).")"); $prep->execute(values(%row)); print $prep->{"mysql_insertid"}."\n"; } sub forkoff { my $pid=fork(); confess if !defined $pid; $DBI=undef(); # Prevent: Server has gone away exit 0 if $pid; # parent # child } my $submitonce_run=0; sub submitonce { $submitonce_run++; local *LOCK; open LOCK,">>$Lock_pathname" or die "open-append \"$Lock_pathname\": $!"; if (!flock LOCK,LOCK_EX|LOCK_NB) { # NEVER unlink here, we are not the lock owning process! print "LOCKED\n"; exit 0; } my $sth=DBI()->prepare("select id,message from $DB_table where retries is not null" # process only non-problematic mails during rerun .($submitonce_run==1 ? "" : " and retries=0") ." order by retries asc,id asc"); $sth->execute(); my $progresschar=""; autoflush STDOUT 1; my $sock; while (my $row=$sth->fetchrow_hashref()) { DBI()->do("update $DB_table set retries=retries+1 where id=".$row->{"id"}); if (!$sock) { $sock=IO::Socket::INET->new( "PeerAddr"=>$PeerAddr, "Proto" =>"tcp", ) or confess "IO::Socket::INET->new(\"$PeerAddr\"): $!"; $sock->connected() or confess "socket not connected"; } $sock->printflush(length($row->{"message"})."\n".$row->{"message"}); alarm $Socket_timeout and $sock->timeout($Socket_timeout) if $Socket_timeout; my $got; my $gotlen=$sock->sysread($got,1); confess $row->{"id"}.": sysread(1)=".(!defined $gotlen ? "undef" : $gotlen).": $!" if !defined($gotlen) || $gotlen!=1; alarm 0; if ($got ne "1") { # Prevent mailing errors from cron invoking us etc. #print STDERR "FAIL:".$row->{"id"}."\n"; undef $sock; } else { DBI()->do("update $DB_table set retries=null where id=".$row->{"id"}); } print $progresschar.$row->{"id"}.($got eq "1" ? "" : "=FAIL"); $progresschar=","; } if ($sock) { $sock->shutdown(0); # stopped reading $sock->printflush("BYE\n"); $sock->shutdown(2); # stopped using undef $sock; } print "\n" if $progresschar; unlink $Lock_pathname; close LOCK; return $progresschar; } sub submit { 1 while submitonce(); } sub print_messages { my($cond)=@_; my $sth=DBI()->prepare("select message from $DB_table $cond order by id"); $sth->execute(); while (my $row=$sth->fetchrow_hashref()) { print $row->{"message"},"\n"; } } sub pending { print_messages("where retries is not null"); } sub dump { print_messages(""); } sub clean { my($keyword,$interval)=@_; # FIXME: SQL "now()" is raced against the block above my $sth=DBI()->prepare("select id,time,retries from $DB_table where time>now()"); $sth->execute(); while (my $row=$sth->fetchrow_hashref()) { warn "Message time in future: ".join(",",map( "$_=".(!defined $row->{$_} ? "NULL" : $row->{$_}) ,keys(%$row))); } return if $interval eq ""; local $_=$interval; my $print=s/^print://; s/(\d+)y/($1*12)."m"/ge; s/(\d+)m/($1*30)."d"/ge; s/(\d+)d/($1*24)."h"/ge; s/(\d+)h/($1*60)."M"/ge; s/(\d+)M/($1*60)."s"/ge; my $sec=0; $sec+=$1 while s/(\d+)s//g; die "Interval parse error; left \"$_\", parsed: $interval" if $_ ne ""; $sth=DBI()->prepare(($print ? "select id" : "delete") ." from $DB_table where retries is null and timeexecute(); if (!$print) { print $sth->rows()."\n"; } else { while (my $row=$sth->fetchrow_hashref()) { print $row->{"id"},"\n"; } } } $Getopt::Long::ignorecase=0; die if !GetOptions( "initdb" ,\&initdb, "store" ,\&store, "forkoff",\&forkoff, "submit" ,\&submit, "pending",\&pending, "dump" ,\&dump, "clean:s",\&clean, "V|version",sub { print "perlmail-submit: $VERSION\n"; exit 0; }, ); exit 0;