4 # Copyright (C) 2004-2007 Jan Kratochvil <project-tcpoverudp@jankratochvil.net>
6 # This program is free software; you can redistribute it and/or modify
7 # it under the terms of the GNU General Public License as published by
8 # the Free Software Foundation; exactly version 2 of June 1991 is required
10 # This program is distributed in the hope that it will be useful,
11 # but WITHOUT ANY WARRANTY; without even the implied warranty of
12 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
13 # GNU General Public License for more details.
15 # You should have received a copy of the GNU General Public License
16 # along with this program; if not, write to the Free Software
17 # Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
22 use Getopt::Long 2.35; # >=2.35 for: {,}
23 require IO::Socket::INET;
25 use Carp qw(cluck confess);
27 use Time::HiRes qw(time);
30 my $READ_SIZE=256; # 96kbit = 47 x 256B
36 $SIG{$_}=eval "sub { cluck 'INFO: Got signal SIG$_'; };";
40 my $opt_udp_listen_port;
41 my $opt_udp_server_addr;
42 my $opt_udp_server_port;
43 my @opt_tcp_listen_port;
44 my @opt_tcp_forward_addr;
45 my @opt_tcp_forward_port;
49 "udp-listen-port=s",\$opt_udp_listen_port,
50 "udp-server-addr=s",\$opt_udp_server_addr,
51 "udp-server-port=s",\$opt_udp_server_port,
52 "tcp-listen-port=s{,}",\@opt_tcp_listen_port,
53 "tcp-forward-addr=s{,}",\@opt_tcp_forward_addr,
54 "tcp-forward-port=s{,}",\@opt_tcp_forward_port,
55 "t|timeout=s",\$opt_timeout,
56 "recvloss=s",\$opt_recvloss,
60 die "udp-server- addr/port inconsistency" if !$opt_udp_server_addr != !$opt_udp_server_port;
61 die "udp- listen/sever port inconsistency" if !$opt_udp_listen_port == !$opt_udp_server_port;
62 die "tcp-forward- addr/port inconsistency" if !@opt_tcp_forward_addr != !@opt_tcp_forward_port;
63 die "tcp- listen/forward port inconsistency" if !@opt_tcp_listen_port == !@opt_tcp_forward_port;
64 die "udp vs. tcp inconsistency" if !$opt_udp_listen_port == !@opt_tcp_listen_port;
67 for my $tcp_listen_port (@opt_tcp_listen_port) {
68 my $sock_tcp=IO::Socket::INET->new(
69 LocalPort=>$tcp_listen_port,
73 ) or die "socket(): $!";
74 push @sock_tcp,$sock_tcp;
78 if ($opt_udp_listen_port) {
79 $sock_udp=IO::Socket::INET->new(
81 LocalPort=>$opt_udp_listen_port,
82 ) or die "socket(): $!";
84 $sock_udp=IO::Socket::INET->new(
86 PeerAddr=>$opt_udp_server_addr,
87 PeerPort=>$opt_udp_server_port,
88 ) or die "socket(): $!";
107 return if $now<$last+1 && !$D;
109 print join(" ","stats:",map(("$_=".$stats{$_}),sort keys(%stats))).($D ? "\r" : "\r");
113 my $MAGIC=0x56319EA6;
120 cluck "Still no peer to send";
124 $data=pack "Na*",$MAGIC,$data;
125 if (!send $sock_udp,$data,0,$peer_addr) {
126 cluck "Error sending packet: $!";
129 stats($stats||"sentok");
145 "timeout"=>time()+$opt_timeout,
154 my($id,$which,$stream)=@_;
156 confess if $sock{$id};
157 $active{$id}=$sock{$id}={
160 "which"=>$which, # for OPEN retransmits
172 my $TYPE_OPEN=0; # new_id,which
173 my $TYPE_SEND=1; # id,seq,data
174 my $TYPE_ACK=2; # id,seq
175 my $TYPE_CLOSE=3; # id,seq
178 $V and print localtime()." START\n";
179 if ($opt_udp_server_port) {
180 my $host=gethostbyname($opt_udp_server_addr) or die "resolving $opt_udp_server_addr: $!";
181 $peer_addr=sockaddr_in($opt_udp_server_port,$host) or die "assembling $opt_udp_server_addr:$opt_udp_server_port";
182 my($back_port,$back_host)=sockaddr_in $peer_addr;
183 $back_host=inet_ntoa $back_host;
184 warn "Peer server: $back_host:$back_port";
189 for my $sock_tcp (@sock_tcp) {
190 vec($rfds,fileno($sock_tcp),1)=1;
192 vec($rfds,fileno($sock_udp),1)=1;
193 for my $hashref (values(%active)) {
194 next if !$hashref->{"stream"};
195 next if keys(%{$hashref->{"sent_queue"}})>=$MAX_UNACKED;
196 vec($rfds,fileno($hashref->{"stream"}),1)=1;
198 ###warn "select(2)..." if $D;
199 my $periodic_remaining;
201 $periodic_remaining=($earliest>$now ? $earliest-$now : 0) if $earliest;
202 my $got=select $rfds,undef(),undef(),$periodic_remaining;
203 ###warn "got from select." if $D;
204 die "Invalid select(2): ".Dumper($got) if !defined $got || $got<0;
206 for my $which (0..$#sock_tcp) {
207 my $sock_tcp=$sock_tcp[$which];
208 next if !vec($rfds,fileno($sock_tcp),1);
210 accept $sock_tcp_new,$sock_tcp or confess "Error accepting new TCP socket: $!";
212 warn "Accepted new TCP (id=$id)" if $D;
213 my $old=select $sock_tcp_new;
216 sock_new $id,$which,$sock_tcp_new;
217 sendpkt pack("CNN",$TYPE_OPEN,$id,$which);
218 warn "Sent OPEN (id=$id)" if $D;
220 for my $hashref (values(%active)) {
221 next if !$hashref->{"stream"};
222 my $id=$hashref->{"id"};
223 next if !vec($rfds,fileno($hashref->{"stream"}),1);
225 fcntl($hashref->{"stream"},F_SETFL,O_NONBLOCK) or die "fnctl(,F_SETFL,O_NONBLOCK)";
226 my $got=sysread $hashref->{"stream"},$buf,$READ_SIZE;
227 fcntl($hashref->{"stream"},F_SETFL,0) or die "fnctl(,F_SETFL,0)";
228 #defined($got) or confess "Error reading TCP socket: $!";
230 warn "Got TCP EOF/error (id=$id)" if $D;
231 my $seq=++$hashref->{"sent_to_udp"};
232 $hashref->{"sent_queue"}{$seq}=seq_new(undef());
233 sendpkt pack("CNN",$TYPE_CLOSE,$id,$seq);
234 close $hashref->{"stream"} or confess "Error closing local socket: $!";
235 delete $hashref->{"stream"};
236 warn "Sent CLOSE (id=$id,seq=$seq)" if $D;
237 } elsif ($got==length $buf) {
238 warn "Got TCP data (id=$id,got=$got)" if $D;
239 my $seq=++$hashref->{"sent_to_udp"};
240 $hashref->{"sent_queue"}{$seq}=seq_new($buf);
241 sendpkt pack("CNNa*",$TYPE_SEND,$id,$seq,$buf);
242 warn "Sent SEND (id=$id,seq=$seq,data=".printable($buf).")" if $D;
244 confess "Invalid socket read return value: $got";
247 if (vec($rfds,fileno($sock_udp),1)) {{
249 my $got_addr=recv $sock_udp,$udp_data,0x10000,0;
251 cluck "Error receiving UDP data: $!";
255 $peer_addr||=$got_addr;
256 if ($got_addr ne $peer_addr) {
257 my($port,$host)=sockaddr_in $got_addr;
258 $host=inet_ntoa $host;
259 cluck "Ignoring packet as from unidentified address: $host:$port";
266 $udp_data=$try_retry;
269 my $udp_data_orig=$udp_data;
270 my($magic,$type,$id);
271 ($magic,$type,$id,$udp_data)=unpack "NCNa*",$udp_data;
272 if (!$magic || $magic!=$MAGIC) {
274 } elsif (rand() < $opt_recvloss) {
275 warn "Got type=$type (id=$id) but it got lost" if $D;
276 } elsif ($type==$TYPE_OPEN) {
278 ($which,$udp_data)=unpack "Na*",$udp_data;
279 warn "Got OPEN (id=$id,which=$which)" if $D;
282 my $sock_tcp_new=IO::Socket::INET->new(
283 PeerAddr=>$opt_tcp_forward_addr[$which],
284 PeerPort=>$opt_tcp_forward_port[$which],
287 if (!$sock_tcp_new) {
288 sendpkt pack("CNN",$TYPE_CLOSE,$id,1);
289 warn "Refused back OPEN by CLOSE (id=$id,seq=1)" if $D;
291 my $old=select $sock_tcp_new;
294 sock_new $id,$which,$sock_tcp_new;
298 sendpkt pack("CNN",$TYPE_ACK,$id,0);
299 } elsif ($type==$TYPE_SEND) {
301 ($seq,$udp_data)=unpack "Na*",$udp_data;
302 my $hashref=$sock{$id};
304 cluck "Got SEND but for nonexisting sock $id";
307 warn "Got SEND(id=$id,seq=$seq (acked_to_udp=".$hashref->{"acked_to_udp"}."),data=".printable($udp_data).")" if $D;
308 if ($hashref->{"acked_to_udp"}+1>$seq) {
311 if ($hashref->{"acked_to_udp"}+1==$seq) {
312 if ($hashref->{"stream"}) {
313 if (length($udp_data)==((syswrite $hashref->{"stream"},$udp_data,length($udp_data)) || 0)) {
314 warn "Wrote TCP data (id=$id,acked_to_udp=seq=$seq,data=".printable($udp_data).")" if $D;
316 my $seqclose=++$hashref->{"sent_to_udp"};
317 $hashref->{"sent_queue"}{$seqclose}=seq_new(undef());
318 warn "Refusing back OPEN by CLOSE (id=$id,seqclose=$seqclose)" if $D;
319 sendpkt pack("CNN",$TYPE_CLOSE,$id,$seqclose);
322 $hashref->{"acked_to_udp"}=$seq;
324 warn "In order - got SEND (id=$id,seq=$seq (acked_to_udp=".$hashref->{"acked_to_udp"}.")" if $D && $D>=2;
325 if (($try_retry=$hashref->{"incoming"}{$seq+1})) {
326 delete $hashref->{"incoming"}{$seq+1};
327 warn "Reinserted, retrying" if $D && $D>=2;
330 if ($hashref->{"acked_to_udp"}+1<$seq) {
331 warn "Out of order - got SEND (id=$id,seq=$seq (acked_to_udp=".$hashref->{"acked_to_udp"}.")" if $D && $D>=2;
332 $hashref->{"incoming"}{$seq}=$udp_data_orig;
335 if (!$hashref || $hashref->{"acked_to_udp"}+1>=$seq) {
336 sendpkt pack("CNN",$TYPE_ACK,$id,$seq);
337 warn "Sent ACK (id=$id,seq=$seq)" if $D;
339 goto retry if $try_retry;
340 } elsif ($type==$TYPE_ACK) {{
341 my $hashref=$sock{$id};
343 cluck "Got ACK but for nonexisting sock $id";
348 ($seq,$udp_data)=unpack "Na*",$udp_data;
349 warn "Got ACK (id=$id,seq=$seq)" if $D;
351 ###exists $hashref->{"sent_queue"}{$seq} or confess "Nonexisting queue of $id: $seq";
352 if (exists $hashref->{"sent_queue"}{$seq}) {
353 my $data=$hashref->{"sent_queue"}{$seq}{"data"};
354 die if !$seq && defined $data;
355 die if $seq && defined $data && $data eq "";
356 delete $hashref->{"sent_queue"}{$seq};
357 if ($seq && !defined $data) {
359 warn "Deleted active id $id (processed ACK on close)" if $D;
361 warn "Processed ACK (id=$id,seq=$seq); remaining:".scalar(keys(%{$hashref->{"sent_queue"}})) if $D;
363 }} elsif ($type==$TYPE_CLOSE) {
365 ($seq,$udp_data)=unpack "Na*",$udp_data;
366 my $hashref=$sock{$id};
368 cluck "Got CLOSE but for nonexisting sock $id";
371 warn "Got CLOSE (id=$id,seq=$seq)" if $D;
373 if ($hashref->{"acked_to_udp"}+1>$seq) {
376 if ($hashref->{"acked_to_udp"}+1==$seq && $hashref->{"stream"}) {
377 close $hashref->{"stream"} or confess "Cannot close socket of $id";
378 delete $hashref->{"stream"};
379 $hashref->{"acked_to_udp"}=$seq;
380 confess if !$active{$id};
382 warn "Closed the local stream, deleted it from active (id=$id,seq=$seq)" if $D;
385 if (!$hashref || $hashref->{"acked_to_udp"}+1>=$seq) {
386 sendpkt pack("CNN",$TYPE_ACK,$id,$seq);
387 warn "Sent ACK of close (id=$id,seq=$seq)" if $D;
390 confess "Invalid packet type $type";
394 for my $hashref (values(%active)) {
395 my $id=$hashref->{"id"};
396 for my $seq (sort {$a <=> $b} keys(%{$hashref->{"sent_queue"}})) {
397 my $seqhashref=$hashref->{"sent_queue"}{$seq};
398 my $data=$seqhashref->{"data"};
399 my $when=$seqhashref->{"timeout"};
402 die if defined $data;
403 warn "Resent OPEN (id=$id)" if $D;
404 sendpkt pack("CNN",$TYPE_OPEN,$id,$hashref->{"which"}),"sentdup";
405 } elsif (defined $data) {
407 warn "Resent SEND (id=$id,seq=$seq)" if $D;
408 sendpkt pack("CNNa*",$TYPE_SEND,$id,$seq,$data),"sentdup";
409 } else { # pending CLOSE
410 warn "Resent CLOSE (id=$id,seq=$seq)" if $D;
411 sendpkt pack("CNN",$TYPE_CLOSE,$id,$seq),"sentdup";
413 $when=$seqhashref->{"timeout"}=time()+$opt_timeout;
415 $earliest=$when if !$earliest || $when<$earliest;
416 last if time()<$seqhashref->{"timeout"};