Friday, October 5, 2007

Disk caching and file splitting

I realized I had been a little careless in my Wide Finder testing. I had not taken into account the effects of disk caching. All of the times in previous posts are with the cache warmed up. There really is an enormous difference: 30 seconds to 10 seconds for 5 million lines! I think nothing is the cache is the more realistic benchmark, as you are unlikely to be reading through the file again and again.

I implemented reading a single file in parallel. I didn't use anything fancy like memory mapped files, just regular FileInputStreams. This has the advantage that I can easily swap in the split streams where ordinary streams are used.
The steps I take to do this are fairly straightforward:

  1. Divide the file size by the desired number of sections. This is your section size

  2. Open a stream for the file. Don't do anything with it

  3. Open another stream. Skip this forward by the section size

  4. Step forward until you find '\n'

  5. Wrap the first stream in a limited length stream decorator with the current position as the limit. This will make the stream end where the next one begins

  6. Stick your initial stream in a list and make the second stream your new first one

  7. Repeat from step 3 for the number of sections - 1 but skipping forward by section size * the number of the section we are on

  8. Stick the last stream on the list



One thing I needed for this was a limited length stream wrapper. It is something you would think is in commons-io but no joy there. It is easy enough to write though.


public class LimitedLengthInputStream extends InputStream {
private InputStream stream;
private long available;
private long availableAtMark;

public LimitedLengthInputStream(InputStream stream, long available) {
this.stream = stream;
this.available = available;
}

public int read() throws IOException {
if (available > 0) {
available--;
return stream.read();
}
return -1;
}

public int read(byte b[], int off, int len) throws IOException {
if (available > 0) {
if (len > available) {
len = (int) available;
}
int read = stream.read(b, off, len);
if (read == -1) {
available = 0;
} else {
available -= read;
}
return read;
}
return -1;
}

public long skip(long n) throws IOException {
long skip = stream.skip(n);
if (available > 0) {
available -= skip;
}
return skip;
}

public void mark(int readlimit) {
stream.mark(readlimit);
availableAtMark = available;
}

public void reset() throws IOException {
stream.reset();
available = availableAtMark;
}

public boolean markSupported() {
return true;
}
}


One thing to watch out for when doing this is InputStream does not guarantee that a skip will actually skip the length you ask for. You have to wrap the call in a loop to force it to skip the full distance, which is a bit annoying.


public class SplitFileInputStreams
{
public static List<InputStream> split(String filename, int sections) throws IOException {
File f = new File(filename);
long l = f.length();
long sectionSize = l / sections;
List<InputStream> streams = new ArrayList<InputStream>();

InputStream previous = new BufferedInputStream(new FileInputStream(filename));
InputStream current = new BufferedInputStream(new FileInputStream(filename));


int i = 0;
long lastPoint = 0;
for (int j = 1; j < sections; j++) {
long skipToPos = Math.max(j * sectionSize, lastPoint + 1);

reallySkip(current, skipToPos);
do {
i = current.read();
skipToPos++;
} while (i != '\n' && i != -1);

if (i != -1) {
streams.add(new LimitedLengthInputStream(previous, skipToPos - lastPoint));
if (j < sections - 1) {
lastPoint = skipToPos;
previous = current;
current = new BufferedInputStream(new FileInputStream(filename));
}
else {
streams.add(current);
}
}
else {
streams.add(previous);
current.close();
break;
}
}

return streams;
}

private static void reallySkip(InputStream in, long skip) throws IOException {
long skipped = 0;
do {
skipped += in.skip(skip - skipped);
} while (skipped != skip);
}
}


There are just a few modifications to the Wide Finder main method from my last post:


int batchSize = Integer.parseInt(args[1]);
int numThreads = Integer.parseInt(args[2]);
int split = 1;
if (args.length > 3) {
split = Integer.parseInt(args[3]);
}
final BlockingQueue<List<String>> queue = new ArrayBlockingQueue<List<String>>(100);
ExecutorService exec = Executors.newCachedThreadPool();

List<Future<Map<String, Integer>>> counters = new ArrayList<Future<Map<String, Integer>>>();
Pattern p = Pattern.compile("GET /ongoing/When/\\d\\d\\dx/(\\d\\d\\d\\d/\\d\\d/\\d\\d/[^ .]+) ");

String [] files = args[0].split(File.pathSeparator);
CountDownLatch latch = new CountDownLatch(files.length * split);

for (int i = 0; i < numThreads; i++) {
counters.add(exec.submit(new LineCounter(p, queue)));
}

for (String filename : files) {
if (split <= 1) {
exec.execute(new LineReader(queue,
new BufferedReader(new InputStreamReader(
new FileInputStream(filename), "US-ASCII")),
latch, batchSize));
}
else {
List<InputStream> streams = SplitFileInputStreams.split(filename, split);
for (InputStream stream : streams) {
exec.execute(new LineReader(queue,
new BufferedReader(new InputStreamReader(stream, "US-ASCII")),
latch, batchSize));
}
}
}


With a 200MB/1m line file there is no advantage. With 1GB/5m lines there was a nice 25% speedup when splitting the file in 3 with a few processing threads.

3 comments:

Paddy3118 said...

I later found the same effect of memory mapping the file in my Wide Finder programs here:

http://paddy3118.blogspot.com/2007/10/wide-finder-on-command-line.html
http://paddy3118.blogspot.com/2007/10/multi-processing-design-pattern.html

- Paddy.

Anonymous said...

Interesting to know.

Unknown said...

It is a very informative and useful post thanks it is good material to read this post increases my knowledge SEO Company in Pakistan