Friday, October 5, 2007

Disk caching and file splitting

I realized I had been a little careless in my Wide Finder testing. I had not taken into account the effects of disk caching. All of the times in previous posts are with the cache warmed up. There really is an enormous difference: 30 seconds to 10 seconds for 5 million lines! I think nothing is the cache is the more realistic benchmark, as you are unlikely to be reading through the file again and again.

I implemented reading a single file in parallel. I didn't use anything fancy like memory mapped files, just regular FileInputStreams. This has the advantage that I can easily swap in the split streams where ordinary streams are used.
The steps I take to do this are fairly straightforward:

  1. Divide the file size by the desired number of sections. This is your section size

  2. Open a stream for the file. Don't do anything with it

  3. Open another stream. Skip this forward by the section size

  4. Step forward until you find '\n'

  5. Wrap the first stream in a limited length stream decorator with the current position as the limit. This will make the stream end where the next one begins

  6. Stick your initial stream in a list and make the second stream your new first one

  7. Repeat from step 3 for the number of sections - 1 but skipping forward by section size * the number of the section we are on

  8. Stick the last stream on the list



One thing I needed for this was a limited length stream wrapper. It is something you would think is in commons-io but no joy there. It is easy enough to write though.


public class LimitedLengthInputStream extends InputStream {
private InputStream stream;
private long available;
private long availableAtMark;

public LimitedLengthInputStream(InputStream stream, long available) {
this.stream = stream;
this.available = available;
}

public int read() throws IOException {
if (available > 0) {
available--;
return stream.read();
}
return -1;
}

public int read(byte b[], int off, int len) throws IOException {
if (available > 0) {
if (len > available) {
len = (int) available;
}
int read = stream.read(b, off, len);
if (read == -1) {
available = 0;
} else {
available -= read;
}
return read;
}
return -1;
}

public long skip(long n) throws IOException {
long skip = stream.skip(n);
if (available > 0) {
available -= skip;
}
return skip;
}

public void mark(int readlimit) {
stream.mark(readlimit);
availableAtMark = available;
}

public void reset() throws IOException {
stream.reset();
available = availableAtMark;
}

public boolean markSupported() {
return true;
}
}


One thing to watch out for when doing this is InputStream does not guarantee that a skip will actually skip the length you ask for. You have to wrap the call in a loop to force it to skip the full distance, which is a bit annoying.


public class SplitFileInputStreams
{
public static List<InputStream> split(String filename, int sections) throws IOException {
File f = new File(filename);
long l = f.length();
long sectionSize = l / sections;
List<InputStream> streams = new ArrayList<InputStream>();

InputStream previous = new BufferedInputStream(new FileInputStream(filename));
InputStream current = new BufferedInputStream(new FileInputStream(filename));


int i = 0;
long lastPoint = 0;
for (int j = 1; j < sections; j++) {
long skipToPos = Math.max(j * sectionSize, lastPoint + 1);

reallySkip(current, skipToPos);
do {
i = current.read();
skipToPos++;
} while (i != '\n' && i != -1);

if (i != -1) {
streams.add(new LimitedLengthInputStream(previous, skipToPos - lastPoint));
if (j < sections - 1) {
lastPoint = skipToPos;
previous = current;
current = new BufferedInputStream(new FileInputStream(filename));
}
else {
streams.add(current);
}
}
else {
streams.add(previous);
current.close();
break;
}
}

return streams;
}

private static void reallySkip(InputStream in, long skip) throws IOException {
long skipped = 0;
do {
skipped += in.skip(skip - skipped);
} while (skipped != skip);
}
}


There are just a few modifications to the Wide Finder main method from my last post:


int batchSize = Integer.parseInt(args[1]);
int numThreads = Integer.parseInt(args[2]);
int split = 1;
if (args.length > 3) {
split = Integer.parseInt(args[3]);
}
final BlockingQueue<List<String>> queue = new ArrayBlockingQueue<List<String>>(100);
ExecutorService exec = Executors.newCachedThreadPool();

List<Future<Map<String, Integer>>> counters = new ArrayList<Future<Map<String, Integer>>>();
Pattern p = Pattern.compile("GET /ongoing/When/\\d\\d\\dx/(\\d\\d\\d\\d/\\d\\d/\\d\\d/[^ .]+) ");

String [] files = args[0].split(File.pathSeparator);
CountDownLatch latch = new CountDownLatch(files.length * split);

for (int i = 0; i < numThreads; i++) {
counters.add(exec.submit(new LineCounter(p, queue)));
}

for (String filename : files) {
if (split <= 1) {
exec.execute(new LineReader(queue,
new BufferedReader(new InputStreamReader(
new FileInputStream(filename), "US-ASCII")),
latch, batchSize));
}
else {
List<InputStream> streams = SplitFileInputStreams.split(filename, split);
for (InputStream stream : streams) {
exec.execute(new LineReader(queue,
new BufferedReader(new InputStreamReader(stream, "US-ASCII")),
latch, batchSize));
}
}
}


With a 200MB/1m line file there is no advantage. With 1GB/5m lines there was a nice 25% speedup when splitting the file in 3 with a few processing threads.

Wednesday, October 3, 2007

More Threads FTW

So the two threads version of the WideFinder in my last post is pretty practical, but it doesn't solve the theoretical problem of scaling to many cores/CPUs. It doesn't take too much to rejig it so it uses multiple worker threads. I decided to change it to use Futures instead of manually joining on the thread, and that is a bit nicer. I also introduced multiple file reading threads, so you can pass in a list of files separated by your system's path separator. The only slightly tricky thing is the use of a countdown latch to let us know when all the reading threads are finished. The maps returned by the workers must be combined to give use the final result. With the amount of work done for each line multiple worker threads don't help, but that could easily change if you were doing something more complex.


public class WideFinderManyThreads {
public static void main(String[] args) throws IOException, InterruptedException, ExecutionException {
int batchSize = Integer.parseInt(args[1]);
int numThreads = Integer.parseInt(args[2]);
final BlockingQueue<List<String>> queue = new ArrayBlockingQueue<List<String>>(100);
ExecutorService exec = Executors.newCachedThreadPool();

List<Future<Map<String, Integer>>> counters = new ArrayList<Future<Map<String, Integer>>>();
Pattern p = Pattern.compile("GET /ongoing/When/\\d\\d\\dx/(\\d\\d\\d\\d/\\d\\d/\\d\\d/[^ .]+) ");

String [] files = args[0].split(File.pathSeparator);
CountDownLatch latch = new CountDownLatch(files.length);

for (int i = 0; i < numThreads; i++) {
counters.add(exec.submit(new LineCounter(p, queue)));
}

for (String filename : files) {
exec.execute(new LineReader(queue,
new BufferedReader(new InputStreamReader(new FileInputStream(filename), "US-ASCII")),
latch, batchSize));
}

latch.await();
for (int i = 0; i < numThreads; i++) {
queue.put(LineCounter.FINISHED);
}

Map<String, Integer> combined = new HashMap<String, Integer>();
for (Future<Map<String, Integer>> future : counters) {
combine(combined, future.get());
}
exec.shutdown();

List<Entry<String, Integer>> results = new ArrayList<Map.Entry<String, Integer>>(combined.entrySet());
Collections.sort(results, new Comparator<Entry<String, Integer>>() {
public int compare(Entry<String, Integer> o1, Entry<String, Integer> o2) {
return o2.getValue().compareTo(o1.getValue());
}
});

for(int i = 0; i < 10; i++) {
System.out.println(results.get(i));
}
}

private static void combine(Map<String, Integer> combined, Map<String, Integer> section)
{
Set<Entry<String, Integer>> name = section.entrySet();
for (Entry<String,Integer> entry : name) {
Integer currentCount = combined.get(entry.getKey());
combined.put(entry.getKey(), (currentCount == null ? entry.getValue() : (currentCount + entry.getValue())));
}
}

static class LineReader implements Runnable {
private final BlockingQueue<List<String>> queue;
private final BufferedReader reader;
private final CountDownLatch latch;
private final int batchSize;

public LineReader(final BlockingQueue<List<String>> queue, final BufferedReader reader, final CountDownLatch latch, int batchSize)
{
this.queue = queue;
this.reader = reader;
this.latch = latch;
this.batchSize = batchSize;
}

public void run()
{
String line = null;
List<String> batch = new ArrayList<String>(batchSize);
try {
while ((line = reader.readLine()) != null) {
batch.add(line);
if (batch.size() == batchSize) {
queue.put(batch);
batch = new ArrayList<String>(batchSize);
}
}
queue.put(batch);
}
catch (Exception e) {
e.printStackTrace();
}
finally {
try {
reader.close();
}
catch (IOException e) {
e.printStackTrace();
}
latch.countDown();
}
}
}

static class LineCounter implements Callable<Map<String, Integer>> {
public static final List<String> FINISHED = new ArrayList<String>();
private final BlockingQueue<List<String>> queue;
private final Pattern pattern;
private final Map<String, Integer> counts = new HashMap<String, Integer>();

LineCounter(Pattern pattern, BlockingQueue<List<String>> queue) {
this.pattern = pattern;
this.queue = queue;
}

public Map<String, Integer> call() {
List<String> lines = null;
try {
while ((lines = queue.take()) != FINISHED) {
for (String line : lines) {
Matcher m = pattern.matcher(line);
if (m.find()) {
String key = m.group();
Integer currentCount = counts.get(key);
counts.put(key, (currentCount == null ? 1 : (currentCount + 1)));
}
}
}
}
catch (InterruptedException e) {
Thread.currentThread().interrupt();
}

return counts;
}
}
}


One thing I found was passing multiple files in, even when they were on the same disk, did result in a speedup. For example 1 file with 1m lines took about 2.5 seconds, and 2 files with 1m lines each and 3 processing threads took about 3.4 seconds. That is not bad at all. I had read about what some of the Erlang guys were doing with reading one file in parallel in multiple processes. I thought it was unlikely to give much of an advantage, but I am rethinking that now. I may have a look at implementing that in Java.

Tuesday, October 2, 2007

WideFinder and Java

Update: I realized there is a stupid bug in the concurrent version that I posted. ConcurrentHashMap.putIfAbsent returns null if there was no previous value - not the new value - so I was missing one increment and the Executor seemed to be eating the NullPointerException. I've updated the code below.

Everyone seems to getting in on Tim Bray's WideFinder challenge; there is even a C++ implementation.
I think it was originally supposed to be a competition between all those new and exciting dynamic languages, but they have not be overly impressive thus far. So I've decided to post up my various attempts at a solution in Java. Here is the basic WideFinder in Java (all import statements are omitted for brevity):


public class WideFinder {
public static void main(String[] args) throws IOException {
Map<String, Integer> counts = new HashMap<String, Integer>();
Pattern p = Pattern.compile("GET /ongoing/When/\\d\\d\\dx/(\\d\\d\\d\\d/\\d\\d/\\d\\d/[^ .]+) ");
BufferedReader in = new BufferedReader(new InputStreamReader(
new FileInputStream(args[0]), "US-ASCII"));

String line = null;
while ((line = in.readLine()) != null) {
Matcher m = p.matcher(line);
if (m.find()) {
String key = m.group();
Integer currentCount = counts.get(key);
counts.put(key, (currentCount == null ? 1 : (currentCount + 1)));
}
}
in.close();

List<Entry<String, Integer>> results = new ArrayList<Map.Entry<String, Integer>>(counts.entrySet());
Collections.sort(results, new Comparator<Entry<String, Integer>>() {

public int compare(Entry<String, Integer> o1, Entry<String, Integer> o2)
{
return o2.getValue().compareTo(o1.getValue());
}

});

for(int i = 0; i < 10; i++) {
System.out.println(results.get(i));
}
}
}



The basic version is very similar to the Ruby one, if a little more verbose. Like all of the others, it is primarily IO bound. One important thing I found, was that specifying the encoding as being ASCII made about 30% improvement in the time taken. This is because Java uses unicode strings internally, and if you read text without specifying an encoding it will use the platform default. ASCII is a good bit quicker to convert than ISO-8859-1.

For a file with 1 million entries I got the following times:

real 0m3.208s
user 0m3.460s
sys 0m0.215s

Taking the regexp out:

real 0m2.267s
user 0m2.329s
sys 0m0.210s

So we can potentially save up to 1 second, almost 1/3 of the time. My first solution was to try to keep things as close to the original as possible. For each line, instead of matching it directly, I submit a new task to an ExecutorService. To make the map safe to access from multiple threads I use a ConcurrentHashMap, with AtomicIntegers as values.


public class WideFinderConcurrent {
public static void main(String[] args) throws IOException, InterruptedException {
int threads = Integer.parseInt(args[1]);
ExecutorService executorService = Executors.newFixedThreadPool(threads);

final ConcurrentHashMap<String, AtomicInteger> counts = new ConcurrentHashMap<String, AtomicInteger>();
final Pattern p = Pattern.compile("GET /ongoing/When/\\d\\d\\dx/(\\d\\d\\d\\d/\\d\\d/\\d\\d/[^ .]+) ");

BufferedReader in = new BufferedReader(new InputStreamReader(new FileInputStream(args[0]), "US-ASCII"));

String line = null;
while ((line = in.readLine()) != null) {
final String lineForTask = line;
executorService.execute(new Runnable() {
public void run() {
Matcher m = p.matcher(lineForTask);
if (m.find()) {
String key = m.group();

AtomicInteger currentCount = counts.get(key);
if (currentCount == null) {
currentCount = new AtomicInteger(0);
AtomicInteger old = counts.putIfAbsent(key, currentCount);
if (old != null) {
currentCount = old;
}
}
currentCount.incrementAndGet();
}
}
});
}

in.close();
executorService.shutdown();
executorService.awaitTermination(100000, TimeUnit.SECONDS);

List<Entry<String, AtomicInteger>> results = new ArrayList<Map.Entry<String, AtomicInteger>>(counts.entrySet());
Collections.sort(results, new Comparator<Entry<String, AtomicInteger>>() {
public int compare(Entry<String, AtomicInteger> o1, Entry<String, AtomicInteger> o2) {
int anotherVal = o1.getValue().get();
int thisVal = o2.getValue().get();
return (thisVal<anotherVal ? -1 : (thisVal==anotherVal ? 0 : 1));
}
});

for(int i = 0; i < 10; i++) {
System.out.println(results.get(i));
}
}
}


The performance for this version is pretty poor:
real 0m5.407s
user 0m11.921s
sys 0m4.023s

Most of the locking is probably uncontended but there is still significant overhead. It also is probably not a great idea to use lots and lots of AtomicIntegers. With more work being done in the tasks it might be more reasonable. The next step was to get my hands dirty with low level threading. Many bloggers will have you believe this is impossible to get right, but somehow I've managed to write a fair bit of multithreaded code with manual synchronization without causing horrible errors. For this version I just used two threads: the main thread doing the reading, and another thread doing the regexp match and map update. I didn't bother trying to parallelize the sort. The main thread will pass lines on a queue to the regexp thread. Once the file is finished a special object is put on the queue to indicate that. We then wait for the regexp thread to finish and get the map back. Obviously locking overhead will be a problem so instead of putting lines on the queue one by one batches of lines are used.


public class WideFinder2ThreadsBulk {
public static void main(String[] args) throws IOException, InterruptedException {
int batchSize = Integer.parseInt(args[1]);
Pattern p = Pattern.compile("GET /ongoing/When/\\d\\d\\dx/(\\d\\d\\d\\d/\\d\\d/\\d\\d/[^ .]+) ");

LineCounter counter = new LineCounter(p);
Thread t = new Thread(counter);
t.start();

BufferedReader in = new BufferedReader(new InputStreamReader(new FileInputStream(args[0]), "US-ASCII"));

String line = null;
List<String> batch = new ArrayList<String>(batchSize);
while ((line = in.readLine()) != null) {
batch.add(line);
if (batch.size() == batchSize) {
counter.addLines(batch);
batch = new ArrayList<String>(batchSize);
}
}
counter.addLines(batch);
counter.finished();

t.join();
in.close();

List<Entry<String, Integer>> results = new ArrayList<Map.Entry<String, Integer>>(counter.getCounts().entrySet());
Collections.sort(results, new Comparator<Entry<String, Integer>>() {
public int compare(Entry<String, Integer> o1, Entry<String, Integer> o2)
{
return o2.getValue().compareTo(o1.getValue());
}
});

for(int i = 0; i < 10; i++) {
System.out.println(results.get(i));
}
}

static class LineCounter implements Runnable {
private static final List<String> FINISHED = new ArrayList<String>();
private final BlockingQueue<List<String>> queue = new ArrayBlockingQueue<List<String>>(100);
private final Pattern pattern;
private final Map<String, Integer> counts = new HashMap<String, Integer>();

LineCounter(Pattern pattern) {
this.pattern = pattern;
}

public void run() {
List<String> lines = null;
try {
while ((lines = queue.take()) != FINISHED) {
for (String line : lines) {
Matcher m = pattern.matcher(line);
if (m.find()) {
String key = m.group();
Integer currentCount = counts.get(key);
counts.put(key, (currentCount == null ? 1 : (currentCount + 1)));
}
}
}
}
catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
}

void addLines(List<String> batch) throws InterruptedException {
queue.put(batch);
}

void finished() throws InterruptedException {
queue.put(FINISHED);
}

Map<String, Integer> getCounts() {
return counts;
}
}
}



Finally we get some speedup (for batches of 500 lines):
real 0m2.597s
user 0m4.137s
sys 0m0.294s

You could generalize this for multiple threads, but it likely would not be worth the effort. Really, this whole exercise probably isn't worth the effort. In fact, I feel like we have all failed one of Steve Yegge's interview questions. A commenter on Tim Bray's blog shows that Unix command line utilities are the way to go, while we have been writing big programs, and trying to optimize file IO. Another - probably easy - way to do this would be to suck the data into your preferred DB, and run a single (simple) SQL query. But that wouldn't be fun, would it?