I implemented reading a single file in parallel. I didn't use anything fancy like memory mapped files, just regular FileInputStreams. This has the advantage that I can easily swap in the split streams where ordinary streams are used.
The steps I take to do this are fairly straightforward:
- Divide the file size by the desired number of sections. This is your section size
- Open a stream for the file. Don't do anything with it
- Open another stream. Skip this forward by the section size
- Step forward until you find '\n'
- Wrap the first stream in a limited length stream decorator with the current position as the limit. This will make the stream end where the next one begins
- Stick your initial stream in a list and make the second stream your new first one
- Repeat from step 3 for the number of sections - 1 but skipping forward by section size * the number of the section we are on
- Stick the last stream on the list
One thing I needed for this was a limited length stream wrapper. It is something you would think is in commons-io but no joy there. It is easy enough to write though.
public class LimitedLengthInputStream extends InputStream {
private InputStream stream;
private long available;
private long availableAtMark;
public LimitedLengthInputStream(InputStream stream, long available) {
this.stream = stream;
this.available = available;
}
public int read() throws IOException {
if (available > 0) {
available--;
return stream.read();
}
return -1;
}
public int read(byte b[], int off, int len) throws IOException {
if (available > 0) {
if (len > available) {
len = (int) available;
}
int read = stream.read(b, off, len);
if (read == -1) {
available = 0;
} else {
available -= read;
}
return read;
}
return -1;
}
public long skip(long n) throws IOException {
long skip = stream.skip(n);
if (available > 0) {
available -= skip;
}
return skip;
}
public void mark(int readlimit) {
stream.mark(readlimit);
availableAtMark = available;
}
public void reset() throws IOException {
stream.reset();
available = availableAtMark;
}
public boolean markSupported() {
return true;
}
}
One thing to watch out for when doing this is InputStream does not guarantee that a skip will actually skip the length you ask for. You have to wrap the call in a loop to force it to skip the full distance, which is a bit annoying.
public class SplitFileInputStreams
{
public static List<InputStream> split(String filename, int sections) throws IOException {
File f = new File(filename);
long l = f.length();
long sectionSize = l / sections;
List<InputStream> streams = new ArrayList<InputStream>();
InputStream previous = new BufferedInputStream(new FileInputStream(filename));
InputStream current = new BufferedInputStream(new FileInputStream(filename));
int i = 0;
long lastPoint = 0;
for (int j = 1; j < sections; j++) {
long skipToPos = Math.max(j * sectionSize, lastPoint + 1);
reallySkip(current, skipToPos);
do {
i = current.read();
skipToPos++;
} while (i != '\n' && i != -1);
if (i != -1) {
streams.add(new LimitedLengthInputStream(previous, skipToPos - lastPoint));
if (j < sections - 1) {
lastPoint = skipToPos;
previous = current;
current = new BufferedInputStream(new FileInputStream(filename));
}
else {
streams.add(current);
}
}
else {
streams.add(previous);
current.close();
break;
}
}
return streams;
}
private static void reallySkip(InputStream in, long skip) throws IOException {
long skipped = 0;
do {
skipped += in.skip(skip - skipped);
} while (skipped != skip);
}
}
There are just a few modifications to the Wide Finder main method from my last post:
int batchSize = Integer.parseInt(args[1]);
int numThreads = Integer.parseInt(args[2]);
int split = 1;
if (args.length > 3) {
split = Integer.parseInt(args[3]);
}
final BlockingQueue<List<String>> queue = new ArrayBlockingQueue<List<String>>(100);
ExecutorService exec = Executors.newCachedThreadPool();
List<Future<Map<String, Integer>>> counters = new ArrayList<Future<Map<String, Integer>>>();
Pattern p = Pattern.compile("GET /ongoing/When/\\d\\d\\dx/(\\d\\d\\d\\d/\\d\\d/\\d\\d/[^ .]+) ");
String [] files = args[0].split(File.pathSeparator);
CountDownLatch latch = new CountDownLatch(files.length * split);
for (int i = 0; i < numThreads; i++) {
counters.add(exec.submit(new LineCounter(p, queue)));
}
for (String filename : files) {
if (split <= 1) {
exec.execute(new LineReader(queue,
new BufferedReader(new InputStreamReader(
new FileInputStream(filename), "US-ASCII")),
latch, batchSize));
}
else {
List<InputStream> streams = SplitFileInputStreams.split(filename, split);
for (InputStream stream : streams) {
exec.execute(new LineReader(queue,
new BufferedReader(new InputStreamReader(stream, "US-ASCII")),
latch, batchSize));
}
}
}
With a 200MB/1m line file there is no advantage. With 1GB/5m lines there was a nice 25% speedup when splitting the file in 3 with a few processing threads.