3 # To compile run: sh pipebuf.c
4 # Help is then available: ./pipebuf -h
6 # Copyright (C) 1998 Jan Kratochvil <short@ucw.cz>
8 # This program is free software; you can redistribute it and/or modify
9 # it under the terms of the GNU General Public License as published by
10 # the Free Software Foundation; you must use exactly version 2.
12 # This program is distributed in the hope that it will be useful,
13 # but WITHOUT ANY WARRANTY; without even the implied warranty of
14 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
15 # GNU General Public License for more details.
17 # You may download a copy of the GNU General Public License from URL
18 # http://www.opensource.org/gpl-license.html
19 # If not, write to the Free Software Foundation,
20 # Inc., 59 Temple Place - Suite 330, Boston, MA 02111-1307, USA.
22 echo "Compiling PipeBuf..."
24 b="-s -Wall -O6 -fexpensive-optimizations -fomit-frame-pointer -D_GNU_SOURCE=1"
25 c="-o `basename "$0" .c` $0"
27 if $a $b $c;then echo -n
30 else echo "Failed - please check the output.";exit
40 #include <sys/types.h>
55 #define BUFSZ (12<<10) /* in KB */
56 #define BUFWARN (80) /* in percents */
57 #define WARNTIM (5) /* in seconds */
58 #define MAX_XFER (PIPE_BUF)
62 #define SHMMAX 0x2000000
67 #define __NORETURN __attribute__((__noreturn__))
81 #define PNAME_LEN (16)
82 #define bufw(n) ((n)==bufsz?0:(n))
83 #define failf(name) do { fprintf(stderr,"%s: ",pname); perror(name"()"); exit(EXIT_FAILURE); } while (0)
85 #define max(a,b) ((a)>=(b)?(a):(b))
88 #define min(a,b) ((a)<=(b)?(a):(b))
91 const char version[]="This is PipeBuf, version 1.2\n";
93 long bufsz=BUFSZ<<10,bufwarn=BUFWARN;
94 int prefill,quiet,verbose;
97 int shmid=-1,dis_cleanup=0,myself;
108 struct msgbuf smsgbuf={1,{0}};
109 struct msgbuf rmsgbuf;
111 static void cleanup(void)
113 dbg(fprintf(stderr,"%s: cleanup()\n",pname));
114 if (dis_cleanup) return;
115 if (other!=-1) kill(other,SIGTERM);
116 shmctl(shmid,IPC_RMID,NULL);
118 dbg(fprintf(stderr,"%s: msgctl(%d,IPC_RMID), myself=%d\n",pname,comm->p[ myself].msqid, myself));
119 msgctl(comm->p[ myself].msqid,IPC_RMID,NULL);
120 dbg(fprintf(stderr,"%s: msgctl(%d,IPC_RMID),!myself=%d\n",pname,comm->p[!myself].msqid,!myself));
121 msgctl(comm->p[!myself].msqid,IPC_RMID,NULL);
127 static void wake(void)
129 dbg(fprintf(stderr,"%s: waking (sent=%d)...\n",pname,comm->p[myself].sent));
130 if (comm->p[myself].sent) return;
131 dbg(fprintf(stderr,"%s: msgsnd(msqid=%d), myself=%d\n",pname,comm->p[!myself].msqid,myself));
132 /* dbg(fprintf(stderr,"%s: sleeping 5 sec...\n",pname));
134 dbg(fprintf(stderr,"%s: sleeping 5 sec done\n",pname));*/
135 if (msgsnd(comm->p[!myself].msqid,(struct msgbuf *)&smsgbuf,0,IPC_NOWAIT)) if (errno!=EAGAIN) failf("msgsnd");
136 comm->p[myself].sent++;
137 dbg(fprintf(stderr,"%s: waked (sent=%d)\n",pname,comm->p[myself].sent));
140 static void shake(void)
143 dbg(fprintf(stderr,"%s: waiting (sent=%d)...\n",pname,comm->p[myself].sent));
144 if (comm->eof) dbg(fprintf(stderr,"%s: EOF => breaking out of shake()!\n",pname));
146 dbg(fprintf(stderr,"%s: msgrcv(msqid=%d),myself=%d\n",pname,comm->p[myself].msqid,myself));
147 /* dbg(fprintf(stderr,"%s: sleeping 5 sec...\n",pname));
149 dbg(fprintf(stderr,"%s: sleeping 5 sec done\n",pname));*/
150 if (msgrcv(comm->p[myself].msqid,&rmsgbuf,0,0,0)) failf("msgrcv");
152 comm->p[!myself].sent--;
153 dbg(if (comm->p[!myself].sent<0) {
154 fprintf(stderr,"%s: FATAL - .sent=%d (<0)!\n",pname,comm->p[!myself].sent);
157 dbg(fprintf(stderr,"%s: wait returned (sent=%d)\n",pname,comm->p[myself].sent));
160 static void warnbuf(void)
162 long crp,cwp,bufused,cup;
165 if (!comm->up||quiet||comm->eof||time(NULL)-comm->up<WARNTIM) return;
166 comm->up=cup=time(NULL);
167 crp=comm->rp; cwp=comm->wp;
168 bufused=crp-cwp+bufsz*!(crp>cwp);
169 if ((percused=(float)bufused*100/bufsz)>=bufwarn) return;
171 *strchr(tim,'\n')='\0';
172 fprintf(stderr,"%s: %s: WARNING - Low buffer fill-up: %8ld of %8ld (%2.1f%%)\n",
173 pname,tim,bufused,bufsz,percused);
176 static __NORETURN void usage(void)
180 This command offers the pipe buffering:\n\
182 Usage: pipebuf [-b|--buffer <size in KB>] [-p|--prefill] [-w|--warning <percent>]\n\
183 [-q|--quiet] [-v|--verbose] [-h|--help] [-V|--version]\n\
185 -b, --buffer <size in KB>\tSpecify buffer size (1-%dKB, def=%dKB)\n\
186 -p, --prefill\t\t\tFill the buffer before first write\n\
187 -w, --warning <percent>\tNo-buffer-data warnings threshold (0-100%%, def=%d%%)\n\
188 -q, --quiet\t\t\tDon't print warnings\n\
189 -v, --verbose\t\t\tInform about phases of transfer\n\
190 -h, --help\t\t\tPrint a summary of the options\n\
191 -V, --version\t\t\tPrint the version number\n\
192 ",version,(SHMMAX>>10)-1,BUFSZ,BUFWARN);
196 const struct option longopts[]={
203 {"version",0,0,'V'}};
205 int main(int argc,char **argv)
214 signal(SIGTERM,(void (*)(int))cleanup);
215 signal(SIGQUIT,(void (*)(int))cleanup);
216 signal(SIGINT ,(void (*)(int))cleanup);
217 signal(SIGHUP ,(void (*)(int))cleanup);
218 while ((optc=getopt_long(argc,argv,"b:pw:qvhV",longopts,NULL))!=EOF) switch (optc) {
221 bufsz=strtol(optarg,&s,0);
222 if (*s!='\0'||bufsz<1||bufsz<<10>=SHMMAX) { perror(optarg); usage(); }
230 bufwarn=strtol(optarg,&s,0);
231 if (*s!='\0'||bufwarn<0||bufwarn>100) { perror(optarg); usage(); }
242 fprintf(stderr,version);
244 default: /* also 'h' */
248 if ((shmid=shmget(IPC_PRIVATE,bufsz+sizeof(*comm),0600|IPC_CREAT|IPC_EXCL))==-1) failf("shmget");
249 if ((int)(buf=shmat(shmid,0,0))==-1) failf("shmat");
250 comm=(void *)buf+bufsz;
251 bzero((void *)comm,sizeof(*comm));
252 if ((comm->p[RDID].msqid=msgget(IPC_PRIVATE,0777|IPC_CREAT|IPC_EXCL))==-1) failf("msgget");
253 if ((comm->p[WRID].msqid=msgget(IPC_PRIVATE,0777|IPC_CREAT|IPC_EXCL))==-1) failf("msgget");
254 if (!prefill) comm->up=time(NULL);
258 dbg(fprintf(stderr,"%s: started rd\n",pname));
260 strncat(pname,"-rd",max(PNAME_LEN-strlen(pname)-1,0));
261 if (close(STDOUT_FILENO)) failf("close");
262 if (verbose) fprintf(stderr,"%s: Using buffer %ldKB%s...\n",pname,bufsz>>10,(prefill?", filling":""));
263 dbg(fprintf(stderr,"%s: pname check rd\n",pname));
265 if (bufw(comm->rp+1)==comm->wp) {
268 if (verbose) fprintf(stderr,"%s: Buffer filled-up, starting transfer...\n",pname);
275 dbg(fprintf(stderr,"%s: rp=%ld, wp=%ld",pname,comm->rp,cfp));
276 cfp=(cfp<=comm->rp?bufsz-comm->rp-!cfp:cfp-1-comm->rp);
277 dbg(fprintf(stderr,", read(%d,%08lx,%ld)\n",STDIN_FILENO,(long)&buf[comm->rp],min(cfp,MAX_XFER)));
278 if ((r=read(STDIN_FILENO,&buf[comm->rp],min(cfp,MAX_XFER)))==-1) failf("read");
279 dbg(fprintf(stderr,"%s: rp=%ld, wp=%ld, read=%d\n",pname,bufw(comm->rp+r),comm->wp,r));
280 if (r) comm->rp=bufw(comm->rp+r);
285 if (verbose) fprintf(stderr,"%s: Reached EOF before buffer fill-up, starting transfer...\n",pname);
290 if (comm->up) wake();
292 if (verbose) fprintf(stderr,"%s: All input data read, waiting for write completion...\n",pname);
293 if (waitpid(other,NULL,0)!=other) failf("waitpid");
297 dbg(fprintf(stderr,"%s: started wr\n",pname));
299 strncat(pname,"-wr",max(PNAME_LEN-strlen(pname)-1,0));
301 if (close(STDIN_FILENO)) failf("close");
302 dbg(fprintf(stderr,"%s: pname check wr\n",pname));
304 if (comm->eof&&comm->rp==comm->wp) break;
305 while (!comm->eof&&(!comm->up||comm->rp==comm->wp)) shake();
306 if (comm->eof&&comm->rp==comm->wp) break;
308 dbg(fprintf(stderr,"%s: rp=%ld, wp=%ld",pname,cfp,comm->wp));
309 cfp=(cfp<=comm->wp?bufsz-comm->wp:cfp-comm->wp);
310 dbg(fprintf(stderr,", write(%d,%08lx,%ld)\n",STDOUT_FILENO,(long)&buf[comm->wp],min(cfp,MAX_XFER)));
311 if ((r=write(STDOUT_FILENO,&buf[comm->wp],min(cfp,MAX_XFER)))==-1) failf("write");
312 dbg(fprintf(stderr,"%s: rp=%ld, wp=%ld, write=%d\n",pname,comm->rp,bufw(comm->wp+r),r));
313 if (!(comm->wp=bufw(comm->wp+r))&&comm->rp) continue;
318 if (verbose) fprintf(stderr,"%s: Ending operation (sent=%d).\n",pname,comm->p[myself].sent);
319 return(EXIT_SUCCESS);