Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Makes Tailer use ScheduledExecutorService from Java 5 #2

Open
wants to merge 1 commit into
base: trunk
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
198 changes: 28 additions & 170 deletions src/main/java/org/apache/commons/io/input/Tailer.java
Original file line number Diff line number Diff line change
Expand Up @@ -16,17 +16,14 @@
*/
package org.apache.commons.io.input;

import static org.apache.commons.io.IOUtils.EOF;

import java.io.ByteArrayOutputStream;
import java.io.File;
import java.io.FileNotFoundException;
import java.io.IOException;
import java.io.RandomAccessFile;
import java.nio.charset.Charset;

import org.apache.commons.io.FileUtils;
import org.apache.commons.io.IOUtils;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.TimeUnit;

/**
* Simple implementation of the unix "tail -f" functionality.
Expand Down Expand Up @@ -120,54 +117,43 @@
*/
public class Tailer implements Runnable {


private static final int DEFAULT_DELAY_MILLIS = 1000;

private static final String RAF_MODE = "r";

private static final int DEFAULT_BUFSIZE = 4096;

// The default charset used for reading files
private static final Charset DEFAULT_CHARSET = Charset.defaultCharset();

/**
* Buffer on top of RandomAccessFile.
* This will run the {@link #scheduled} repeatedly until {@link #runTrigger} reaches zero.
*/
private final byte inbuf[];
private final ScheduledExecutorService executor = Executors.newScheduledThreadPool(1);

// The default charset used for reading files
private static final Charset DEFAULT_CHARSET = Charset.defaultCharset();

/**
* The file which will be tailed.
*/
private final File file;

/**
* The character set that will be used to read the file.
*/
private final Charset cset;

/**
* The amount of time to wait for the file to be updated.
*/
private final long delayMillis;

/**
* Whether to tail from the end or start of file
*/
private final boolean end;

/**
* The listener to notify of events when tailing.
*/
private final TailerListener listener;

/**
* Whether to close and reopen the file whilst waiting for more input.
* This will hold the {@link Runnable} that will be repeatedly executed.
*/
private final boolean reOpen;
private final TailerRun scheduled;

/**
* The tailer will run as long as this value is true.
* When this reaches zero, {@link #run()} will be terminated.
*/
private volatile boolean run = true;
private CountDownLatch runTrigger = new CountDownLatch(1);

/**
* Creates a Tailer for the given file, starting from the beginning, with the default delay of 1.0s.
Expand Down Expand Up @@ -250,15 +236,9 @@ public Tailer(final File file, final Charset cset, final TailerListener listener
, final int bufSize) {
this.file = file;
this.delayMillis = delayMillis;
this.end = end;

this.inbuf = new byte[bufSize];

// Save and prepare the listener
this.listener = listener;
listener.init(this);
this.reOpen = reOpen;
this.cset = cset;
this.scheduled = new TailerRun(file, cset, listener, end, reOpen, bufSize);
}

/**
Expand Down Expand Up @@ -379,7 +359,7 @@ public File getFile() {
* @since 2.5
*/
protected boolean getRun() {
return run;
return this.runTrigger.getCount() > 0;
}

/**
Expand All @@ -392,151 +372,29 @@ public long getDelay() {
}

/**
* Follows changes in the file, calling the TailerListener's handle method for each new line.
* Follows changes in the file, calling the TailerListener's handle method
* for each new line.
*/
@Override
public void run() {
RandomAccessFile reader = null;
final ScheduledFuture<?> future = this.executor.scheduleWithFixedDelay(this.scheduled, 0, this.delayMillis,
TimeUnit.MILLISECONDS);
try {
long last = 0; // The last time the file was checked for changes
long position = 0; // position within the file
// Open the file
while (getRun() && reader == null) {
try {
reader = new RandomAccessFile(file, RAF_MODE);
} catch (final FileNotFoundException e) {
listener.fileNotFound();
}
if (reader == null) {
Thread.sleep(delayMillis);
} else {
// The current position in the file
position = end ? file.length() : 0;
last = file.lastModified();
reader.seek(position);
}
}
while (getRun()) {
final boolean newer = FileUtils.isFileNewer(file, last); // IO-279, must be done first
// Check the file length to see if it was rotated
final long length = file.length();
if (length < position) {
// File was rotated
listener.fileRotated();
// Reopen the reader after rotation
try {
// Ensure that the old file is closed iff we re-open it successfully
final RandomAccessFile save = reader;
reader = new RandomAccessFile(file, RAF_MODE);
// At this point, we're sure that the old file is rotated
// Finish scanning the old file and then we'll start with the new one
try {
readLines(save);
} catch (IOException ioe) {
listener.handle(ioe);
}
position = 0;
// close old file explicitly rather than relying on GC picking up previous RAF
IOUtils.closeQuietly(save);
} catch (final FileNotFoundException e) {
// in this case we continue to use the previous reader and position values
listener.fileNotFound();
}
continue;
} else {
// File was not rotated
// See if the file needs to be read again
if (length > position) {
// The file has more content than it did last time
position = readLines(reader);
last = file.lastModified();
} else if (newer) {
/*
* This can happen if the file is truncated or overwritten with the exact same length of
* information. In cases like this, the file position needs to be reset
*/
position = 0;
reader.seek(position); // cannot be null here

// Now we can read new lines
position = readLines(reader);
last = file.lastModified();
}
}
if (reOpen) {
IOUtils.closeQuietly(reader);
}
Thread.sleep(delayMillis);
if (getRun() && reOpen) {
reader = new RandomAccessFile(file, RAF_MODE);
reader.seek(position);
}
}
this.runTrigger.await();
} catch (final InterruptedException e) {
Thread.currentThread().interrupt();
stop(e);
} catch (final Exception e) {
stop(e);
this.listener.handle(e);
} finally {
IOUtils.closeQuietly(reader);
future.cancel(true); // stop the periodic reading
this.scheduled.cleanup();
this.executor.shutdownNow();
}
}

private void stop(final Exception e) {
listener.handle(e);
stop();
}

/**
* Allows the tailer to complete its current loop and return.
*/
public void stop() {
this.run = false;
}

/**
* Read new lines.
*
* @param reader The file to read
* @return The new position after the lines have been read
* @throws java.io.IOException if an I/O error occurs.
*/
private long readLines(final RandomAccessFile reader) throws IOException {
ByteArrayOutputStream lineBuf = new ByteArrayOutputStream(64);
long pos = reader.getFilePointer();
long rePos = pos; // position to re-read
int num;
boolean seenCR = false;
while (getRun() && ((num = reader.read(inbuf)) != EOF)) {
for (int i = 0; i < num; i++) {
final byte ch = inbuf[i];
switch (ch) {
case '\n':
seenCR = false; // swallow CR before LF
listener.handle(new String(lineBuf.toByteArray(), cset));
lineBuf.reset();
rePos = pos + i + 1;
break;
case '\r':
if (seenCR) {
lineBuf.write('\r');
}
seenCR = true;
break;
default:
if (seenCR) {
seenCR = false; // swallow final CR
listener.handle(new String(lineBuf.toByteArray(), cset));
lineBuf.reset();
rePos = pos + i + 1;
}
lineBuf.write(ch);
}
}
pos = reader.getFilePointer();
}
IOUtils.closeQuietly(lineBuf); // not strictly necessary
reader.seek(rePos); // Ensure we can re-read if necessary
return rePos;
this.runTrigger.countDown();
}

}
Loading