Sunday, January 25, 2009

Erlang style binary matching in Scala

Binary Patterns


Erlang programs typically read binary data into Binaries which are broadly equivalent to byte arrays in C family languages. These binary objects can then be matched by patterns in function definitions and case expressions using the bit syntax. A simple example which matches two 3 bit values and one 10 bit values is shown below:
case Header of <<Version:3, Type:3, Len:10>> -> ...

The bit syntax can also pack values into binaries and unpack them into variables. There are lots of options which control the size and type of the extracted variables: signed, little-endian etc.

By comparison working with bytes arrays in Java is much more cumbersome. You would typically wrap your byte array in a ByteArrayInputstream and DataInputStream for reading the built-in types, and for reading unusual numbers of bits you need to write your own bit fiddling code. If we had a BitStream class the previous example could be written like the following in Java:

if (in.remaining == 16) {
int version = in.getBits(3);
int type = in.getBits(3);
int len = in.getBits(10);
...
}

The situation in Scala is pretty much the same as it doesn't have much of an IO library yet.
Since it is possible to implement Erlang's party trick (actor concurrency) as a Scala library I thought it would be interesting to see if something approximating the bit syntax could be implemented in Scala.

Scala Pattern Matching


Scala has a quite rich pattern matching ability. The three most common uses of pattern matching in Scala are with constants/literals, with types, and with case classes. Matching literals or constants is pretty much the classic switch statement. Matching on types nicely replaces the if-instance-then-cast idiom from Java:

x match {
case s:String => ...
case i:Int => ...
}

Case classes are used to model algebraic data types as are common in functional languages:

sealed abstract class Tree
case object Empty extends Tree
case class LeafNode(value: Int) extends Tree
case class InternalNode(left: Tree, right: Tree, value: Int) extends tree

def sum(t: Tree) = t match {
case Empty => 0
case LeafNode(v) => v
case InternalNode(l, r, v) => v + sum(l) + sum(r)
}

Obviously neither of these types of pattern matching will let us do bit syntax style matching. Luckily Scala has another mechanism for pattern matching: extractors. Extractors are objects with an unapply method which can return either Some[A] if it matchs or None if it doesn't. A simple example (from Programming In Scala) is an email address matcher:

object EmailAddress {
def unapply(s: String): Option[(String,String)] = {
val parts = s split "@"
if (parts.length == 2) Some(parts(0), parts(1)) else None
}
}

"foo@bar.com" match {
case EmailAddress(user, domain) => println("Hello " + user)
case _ => println("Unmatched")
}

What immediately comes to mind is something like:

case BitPattern(Bits(3), Bits(3), Bits(10)) =>

But of course this won't work. All extractors know about is the value passed to the unapply method. They know nothing about the context in which it was called, or any nested patterns.
So any chance of exactly copying the Erlang syntax is scuppered. But if we are willing to allow the definition of the pattern in a different place to its use then we can still achieve something.

Underlying implementation


To start with I'll assume a BitStream class that wraps a byte array. I won't go into the details as it is not very exciting (and my bit fiddling expertise isn't all that). We then need some classes to represent the various data types we want to pull out.

abstract class BitPart[T] {
def bitLength : Int
def take(b: BitStream) : T
}

class Bits(c : Int) extends BitPart[Int] {
def bitLength = c
def take(b: BitStream) = b.getBits(c)
}

object SInt extends BitPart[Int] {
def bitLength = 32
def take(b: BitStream) = b.getBits(bitLength)
}

object Remaining extends BitPart[Array[Byte]] {
def bitLength = -1
def take(b: BitStream) = b.remainingBytes
}

Remaining is a special case that will match any number of bytes that lets us match arbitrary length byte arrays. Obviously we would need a bunch more types for this to be a useful API.
So, on to matching with these classes. It is not too difficult to take a sequence of BitPart instances, apply them to a BitStream, and return None if they don't match or Some[Array[Any]] if they do.

def matchPattern(bytes: Array[Byte], parts : BitPart[_]* ) : Option[Array[Any]] = {
...
}

I'm omitting the implementation again, because it is a bit long and not overly interesting.
It is possibly to write extractors that return a variable number of items but that is not really what we want to do here as we could make a mistake between the pattern definition and use. Also we've lost the type safety and so would need to stick type annotations on all the pattern variables: annoying and error prone.

The Extractors


We will wrap this matchPattern method up in a type safe extractor.

class BitMatch3[T1,T2,T3] (p1: BitPart[T1], p2: BitPart[T2], p3: BitPart[T3]) {
def unapply(bytes : Array[Byte]) : Option[(T1,T2,T3)] = {
for (a <- matchPattern(bytes, p1, p2, p3))
yield (a(0).asInstanceOf[T1], a(1).asInstanceOf[T2], a(2).asInstanceOf[T3])
}
}

Let's try this out on our original example:

val Header = new BitMatch3(new Bits(3), new Bits(3), new Bits(10))
bArray match {
case Header(version, type, len) => ...
case _ =>
}

A little clunky maybe, but getting there. As you might have guessed from the BitMatch3 name we need a different extractor for each number of elements. This is the same as Tuples in Scala which are defined with up to 22 elements. It is a common limitation in statically typed languages which could possibly be overcome with Lisp style macros or C++ templates.
Along with all these classes I created an object with a bunch of overloaded methods to hide them, so that the end user doesn't have to use the ugly names with numbers in.

object Patterns {
def bitPattern[T](p: BitPart[T]) = new BitMatch1(p)
def bitPattern[T1,T2](p1: BitPart[T1],p2: BitPart[T2]) = new BitMatch2(p1,p2)
def bitPattern[T1,T2,T3](p1: BitPart[T1],p2: BitPart[T2],p3: BitPart[T3]) = new BitMatch3(p1,p2,p3)
def bitPattern[T1,T2,T3,T4](p1: BitPart[T1],p2: BitPart[T2],p3: BitPart[T3],p4: BitPart[T4]) = new BitMatch4(p1,p2,p3,p4)
...
}

The direct use of the BitPart constructors and objects is a bit ugly so the usual Scala DSL stuff comes into play:

def int = SInt
def byte = SByte
def remaining = Remaining

class PatternInt(x :Int) {
def bits = new Bits(x)
def bytes = new Bytes(x)
}

implicit def intExtras(x : Int) = new PatternInt(x)

This makes the simple example a lot nicer:

val Header = bitPattern(3 bits, 3 bits, 10 bits)

A more complicated example could include lots of types and lots of DSL magic:

val Packet = bitPattern(2 bits, 6 bits, byte, unsigned int, float, remaining)

If you don't like the long bitPattern name you could change it to the Erlang << and stick a def >> = this method on the BitMatch classes for symmetry, which would give you:
val Header = <<(3 bits, 3 bits, 10 bits)>>

Lets look at a more complicated example from Programming Erlang which parses an IPv4 datagram:

-define(IP_VERSION, 4).

DgramSize = size(Dgram),
case Dgram of
<<?IP_VERSION:4, Hlen:4, SrvcType:8, TotLen:16, ID:16, Flgs:3, FragOff:13,
TTL:8, proto:8, HdrChkSum:16, SrcIP: 32, DstIP:32, RestDgram/binary>>
when HLen >= 5, 4 * HLen =< DgramSize ->
...

In Scala:

val IpVersion = 4
val IPv4Dgram = bitPattern(4 bits, 4 bits, byte, short, short, 3 bits, 13 bits,
byte, byte, short, int, int, remaining)

datagram match {
case IPv4Dgram(IpVersion, hLen, srvcType, totLen, id, flags, fragOff,
ttl, proto, hdrChkSum, srcIP, destIP, restDgram) if hLen >= 5 &
(4 * hLen) <= datagram.length => ...
}

Patterns can be nested so, for example, in the case above we could have(assuming an IPAddress extractor):

ttl, proto, hdrChkSum, IPAddress(127,0,0,1), destIP, restDgram)

Which would only match datagrams from localhost.

Wrap-up


Erlang bit syntax is compiled, and so is supposedly quite fast, whereas this Scala implementation involves a fair bit of boxing and allocation, so I couldn't recommend it for the core of your high performance server, but for occasional network or file tasks it is a pretty neat solution.
This implementation works on byte arrays, because that is pretty much what Erlang does, and going by Programming Erlang it is common practise to read a whole file into one big binary. I wouldn't normally want to do that, so pattern matching that worked on lazy list (scala.Stream) wrapper around an InputStream wold be nice to have. The tricky part in that is managing when earlier parts of the Stream are garbage collected as you could end up holding on to the whole file anyway.
Extractors are a very cool part of the Scala language and it is nice to see how easy it is to imitate features of other language.

Friday, October 5, 2007

Disk caching and file splitting

I realized I had been a little careless in my Wide Finder testing. I had not taken into account the effects of disk caching. All of the times in previous posts are with the cache warmed up. There really is an enormous difference: 30 seconds to 10 seconds for 5 million lines! I think nothing is the cache is the more realistic benchmark, as you are unlikely to be reading through the file again and again.

I implemented reading a single file in parallel. I didn't use anything fancy like memory mapped files, just regular FileInputStreams. This has the advantage that I can easily swap in the split streams where ordinary streams are used.
The steps I take to do this are fairly straightforward:

  1. Divide the file size by the desired number of sections. This is your section size

  2. Open a stream for the file. Don't do anything with it

  3. Open another stream. Skip this forward by the section size

  4. Step forward until you find '\n'

  5. Wrap the first stream in a limited length stream decorator with the current position as the limit. This will make the stream end where the next one begins

  6. Stick your initial stream in a list and make the second stream your new first one

  7. Repeat from step 3 for the number of sections - 1 but skipping forward by section size * the number of the section we are on

  8. Stick the last stream on the list



One thing I needed for this was a limited length stream wrapper. It is something you would think is in commons-io but no joy there. It is easy enough to write though.


public class LimitedLengthInputStream extends InputStream {
private InputStream stream;
private long available;
private long availableAtMark;

public LimitedLengthInputStream(InputStream stream, long available) {
this.stream = stream;
this.available = available;
}

public int read() throws IOException {
if (available > 0) {
available--;
return stream.read();
}
return -1;
}

public int read(byte b[], int off, int len) throws IOException {
if (available > 0) {
if (len > available) {
len = (int) available;
}
int read = stream.read(b, off, len);
if (read == -1) {
available = 0;
} else {
available -= read;
}
return read;
}
return -1;
}

public long skip(long n) throws IOException {
long skip = stream.skip(n);
if (available > 0) {
available -= skip;
}
return skip;
}

public void mark(int readlimit) {
stream.mark(readlimit);
availableAtMark = available;
}

public void reset() throws IOException {
stream.reset();
available = availableAtMark;
}

public boolean markSupported() {
return true;
}
}


One thing to watch out for when doing this is InputStream does not guarantee that a skip will actually skip the length you ask for. You have to wrap the call in a loop to force it to skip the full distance, which is a bit annoying.


public class SplitFileInputStreams
{
public static List<InputStream> split(String filename, int sections) throws IOException {
File f = new File(filename);
long l = f.length();
long sectionSize = l / sections;
List<InputStream> streams = new ArrayList<InputStream>();

InputStream previous = new BufferedInputStream(new FileInputStream(filename));
InputStream current = new BufferedInputStream(new FileInputStream(filename));


int i = 0;
long lastPoint = 0;
for (int j = 1; j < sections; j++) {
long skipToPos = Math.max(j * sectionSize, lastPoint + 1);

reallySkip(current, skipToPos);
do {
i = current.read();
skipToPos++;
} while (i != '\n' && i != -1);

if (i != -1) {
streams.add(new LimitedLengthInputStream(previous, skipToPos - lastPoint));
if (j < sections - 1) {
lastPoint = skipToPos;
previous = current;
current = new BufferedInputStream(new FileInputStream(filename));
}
else {
streams.add(current);
}
}
else {
streams.add(previous);
current.close();
break;
}
}

return streams;
}

private static void reallySkip(InputStream in, long skip) throws IOException {
long skipped = 0;
do {
skipped += in.skip(skip - skipped);
} while (skipped != skip);
}
}


There are just a few modifications to the Wide Finder main method from my last post:


int batchSize = Integer.parseInt(args[1]);
int numThreads = Integer.parseInt(args[2]);
int split = 1;
if (args.length > 3) {
split = Integer.parseInt(args[3]);
}
final BlockingQueue<List<String>> queue = new ArrayBlockingQueue<List<String>>(100);
ExecutorService exec = Executors.newCachedThreadPool();

List<Future<Map<String, Integer>>> counters = new ArrayList<Future<Map<String, Integer>>>();
Pattern p = Pattern.compile("GET /ongoing/When/\\d\\d\\dx/(\\d\\d\\d\\d/\\d\\d/\\d\\d/[^ .]+) ");

String [] files = args[0].split(File.pathSeparator);
CountDownLatch latch = new CountDownLatch(files.length * split);

for (int i = 0; i < numThreads; i++) {
counters.add(exec.submit(new LineCounter(p, queue)));
}

for (String filename : files) {
if (split <= 1) {
exec.execute(new LineReader(queue,
new BufferedReader(new InputStreamReader(
new FileInputStream(filename), "US-ASCII")),
latch, batchSize));
}
else {
List<InputStream> streams = SplitFileInputStreams.split(filename, split);
for (InputStream stream : streams) {
exec.execute(new LineReader(queue,
new BufferedReader(new InputStreamReader(stream, "US-ASCII")),
latch, batchSize));
}
}
}


With a 200MB/1m line file there is no advantage. With 1GB/5m lines there was a nice 25% speedup when splitting the file in 3 with a few processing threads.

Wednesday, October 3, 2007

More Threads FTW

So the two threads version of the WideFinder in my last post is pretty practical, but it doesn't solve the theoretical problem of scaling to many cores/CPUs. It doesn't take too much to rejig it so it uses multiple worker threads. I decided to change it to use Futures instead of manually joining on the thread, and that is a bit nicer. I also introduced multiple file reading threads, so you can pass in a list of files separated by your system's path separator. The only slightly tricky thing is the use of a countdown latch to let us know when all the reading threads are finished. The maps returned by the workers must be combined to give use the final result. With the amount of work done for each line multiple worker threads don't help, but that could easily change if you were doing something more complex.


public class WideFinderManyThreads {
public static void main(String[] args) throws IOException, InterruptedException, ExecutionException {
int batchSize = Integer.parseInt(args[1]);
int numThreads = Integer.parseInt(args[2]);
final BlockingQueue<List<String>> queue = new ArrayBlockingQueue<List<String>>(100);
ExecutorService exec = Executors.newCachedThreadPool();

List<Future<Map<String, Integer>>> counters = new ArrayList<Future<Map<String, Integer>>>();
Pattern p = Pattern.compile("GET /ongoing/When/\\d\\d\\dx/(\\d\\d\\d\\d/\\d\\d/\\d\\d/[^ .]+) ");

String [] files = args[0].split(File.pathSeparator);
CountDownLatch latch = new CountDownLatch(files.length);

for (int i = 0; i < numThreads; i++) {
counters.add(exec.submit(new LineCounter(p, queue)));
}

for (String filename : files) {
exec.execute(new LineReader(queue,
new BufferedReader(new InputStreamReader(new FileInputStream(filename), "US-ASCII")),
latch, batchSize));
}

latch.await();
for (int i = 0; i < numThreads; i++) {
queue.put(LineCounter.FINISHED);
}

Map<String, Integer> combined = new HashMap<String, Integer>();
for (Future<Map<String, Integer>> future : counters) {
combine(combined, future.get());
}
exec.shutdown();

List<Entry<String, Integer>> results = new ArrayList<Map.Entry<String, Integer>>(combined.entrySet());
Collections.sort(results, new Comparator<Entry<String, Integer>>() {
public int compare(Entry<String, Integer> o1, Entry<String, Integer> o2) {
return o2.getValue().compareTo(o1.getValue());
}
});

for(int i = 0; i < 10; i++) {
System.out.println(results.get(i));
}
}

private static void combine(Map<String, Integer> combined, Map<String, Integer> section)
{
Set<Entry<String, Integer>> name = section.entrySet();
for (Entry<String,Integer> entry : name) {
Integer currentCount = combined.get(entry.getKey());
combined.put(entry.getKey(), (currentCount == null ? entry.getValue() : (currentCount + entry.getValue())));
}
}

static class LineReader implements Runnable {
private final BlockingQueue<List<String>> queue;
private final BufferedReader reader;
private final CountDownLatch latch;
private final int batchSize;

public LineReader(final BlockingQueue<List<String>> queue, final BufferedReader reader, final CountDownLatch latch, int batchSize)
{
this.queue = queue;
this.reader = reader;
this.latch = latch;
this.batchSize = batchSize;
}

public void run()
{
String line = null;
List<String> batch = new ArrayList<String>(batchSize);
try {
while ((line = reader.readLine()) != null) {
batch.add(line);
if (batch.size() == batchSize) {
queue.put(batch);
batch = new ArrayList<String>(batchSize);
}
}
queue.put(batch);
}
catch (Exception e) {
e.printStackTrace();
}
finally {
try {
reader.close();
}
catch (IOException e) {
e.printStackTrace();
}
latch.countDown();
}
}
}

static class LineCounter implements Callable<Map<String, Integer>> {
public static final List<String> FINISHED = new ArrayList<String>();
private final BlockingQueue<List<String>> queue;
private final Pattern pattern;
private final Map<String, Integer> counts = new HashMap<String, Integer>();

LineCounter(Pattern pattern, BlockingQueue<List<String>> queue) {
this.pattern = pattern;
this.queue = queue;
}

public Map<String, Integer> call() {
List<String> lines = null;
try {
while ((lines = queue.take()) != FINISHED) {
for (String line : lines) {
Matcher m = pattern.matcher(line);
if (m.find()) {
String key = m.group();
Integer currentCount = counts.get(key);
counts.put(key, (currentCount == null ? 1 : (currentCount + 1)));
}
}
}
}
catch (InterruptedException e) {
Thread.currentThread().interrupt();
}

return counts;
}
}
}


One thing I found was passing multiple files in, even when they were on the same disk, did result in a speedup. For example 1 file with 1m lines took about 2.5 seconds, and 2 files with 1m lines each and 3 processing threads took about 3.4 seconds. That is not bad at all. I had read about what some of the Erlang guys were doing with reading one file in parallel in multiple processes. I thought it was unlikely to give much of an advantage, but I am rethinking that now. I may have a look at implementing that in Java.

Tuesday, October 2, 2007

WideFinder and Java

Update: I realized there is a stupid bug in the concurrent version that I posted. ConcurrentHashMap.putIfAbsent returns null if there was no previous value - not the new value - so I was missing one increment and the Executor seemed to be eating the NullPointerException. I've updated the code below.

Everyone seems to getting in on Tim Bray's WideFinder challenge; there is even a C++ implementation.
I think it was originally supposed to be a competition between all those new and exciting dynamic languages, but they have not be overly impressive thus far. So I've decided to post up my various attempts at a solution in Java. Here is the basic WideFinder in Java (all import statements are omitted for brevity):


public class WideFinder {
public static void main(String[] args) throws IOException {
Map<String, Integer> counts = new HashMap<String, Integer>();
Pattern p = Pattern.compile("GET /ongoing/When/\\d\\d\\dx/(\\d\\d\\d\\d/\\d\\d/\\d\\d/[^ .]+) ");
BufferedReader in = new BufferedReader(new InputStreamReader(
new FileInputStream(args[0]), "US-ASCII"));

String line = null;
while ((line = in.readLine()) != null) {
Matcher m = p.matcher(line);
if (m.find()) {
String key = m.group();
Integer currentCount = counts.get(key);
counts.put(key, (currentCount == null ? 1 : (currentCount + 1)));
}
}
in.close();

List<Entry<String, Integer>> results = new ArrayList<Map.Entry<String, Integer>>(counts.entrySet());
Collections.sort(results, new Comparator<Entry<String, Integer>>() {

public int compare(Entry<String, Integer> o1, Entry<String, Integer> o2)
{
return o2.getValue().compareTo(o1.getValue());
}

});

for(int i = 0; i < 10; i++) {
System.out.println(results.get(i));
}
}
}



The basic version is very similar to the Ruby one, if a little more verbose. Like all of the others, it is primarily IO bound. One important thing I found, was that specifying the encoding as being ASCII made about 30% improvement in the time taken. This is because Java uses unicode strings internally, and if you read text without specifying an encoding it will use the platform default. ASCII is a good bit quicker to convert than ISO-8859-1.

For a file with 1 million entries I got the following times:

real 0m3.208s
user 0m3.460s
sys 0m0.215s

Taking the regexp out:

real 0m2.267s
user 0m2.329s
sys 0m0.210s

So we can potentially save up to 1 second, almost 1/3 of the time. My first solution was to try to keep things as close to the original as possible. For each line, instead of matching it directly, I submit a new task to an ExecutorService. To make the map safe to access from multiple threads I use a ConcurrentHashMap, with AtomicIntegers as values.


public class WideFinderConcurrent {
public static void main(String[] args) throws IOException, InterruptedException {
int threads = Integer.parseInt(args[1]);
ExecutorService executorService = Executors.newFixedThreadPool(threads);

final ConcurrentHashMap<String, AtomicInteger> counts = new ConcurrentHashMap<String, AtomicInteger>();
final Pattern p = Pattern.compile("GET /ongoing/When/\\d\\d\\dx/(\\d\\d\\d\\d/\\d\\d/\\d\\d/[^ .]+) ");

BufferedReader in = new BufferedReader(new InputStreamReader(new FileInputStream(args[0]), "US-ASCII"));

String line = null;
while ((line = in.readLine()) != null) {
final String lineForTask = line;
executorService.execute(new Runnable() {
public void run() {
Matcher m = p.matcher(lineForTask);
if (m.find()) {
String key = m.group();

AtomicInteger currentCount = counts.get(key);
if (currentCount == null) {
currentCount = new AtomicInteger(0);
AtomicInteger old = counts.putIfAbsent(key, currentCount);
if (old != null) {
currentCount = old;
}
}
currentCount.incrementAndGet();
}
}
});
}

in.close();
executorService.shutdown();
executorService.awaitTermination(100000, TimeUnit.SECONDS);

List<Entry<String, AtomicInteger>> results = new ArrayList<Map.Entry<String, AtomicInteger>>(counts.entrySet());
Collections.sort(results, new Comparator<Entry<String, AtomicInteger>>() {
public int compare(Entry<String, AtomicInteger> o1, Entry<String, AtomicInteger> o2) {
int anotherVal = o1.getValue().get();
int thisVal = o2.getValue().get();
return (thisVal<anotherVal ? -1 : (thisVal==anotherVal ? 0 : 1));
}
});

for(int i = 0; i < 10; i++) {
System.out.println(results.get(i));
}
}
}


The performance for this version is pretty poor:
real 0m5.407s
user 0m11.921s
sys 0m4.023s

Most of the locking is probably uncontended but there is still significant overhead. It also is probably not a great idea to use lots and lots of AtomicIntegers. With more work being done in the tasks it might be more reasonable. The next step was to get my hands dirty with low level threading. Many bloggers will have you believe this is impossible to get right, but somehow I've managed to write a fair bit of multithreaded code with manual synchronization without causing horrible errors. For this version I just used two threads: the main thread doing the reading, and another thread doing the regexp match and map update. I didn't bother trying to parallelize the sort. The main thread will pass lines on a queue to the regexp thread. Once the file is finished a special object is put on the queue to indicate that. We then wait for the regexp thread to finish and get the map back. Obviously locking overhead will be a problem so instead of putting lines on the queue one by one batches of lines are used.


public class WideFinder2ThreadsBulk {
public static void main(String[] args) throws IOException, InterruptedException {
int batchSize = Integer.parseInt(args[1]);
Pattern p = Pattern.compile("GET /ongoing/When/\\d\\d\\dx/(\\d\\d\\d\\d/\\d\\d/\\d\\d/[^ .]+) ");

LineCounter counter = new LineCounter(p);
Thread t = new Thread(counter);
t.start();

BufferedReader in = new BufferedReader(new InputStreamReader(new FileInputStream(args[0]), "US-ASCII"));

String line = null;
List<String> batch = new ArrayList<String>(batchSize);
while ((line = in.readLine()) != null) {
batch.add(line);
if (batch.size() == batchSize) {
counter.addLines(batch);
batch = new ArrayList<String>(batchSize);
}
}
counter.addLines(batch);
counter.finished();

t.join();
in.close();

List<Entry<String, Integer>> results = new ArrayList<Map.Entry<String, Integer>>(counter.getCounts().entrySet());
Collections.sort(results, new Comparator<Entry<String, Integer>>() {
public int compare(Entry<String, Integer> o1, Entry<String, Integer> o2)
{
return o2.getValue().compareTo(o1.getValue());
}
});

for(int i = 0; i < 10; i++) {
System.out.println(results.get(i));
}
}

static class LineCounter implements Runnable {
private static final List<String> FINISHED = new ArrayList<String>();
private final BlockingQueue<List<String>> queue = new ArrayBlockingQueue<List<String>>(100);
private final Pattern pattern;
private final Map<String, Integer> counts = new HashMap<String, Integer>();

LineCounter(Pattern pattern) {
this.pattern = pattern;
}

public void run() {
List<String> lines = null;
try {
while ((lines = queue.take()) != FINISHED) {
for (String line : lines) {
Matcher m = pattern.matcher(line);
if (m.find()) {
String key = m.group();
Integer currentCount = counts.get(key);
counts.put(key, (currentCount == null ? 1 : (currentCount + 1)));
}
}
}
}
catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
}

void addLines(List<String> batch) throws InterruptedException {
queue.put(batch);
}

void finished() throws InterruptedException {
queue.put(FINISHED);
}

Map<String, Integer> getCounts() {
return counts;
}
}
}



Finally we get some speedup (for batches of 500 lines):
real 0m2.597s
user 0m4.137s
sys 0m0.294s

You could generalize this for multiple threads, but it likely would not be worth the effort. Really, this whole exercise probably isn't worth the effort. In fact, I feel like we have all failed one of Steve Yegge's interview questions. A commenter on Tim Bray's blog shows that Unix command line utilities are the way to go, while we have been writing big programs, and trying to optimize file IO. Another - probably easy - way to do this would be to suck the data into your preferred DB, and run a single (simple) SQL query. But that wouldn't be fun, would it?