src/: +streamfer*
authorJan Kratochvil <jan@jankratochvil.net>
Mon, 23 Oct 2023 13:12:31 +0000 (21:12 +0800)
committerJan Kratochvil <jan@jankratochvil.net>
Mon, 23 Oct 2023 13:12:31 +0000 (21:12 +0800)
src/Makefile
src/safeio.C [new file with mode: 0644]
src/safeio.h [new file with mode: 0644]
src/socket.C [new file with mode: 0644]
src/socket.h [new file with mode: 0644]
src/streamfer-client.C [new file with mode: 0644]
src/streamfer-server.C [new file with mode: 0644]
src/streamfer.C [new file with mode: 0644]
src/streamfer.h [new file with mode: 0644]
src/stringf.h [new file with mode: 0644]

index d23df8b..5625e19 100644 (file)
@@ -4,8 +4,8 @@ CFLAGS+=-Wall -Wstrict-prototypes -ggdb3
 #CFLAGS+=-O9 -fexpensive-optimizations -finline-functions
 #CFLAGS+=-pg
 
-all: yasm2gas
-# unmime flock pipebuf
+all: streamfer-server streamfer-client
+# yasm2gas unmime flock pipebuf
 
 pipebuf: pipebuf.c
        sh $<
@@ -13,3 +13,8 @@ pipebuf: pipebuf.c
 yasm2gas: yasm2gas.c
        gcc -o $@ -Wall -g $<
 
+streamfer-server: streamfer-server.C streamfer.h streamfer.C safeio.C safeio.h socket.C socket.h stringf.h
+       clang++ -o $@ -Wall -g streamfer-server.C streamfer.C safeio.C socket.C
+
+streamfer-client: streamfer-client.C streamfer.h streamfer.C safeio.C safeio.h socket.C socket.h stringf.h
+       clang++ -o $@ -Wall -g streamfer-client.C streamfer.C safeio.C socket.C
diff --git a/src/safeio.C b/src/safeio.C
new file mode 100644 (file)
index 0000000..fb76743
--- /dev/null
@@ -0,0 +1,87 @@
+#include "safeio.h"
+#include <sys/uio.h>
+
+const SafeIOError exception_SafeIOError;
+const SafeIOEOF   exception_SafeIOEOF  ;
+
+void read_safe(int fd,void *buf,size_t size) {
+  assert(size>0);
+  while (size>0) {
+    const ssize_t got(read(fd,buf,size));
+    assert(got>=-1);
+    if (got<=0) {
+      warning("read_safe: fd=%d size=%zu: %s",fd,size,(got==0?"EOF":strerror(errno)));
+      // Do not throw ?exc1:exc2 as it would get upcasted.
+      if (got==0||(got==-1&&errno==ECONNRESET))
+       throw exception_SafeIOEOF;
+      throw exception_SafeIOError;
+    }
+    assert(size_t(got)<=size);
+    buf=static_cast<uint8_t *>(buf)+got;
+    size-=got;
+  }
+}
+
+void read_safe(FILE *f,void *buf,size_t size) {
+  assert(size>0);
+  assert(!ferror(f));
+  assert(!feof(f));
+  const size_t got(fread(buf,1/*size*/,size/*nmemb*/,f));
+  if (ferror(f)||feof(f)) {
+    warning("read_safe: fd=%d size=%zu: %s",fileno(f),size,(ferror(f)?"error":"EOF"));
+    // Do not throw ?exc1:exc2 as it would get upcasted.
+    if (ferror(f))
+      throw exception_SafeIOError;
+    throw exception_SafeIOEOF;
+  }
+  // return [...] which is less than nitems only if a read error or end-of-file is encountered.
+  if (size_t(got)!=size) {
+    warning("read_safe: fd=%d size=%zu got=%zu",fileno(f),size,got);
+    throw exception_SafeIOError;
+  }
+}
+
+void write_safe(int fd,const void *buf,size_t size) {
+  assert(size>0);
+  const ssize_t got(write(fd,buf,size));
+  if (got==-1) {
+    warning("Error writing %zu to fd %d: %m",size,fd);
+    throw exception_SafeIOError;
+  }
+  assert(got>0);
+  if (size_t(got)!=size) {
+    warning("Wrote only %zd of %zu to fd %d",got,size,fd);
+    throw exception_SafeIOEOF;
+  }
+}
+
+void write_safe(FILE *f,const void *buf,size_t size) {
+  assert(size>0);
+  assert(!ferror(f));
+  const size_t got(fwrite(buf,1/*size*/,size/*nmemb*/,f));
+  if (ferror(f)) {
+    warning("Error writing %zu to FILE * of fd %d",size,fileno(f));
+    throw exception_SafeIOError;
+  }
+  if (got!=size) {
+    warning("Wrote only %zu of %zu to FILE * of fd %d",got,size,fileno(f));
+    throw exception_SafeIOEOF;
+  }
+}
+
+void writev_safe_(int fd,const struct iovec *iov, int iovcnt) {
+  size_t size=0;
+  for (int ix=0;ix<iovcnt;++ix)
+    size+=iov[ix].iov_len;
+  assert(size>0);
+  const ssize_t got(writev(fd,iov,iovcnt));
+  if (got==-1) {
+    warning("Error writing %zu iovec to fd %d: %m",size,fd);
+    throw exception_SafeIOError;
+  }
+  assert(got>0);
+  if (size_t(got)!=size) {
+    warning("Wrote only %zd of %zu iovec to fd %d",got,size,fd);
+    throw exception_SafeIOEOF;
+  }
+}
diff --git a/src/safeio.h b/src/safeio.h
new file mode 100644 (file)
index 0000000..84a6706
--- /dev/null
@@ -0,0 +1,89 @@
+#ifndef LIB_SAFEIO_H
+#define LIB_SAFEIO_H
+
+#include "streamfer.h"
+#include <sys/uio.h>
+#include <stdio.h>
+#include <unistd.h>
+#include <exception>
+#include <vector>
+#include <string>
+#include <array>
+#include <deque>
+#include <cassert>
+#include <cstdint>
+#include <cstring>
+
+extern const class SafeIOError:public exception {
+  virtual const char *what() const noexcept override { return "SafeIOError"; }
+} exception_SafeIOError;
+extern const class SafeIOEOF:public SafeIOError {
+  virtual const char *what() const noexcept override { return "SafeIOEOF"; }
+} exception_SafeIOEOF;
+
+void read_safe(int fd ,void *buf,size_t len);
+void read_safe(FILE *f,void *buf,size_t len);
+
+template<class F,class T> void read_safe(F f,       T  &obj) { read_safe(f,&obj,sizeof(obj)); }
+
+template<class F,class T> void read_safe(F f,vector<T> &vec) {
+  size_t size;
+  read_safe(f,size);
+  vec.resize(size);
+  if (size)
+    read_safe(f,vec.data(),vec.size()*sizeof(vec[0]));
+}
+
+template<class F,class T> void read_safe(F f,deque<T> &vec) {
+  size_t size;
+  read_safe(f,size);
+  vec.resize(size);
+  for (auto &elem:vec)
+    read_safe(f,elem);
+}
+
+template<class F> void read_safe(F f,string &str) {
+  uint8_t len8=0; // false GCC warning
+  read_safe(f,len8);
+  str.resize(len8);
+  if (len8)
+    read_safe(f,&str[0],str.length());
+}
+
+template<class F> string read_safe_string(F f) { string str; read_safe(f,str); return str; }
+
+void write_safe(int fd ,const void *buf,size_t count);
+void write_safe(FILE *f,const void *buf,size_t count);
+void writev_safe_(int fd,const struct iovec *iov, int iovcnt);
+constexpr iovec writev_iovec(const void *base,size_t len) { return iovec{const_cast<void *>(base),len}; }
+static inline const/*FIXME:constexpr c_str()*/ iovec iovec_for_string(const string &str) { return iovec{const_cast<char *>(str.c_str()),str.length()}; }
+template<class T> constexpr iovec iovec_for_object(T &object) { return iovec{reinterpret_cast<void *>(const_cast<typename std::remove_const<T>::type *>(&object)),sizeof(object)}; }
+template<size_t iovcnt> void writev_safe(int fd,std::array<iovec,iovcnt> iov) {
+  writev_safe_(fd,iov.data(),iovcnt);
+}
+
+template<class F,class T> void write_safe(F f,const        T  &obj) { write_safe(f,&obj,sizeof(obj)); }
+
+template<class F,class T> void write_safe(F f,const vector<T> &vec) {
+  const size_t size(vec.size());
+  write_safe(f,size);
+  if (size)
+    write_safe(f,vec.data(),size*sizeof(vec[0]));
+}
+
+template<class F,class T> void write_safe(F f,const deque<T> &vec) {
+  const size_t size(vec.size());
+  write_safe(f,size);
+  for (const auto &elem:vec)
+    write_safe(f,elem);
+}
+
+template<class F> void write_safe(F f,const string &str) {
+  const uint8_t len8(str.length());
+  assert(len8==str.length());
+  write_safe(f,len8);
+  if (!str.empty())
+    write_safe(f,str.c_str(),len8);
+}
+
+#endif /* LIB_SAFEIO_H */
diff --git a/src/socket.C b/src/socket.C
new file mode 100644 (file)
index 0000000..2e0e4b8
--- /dev/null
@@ -0,0 +1,120 @@
+#include "socket.h"
+#include "stringf.h"
+#include <netdb.h>
+#include <netinet/ip.h>
+#include <netinet/tcp.h>
+
+int socket_bind(string host_port_str) {
+  const char *cs(strrchr(host_port_str.c_str(),':'));
+  const string node(!cs?"":host_port_str.substr(0,cs-host_port_str.c_str()));
+  const string service(!cs?host_port_str:cs+1);
+  struct addrinfo hints={}; // designated initializer: error: missing initializer for member ... [-Werror=missing-field-initializers]
+  hints.ai_family=AF_UNSPEC;    /* Allow IPv4 or IPv6 */
+  hints.ai_socktype=SOCK_STREAM;
+  hints.ai_flags=AI_PASSIVE|AI_ADDRCONFIG;    /* For wildcard IP address */
+  struct addrinfo *result;
+  int err(getaddrinfo(node.empty()?NULL:node.c_str(),service.c_str(),&hints,&result));
+  if (err)
+    fatal("<%s>:<%s>: %s",node.c_str(),service.c_str(),gai_strerror(err));
+  int fd=-1;
+  struct addrinfo *rp;
+  for (rp=result;rp;rp=rp->ai_next) {
+    fd=socket(rp->ai_family,rp->ai_socktype,rp->ai_protocol);
+    if (fd==-1)
+      continue;
+    static const int int1(1);
+    err=setsockopt(fd,SOL_SOCKET,SO_REUSEADDR,&int1,sizeof(int1));
+    assert(!err);
+    if (bind(fd,rp->ai_addr,rp->ai_addrlen)==0)
+      break;
+    err=close(fd);
+    assert(!err);
+  }
+  if (rp==NULL)
+    fatal("Cannot bind(): <%s>:<%s>: %m",node.c_str(),service.c_str());
+  assert(fd!=-1);
+  freeaddrinfo(result);
+  err=listen(fd,SOMAXCONN);
+  assert(!err);
+  return fd;
+}
+
+string sockaddr_string(const struct sockaddr *sockaddrp,socklen_t socklen) {
+  char hostname[NI_MAXHOST];
+  char servname[NI_MAXSERV];
+  const int err=getnameinfo(sockaddrp,socklen,hostname,sizeof(hostname),servname,sizeof(servname),NI_NUMERICSERV/*flags*/);
+  assert(!err);
+  return stringf("%s:%s",hostname,servname);
+}
+
+string socket_name(int socket_fd) {
+  struct sockaddr sockaddr;
+  socklen_t socklen(sizeof(sockaddr));
+  const int err(getsockname(socket_fd,&sockaddr,&socklen));
+  assert(!err);
+  return sockaddr_string(&sockaddr,socklen);
+}
+
+static void socket_setopt(int fd) {
+  static const int int1(1);
+  int err;
+  err=setsockopt(fd,SOL_SOCKET,SO_KEEPALIVE,&int1,sizeof(int1));
+  assert(!err);
+  static const uint8_t tos(IPTOS_LOWDELAY);
+  err=setsockopt(fd,IPPROTO_IP,IP_TOS,&tos,sizeof(tos));
+  assert(!err);
+  err=setsockopt(fd,IPPROTO_IP,TCP_NODELAY,&int1,sizeof(int1));
+  assert(!err);
+}
+
+int socket_accept(int listen_fd,function<void(int client_fd,string addr)> msgfunc) {
+  struct sockaddr sockaddr;
+  socklen_t socklen(sizeof(sockaddr));
+  const int client_fd(accept(listen_fd,&sockaddr,&socklen));
+  assert(client_fd>=0);
+  msgfunc(client_fd,sockaddr_string(&sockaddr,socklen));
+  socket_setopt(client_fd);
+  return client_fd;
+}
+
+int socket_connect(const string &host_port_str,unsigned retries) {
+  const char *cs(strrchr(host_port_str.c_str(),':'));
+  if (!cs)
+    fatal("Error parsing <host>:<port>: %s",host_port_str.c_str());
+  const string node(host_port_str.substr(0,cs-host_port_str.c_str()));
+  const string service(cs+1);
+  struct addrinfo hints={}; // designated initializer: error: missing initializer for member ... [-Werror=missing-field-initializers]
+  hints.ai_family=AF_UNSPEC;    /* Allow IPv4 or IPv6 */
+  hints.ai_socktype=SOCK_STREAM;
+  struct addrinfo *result;
+  int err=getaddrinfo(node.c_str(),service.c_str(),&hints,&result);
+  if (err)
+    fatal("Error parsing node+service: <%s>:<%s>: %s",node.c_str(),service.c_str(),gai_strerror(err));
+  int fd;
+  struct addrinfo *rp;
+  for (unsigned retryno=0;retryno<1+retries;++retryno) {
+    if (retryno) {
+      warning("Sleeping 1 second for connect retry #%u/%u",retryno,retries);
+      err=sleep(1);
+      assert(!err);
+    }
+    for (rp=result;rp;rp=rp->ai_next) {
+      fd=socket(rp->ai_family,rp->ai_socktype,rp->ai_protocol);
+      if (fd==-1)
+       continue;
+      if (connect(fd,rp->ai_addr,rp->ai_addrlen)==0)
+       break;
+      err=close(fd);
+      assert(!err);
+    }
+    if (rp==NULL)
+      warning("Could not connect(): <%s>:<%s>: %m",node.c_str(),service.c_str());
+    else
+      break;
+  }
+  freeaddrinfo(result);
+  if (rp==NULL)
+    fatal("Could not connect(), giving up");
+  socket_setopt(fd);
+  return fd;
+}
diff --git a/src/socket.h b/src/socket.h
new file mode 100644 (file)
index 0000000..19570ef
--- /dev/null
@@ -0,0 +1,17 @@
+#ifndef LIB_SOCKET_H
+#define LIB_SOCKET_H
+
+#include "streamfer.h"
+#include <unistd.h>
+#include <string>
+#include <functional>
+#include <cstring>
+#include <sys/socket.h>
+
+int socket_bind(string host_port_str);
+string sockaddr_string(const struct sockaddr *sockaddrp,socklen_t socklen);
+string socket_name(int socket_fd);
+int socket_accept(int listen_fd,function<void(int client_fd,string addr)> msgfunc);
+int socket_connect(const string &host_port_str,unsigned retries);
+
+#endif /* LIB_SOCKET_H */
diff --git a/src/streamfer-client.C b/src/streamfer-client.C
new file mode 100644 (file)
index 0000000..e998ffd
--- /dev/null
@@ -0,0 +1,107 @@
+#include "safeio.h"
+#include "socket.h"
+#include "stringf.h"
+#include <fcntl.h>
+#include <sys/stat.h>
+#include <climits>
+
+static string get_string(FILE *f,const char *fn,const char *what) {
+  char buf[PATH_MAX];
+  char *got=fgets(buf,sizeof(buf),f);
+  if (got!=buf)
+    fatal("Error reading %s from %s: %m",what,fn);
+  char *s(strchr(buf,'\n'));
+  if (!s)
+    fatal("Stored %s in %s is not newline-terminated",what,fn);
+  *s=0;
+  assert(!s[1]);
+  return buf;
+}
+
+// https://stackoverflow.com/a/8615450/2995591
+#include <glob.h> // glob(), globfree()
+#include <cstring> // memset()
+#include <vector>
+#include <string>
+
+int main(int argc,char **argv) {
+  if (argc!=1+3)
+    fatal("streamfer-client <host>:<port> <pattern> <last storage file>");
+  int server_fd(socket_connect(string(argv[1]),0/*retries*/));
+  string pattern(argv[2]);
+  write_safe(server_fd,pattern);
+  string last_found;
+  const char *last_stored_fn(argv[3]);
+  FILE *last_stored_f(fopen(last_stored_fn,"r"));
+  uint64_t offset(0);
+  if (!last_stored_f) {
+    if (errno!=ENOENT)
+      fatal("Error opening filename from %s: %m",last_stored_fn);
+  } else {
+    last_found=      get_string(last_stored_f,last_stored_fn,"filename") ;
+    string offsetstr(get_string(last_stored_f,last_stored_fn,"offset"  ));
+    char *end;
+    errno=0;
+    unsigned long ul(strtoul(offsetstr.c_str(),&end,0));
+    if (errno||(end&&*end))
+      fatal("Error converting offset from %s: %s",last_stored_fn,offsetstr.c_str());
+    offset=ul;
+    int gotint(fgetc(last_stored_f));
+    if (gotint!=EOF)
+      fatal("Stored filename in %s has excessive data after filename %s and offset %s",last_stored_fn,last_found.c_str(),offsetstr.c_str());
+    int err(fclose(last_stored_f));
+    assert(!err);
+  }
+  write_safe(server_fd,last_found);
+  write_safe(server_fd,offset);
+  string last_got;
+  read_safe(server_fd,last_got);
+  if (last_got.empty()) {
+    warning("No more files to transfer");
+    exit(EXIT_SUCCESS);
+  }
+  struct timespec mtim;
+  read_safe(server_fd,mtim);
+  int file_fd;
+  string file_name;
+  size_t slash(last_got.find_last_of('/'));
+  if (slash!=string::npos)
+    file_name=last_got.substr(slash+1);
+  else
+    file_name=last_got;
+  if (last_found==last_got) {
+    file_fd=open(file_name.c_str(),O_WRONLY);
+    if (file_fd==-1)
+      fatal("Error opening for write %s: %m",file_name.c_str());
+    off_t got(lseek(file_fd,offset,SEEK_SET));
+    if ((uint64_t)got!=offset)
+      fatal("Error seeking in %s to %zu, got: %zu",file_name.c_str(),(size_t)offset,(size_t)got);
+  } else {
+    offset=0;
+    file_fd=open(file_name.c_str(),O_WRONLY|O_CREAT,0644);
+    if (file_fd==-1)
+      fatal("Error creating %s: %m",file_name.c_str());
+  }
+  uint64_t transferred(transfer(server_fd,"server fd",file_fd,file_name.c_str()));
+  if (!transferred)
+    return EXIT_FAILURE;
+  offset+=transferred;
+  struct timespec mtim2[2];
+  mtim2[0]=mtim; // atime
+  mtim2[1]=mtim; // mtime
+  if (futimens(file_fd,mtim2))
+    fatal("Error setting timestamp of %s: %m",file_name.c_str());
+  if (close(file_fd))
+    fatal("Error closing %s: %m",file_name.c_str());
+  int last_stored_fd(open(last_stored_fn,O_WRONLY|O_CREAT|O_TRUNC,0644));
+  if (last_stored_fd==-1)
+    fatal("Error storing filename to %s: %m",last_stored_fn);
+  write_safe(last_stored_fd,last_got.c_str(),last_got.length());
+  write_safe(last_stored_fd,'\n');
+  string offsetstr(stringf("%zu",(size_t)offset));
+  write_safe(last_stored_fd,offsetstr.c_str(),offsetstr.length());
+  write_safe(last_stored_fd,'\n');
+  if (close(last_stored_fd))
+    fatal("Error closing %s: %m",last_stored_fn);
+  return EXIT_SUCCESS;
+}
diff --git a/src/streamfer-server.C b/src/streamfer-server.C
new file mode 100644 (file)
index 0000000..dedb9f3
--- /dev/null
@@ -0,0 +1,207 @@
+#include "safeio.h"
+#include "socket.h"
+#include "stringf.h"
+#include <fcntl.h>
+#include <sys/stat.h>
+#include <dirent.h>
+#include <poll.h>
+#include <climits>
+#include <cstdlib>
+#include <csignal>
+
+// https://stackoverflow.com/a/8615450/2995591
+#include <glob.h> // glob(),globfree()
+#include <cstring> // memset()
+#include <vector>
+#include <string>
+
+static std::vector<std::string> cxxglob(const std::string pattern) {
+  glob_t glob_result;
+  memset(&glob_result,0,sizeof(glob_result));
+  int return_value=glob(pattern.c_str(),GLOB_TILDE,NULL,&glob_result);
+  if (return_value)
+    fatal("glob() failed with return_value %s",return_value);
+  vector<string> filenames;
+  filenames.reserve(glob_result.gl_pathc);
+  for (size_t i = 0; i < glob_result.gl_pathc; ++i)
+     filenames.push_back(string(glob_result.gl_pathv[i]));
+  globfree(&glob_result);
+  return filenames;
+}
+
+// FIXME: Use C++17
+static bool fd_is_open(const char *execname,const char *fn) {
+  const char slashproc[]("/proc");
+  DIR *dir(opendir(slashproc));
+  if (!dir)
+    fatal("Cannot opendir %s: %m",slashproc);
+  bool retval(false);
+  for (;;) {
+    errno=0;
+    const struct dirent *de=readdir(dir);
+    if (!de) {
+      if (errno)
+       fatal("Cannot readdir %s: %m",slashproc);
+      break;
+    }
+    if (!isdigit(de->d_name[0]))
+      continue;
+
+    char buf[PATH_MAX];
+    ssize_t got(readlinkat(dirfd(dir),stringf("%s/exe",de->d_name).c_str(),buf,sizeof(buf)));
+    if (got==-1||got==sizeof(buf))
+      continue;
+    buf[got]=0;
+    char *s=strrchr(buf,'/');
+    if (!s)
+      continue;
+    if (strcmp(s+1,execname)!=0)
+      continue;
+
+    string procpidfd(stringf("/proc/%s/fd",de->d_name));
+    DIR *fddir(opendir(procpidfd.c_str()));
+    if (!fddir)
+      fatal("Cannot opendir %s: %m",procpidfd.c_str());
+    for (;;) {
+      errno=0;
+      const struct dirent *de=readdir(fddir);
+      if (!de) {
+       if (errno)
+         fatal("Cannot readdir %s: %m",procpidfd.c_str());
+       break;
+      }
+      if (!isdigit(de->d_name[0]))
+       continue;
+      char buf[PATH_MAX];
+      ssize_t got(readlinkat(dirfd(fddir),de->d_name,buf,sizeof(buf)));
+      if (got==-1||got==sizeof(buf))
+       continue;
+      buf[got]=0;
+      if (strcmp(buf,fn)==0) {
+       retval=true;
+       break;
+      }
+    }
+    if (closedir(fddir))
+      fatal("Cannot closedir %s: %m",procpidfd.c_str());
+    if (retval)
+      break;
+  }
+  if (closedir(dir))
+    fatal("Cannot closedir %s: %m",slashproc);
+  return retval;
+}
+
+int main(int argc,char **argv) {
+  static struct sigaction sigchld;
+  sigchld.sa_handler=SIG_DFL;
+  sigchld.sa_flags=SA_NOCLDWAIT;
+  int err(sigaction(SIGCHLD,&sigchld,nullptr));
+  assert(!err);
+
+  if (argc!=1+2&&argc!=1+3)
+    fatal("streamfer-server [<listen-host>:]<listen-port> <prefix> [follow-fd-of-executable-basename]");
+  string prefix;
+  if (argc>=1+2&&*argv[2])
+    prefix=argv[2];
+  const char *execname(nullptr);
+  if (argc>=1+3)
+    execname=argv[3];
+  int listen_fd(socket_bind(argv[1]));
+  int client_fd;
+  for (;;) {
+    client_fd=socket_accept(listen_fd,[&](int client_fd,string addr) {
+      warning("%d:%s",client_fd,addr.c_str());
+    });
+    int child(fork());
+    assert(child!=-1);
+    if (!child)
+      break;
+    int err(close(client_fd));
+    assert(!err);
+  }
+  err=close(listen_fd);
+  assert(!err);
+
+  string pattern(read_safe_string(client_fd));
+  std::vector<std::string> matched(cxxglob(pattern));
+  for (size_t ix=0;ix<matched.size()-1;++ix) {
+    const std::string &a(matched[ix  ]);
+    const std::string &b(matched[ix+1]);
+    int err(strcmp(a.c_str(),b.c_str()));
+    if (err>=0)
+      fatal("glob: strcmp(\"%s\",\"%s\")=%d",a.c_str(),b.c_str(),err);
+  }
+  string last(read_safe_string(client_fd));
+  size_t lastix(SIZE_MAX);
+  for (size_t ix=0;ix<matched.size();++ix) {
+    const std::string &member(matched[ix]);
+    if (strcmp(last.c_str(),member.c_str())>0)
+      assert(lastix==SIZE_MAX);
+    else if (lastix==SIZE_MAX)
+      lastix=ix;
+  }
+  if (lastix==SIZE_MAX)
+    fatal("Requested too new file");
+  uint64_t offset;
+  read_safe(client_fd,offset);
+  const string *fnp;
+  int file_fd=-1;
+  struct stat statbuf;
+  for (;lastix<matched.size();file_fd=-1,++lastix) {
+    fnp=&matched[lastix];
+    const string &fn(*fnp);
+    file_fd=open(fn.c_str(),O_RDONLY);
+    if (file_fd==-1) {
+      if (errno!=ENOENT)
+       fatal("Cannot open %s: %m",fn.c_str());
+      continue;
+    }
+    int err(fstat(file_fd,&statbuf));
+    assert(!err);
+    if (offset<(uint64_t)statbuf.st_size)
+      break;
+    if (offset>(uint64_t)statbuf.st_size)
+      warning("File %s has transferred %zu < %zu which is its size",fn.c_str(),(size_t)offset,(size_t)statbuf.st_size);
+    if (lastix==matched.size()-1&&execname)
+      break;
+    err=close(file_fd);
+    assert(!err);
+    offset=0;
+  }
+  if (file_fd==-1) {
+    string empty("");
+    write_safe(client_fd,empty);
+    fatal("No more files to transfer");
+  }
+  const string &fn(*fnp);
+  const char *fn_canon(nullptr);
+  if (!prefix.empty()||execname) {
+    fn_canon=realpath(fn.c_str(),nullptr);
+    if (!fn_canon)
+      fatal("realpath %s: %m",fn.c_str());
+  }
+  if (fn!=last)
+    offset=0;
+  if (!prefix.empty()&&strncmp(prefix.c_str(),fn_canon,prefix.length())!=0)
+    fatal("prefix=\"%s\" realpath=\"%s\"",prefix.c_str(),fn_canon);
+  warning("%s @%zu",fn.c_str(),(size_t)offset);
+  write_safe(client_fd,fn);
+  write_safe(client_fd,statbuf.st_mtim);
+  off_t got(lseek(file_fd,offset,SEEK_SET));
+  assert((uint64_t)got==offset);
+  struct pollfd fds;
+  fds.fd=client_fd;
+  fds.events=POLLIN|POLLPRI|POLLRDHUP;
+  for (;;) {
+    transfer(file_fd,fn.c_str(),client_fd,"client fd");
+    if (!fn_canon||!fd_is_open(execname,fn_canon))
+      break;
+    int err(poll(&fds,1,1000/*ms*/));
+    if (err==-1)
+      fatal("poll client fd: %m");
+    if (err==1) 
+      fatal("poll client fd: revents=0x%x",fds.revents);
+    assert(err==0);
+  }
+}
diff --git a/src/streamfer.C b/src/streamfer.C
new file mode 100644 (file)
index 0000000..7f2e432
--- /dev/null
@@ -0,0 +1,51 @@
+#include "streamfer.h"
+#include <unistd.h>
+
+static void vwarning(const char *msg,va_list ap) {
+  int err=vfprintf(stderr,msg,ap);
+  assert(err>0);
+  fputc('\n',stderr);
+}
+
+void warning(const char *msg,...) {
+  va_list ap;
+  va_start(ap,msg);
+  vwarning(msg,ap);
+  va_end(ap);
+}
+
+void fatal(const char *msg,...) {
+  va_list ap;
+  va_start(ap,msg);
+  vwarning(msg,ap);
+  va_end(ap);
+  exit(EXIT_FAILURE);
+}
+
+uint64_t transfer(int from_fd,const char *from_fn,int to_fd,const char *to_fn) {
+  size_t total(0);
+  uint8_t buffer[0x10000];
+  size_t buffer_filled(0);
+  for (;;) {
+    if (buffer_filled==0) {
+      ssize_t got(read(from_fd,buffer+buffer_filled,sizeof(buffer)-buffer_filled));
+      if (got==-1)
+       fatal("Error reading %s: %m",from_fn);
+      if (got==0)
+       return total;
+      buffer_filled=got;
+      assert(buffer_filled<=sizeof(buffer));
+    }
+    size_t buffer_written(0);
+    while (buffer_written<buffer_filled) {
+      ssize_t got(write(to_fd,buffer+buffer_written,buffer_filled-buffer_written));
+      if (got==-1)
+       fatal("Error writing %s: %m",to_fn);
+      assert(got>0);
+      buffer_written+=got;
+      assert(buffer_written<=buffer_filled);
+    }
+    total+=buffer_filled;
+    buffer_filled=0;
+  }
+}
diff --git a/src/streamfer.h b/src/streamfer.h
new file mode 100644 (file)
index 0000000..d692137
--- /dev/null
@@ -0,0 +1,19 @@
+#ifndef STREAMFER_H
+#define STREAMFER_H 1
+
+#include <cstdarg>
+#include <cstdio>
+#include <cassert>
+#include <cstdlib>
+#include <cstdint>
+using namespace std;
+
+#define PRINTF(f,a) __attribute__((format(printf,f,a)))
+#define UNUSED __attribute__((unused))
+
+void warning(const char *msg,...);
+void fatal(const char *msg,...);
+
+uint64_t transfer(int from_fd,const char *from_fn,int to_fd,const char *to_fn);
+
+#endif // STREAMFER_H
diff --git a/src/stringf.h b/src/stringf.h
new file mode 100644 (file)
index 0000000..2e194fc
--- /dev/null
@@ -0,0 +1,45 @@
+#ifndef LIB_STRINGF_H
+#define LIB_STRINGF_H
+
+#include "streamfer.h"
+
+// template<class... Args>: format string is not a string literal (potentially insecure) [-Werror,-Wformat-security]
+
+static inline PRINTF(2,0) string vstringf_const(size_t sizemax,const char *fmt,va_list ap) {
+  string str(sizemax+1,0);
+  const int got=vsnprintf(&str[0],str.length(),fmt,ap);
+  if (got!=ssize_t(sizemax))
+    fatal("stringf*(): expected=%zu got=%d fmt=\"%s\" str=\"%s\"",sizemax,got,fmt,str.c_str());
+  str.resize(sizemax);
+  return str;
+}
+
+static inline PRINTF(2,3) string stringf_const(size_t sizemax,const char *fmt,...) {
+  va_list ap;
+  va_start(ap,fmt);
+  const string str(vstringf_const(sizemax,fmt,ap));
+  va_end(ap);
+  return str;
+}
+
+static inline PRINTF(1,2) string stringf(const char *fmt,...) {
+  // 189: for i in activetick.data/;do find $i -name "*.xz";done|xargs xz -dc|perl -lne 'BEGIN{$m=0;}if (length($_)>$m){$m=length $_;print $m;}'
+  static const int sizetry(192-1/*+1->malloc()*/);
+  string str(sizetry,0);
+  va_list ap;
+  va_start(ap,fmt);
+  const int got(vsnprintf(&str[0],sizetry,fmt,ap));
+  va_end(ap);
+  assert(got>=0);
+  if (got>=sizetry) {
+    str.resize(got+1);
+    va_start(ap,fmt);
+    const int got2(vsnprintf(&str[0],str.length(),fmt,ap));
+    va_end(ap);
+    assert(got2==got);
+  }
+  str.resize(got);
+  return str;
+}
+
+#endif /* LIB_STRINGF_H */