Initial import.
authorlace <>
Mon, 26 Mar 2007 22:49:00 +0000 (22:49 +0000)
committerlace <>
Mon, 26 Mar 2007 22:49:00 +0000 (22:49 +0000)
tcpoverudp [new file with mode: 0755]

diff --git a/tcpoverudp b/tcpoverudp
new file mode 100755 (executable)
index 0000000..8e66659
--- /dev/null
@@ -0,0 +1,419 @@
+#! /usr/bin/perl
+#
+# $Id$
+# Copyright (C) 2004-2007 Jan Kratochvil <project-tcpoverudp@jankratochvil.net>
+# 
+# This program is free software; you can redistribute it and/or modify
+# it under the terms of the GNU General Public License as published by
+# the Free Software Foundation; exactly version 2 of June 1991 is required
+# 
+# This program is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
+# GNU General Public License for more details.
+# 
+# You should have received a copy of the GNU General Public License
+# along with this program; if not, write to the Free Software
+# Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA  02111-1307  USA
+
+
+use strict;
+use warnings;
+use Getopt::Long;
+require IO::Socket::INET;
+use Fcntl;
+use Carp qw(cluck confess);
+use Socket;
+use Time::HiRes qw(time);
+
+
+my $READ_SIZE=256;     # 96kbit = 47 x 256B
+my $MAX_UNACKED=8;
+
+my $V=1;
+$|=1;
+for (qw(PIPE)) {
+       $SIG{$_}=eval "sub { cluck 'INFO: Got signal SIG$_'; };";
+}
+
+my $D;
+my $opt_udp_listen_port;
+my $opt_udp_server_addr;
+my $opt_udp_server_port;
+my @opt_tcp_listen_port;
+my @opt_tcp_forward_addr;
+my @opt_tcp_forward_port;
+my $opt_timeout=0.1;
+my $opt_recvloss=0;
+die if !GetOptions(
+                 "udp-listen-port=s",\$opt_udp_listen_port,
+                 "udp-server-addr=s",\$opt_udp_server_addr,
+                 "udp-server-port=s",\$opt_udp_server_port,
+                 "tcp-listen-port=s{,}",\@opt_tcp_listen_port,
+                 "tcp-forward-addr=s{,}",\@opt_tcp_forward_addr,
+                 "tcp-forward-port=s{,}",\@opt_tcp_forward_port,
+               "t|timeout=s",\$opt_timeout,
+                 "recvloss=s",\$opt_recvloss,
+               "d|debug+",\$D,
+               );
+
+die "udp-server- addr/port inconsistency" if !$opt_udp_server_addr != !$opt_udp_server_port;
+die "udp- listen/sever port inconsistency" if !$opt_udp_listen_port == !$opt_udp_server_port;
+die "tcp-forward- addr/port inconsistency" if !@opt_tcp_forward_addr != !@opt_tcp_forward_port;
+die "tcp- listen/forward port inconsistency" if !@opt_tcp_listen_port == !@opt_tcp_forward_port;
+die "udp vs. tcp inconsistency" if !$opt_udp_listen_port == !@opt_tcp_listen_port;
+
+my @sock_tcp;
+for my $tcp_listen_port (@opt_tcp_listen_port) {
+       my $sock_tcp=IO::Socket::INET->new(
+               LocalPort=>$tcp_listen_port,
+               Proto=>"tcp",
+               Listen=>5,
+               ReuseAddr=>1,
+       ) or die "socket(): $!";
+       push @sock_tcp,$sock_tcp;
+}
+
+my $sock_udp;
+if ($opt_udp_listen_port) {
+       $sock_udp=IO::Socket::INET->new(
+               Proto=>"udp",
+               LocalPort=>$opt_udp_listen_port,
+       ) or die "socket(): $!";
+} else {
+       $sock_udp=IO::Socket::INET->new(
+               Proto=>"udp",
+               PeerAddr=>$opt_udp_server_addr,
+               PeerPort=>$opt_udp_server_port,
+       ) or die "socket(): $!";
+}
+
+sub id_new()
+{
+       our $id;
+       $id||=0;
+       return $id++;
+}
+
+my %stats;
+sub stats($)
+{
+       my($name)=@_;
+
+       $stats{$name}++;
+       our $last;
+       $last||=time();
+       my $now=time();
+       return if $now<$last+1 && !$D;
+       $last=$now;
+       print join(" ","stats:",map(("$_=".$stats{$_}),sort keys(%stats))).($D ? "\r" : "\r");
+}
+
+my $peer_addr;
+my $MAGIC=0x56319EA6;
+
+sub sendpkt($;$)
+{
+       my($data,$stats)=@_;
+
+       if (!$peer_addr) {
+               cluck "Still no peer to send";
+               stats("sentearly");
+               return;
+       }
+       $data=pack "Na*",$MAGIC,$data;
+       if (!send $sock_udp,$data,0,$peer_addr) {
+               cluck "Error sending packet: $!";
+               $stats="senterr";
+       }
+       stats($stats||"sentok");
+}
+
+sub printable($)
+{
+       local $_=$_[0];
+       s/\W/./gs;
+       return $_;
+}
+
+sub seq_new($)
+{
+       my($data)=@_;
+
+       return {
+               "data"=>$data,
+               "timeout"=>time()+$opt_timeout,
+               };
+}
+
+my %sock;
+my %active;
+
+sub sock_new($$$)
+{
+       my($id,$which,$stream)=@_;
+
+       confess if $sock{$id};
+       $active{$id}=$sock{$id}={
+               "id"=>$id,
+               "stream"=>$stream,
+               "which"=>$which,        # for OPEN retransmits
+               "sent_to_udp"=>0,
+               "sent_queue"=>{
+                               0=>seq_new(undef()),
+                       },
+               "acked_to_udp"=>0,
+               "incoming"=>{
+                               # 5=>$udp_data,
+                       },
+       };
+}
+
+my $TYPE_OPEN=0;       # new_id,which
+my $TYPE_SEND=1;       # id,seq,data
+my $TYPE_ACK=2;                # id,seq
+my $TYPE_CLOSE=3;      # id,seq
+
+
+$V and print localtime()." START\n";
+if ($opt_udp_server_port) {
+       my $host=gethostbyname($opt_udp_server_addr) or die "resolving $opt_udp_server_addr: $!";
+       $peer_addr=sockaddr_in($opt_udp_server_port,$host) or die "assembling $opt_udp_server_addr:$opt_udp_server_port";
+       my($back_port,$back_host)=sockaddr_in $peer_addr;
+       $back_host=inet_ntoa $back_host;
+       warn "Peer server: $back_host:$back_port";
+}
+my $earliest;
+for (;;) {
+       my $rfds="";
+       for my $sock_tcp (@sock_tcp) {
+               vec($rfds,fileno($sock_tcp),1)=1;
+       }
+       vec($rfds,fileno($sock_udp),1)=1;
+       for my $hashref (values(%active)) {
+               next if !$hashref->{"stream"};
+               next if keys(%{$hashref->{"sent_queue"}})>=$MAX_UNACKED;
+               vec($rfds,fileno($hashref->{"stream"}),1)=1;
+       }
+       ###warn "select(2)..." if $D;
+       my $periodic_remaining;
+       my $now=time();
+       $periodic_remaining=($earliest>$now ? $earliest-$now : 0) if $earliest;
+       my $got=select $rfds,undef(),undef(),$periodic_remaining;
+       ###warn "got from select." if $D;
+       die "Invalid select(2): ".Dumper($got) if !defined $got || $got<0;
+
+       for my $which (0..$#sock_tcp) {
+               my $sock_tcp=$sock_tcp[$which];
+               next if !vec($rfds,fileno($sock_tcp),1);
+               my $sock_tcp_new;
+               accept $sock_tcp_new,$sock_tcp or confess "Error accepting new TCP socket: $!";
+               my $id=id_new();
+               warn "Accepted new TCP (id=$id)" if $D;
+               my $old=select $sock_tcp_new;
+               $|=1;
+               select $old;
+               sock_new $id,$which,$sock_tcp_new;
+               sendpkt pack("CNN",$TYPE_OPEN,$id,$which);
+               warn "Sent OPEN (id=$id)" if $D;
+       }
+       for my $hashref (values(%active)) {
+               next if !$hashref->{"stream"};
+               my $id=$hashref->{"id"};
+               next if !vec($rfds,fileno($hashref->{"stream"}),1);
+               my $buf;
+               fcntl($hashref->{"stream"},F_SETFL,O_NONBLOCK) or die "fnctl(,F_SETFL,O_NONBLOCK)";
+               my $got=sysread $hashref->{"stream"},$buf,$READ_SIZE;
+               fcntl($hashref->{"stream"},F_SETFL,0)          or die "fnctl(,F_SETFL,0)";
+               #defined($got) or confess "Error reading TCP socket: $!";
+               if (!$got) {
+                       warn "Got TCP EOF/error (id=$id)" if $D;
+                       my $seq=++$hashref->{"sent_to_udp"};
+                       $hashref->{"sent_queue"}{$seq}=seq_new(undef());
+                       sendpkt pack("CNN",$TYPE_CLOSE,$id,$seq);
+                       close $hashref->{"stream"} or confess "Error closing local socket: $!";
+                       delete $hashref->{"stream"};
+                       warn "Sent CLOSE (id=$id,seq=$seq)" if $D;
+               } elsif ($got==length $buf) {
+                       warn "Got TCP data (id=$id,got=$got)" if $D;
+                       my $seq=++$hashref->{"sent_to_udp"};
+                       $hashref->{"sent_queue"}{$seq}=seq_new($buf);
+                       sendpkt pack("CNNa*",$TYPE_SEND,$id,$seq,$buf);
+                       warn "Sent SEND (id=$id,seq=$seq,data=".printable($buf).")" if $D;
+               } else {
+                       confess "Invalid socket read return value: $got";
+               }
+       }
+       if (vec($rfds,fileno($sock_udp),1)) {{
+               my $udp_data;
+               my $got_addr=recv $sock_udp,$udp_data,0x10000,0;
+               if (!$got_addr) {
+                       cluck "Error receiving UDP data: $!";
+                       stats("recverr");
+                       last;
+               }
+               $peer_addr||=$got_addr;
+               if ($got_addr ne $peer_addr) {
+                       my($port,$host)=sockaddr_in $got_addr;
+                       $host=inet_ntoa $host;
+                       cluck "Ignoring packet as from unidentified address: $host:$port";
+                       stats("ufoaddr");
+                       last;
+               }
+               my $try_retry;
+               retry:
+               if ($try_retry) {
+                       $udp_data=$try_retry;
+                       $try_retry=undef();
+               }
+               my $udp_data_orig=$udp_data;
+               my($magic,$type,$id);
+               ($magic,$type,$id,$udp_data)=unpack "NCNa*",$udp_data;
+               if (!$magic || $magic!=$MAGIC) {
+                       stats("badcrc");
+               } elsif (rand() < $opt_recvloss) {
+                       warn "Got type=$type (id=$id) but it got lost" if $D;
+               } elsif ($type==$TYPE_OPEN) {
+                       my($which);
+                       ($which,$udp_data)=unpack "Na*",$udp_data;
+                       warn "Got OPEN (id=$id,which=$which)" if $D;
+                       die if $udp_data;
+                       if (!$sock{$id}) {
+                               my $sock_tcp_new=IO::Socket::INET->new(
+                                       PeerAddr=>$opt_tcp_forward_addr[$which],
+                                       PeerPort=>$opt_tcp_forward_port[$which],
+                                       Proto=>"tcp",
+                               );
+                               if (!$sock_tcp_new) {
+                                       sendpkt pack("CNN",$TYPE_CLOSE,$id,1);
+                                       warn "Refused back OPEN by CLOSE (id=$id,seq=1)" if $D;
+                               } else {
+                                       my $old=select $sock_tcp_new;
+                                       $|=1;
+                                       select $old;
+                                       sock_new $id,$which,$sock_tcp_new;
+                                       stats("openok");
+                               }
+                       }
+                       sendpkt pack("CNN",$TYPE_ACK,$id,0);
+               } elsif ($type==$TYPE_SEND) {
+                       my($seq);
+                       ($seq,$udp_data)=unpack "Na*",$udp_data;
+                       my $hashref=$sock{$id};
+                       if (!$hashref) {
+                               cluck "Got SEND but for nonexisting sock $id";
+                               stats("ufosock");
+                       } else {
+                               warn "Got SEND(id=$id,seq=$seq (acked_to_udp=".$hashref->{"acked_to_udp"}."),data=".printable($udp_data).")" if $D;
+                               if ($hashref->{"acked_to_udp"}+1>$seq) {
+                                       stats("recvdup");
+                               }
+                               if ($hashref->{"acked_to_udp"}+1==$seq) {
+                                       if ($hashref->{"stream"}) {
+                                               if (length($udp_data)==((syswrite $hashref->{"stream"},$udp_data,length($udp_data)) || 0)) {
+                                                       warn "Wrote TCP data (id=$id,acked_to_udp=seq=$seq,data=".printable($udp_data).")" if $D;
+                                               } else {
+                                                       my $seqclose=++$hashref->{"sent_to_udp"};
+                                                       $hashref->{"sent_queue"}{$seqclose}=seq_new(undef());
+                                                       warn "Refusing back OPEN by CLOSE (id=$id,seqclose=$seqclose)" if $D;
+                                                       sendpkt pack("CNN",$TYPE_CLOSE,$id,$seqclose);
+                                               }
+                                       }
+                                       $hashref->{"acked_to_udp"}=$seq;
+                                       stats("recvok");
+                                       warn "In     order - got SEND (id=$id,seq=$seq (acked_to_udp=".$hashref->{"acked_to_udp"}.")" if $D && $D>=2;
+                                       if (($try_retry=$hashref->{"incoming"}{$seq+1})) {
+                                               delete $hashref->{"incoming"}{$seq+1};
+                                               warn "Reinserted, retrying" if $D && $D>=2;
+                                       }
+                               }
+                               if ($hashref->{"acked_to_udp"}+1<$seq) {
+                                       warn "Out of order - got SEND (id=$id,seq=$seq (acked_to_udp=".$hashref->{"acked_to_udp"}.")" if $D && $D>=2;
+                                       $hashref->{"incoming"}{$seq}=$udp_data_orig;
+                               }
+                       }
+                       if (!$hashref || $hashref->{"acked_to_udp"}+1>=$seq) {
+                               sendpkt pack("CNN",$TYPE_ACK,$id,$seq);
+                               warn "Sent ACK (id=$id,seq=$seq)" if $D;
+                       }
+                       goto retry if $try_retry;
+               } elsif ($type==$TYPE_ACK) {{
+                       my $hashref=$sock{$id};
+                       if (!$hashref) {
+                               cluck "Got ACK but for nonexisting sock $id";
+                               stats("ufosock");
+                               last;
+                       }
+                       my($seq);
+                       ($seq,$udp_data)=unpack "Na*",$udp_data;
+                       warn "Got ACK (id=$id,seq=$seq)" if $D;
+                       die if $udp_data;
+                       ###exists $hashref->{"sent_queue"}{$seq} or confess "Nonexisting queue of $id: $seq";
+                       if (exists $hashref->{"sent_queue"}{$seq}) {
+                               my $data=$hashref->{"sent_queue"}{$seq}{"data"};
+                               die if !$seq && defined $data;
+                               die if $seq && defined $data && $data eq "";
+                               delete $hashref->{"sent_queue"}{$seq};
+                               if ($seq && !defined $data) {
+                                       delete $active{$id};
+                                       warn "Deleted active id $id (processed ACK on close)" if $D;
+                               }
+                               warn "Processed ACK (id=$id,seq=$seq); remaining:".scalar(keys(%{$hashref->{"sent_queue"}})) if $D;
+                       }
+               }} elsif ($type==$TYPE_CLOSE) {
+                       my($seq);
+                       ($seq,$udp_data)=unpack "Na*",$udp_data;
+                       my $hashref=$sock{$id};
+                       if (!$hashref) {
+                               cluck "Got CLOSE but for nonexisting sock $id";
+                               stats("ufosock");
+                       } else {
+                               warn "Got CLOSE (id=$id,seq=$seq)" if $D;
+                               die if $udp_data;
+                               if ($hashref->{"acked_to_udp"}+1>$seq) {
+                                       stats("recvdup");
+                               }
+                               if ($hashref->{"acked_to_udp"}+1==$seq && $hashref->{"stream"}) {
+                                       close $hashref->{"stream"} or confess "Cannot close socket of $id";
+                                       delete $hashref->{"stream"};
+                                       $hashref->{"acked_to_udp"}=$seq;
+                                       confess if !$active{$id};
+                                       delete $active{$id};
+                                       warn "Closed the local stream, deleted it from active (id=$id,seq=$seq)" if $D;
+                               }
+                       }
+                       if (!$hashref || $hashref->{"acked_to_udp"}+1>=$seq) {
+                               sendpkt pack("CNN",$TYPE_ACK,$id,$seq);
+                               warn "Sent ACK of close (id=$id,seq=$seq)" if $D;
+                       }
+               } else {
+                       confess "Invalid packet type $type";
+               }
+       }}
+       $earliest=undef();
+       for my $hashref (values(%active)) {
+               my $id=$hashref->{"id"};
+               for my $seq (sort {$a <=> $b} keys(%{$hashref->{"sent_queue"}})) {
+                       my $seqhashref=$hashref->{"sent_queue"}{$seq};
+                       my $data=$seqhashref->{"data"};
+                       my $when=$seqhashref->{"timeout"};
+                       if (time()>=$when) {
+                               if ($seq==0) {
+                                       die if defined $data;
+                                       warn "Resent OPEN (id=$id)" if $D;
+                                       sendpkt pack("CNN",$TYPE_OPEN,$id,$hashref->{"which"}),"sentdup";
+                               } elsif (defined $data) {
+                                       die if $data eq "";
+                                       warn "Resent SEND (id=$id,seq=$seq)" if $D;
+                                       sendpkt pack("CNNa*",$TYPE_SEND,$id,$seq,$data),"sentdup";
+                               } else {        # pending CLOSE
+                                       warn "Resent CLOSE (id=$id,seq=$seq)" if $D;
+                                       sendpkt pack("CNN",$TYPE_CLOSE,$id,$seq),"sentdup";
+                               }
+                               $when=$seqhashref->{"timeout"}=time()+$opt_timeout;
+                       }
+                       $earliest=$when if !$earliest || $when<$earliest;
+                       last if time()<$seqhashref->{"timeout"};
+               }
+       }
+}