+.config/yt-dlp.conf
[nethome.git] / src / streamfer-server.C
1 #include "safeio.h"
2 #include "socket.h"
3 #include "stringf.h"
4 #include <fcntl.h>
5 #include <sys/stat.h>
6 #include <dirent.h>
7 #include <poll.h>
8 #include <climits>
9 #include <cstdlib>
10 #include <csignal>
11
12 // https://stackoverflow.com/a/8615450/2995591
13 #include <glob.h> // glob(),globfree()
14 #include <cstring> // memset()
15 #include <vector>
16 #include <string>
17
18 static std::vector<std::string> cxxglob(const std::string pattern) {
19   glob_t glob_result;
20   memset(&glob_result,0,sizeof(glob_result));
21   int return_value=glob(pattern.c_str(),GLOB_TILDE,NULL,&glob_result);
22   if (return_value)
23     fatal("glob() failed with return_value %s",return_value);
24   vector<string> filenames;
25   filenames.reserve(glob_result.gl_pathc);
26   for (size_t i = 0; i < glob_result.gl_pathc; ++i)
27      filenames.push_back(string(glob_result.gl_pathv[i]));
28   globfree(&glob_result);
29   return filenames;
30 }
31
32 // FIXME: Use C++17
33 static bool fd_is_open(const char *execname,const char *fn) {
34   const char slashproc[]("/proc");
35   DIR *dir(opendir(slashproc));
36   if (!dir)
37     fatal("Cannot opendir %s: %m",slashproc);
38   bool retval(false);
39   for (;;) {
40     errno=0;
41     const struct dirent *de=readdir(dir);
42     if (!de) {
43       if (errno)
44         fatal("Cannot readdir %s: %m",slashproc);
45       break;
46     }
47     if (!isdigit(de->d_name[0]))
48       continue;
49
50     char buf[PATH_MAX];
51     ssize_t got(readlinkat(dirfd(dir),stringf("%s/exe",de->d_name).c_str(),buf,sizeof(buf)));
52     if (got==-1||got==sizeof(buf))
53       continue;
54     buf[got]=0;
55     char *s=strrchr(buf,'/');
56     if (!s)
57       continue;
58     if (strcmp(s+1,execname)!=0)
59       continue;
60
61     string procpidfd(stringf("/proc/%s/fd",de->d_name));
62     DIR *fddir(opendir(procpidfd.c_str()));
63     if (!fddir)
64       fatal("Cannot opendir %s: %m",procpidfd.c_str());
65     for (;;) {
66       errno=0;
67       const struct dirent *de=readdir(fddir);
68       if (!de) {
69         if (errno)
70           fatal("Cannot readdir %s: %m",procpidfd.c_str());
71         break;
72       }
73       if (!isdigit(de->d_name[0]))
74         continue;
75       char buf[PATH_MAX];
76       ssize_t got(readlinkat(dirfd(fddir),de->d_name,buf,sizeof(buf)));
77       if (got==-1||got==sizeof(buf))
78         continue;
79       buf[got]=0;
80       if (strcmp(buf,fn)==0) {
81         retval=true;
82         break;
83       }
84     }
85     if (closedir(fddir))
86       fatal("Cannot closedir %s: %m",procpidfd.c_str());
87     if (retval)
88       break;
89   }
90   if (closedir(dir))
91     fatal("Cannot closedir %s: %m",slashproc);
92   return retval;
93 }
94
95 int main(int argc,char **argv) {
96   static struct sigaction sigchld;
97   sigchld.sa_handler=SIG_DFL;
98   sigchld.sa_flags=SA_NOCLDWAIT;
99   int err(sigaction(SIGCHLD,&sigchld,nullptr));
100   assert(!err);
101
102   if (argc!=1+2&&argc!=1+3)
103     fatal("streamfer-server [<listen-host>:]<listen-port> <prefix> [follow-fd-of-executable-basename]");
104   string prefix;
105   if (argc>=1+2&&*argv[2])
106     prefix=argv[2];
107   const char *execname(nullptr);
108   if (argc>=1+3)
109     execname=argv[3];
110   int listen_fd(socket_bind(argv[1]));
111   int client_fd;
112   for (;;) {
113     client_fd=socket_accept(listen_fd,[&](int client_fd,string addr) {
114       warning("%d:%s",client_fd,addr.c_str());
115     });
116     int child(fork());
117     assert(child!=-1);
118     if (!child)
119       break;
120     int err(close(client_fd));
121     assert(!err);
122   }
123   err=close(listen_fd);
124   assert(!err);
125
126   string pattern(read_safe_string(client_fd));
127   std::vector<std::string> matched(cxxglob(pattern));
128   for (size_t ix=0;ix<matched.size()-1;++ix) {
129     const std::string &a(matched[ix  ]);
130     const std::string &b(matched[ix+1]);
131     int err(strcmp(a.c_str(),b.c_str()));
132     if (err>=0)
133       fatal("glob: strcmp(\"%s\",\"%s\")=%d",a.c_str(),b.c_str(),err);
134   }
135   string last(read_safe_string(client_fd));
136   size_t lastix(SIZE_MAX);
137   for (size_t ix=0;ix<matched.size();++ix) {
138     const std::string &member(matched[ix]);
139     if (strcmp(last.c_str(),member.c_str())>0)
140       assert(lastix==SIZE_MAX);
141     else if (lastix==SIZE_MAX)
142       lastix=ix;
143   }
144   if (lastix==SIZE_MAX)
145     fatal("Requested too new file");
146   uint64_t offset;
147   read_safe(client_fd,offset);
148   struct timespec mtim;
149   read_safe(client_fd,mtim);
150   const string *fnp;
151   int file_fd=-1;
152   struct stat statbuf;
153   for (;lastix<matched.size();file_fd=-1,++lastix) {
154     fnp=&matched[lastix];
155     const string &fn(*fnp);
156     file_fd=open(fn.c_str(),O_RDONLY);
157     if (file_fd==-1) {
158       if (errno!=ENOENT)
159         fatal("Cannot open %s: %m",fn.c_str());
160       continue;
161     }
162     int err(fstat(file_fd,&statbuf));
163     assert(!err);
164     if (offset<(uint64_t)statbuf.st_size)
165       break;
166     static const struct timespec mtim_zero{};
167     if (memcmp(&mtim,&mtim_zero,sizeof(mtim))!=0&&memcmp(&mtim,&statbuf.st_mtim,sizeof(mtim))!=0)
168       break;
169     if (offset>(uint64_t)statbuf.st_size)
170       warning("File %s has transferred %zu < %zu which is its size",fn.c_str(),(size_t)offset,(size_t)statbuf.st_size);
171     if (lastix==matched.size()-1&&execname)
172       break;
173     err=close(file_fd);
174     assert(!err);
175     offset=0;
176   }
177   if (file_fd==-1) {
178     string empty("");
179     write_safe(client_fd,empty);
180     fatal("No more files to transfer");
181   }
182   const string &fn(*fnp);
183   const char *fn_canon(nullptr);
184   if (!prefix.empty()||execname) {
185     fn_canon=realpath(fn.c_str(),nullptr);
186     if (!fn_canon)
187       fatal("realpath %s: %m",fn.c_str());
188   }
189   if (fn!=last)
190     offset=0;
191   if (!prefix.empty()&&strncmp(prefix.c_str(),fn_canon,prefix.length())!=0)
192     fatal("prefix=\"%s\" realpath=\"%s\"",prefix.c_str(),fn_canon);
193   warning("%s @%zu",fn.c_str(),(size_t)offset);
194   write_safe(client_fd,fn);
195   write_safe(client_fd,statbuf.st_mtim);
196   off_t got(lseek(file_fd,offset,SEEK_SET));
197   assert((uint64_t)got==offset);
198   struct pollfd fds;
199   fds.fd=client_fd;
200   fds.events=POLLIN|POLLPRI|POLLRDHUP;
201   for (;;) {
202     transfer(file_fd,fn.c_str(),client_fd,"client fd");
203     if (!fn_canon||!fd_is_open(execname,fn_canon))
204       break;
205     int err(poll(&fds,1,1000/*ms*/));
206     if (err==-1)
207       fatal("poll client fd: %m");
208     if (err==1) 
209       fatal("poll client fd: revents=0x%x",fds.revents);
210     assert(err==0);
211   }
212 }