+Getopt::Long $VERSION requirement.
[tcpoverudp.git] / tcpoverudp
1 #! /usr/bin/perl
2 #
3 # $Id$
4 # Copyright (C) 2004-2007 Jan Kratochvil <project-tcpoverudp@jankratochvil.net>
5
6 # This program is free software; you can redistribute it and/or modify
7 # it under the terms of the GNU General Public License as published by
8 # the Free Software Foundation; exactly version 2 of June 1991 is required
9
10 # This program is distributed in the hope that it will be useful,
11 # but WITHOUT ANY WARRANTY; without even the implied warranty of
12 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
13 # GNU General Public License for more details.
14
15 # You should have received a copy of the GNU General Public License
16 # along with this program; if not, write to the Free Software
17 # Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA  02111-1307  USA
18
19
20 use strict;
21 use warnings;
22 use Getopt::Long 2.35;  # >=2.35 for: {,}
23 require IO::Socket::INET;
24 use Fcntl;
25 use Carp qw(cluck confess);
26 use Socket;
27 use Time::HiRes qw(time);
28
29
30 my $READ_SIZE=256;      # 96kbit = 47 x 256B
31 my $MAX_UNACKED=8;
32
33 my $V=1;
34 $|=1;
35 for (qw(PIPE)) {
36         $SIG{$_}=eval "sub { cluck 'INFO: Got signal SIG$_'; };";
37 }
38
39 my $D;
40 my $opt_udp_listen_port;
41 my $opt_udp_server_addr;
42 my $opt_udp_server_port;
43 my @opt_tcp_listen_port;
44 my @opt_tcp_forward_addr;
45 my @opt_tcp_forward_port;
46 my $opt_timeout=0.1;
47 my $opt_recvloss=0;
48 die if !GetOptions(
49                   "udp-listen-port=s",\$opt_udp_listen_port,
50                   "udp-server-addr=s",\$opt_udp_server_addr,
51                   "udp-server-port=s",\$opt_udp_server_port,
52                   "tcp-listen-port=s{,}",\@opt_tcp_listen_port,
53                   "tcp-forward-addr=s{,}",\@opt_tcp_forward_addr,
54                   "tcp-forward-port=s{,}",\@opt_tcp_forward_port,
55                 "t|timeout=s",\$opt_timeout,
56                   "recvloss=s",\$opt_recvloss,
57                 "d|debug+",\$D,
58                 );
59
60 die "udp-server- addr/port inconsistency" if !$opt_udp_server_addr != !$opt_udp_server_port;
61 die "udp- listen/sever port inconsistency" if !$opt_udp_listen_port == !$opt_udp_server_port;
62 die "tcp-forward- addr/port inconsistency" if !@opt_tcp_forward_addr != !@opt_tcp_forward_port;
63 die "tcp- listen/forward port inconsistency" if !@opt_tcp_listen_port == !@opt_tcp_forward_port;
64 die "udp vs. tcp inconsistency" if !$opt_udp_listen_port == !@opt_tcp_listen_port;
65
66 my @sock_tcp;
67 for my $tcp_listen_port (@opt_tcp_listen_port) {
68         my $sock_tcp=IO::Socket::INET->new(
69                 LocalPort=>$tcp_listen_port,
70                 Proto=>"tcp",
71                 Listen=>5,
72                 ReuseAddr=>1,
73         ) or die "socket(): $!";
74         push @sock_tcp,$sock_tcp;
75 }
76
77 my $sock_udp;
78 if ($opt_udp_listen_port) {
79         $sock_udp=IO::Socket::INET->new(
80                 Proto=>"udp",
81                 LocalPort=>$opt_udp_listen_port,
82         ) or die "socket(): $!";
83 } else {
84         $sock_udp=IO::Socket::INET->new(
85                 Proto=>"udp",
86                 PeerAddr=>$opt_udp_server_addr,
87                 PeerPort=>$opt_udp_server_port,
88         ) or die "socket(): $!";
89 }
90
91 sub id_new()
92 {
93         our $id;
94         $id||=0;
95         return $id++;
96 }
97
98 my %stats;
99 sub stats($)
100 {
101         my($name)=@_;
102
103         $stats{$name}++;
104         our $last;
105         $last||=time();
106         my $now=time();
107         return if $now<$last+1 && !$D;
108         $last=$now;
109         print join(" ","stats:",map(("$_=".$stats{$_}),sort keys(%stats))).($D ? "\r" : "\r");
110 }
111
112 my $peer_addr;
113 my $MAGIC=0x56319EA6;
114
115 sub sendpkt($;$)
116 {
117         my($data,$stats)=@_;
118
119         if (!$peer_addr) {
120                 cluck "Still no peer to send";
121                 stats("sentearly");
122                 return;
123         }
124         $data=pack "Na*",$MAGIC,$data;
125         if (!send $sock_udp,$data,0,$peer_addr) {
126                 cluck "Error sending packet: $!";
127                 $stats="senterr";
128         }
129         stats($stats||"sentok");
130 }
131
132 sub printable($)
133 {
134         local $_=$_[0];
135         s/\W/./gs;
136         return $_;
137 }
138
139 sub seq_new($)
140 {
141         my($data)=@_;
142
143         return {
144                 "data"=>$data,
145                 "timeout"=>time()+$opt_timeout,
146                 };
147 }
148
149 my %sock;
150 my %active;
151
152 sub sock_new($$$)
153 {
154         my($id,$which,$stream)=@_;
155
156         confess if $sock{$id};
157         $active{$id}=$sock{$id}={
158                 "id"=>$id,
159                 "stream"=>$stream,
160                 "which"=>$which,        # for OPEN retransmits
161                 "sent_to_udp"=>0,
162                 "sent_queue"=>{
163                                 0=>seq_new(undef()),
164                         },
165                 "acked_to_udp"=>0,
166                 "incoming"=>{
167                                 # 5=>$udp_data,
168                         },
169         };
170 }
171
172 my $TYPE_OPEN=0;        # new_id,which
173 my $TYPE_SEND=1;        # id,seq,data
174 my $TYPE_ACK=2;         # id,seq
175 my $TYPE_CLOSE=3;       # id,seq
176
177
178 $V and print localtime()." START\n";
179 if ($opt_udp_server_port) {
180         my $host=gethostbyname($opt_udp_server_addr) or die "resolving $opt_udp_server_addr: $!";
181         $peer_addr=sockaddr_in($opt_udp_server_port,$host) or die "assembling $opt_udp_server_addr:$opt_udp_server_port";
182         my($back_port,$back_host)=sockaddr_in $peer_addr;
183         $back_host=inet_ntoa $back_host;
184         warn "Peer server: $back_host:$back_port";
185 }
186 my $earliest;
187 for (;;) {
188         my $rfds="";
189         for my $sock_tcp (@sock_tcp) {
190                 vec($rfds,fileno($sock_tcp),1)=1;
191         }
192         vec($rfds,fileno($sock_udp),1)=1;
193         for my $hashref (values(%active)) {
194                 next if !$hashref->{"stream"};
195                 next if keys(%{$hashref->{"sent_queue"}})>=$MAX_UNACKED;
196                 vec($rfds,fileno($hashref->{"stream"}),1)=1;
197         }
198         ###warn "select(2)..." if $D;
199         my $periodic_remaining;
200         my $now=time();
201         $periodic_remaining=($earliest>$now ? $earliest-$now : 0) if $earliest;
202         my $got=select $rfds,undef(),undef(),$periodic_remaining;
203         ###warn "got from select." if $D;
204         die "Invalid select(2): ".Dumper($got) if !defined $got || $got<0;
205
206         for my $which (0..$#sock_tcp) {
207                 my $sock_tcp=$sock_tcp[$which];
208                 next if !vec($rfds,fileno($sock_tcp),1);
209                 my $sock_tcp_new;
210                 accept $sock_tcp_new,$sock_tcp or confess "Error accepting new TCP socket: $!";
211                 my $id=id_new();
212                 warn "Accepted new TCP (id=$id)" if $D;
213                 my $old=select $sock_tcp_new;
214                 $|=1;
215                 select $old;
216                 sock_new $id,$which,$sock_tcp_new;
217                 sendpkt pack("CNN",$TYPE_OPEN,$id,$which);
218                 warn "Sent OPEN (id=$id)" if $D;
219         }
220         for my $hashref (values(%active)) {
221                 next if !$hashref->{"stream"};
222                 my $id=$hashref->{"id"};
223                 next if !vec($rfds,fileno($hashref->{"stream"}),1);
224                 my $buf;
225                 fcntl($hashref->{"stream"},F_SETFL,O_NONBLOCK) or die "fnctl(,F_SETFL,O_NONBLOCK)";
226                 my $got=sysread $hashref->{"stream"},$buf,$READ_SIZE;
227                 fcntl($hashref->{"stream"},F_SETFL,0)          or die "fnctl(,F_SETFL,0)";
228                 #defined($got) or confess "Error reading TCP socket: $!";
229                 if (!$got) {
230                         warn "Got TCP EOF/error (id=$id)" if $D;
231                         my $seq=++$hashref->{"sent_to_udp"};
232                         $hashref->{"sent_queue"}{$seq}=seq_new(undef());
233                         sendpkt pack("CNN",$TYPE_CLOSE,$id,$seq);
234                         close $hashref->{"stream"} or confess "Error closing local socket: $!";
235                         delete $hashref->{"stream"};
236                         warn "Sent CLOSE (id=$id,seq=$seq)" if $D;
237                 } elsif ($got==length $buf) {
238                         warn "Got TCP data (id=$id,got=$got)" if $D;
239                         my $seq=++$hashref->{"sent_to_udp"};
240                         $hashref->{"sent_queue"}{$seq}=seq_new($buf);
241                         sendpkt pack("CNNa*",$TYPE_SEND,$id,$seq,$buf);
242                         warn "Sent SEND (id=$id,seq=$seq,data=".printable($buf).")" if $D;
243                 } else {
244                         confess "Invalid socket read return value: $got";
245                 }
246         }
247         if (vec($rfds,fileno($sock_udp),1)) {{
248                 my $udp_data;
249                 my $got_addr=recv $sock_udp,$udp_data,0x10000,0;
250                 if (!$got_addr) {
251                         cluck "Error receiving UDP data: $!";
252                         stats("recverr");
253                         last;
254                 }
255                 $peer_addr||=$got_addr;
256                 if ($got_addr ne $peer_addr) {
257                         my($port,$host)=sockaddr_in $got_addr;
258                         $host=inet_ntoa $host;
259                         cluck "Ignoring packet as from unidentified address: $host:$port";
260                         stats("ufoaddr");
261                         last;
262                 }
263                 my $try_retry;
264                 retry:
265                 if ($try_retry) {
266                         $udp_data=$try_retry;
267                         $try_retry=undef();
268                 }
269                 my $udp_data_orig=$udp_data;
270                 my($magic,$type,$id);
271                 ($magic,$type,$id,$udp_data)=unpack "NCNa*",$udp_data;
272                 if (!$magic || $magic!=$MAGIC) {
273                         stats("badcrc");
274                 } elsif (rand() < $opt_recvloss) {
275                         warn "Got type=$type (id=$id) but it got lost" if $D;
276                 } elsif ($type==$TYPE_OPEN) {
277                         my($which);
278                         ($which,$udp_data)=unpack "Na*",$udp_data;
279                         warn "Got OPEN (id=$id,which=$which)" if $D;
280                         die if $udp_data;
281                         if (!$sock{$id}) {
282                                 my $sock_tcp_new=IO::Socket::INET->new(
283                                         PeerAddr=>$opt_tcp_forward_addr[$which],
284                                         PeerPort=>$opt_tcp_forward_port[$which],
285                                         Proto=>"tcp",
286                                 );
287                                 if (!$sock_tcp_new) {
288                                         sendpkt pack("CNN",$TYPE_CLOSE,$id,1);
289                                         warn "Refused back OPEN by CLOSE (id=$id,seq=1)" if $D;
290                                 } else {
291                                         my $old=select $sock_tcp_new;
292                                         $|=1;
293                                         select $old;
294                                         sock_new $id,$which,$sock_tcp_new;
295                                         stats("openok");
296                                 }
297                         }
298                         sendpkt pack("CNN",$TYPE_ACK,$id,0);
299                 } elsif ($type==$TYPE_SEND) {
300                         my($seq);
301                         ($seq,$udp_data)=unpack "Na*",$udp_data;
302                         my $hashref=$sock{$id};
303                         if (!$hashref) {
304                                 cluck "Got SEND but for nonexisting sock $id";
305                                 stats("ufosock");
306                         } else {
307                                 warn "Got SEND(id=$id,seq=$seq (acked_to_udp=".$hashref->{"acked_to_udp"}."),data=".printable($udp_data).")" if $D;
308                                 if ($hashref->{"acked_to_udp"}+1>$seq) {
309                                         stats("recvdup");
310                                 }
311                                 if ($hashref->{"acked_to_udp"}+1==$seq) {
312                                         if ($hashref->{"stream"}) {
313                                                 if (length($udp_data)==((syswrite $hashref->{"stream"},$udp_data,length($udp_data)) || 0)) {
314                                                         warn "Wrote TCP data (id=$id,acked_to_udp=seq=$seq,data=".printable($udp_data).")" if $D;
315                                                 } else {
316                                                         my $seqclose=++$hashref->{"sent_to_udp"};
317                                                         $hashref->{"sent_queue"}{$seqclose}=seq_new(undef());
318                                                         warn "Refusing back OPEN by CLOSE (id=$id,seqclose=$seqclose)" if $D;
319                                                         sendpkt pack("CNN",$TYPE_CLOSE,$id,$seqclose);
320                                                 }
321                                         }
322                                         $hashref->{"acked_to_udp"}=$seq;
323                                         stats("recvok");
324                                         warn "In     order - got SEND (id=$id,seq=$seq (acked_to_udp=".$hashref->{"acked_to_udp"}.")" if $D && $D>=2;
325                                         if (($try_retry=$hashref->{"incoming"}{$seq+1})) {
326                                                 delete $hashref->{"incoming"}{$seq+1};
327                                                 warn "Reinserted, retrying" if $D && $D>=2;
328                                         }
329                                 }
330                                 if ($hashref->{"acked_to_udp"}+1<$seq) {
331                                         warn "Out of order - got SEND (id=$id,seq=$seq (acked_to_udp=".$hashref->{"acked_to_udp"}.")" if $D && $D>=2;
332                                         $hashref->{"incoming"}{$seq}=$udp_data_orig;
333                                 }
334                         }
335                         if (!$hashref || $hashref->{"acked_to_udp"}+1>=$seq) {
336                                 sendpkt pack("CNN",$TYPE_ACK,$id,$seq);
337                                 warn "Sent ACK (id=$id,seq=$seq)" if $D;
338                         }
339                         goto retry if $try_retry;
340                 } elsif ($type==$TYPE_ACK) {{
341                         my $hashref=$sock{$id};
342                         if (!$hashref) {
343                                 cluck "Got ACK but for nonexisting sock $id";
344                                 stats("ufosock");
345                                 last;
346                         }
347                         my($seq);
348                         ($seq,$udp_data)=unpack "Na*",$udp_data;
349                         warn "Got ACK (id=$id,seq=$seq)" if $D;
350                         die if $udp_data;
351                         ###exists $hashref->{"sent_queue"}{$seq} or confess "Nonexisting queue of $id: $seq";
352                         if (exists $hashref->{"sent_queue"}{$seq}) {
353                                 my $data=$hashref->{"sent_queue"}{$seq}{"data"};
354                                 die if !$seq && defined $data;
355                                 die if $seq && defined $data && $data eq "";
356                                 delete $hashref->{"sent_queue"}{$seq};
357                                 if ($seq && !defined $data) {
358                                         delete $active{$id};
359                                         warn "Deleted active id $id (processed ACK on close)" if $D;
360                                 }
361                                 warn "Processed ACK (id=$id,seq=$seq); remaining:".scalar(keys(%{$hashref->{"sent_queue"}})) if $D;
362                         }
363                 }} elsif ($type==$TYPE_CLOSE) {
364                         my($seq);
365                         ($seq,$udp_data)=unpack "Na*",$udp_data;
366                         my $hashref=$sock{$id};
367                         if (!$hashref) {
368                                 cluck "Got CLOSE but for nonexisting sock $id";
369                                 stats("ufosock");
370                         } else {
371                                 warn "Got CLOSE (id=$id,seq=$seq)" if $D;
372                                 die if $udp_data;
373                                 if ($hashref->{"acked_to_udp"}+1>$seq) {
374                                         stats("recvdup");
375                                 }
376                                 if ($hashref->{"acked_to_udp"}+1==$seq && $hashref->{"stream"}) {
377                                         close $hashref->{"stream"} or confess "Cannot close socket of $id";
378                                         delete $hashref->{"stream"};
379                                         $hashref->{"acked_to_udp"}=$seq;
380                                         confess if !$active{$id};
381                                         delete $active{$id};
382                                         warn "Closed the local stream, deleted it from active (id=$id,seq=$seq)" if $D;
383                                 }
384                         }
385                         if (!$hashref || $hashref->{"acked_to_udp"}+1>=$seq) {
386                                 sendpkt pack("CNN",$TYPE_ACK,$id,$seq);
387                                 warn "Sent ACK of close (id=$id,seq=$seq)" if $D;
388                         }
389                 } else {
390                         confess "Invalid packet type $type";
391                 }
392         }}
393         $earliest=undef();
394         for my $hashref (values(%active)) {
395                 my $id=$hashref->{"id"};
396                 for my $seq (sort {$a <=> $b} keys(%{$hashref->{"sent_queue"}})) {
397                         my $seqhashref=$hashref->{"sent_queue"}{$seq};
398                         my $data=$seqhashref->{"data"};
399                         my $when=$seqhashref->{"timeout"};
400                         if (time()>=$when) {
401                                 if ($seq==0) {
402                                         die if defined $data;
403                                         warn "Resent OPEN (id=$id)" if $D;
404                                         sendpkt pack("CNN",$TYPE_OPEN,$id,$hashref->{"which"}),"sentdup";
405                                 } elsif (defined $data) {
406                                         die if $data eq "";
407                                         warn "Resent SEND (id=$id,seq=$seq)" if $D;
408                                         sendpkt pack("CNNa*",$TYPE_SEND,$id,$seq,$data),"sentdup";
409                                 } else {        # pending CLOSE
410                                         warn "Resent CLOSE (id=$id,seq=$seq)" if $D;
411                                         sendpkt pack("CNN",$TYPE_CLOSE,$id,$seq),"sentdup";
412                                 }
413                                 $when=$seqhashref->{"timeout"}=time()+$opt_timeout;
414                         }
415                         $earliest=$when if !$earliest || $when<$earliest;
416                         last if time()<$seqhashref->{"timeout"};
417                 }
418         }
419 }