public class WideFinderManyThreads {
public static void main(String[] args) throws IOException, InterruptedException, ExecutionException {
int batchSize = Integer.parseInt(args[1]);
int numThreads = Integer.parseInt(args[2]);
final BlockingQueue<List<String>> queue = new ArrayBlockingQueue<List<String>>(100);
ExecutorService exec = Executors.newCachedThreadPool();
List<Future<Map<String, Integer>>> counters = new ArrayList<Future<Map<String, Integer>>>();
Pattern p = Pattern.compile("GET /ongoing/When/\\d\\d\\dx/(\\d\\d\\d\\d/\\d\\d/\\d\\d/[^ .]+) ");
String [] files = args[0].split(File.pathSeparator);
CountDownLatch latch = new CountDownLatch(files.length);
for (int i = 0; i < numThreads; i++) {
counters.add(exec.submit(new LineCounter(p, queue)));
}
for (String filename : files) {
exec.execute(new LineReader(queue,
new BufferedReader(new InputStreamReader(new FileInputStream(filename), "US-ASCII")),
latch, batchSize));
}
latch.await();
for (int i = 0; i < numThreads; i++) {
queue.put(LineCounter.FINISHED);
}
Map<String, Integer> combined = new HashMap<String, Integer>();
for (Future<Map<String, Integer>> future : counters) {
combine(combined, future.get());
}
exec.shutdown();
List<Entry<String, Integer>> results = new ArrayList<Map.Entry<String, Integer>>(combined.entrySet());
Collections.sort(results, new Comparator<Entry<String, Integer>>() {
public int compare(Entry<String, Integer> o1, Entry<String, Integer> o2) {
return o2.getValue().compareTo(o1.getValue());
}
});
for(int i = 0; i < 10; i++) {
System.out.println(results.get(i));
}
}
private static void combine(Map<String, Integer> combined, Map<String, Integer> section)
{
Set<Entry<String, Integer>> name = section.entrySet();
for (Entry<String,Integer> entry : name) {
Integer currentCount = combined.get(entry.getKey());
combined.put(entry.getKey(), (currentCount == null ? entry.getValue() : (currentCount + entry.getValue())));
}
}
static class LineReader implements Runnable {
private final BlockingQueue<List<String>> queue;
private final BufferedReader reader;
private final CountDownLatch latch;
private final int batchSize;
public LineReader(final BlockingQueue<List<String>> queue, final BufferedReader reader, final CountDownLatch latch, int batchSize)
{
this.queue = queue;
this.reader = reader;
this.latch = latch;
this.batchSize = batchSize;
}
public void run()
{
String line = null;
List<String> batch = new ArrayList<String>(batchSize);
try {
while ((line = reader.readLine()) != null) {
batch.add(line);
if (batch.size() == batchSize) {
queue.put(batch);
batch = new ArrayList<String>(batchSize);
}
}
queue.put(batch);
}
catch (Exception e) {
e.printStackTrace();
}
finally {
try {
reader.close();
}
catch (IOException e) {
e.printStackTrace();
}
latch.countDown();
}
}
}
static class LineCounter implements Callable<Map<String, Integer>> {
public static final List<String> FINISHED = new ArrayList<String>();
private final BlockingQueue<List<String>> queue;
private final Pattern pattern;
private final Map<String, Integer> counts = new HashMap<String, Integer>();
LineCounter(Pattern pattern, BlockingQueue<List<String>> queue) {
this.pattern = pattern;
this.queue = queue;
}
public Map<String, Integer> call() {
List<String> lines = null;
try {
while ((lines = queue.take()) != FINISHED) {
for (String line : lines) {
Matcher m = pattern.matcher(line);
if (m.find()) {
String key = m.group();
Integer currentCount = counts.get(key);
counts.put(key, (currentCount == null ? 1 : (currentCount + 1)));
}
}
}
}
catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
return counts;
}
}
}
One thing I found was passing multiple files in, even when they were on the same disk, did result in a speedup. For example 1 file with 1m lines took about 2.5 seconds, and 2 files with 1m lines each and 3 processing threads took about 3.4 seconds. That is not bad at all. I had read about what some of the Erlang guys were doing with reading one file in parallel in multiple processes. I thought it was unlikely to give much of an advantage, but I am rethinking that now. I may have a look at implementing that in Java.
0 comments:
Post a Comment