Skip to content

Commit

Permalink
add EOF marker functionality
Browse files Browse the repository at this point in the history
since it's hard or impossible to close fifo pipes in shell scripts,
it's handy to have a way to signal jobflow when the end of input
is reached so it can terminate once everything is processed.
the shell script can now run jobflow with -eof=XXX to specify a
magic eof marker.
  • Loading branch information
rofl0r committed Jun 10, 2019
1 parent 1fff39d commit 6746904
Showing 1 changed file with 22 additions and 2 deletions.
24 changes: 22 additions & 2 deletions jobflow.c
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ along with this program. If not, see <http://www.gnu.org/licenses/>.
*/


#define VERSION "1.2.2"
#define VERSION "1.2.3"


#undef _POSIX_C_SOURCE
Expand Down Expand Up @@ -86,6 +86,7 @@ typedef struct {
unsigned numthreads;
unsigned threads_running;
char* statefile;
char* eof_marker;
unsigned long long skip;
sblist* job_infos;
sblist* subst_entries;
Expand Down Expand Up @@ -319,6 +320,7 @@ static int syntax(void) {
"available options:\n\n"
"-skip=XXX -threads=XXX -resume -statefile=/tmp/state -delayedflush\n"
"-delayedspinup=XXX -buffered -joinoutput -limits=mem=16M,cpu=10\n"
"-eof=XXX\n"
"-exec ./mycommand {}\n"
"\n"
"-skip=XXX\n"
Expand All @@ -327,6 +329,10 @@ static int syntax(void) {
" XXX=number of parallel processes to spawn\n"
"-resume\n"
" resume from last jobnumber stored in statefile\n"
"-eof=XXX\n"
" use XXX as the EOF marker on stdin\n"
" if the marker is encountered, behave as if stdin was closed\n"
" not compatible with pipe/bulk mode\n"
"-statefile=XXX\n"
" XXX=filename\n"
" saves last launched jobnumber into a file\n"
Expand Down Expand Up @@ -393,6 +399,9 @@ static int parse_args(int argc, char** argv) {
op_temp = op_get(op, SPL("statefile"));
prog_state.statefile = op_temp;

op_temp = op_get(op, SPL("eof"));
prog_state.eof_marker = op_temp;

op_temp = op_get(op, SPL("skip"));
prog_state.skip = op_temp ? strtoll(op_temp,0,10) : 0;
if(op_hasflag(op, SPL("resume"))) {
Expand Down Expand Up @@ -585,6 +594,13 @@ static size_t count_linefeeds(const char *buf, size_t len) {
}
return cnt;
}

static int match_eof(char* inbuf, size_t len) {
if(!prog_state.eof_marker) return 0;
size_t l = strlen(prog_state.eof_marker);
return l == len-1 && !memcmp(prog_state.eof_marker, inbuf, l);
}

#define MAX_SUBSTS 16
static int dispatch_line(char* inbuf, size_t len, char** argv) {
char subst_buf[MAX_SUBSTS][4096];
Expand Down Expand Up @@ -751,13 +767,17 @@ int main(int argc, char** argv) {

if(!p) break;
ptrdiff_t diff = (p - in) + 1;
if(match_eof(in, diff)) {
exitcode = 0;
goto out;
}
if(!dispatch_line(in, diff, argv))
goto out;
left -= diff;
in += diff;
}
if(!n) {
if(left) dispatch_line(in, left, argv);
if(left && !match_eof(in, left)) dispatch_line(in, left, argv);
break;
}
if(left > chunksize) {
Expand Down

0 comments on commit 6746904

Please sign in to comment.