From: lace <> Date: Mon, 26 Mar 2007 22:49:00 +0000 (+0000) Subject: Initial import. X-Git-Url: http://git.jankratochvil.net/?p=tcpoverudp.git;a=commitdiff_plain;h=c2d4e159ca2346685d5e3f729125dc9e9c5d31ef Initial import. --- c2d4e159ca2346685d5e3f729125dc9e9c5d31ef diff --git a/tcpoverudp b/tcpoverudp new file mode 100755 index 0000000..8e66659 --- /dev/null +++ b/tcpoverudp @@ -0,0 +1,419 @@ +#! /usr/bin/perl +# +# $Id$ +# Copyright (C) 2004-2007 Jan Kratochvil +# +# This program is free software; you can redistribute it and/or modify +# it under the terms of the GNU General Public License as published by +# the Free Software Foundation; exactly version 2 of June 1991 is required +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License +# along with this program; if not, write to the Free Software +# Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA + + +use strict; +use warnings; +use Getopt::Long; +require IO::Socket::INET; +use Fcntl; +use Carp qw(cluck confess); +use Socket; +use Time::HiRes qw(time); + + +my $READ_SIZE=256; # 96kbit = 47 x 256B +my $MAX_UNACKED=8; + +my $V=1; +$|=1; +for (qw(PIPE)) { + $SIG{$_}=eval "sub { cluck 'INFO: Got signal SIG$_'; };"; +} + +my $D; +my $opt_udp_listen_port; +my $opt_udp_server_addr; +my $opt_udp_server_port; +my @opt_tcp_listen_port; +my @opt_tcp_forward_addr; +my @opt_tcp_forward_port; +my $opt_timeout=0.1; +my $opt_recvloss=0; +die if !GetOptions( + "udp-listen-port=s",\$opt_udp_listen_port, + "udp-server-addr=s",\$opt_udp_server_addr, + "udp-server-port=s",\$opt_udp_server_port, + "tcp-listen-port=s{,}",\@opt_tcp_listen_port, + "tcp-forward-addr=s{,}",\@opt_tcp_forward_addr, + "tcp-forward-port=s{,}",\@opt_tcp_forward_port, + "t|timeout=s",\$opt_timeout, + "recvloss=s",\$opt_recvloss, + "d|debug+",\$D, + ); + +die "udp-server- addr/port inconsistency" if !$opt_udp_server_addr != !$opt_udp_server_port; +die "udp- listen/sever port inconsistency" if !$opt_udp_listen_port == !$opt_udp_server_port; +die "tcp-forward- addr/port inconsistency" if !@opt_tcp_forward_addr != !@opt_tcp_forward_port; +die "tcp- listen/forward port inconsistency" if !@opt_tcp_listen_port == !@opt_tcp_forward_port; +die "udp vs. tcp inconsistency" if !$opt_udp_listen_port == !@opt_tcp_listen_port; + +my @sock_tcp; +for my $tcp_listen_port (@opt_tcp_listen_port) { + my $sock_tcp=IO::Socket::INET->new( + LocalPort=>$tcp_listen_port, + Proto=>"tcp", + Listen=>5, + ReuseAddr=>1, + ) or die "socket(): $!"; + push @sock_tcp,$sock_tcp; +} + +my $sock_udp; +if ($opt_udp_listen_port) { + $sock_udp=IO::Socket::INET->new( + Proto=>"udp", + LocalPort=>$opt_udp_listen_port, + ) or die "socket(): $!"; +} else { + $sock_udp=IO::Socket::INET->new( + Proto=>"udp", + PeerAddr=>$opt_udp_server_addr, + PeerPort=>$opt_udp_server_port, + ) or die "socket(): $!"; +} + +sub id_new() +{ + our $id; + $id||=0; + return $id++; +} + +my %stats; +sub stats($) +{ + my($name)=@_; + + $stats{$name}++; + our $last; + $last||=time(); + my $now=time(); + return if $now<$last+1 && !$D; + $last=$now; + print join(" ","stats:",map(("$_=".$stats{$_}),sort keys(%stats))).($D ? "\r" : "\r"); +} + +my $peer_addr; +my $MAGIC=0x56319EA6; + +sub sendpkt($;$) +{ + my($data,$stats)=@_; + + if (!$peer_addr) { + cluck "Still no peer to send"; + stats("sentearly"); + return; + } + $data=pack "Na*",$MAGIC,$data; + if (!send $sock_udp,$data,0,$peer_addr) { + cluck "Error sending packet: $!"; + $stats="senterr"; + } + stats($stats||"sentok"); +} + +sub printable($) +{ + local $_=$_[0]; + s/\W/./gs; + return $_; +} + +sub seq_new($) +{ + my($data)=@_; + + return { + "data"=>$data, + "timeout"=>time()+$opt_timeout, + }; +} + +my %sock; +my %active; + +sub sock_new($$$) +{ + my($id,$which,$stream)=@_; + + confess if $sock{$id}; + $active{$id}=$sock{$id}={ + "id"=>$id, + "stream"=>$stream, + "which"=>$which, # for OPEN retransmits + "sent_to_udp"=>0, + "sent_queue"=>{ + 0=>seq_new(undef()), + }, + "acked_to_udp"=>0, + "incoming"=>{ + # 5=>$udp_data, + }, + }; +} + +my $TYPE_OPEN=0; # new_id,which +my $TYPE_SEND=1; # id,seq,data +my $TYPE_ACK=2; # id,seq +my $TYPE_CLOSE=3; # id,seq + + +$V and print localtime()." START\n"; +if ($opt_udp_server_port) { + my $host=gethostbyname($opt_udp_server_addr) or die "resolving $opt_udp_server_addr: $!"; + $peer_addr=sockaddr_in($opt_udp_server_port,$host) or die "assembling $opt_udp_server_addr:$opt_udp_server_port"; + my($back_port,$back_host)=sockaddr_in $peer_addr; + $back_host=inet_ntoa $back_host; + warn "Peer server: $back_host:$back_port"; +} +my $earliest; +for (;;) { + my $rfds=""; + for my $sock_tcp (@sock_tcp) { + vec($rfds,fileno($sock_tcp),1)=1; + } + vec($rfds,fileno($sock_udp),1)=1; + for my $hashref (values(%active)) { + next if !$hashref->{"stream"}; + next if keys(%{$hashref->{"sent_queue"}})>=$MAX_UNACKED; + vec($rfds,fileno($hashref->{"stream"}),1)=1; + } + ###warn "select(2)..." if $D; + my $periodic_remaining; + my $now=time(); + $periodic_remaining=($earliest>$now ? $earliest-$now : 0) if $earliest; + my $got=select $rfds,undef(),undef(),$periodic_remaining; + ###warn "got from select." if $D; + die "Invalid select(2): ".Dumper($got) if !defined $got || $got<0; + + for my $which (0..$#sock_tcp) { + my $sock_tcp=$sock_tcp[$which]; + next if !vec($rfds,fileno($sock_tcp),1); + my $sock_tcp_new; + accept $sock_tcp_new,$sock_tcp or confess "Error accepting new TCP socket: $!"; + my $id=id_new(); + warn "Accepted new TCP (id=$id)" if $D; + my $old=select $sock_tcp_new; + $|=1; + select $old; + sock_new $id,$which,$sock_tcp_new; + sendpkt pack("CNN",$TYPE_OPEN,$id,$which); + warn "Sent OPEN (id=$id)" if $D; + } + for my $hashref (values(%active)) { + next if !$hashref->{"stream"}; + my $id=$hashref->{"id"}; + next if !vec($rfds,fileno($hashref->{"stream"}),1); + my $buf; + fcntl($hashref->{"stream"},F_SETFL,O_NONBLOCK) or die "fnctl(,F_SETFL,O_NONBLOCK)"; + my $got=sysread $hashref->{"stream"},$buf,$READ_SIZE; + fcntl($hashref->{"stream"},F_SETFL,0) or die "fnctl(,F_SETFL,0)"; + #defined($got) or confess "Error reading TCP socket: $!"; + if (!$got) { + warn "Got TCP EOF/error (id=$id)" if $D; + my $seq=++$hashref->{"sent_to_udp"}; + $hashref->{"sent_queue"}{$seq}=seq_new(undef()); + sendpkt pack("CNN",$TYPE_CLOSE,$id,$seq); + close $hashref->{"stream"} or confess "Error closing local socket: $!"; + delete $hashref->{"stream"}; + warn "Sent CLOSE (id=$id,seq=$seq)" if $D; + } elsif ($got==length $buf) { + warn "Got TCP data (id=$id,got=$got)" if $D; + my $seq=++$hashref->{"sent_to_udp"}; + $hashref->{"sent_queue"}{$seq}=seq_new($buf); + sendpkt pack("CNNa*",$TYPE_SEND,$id,$seq,$buf); + warn "Sent SEND (id=$id,seq=$seq,data=".printable($buf).")" if $D; + } else { + confess "Invalid socket read return value: $got"; + } + } + if (vec($rfds,fileno($sock_udp),1)) {{ + my $udp_data; + my $got_addr=recv $sock_udp,$udp_data,0x10000,0; + if (!$got_addr) { + cluck "Error receiving UDP data: $!"; + stats("recverr"); + last; + } + $peer_addr||=$got_addr; + if ($got_addr ne $peer_addr) { + my($port,$host)=sockaddr_in $got_addr; + $host=inet_ntoa $host; + cluck "Ignoring packet as from unidentified address: $host:$port"; + stats("ufoaddr"); + last; + } + my $try_retry; + retry: + if ($try_retry) { + $udp_data=$try_retry; + $try_retry=undef(); + } + my $udp_data_orig=$udp_data; + my($magic,$type,$id); + ($magic,$type,$id,$udp_data)=unpack "NCNa*",$udp_data; + if (!$magic || $magic!=$MAGIC) { + stats("badcrc"); + } elsif (rand() < $opt_recvloss) { + warn "Got type=$type (id=$id) but it got lost" if $D; + } elsif ($type==$TYPE_OPEN) { + my($which); + ($which,$udp_data)=unpack "Na*",$udp_data; + warn "Got OPEN (id=$id,which=$which)" if $D; + die if $udp_data; + if (!$sock{$id}) { + my $sock_tcp_new=IO::Socket::INET->new( + PeerAddr=>$opt_tcp_forward_addr[$which], + PeerPort=>$opt_tcp_forward_port[$which], + Proto=>"tcp", + ); + if (!$sock_tcp_new) { + sendpkt pack("CNN",$TYPE_CLOSE,$id,1); + warn "Refused back OPEN by CLOSE (id=$id,seq=1)" if $D; + } else { + my $old=select $sock_tcp_new; + $|=1; + select $old; + sock_new $id,$which,$sock_tcp_new; + stats("openok"); + } + } + sendpkt pack("CNN",$TYPE_ACK,$id,0); + } elsif ($type==$TYPE_SEND) { + my($seq); + ($seq,$udp_data)=unpack "Na*",$udp_data; + my $hashref=$sock{$id}; + if (!$hashref) { + cluck "Got SEND but for nonexisting sock $id"; + stats("ufosock"); + } else { + warn "Got SEND(id=$id,seq=$seq (acked_to_udp=".$hashref->{"acked_to_udp"}."),data=".printable($udp_data).")" if $D; + if ($hashref->{"acked_to_udp"}+1>$seq) { + stats("recvdup"); + } + if ($hashref->{"acked_to_udp"}+1==$seq) { + if ($hashref->{"stream"}) { + if (length($udp_data)==((syswrite $hashref->{"stream"},$udp_data,length($udp_data)) || 0)) { + warn "Wrote TCP data (id=$id,acked_to_udp=seq=$seq,data=".printable($udp_data).")" if $D; + } else { + my $seqclose=++$hashref->{"sent_to_udp"}; + $hashref->{"sent_queue"}{$seqclose}=seq_new(undef()); + warn "Refusing back OPEN by CLOSE (id=$id,seqclose=$seqclose)" if $D; + sendpkt pack("CNN",$TYPE_CLOSE,$id,$seqclose); + } + } + $hashref->{"acked_to_udp"}=$seq; + stats("recvok"); + warn "In order - got SEND (id=$id,seq=$seq (acked_to_udp=".$hashref->{"acked_to_udp"}.")" if $D && $D>=2; + if (($try_retry=$hashref->{"incoming"}{$seq+1})) { + delete $hashref->{"incoming"}{$seq+1}; + warn "Reinserted, retrying" if $D && $D>=2; + } + } + if ($hashref->{"acked_to_udp"}+1<$seq) { + warn "Out of order - got SEND (id=$id,seq=$seq (acked_to_udp=".$hashref->{"acked_to_udp"}.")" if $D && $D>=2; + $hashref->{"incoming"}{$seq}=$udp_data_orig; + } + } + if (!$hashref || $hashref->{"acked_to_udp"}+1>=$seq) { + sendpkt pack("CNN",$TYPE_ACK,$id,$seq); + warn "Sent ACK (id=$id,seq=$seq)" if $D; + } + goto retry if $try_retry; + } elsif ($type==$TYPE_ACK) {{ + my $hashref=$sock{$id}; + if (!$hashref) { + cluck "Got ACK but for nonexisting sock $id"; + stats("ufosock"); + last; + } + my($seq); + ($seq,$udp_data)=unpack "Na*",$udp_data; + warn "Got ACK (id=$id,seq=$seq)" if $D; + die if $udp_data; + ###exists $hashref->{"sent_queue"}{$seq} or confess "Nonexisting queue of $id: $seq"; + if (exists $hashref->{"sent_queue"}{$seq}) { + my $data=$hashref->{"sent_queue"}{$seq}{"data"}; + die if !$seq && defined $data; + die if $seq && defined $data && $data eq ""; + delete $hashref->{"sent_queue"}{$seq}; + if ($seq && !defined $data) { + delete $active{$id}; + warn "Deleted active id $id (processed ACK on close)" if $D; + } + warn "Processed ACK (id=$id,seq=$seq); remaining:".scalar(keys(%{$hashref->{"sent_queue"}})) if $D; + } + }} elsif ($type==$TYPE_CLOSE) { + my($seq); + ($seq,$udp_data)=unpack "Na*",$udp_data; + my $hashref=$sock{$id}; + if (!$hashref) { + cluck "Got CLOSE but for nonexisting sock $id"; + stats("ufosock"); + } else { + warn "Got CLOSE (id=$id,seq=$seq)" if $D; + die if $udp_data; + if ($hashref->{"acked_to_udp"}+1>$seq) { + stats("recvdup"); + } + if ($hashref->{"acked_to_udp"}+1==$seq && $hashref->{"stream"}) { + close $hashref->{"stream"} or confess "Cannot close socket of $id"; + delete $hashref->{"stream"}; + $hashref->{"acked_to_udp"}=$seq; + confess if !$active{$id}; + delete $active{$id}; + warn "Closed the local stream, deleted it from active (id=$id,seq=$seq)" if $D; + } + } + if (!$hashref || $hashref->{"acked_to_udp"}+1>=$seq) { + sendpkt pack("CNN",$TYPE_ACK,$id,$seq); + warn "Sent ACK of close (id=$id,seq=$seq)" if $D; + } + } else { + confess "Invalid packet type $type"; + } + }} + $earliest=undef(); + for my $hashref (values(%active)) { + my $id=$hashref->{"id"}; + for my $seq (sort {$a <=> $b} keys(%{$hashref->{"sent_queue"}})) { + my $seqhashref=$hashref->{"sent_queue"}{$seq}; + my $data=$seqhashref->{"data"}; + my $when=$seqhashref->{"timeout"}; + if (time()>=$when) { + if ($seq==0) { + die if defined $data; + warn "Resent OPEN (id=$id)" if $D; + sendpkt pack("CNN",$TYPE_OPEN,$id,$hashref->{"which"}),"sentdup"; + } elsif (defined $data) { + die if $data eq ""; + warn "Resent SEND (id=$id,seq=$seq)" if $D; + sendpkt pack("CNNa*",$TYPE_SEND,$id,$seq,$data),"sentdup"; + } else { # pending CLOSE + warn "Resent CLOSE (id=$id,seq=$seq)" if $D; + sendpkt pack("CNN",$TYPE_CLOSE,$id,$seq),"sentdup"; + } + $when=$seqhashref->{"timeout"}=time()+$opt_timeout; + } + $earliest=$when if !$earliest || $when<$earliest; + last if time()<$seqhashref->{"timeout"}; + } + } +}