pipebuf included as it is used by .bashrc/cvs*() functions
[nethome.git] / src / pipebuf.c
1 #/*
2 # PipeBuf version 1.2
3 # To compile run:           sh pipebuf.c
4 # Help is then available:   ./pipebuf -h
5
6 # Copyright (C) 1998 Jan Kratochvil <short@ucw.cz>
7 #
8 # This program is free software; you can redistribute it and/or modify
9 # it under the terms of the GNU General Public License as published by
10 # the Free Software Foundation; you must use exactly version 2.
11 #
12 # This program is distributed in the hope that it will be useful,
13 # but WITHOUT ANY WARRANTY; without even the implied warranty of
14 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
15 # GNU General Public License for more details.
16 #
17 # You may download a copy of the GNU General Public License from URL
18 #   http://www.opensource.org/gpl-license.html
19 # If not, write to the Free Software Foundation,
20 # Inc., 59 Temple Place - Suite 330, Boston, MA 02111-1307, USA.
21
22 echo "Compiling PipeBuf..."
23 a="cc"
24 b="-s -Wall -O6 -fexpensive-optimizations -fomit-frame-pointer -D_GNU_SOURCE=1"
25 c="-o `basename "$0" .c` $0"
26 echo "$a $b $c"
27 if $a $b $c;then echo -n
28 else echo "$a $c"
29   if $a $c;then echo -n
30   else echo "Failed - please check the output.";exit
31   fi
32 fi
33 echo "done."
34 exit
35 */
36
37 #include <stdio.h>
38 #include <stdlib.h>
39 #include <sys/time.h>
40 #include <sys/types.h>
41 #include <sys/wait.h>
42 #include <sys/mman.h>
43 #include <signal.h>
44 #include <unistd.h>
45 #include <fcntl.h>
46 #include <string.h>
47 #include <limits.h>
48 #include <sys/ipc.h>
49 #include <sys/shm.h>
50 #include <sys/msg.h>
51 #include <getopt.h>
52 #include <errno.h>
53 #include <time.h>
54
55 #define BUFSZ (12<<10) /* in KB */
56 #define BUFWARN (80) /* in percents */
57 #define WARNTIM (5) /* in seconds */
58 #define MAX_XFER (PIPE_BUF)
59 #undef DEBUG
60
61 #ifndef SHMMAX
62 #define SHMMAX 0x2000000
63 #endif
64
65 #ifndef __NORETURN
66 #if __GNUC__ >= 2
67 #define __NORETURN __attribute__((__noreturn__))
68 #else
69 #define __NORETURN
70 #endif
71 #endif
72
73 #define RDID (0)
74 #define WRID (1)
75
76 #ifndef DEBUG
77 #define dbg(cmd)
78 #else
79 #define dbg(cmd) cmd
80 #endif
81 #define PNAME_LEN (16)
82 #define bufw(n) ((n)==bufsz?0:(n))
83 #define failf(name) do { fprintf(stderr,"%s: ",pname); perror(name"()"); exit(EXIT_FAILURE); } while (0)
84 #ifndef max
85 #define max(a,b) ((a)>=(b)?(a):(b))
86 #endif
87 #ifndef min
88 #define min(a,b) ((a)<=(b)?(a):(b))
89 #endif
90
91 const char version[]="This is PipeBuf, version 1.2\n";
92
93 long bufsz=BUFSZ<<10,bufwarn=BUFWARN;
94 int prefill,quiet,verbose;
95
96 char *pname;
97 int shmid=-1,dis_cleanup=0,myself;
98 pid_t other=-1;
99 volatile struct {
100         long rp,wp;
101         int eof;
102         time_t up;
103         struct {
104                 char sent;
105                 int msqid;
106                 } p[2];
107         } *comm;
108 struct msgbuf smsgbuf={1,{0}};
109 struct msgbuf rmsgbuf;
110
111 static void cleanup(void)
112 {
113         dbg(fprintf(stderr,"%s: cleanup()\n",pname));
114         if (dis_cleanup) return;
115         if (other!=-1) kill(other,SIGTERM);
116         shmctl(shmid,IPC_RMID,NULL);
117         if (comm) {
118                 dbg(fprintf(stderr,"%s: msgctl(%d,IPC_RMID), myself=%d\n",pname,comm->p[ myself].msqid, myself));
119                 msgctl(comm->p[ myself].msqid,IPC_RMID,NULL);
120                 dbg(fprintf(stderr,"%s: msgctl(%d,IPC_RMID),!myself=%d\n",pname,comm->p[!myself].msqid,!myself));
121                 msgctl(comm->p[!myself].msqid,IPC_RMID,NULL);
122                 }
123         dis_cleanup=1;
124         exit(EXIT_FAILURE);
125 }
126
127 static void wake(void)
128 {
129         dbg(fprintf(stderr,"%s: waking (sent=%d)...\n",pname,comm->p[myself].sent));
130         if (comm->p[myself].sent) return;
131         dbg(fprintf(stderr,"%s: msgsnd(msqid=%d), myself=%d\n",pname,comm->p[!myself].msqid,myself));
132 /*      dbg(fprintf(stderr,"%s: sleeping 5 sec...\n",pname));
133         sleep(5);
134         dbg(fprintf(stderr,"%s: sleeping 5 sec done\n",pname));*/
135         if (msgsnd(comm->p[!myself].msqid,(struct msgbuf *)&smsgbuf,0,IPC_NOWAIT)) if (errno!=EAGAIN) failf("msgsnd");
136         comm->p[myself].sent++;
137         dbg(fprintf(stderr,"%s: waked (sent=%d)\n",pname,comm->p[myself].sent));
138 }
139
140 static void shake(void)
141 {
142         wake();
143         dbg(fprintf(stderr,"%s: waiting (sent=%d)...\n",pname,comm->p[myself].sent));
144         if (comm->eof) dbg(fprintf(stderr,"%s: EOF => breaking out of shake()!\n",pname));
145         else {
146                 dbg(fprintf(stderr,"%s: msgrcv(msqid=%d),myself=%d\n",pname,comm->p[myself].msqid,myself));
147 /*              dbg(fprintf(stderr,"%s: sleeping 5 sec...\n",pname));
148                 sleep(5);
149                 dbg(fprintf(stderr,"%s: sleeping 5 sec done\n",pname));*/
150                 if (msgrcv(comm->p[myself].msqid,&rmsgbuf,0,0,0)) failf("msgrcv");
151                 }
152         comm->p[!myself].sent--;
153         dbg(if (comm->p[!myself].sent<0) {
154                 fprintf(stderr,"%s: FATAL - .sent=%d (<0)!\n",pname,comm->p[!myself].sent);
155                 exit(EXIT_FAILURE);
156                 });
157         dbg(fprintf(stderr,"%s: wait returned (sent=%d)\n",pname,comm->p[myself].sent));
158 }
159
160 static void warnbuf(void)
161 {
162 long crp,cwp,bufused,cup;
163 float percused;
164 char *tim;
165         if (!comm->up||quiet||comm->eof||time(NULL)-comm->up<WARNTIM) return;
166         comm->up=cup=time(NULL);
167         crp=comm->rp; cwp=comm->wp;
168         bufused=crp-cwp+bufsz*!(crp>cwp);
169         if ((percused=(float)bufused*100/bufsz)>=bufwarn) return;
170         tim=ctime(&cup);
171         *strchr(tim,'\n')='\0';
172         fprintf(stderr,"%s: %s: WARNING - Low buffer fill-up: %8ld of %8ld (%2.1f%%)\n",
173                 pname,tim,bufused,bufsz,percused);
174 }
175
176 static __NORETURN void usage(void)
177 {
178         fprintf(stderr,"\
179 %s\
180 This command offers the pipe buffering:\n\
181 \n\
182 Usage: pipebuf [-b|--buffer <size in KB>] [-p|--prefill] [-w|--warning <percent>]\n\
183                [-q|--quiet] [-v|--verbose] [-h|--help] [-V|--version]\n\
184 \n\
185   -b, --buffer <size in KB>\tSpecify buffer size (1-%dKB, def=%dKB)\n\
186   -p, --prefill\t\t\tFill the buffer before first write\n\
187   -w, --warning <percent>\tNo-buffer-data warnings threshold (0-100%%, def=%d%%)\n\
188   -q, --quiet\t\t\tDon't print warnings\n\
189   -v, --verbose\t\t\tInform about phases of transfer\n\
190   -h, --help\t\t\tPrint a summary of the options\n\
191   -V, --version\t\t\tPrint the version number\n\
192 ",version,(SHMMAX>>10)-1,BUFSZ,BUFWARN);
193         exit(EXIT_FAILURE);
194 }
195
196 const struct option longopts[]={
197 {"buffer" ,1,0,'b'},
198 {"prefill",0,0,'p'},
199 {"warning",1,0,'w'},
200 {"quiet"  ,0,0,'q'},
201 {"verbose",0,0,'v'},
202 {"help"   ,0,0,'h'},
203 {"version",0,0,'V'}};
204
205 int main(int argc,char **argv)
206 {
207 long cfp;
208 int r,optc;
209 caddr_t buf;
210 char *s;
211
212         pname=*argv;
213         atexit(cleanup);
214         signal(SIGTERM,(void (*)(int))cleanup);
215         signal(SIGQUIT,(void (*)(int))cleanup);
216         signal(SIGINT ,(void (*)(int))cleanup);
217         signal(SIGHUP ,(void (*)(int))cleanup);
218         while ((optc=getopt_long(argc,argv,"b:pw:qvhV",longopts,NULL))!=EOF) switch (optc) {
219                 case 'b':
220                         errno=EINVAL;
221                         bufsz=strtol(optarg,&s,0);
222                         if (*s!='\0'||bufsz<1||bufsz<<10>=SHMMAX) { perror(optarg); usage(); }
223                         bufsz<<=10;
224                         break;
225                 case 'p':
226                         prefill=1;
227                         break;
228                 case 'w':
229                         errno=EINVAL;
230                         bufwarn=strtol(optarg,&s,0);
231                         if (*s!='\0'||bufwarn<0||bufwarn>100) { perror(optarg); usage(); }
232                         break;
233                 case 'q':
234                         quiet=1;
235                         verbose=0;
236                         break;
237                 case 'v':
238                         verbose=1;
239                         quiet=0;
240                         break;
241                 case 'V':
242                         fprintf(stderr,version);
243                         exit(EXIT_FAILURE);
244                 default: /* also 'h' */
245                         usage();
246                         break;
247                 }
248         if ((shmid=shmget(IPC_PRIVATE,bufsz+sizeof(*comm),0600|IPC_CREAT|IPC_EXCL))==-1) failf("shmget");
249         if ((int)(buf=shmat(shmid,0,0))==-1) failf("shmat");
250         comm=(void *)buf+bufsz;
251         bzero((void *)comm,sizeof(*comm));
252         if ((comm->p[RDID].msqid=msgget(IPC_PRIVATE,0777|IPC_CREAT|IPC_EXCL))==-1) failf("msgget");
253         if ((comm->p[WRID].msqid=msgget(IPC_PRIVATE,0777|IPC_CREAT|IPC_EXCL))==-1) failf("msgget");
254         if (!prefill) comm->up=time(NULL);
255         other=fork();
256         if (other) {
257         /* Read process */
258                 dbg(fprintf(stderr,"%s: started rd\n",pname));
259                 myself=RDID;
260                 strncat(pname,"-rd",max(PNAME_LEN-strlen(pname)-1,0));
261                 if (close(STDOUT_FILENO)) failf("close");
262                 if (verbose) fprintf(stderr,"%s: Using buffer %ldKB%s...\n",pname,bufsz>>10,(prefill?", filling":""));
263                 dbg(fprintf(stderr,"%s: pname check rd\n",pname));
264                 for (;;) {
265                         if (bufw(comm->rp+1)==comm->wp) {
266                                 if (!comm->up) {
267                                         comm->up=time(NULL);
268                                         if (verbose) fprintf(stderr,"%s: Buffer filled-up, starting transfer...\n",pname);
269                                         }
270                                 shake();
271                                 continue;
272                                 }
273                         warnbuf();
274                         cfp=comm->wp;
275                         dbg(fprintf(stderr,"%s: rp=%ld, wp=%ld",pname,comm->rp,cfp));
276                         cfp=(cfp<=comm->rp?bufsz-comm->rp-!cfp:cfp-1-comm->rp);
277                         dbg(fprintf(stderr,", read(%d,%08lx,%ld)\n",STDIN_FILENO,(long)&buf[comm->rp],min(cfp,MAX_XFER)));
278                         if ((r=read(STDIN_FILENO,&buf[comm->rp],min(cfp,MAX_XFER)))==-1) failf("read");
279                         dbg(fprintf(stderr,"%s: rp=%ld, wp=%ld, read=%d\n",pname,bufw(comm->rp+r),comm->wp,r));
280                         if (r) comm->rp=bufw(comm->rp+r);
281                         else {
282                                 comm->eof=1;
283                                 if (!comm->up) {
284                                         comm->up=time(NULL);
285                                         if (verbose) fprintf(stderr,"%s: Reached EOF before buffer fill-up, starting transfer...\n",pname);
286                                         }
287                                 wake();
288                                 break;
289                                 }
290                         if (comm->up) wake();
291                         }
292                 if (verbose) fprintf(stderr,"%s: All input data read, waiting for write completion...\n",pname);
293                 if (waitpid(other,NULL,0)!=other) failf("waitpid");
294                 }
295         else {
296         /* Write process */
297                 dbg(fprintf(stderr,"%s: started wr\n",pname));
298                 myself=WRID;
299                 strncat(pname,"-wr",max(PNAME_LEN-strlen(pname)-1,0));
300                 other=getppid();
301                 if (close(STDIN_FILENO)) failf("close");
302                 dbg(fprintf(stderr,"%s: pname check wr\n",pname));
303                 for (;;) {
304                         if (comm->eof&&comm->rp==comm->wp) break;
305                         while (!comm->eof&&(!comm->up||comm->rp==comm->wp)) shake();
306                         if (comm->eof&&comm->rp==comm->wp) break;
307                         cfp=comm->rp;
308                         dbg(fprintf(stderr,"%s: rp=%ld, wp=%ld",pname,cfp,comm->wp));
309                         cfp=(cfp<=comm->wp?bufsz-comm->wp:cfp-comm->wp);
310                         dbg(fprintf(stderr,", write(%d,%08lx,%ld)\n",STDOUT_FILENO,(long)&buf[comm->wp],min(cfp,MAX_XFER)));
311                         if ((r=write(STDOUT_FILENO,&buf[comm->wp],min(cfp,MAX_XFER)))==-1) failf("write");
312                         dbg(fprintf(stderr,"%s: rp=%ld, wp=%ld, write=%d\n",pname,comm->rp,bufw(comm->wp+r),r));
313                         if (!(comm->wp=bufw(comm->wp+r))&&comm->rp) continue;
314                         wake();
315                         warnbuf();
316                         }
317                 }
318         if (verbose) fprintf(stderr,"%s: Ending operation (sent=%d).\n",pname,comm->p[myself].sent);
319         return(EXIT_SUCCESS);
320 }