diff options
Diffstat (limited to 'src')
| -rw-r--r-- | src/main/java/dev/morling/onebrc/CalculateAverage_gnabyl.java | 136 |
1 files changed, 61 insertions, 75 deletions
diff --git a/src/main/java/dev/morling/onebrc/CalculateAverage_gnabyl.java b/src/main/java/dev/morling/onebrc/CalculateAverage_gnabyl.java index 7c9769e..3c27119 100644 --- a/src/main/java/dev/morling/onebrc/CalculateAverage_gnabyl.java +++ b/src/main/java/dev/morling/onebrc/CalculateAverage_gnabyl.java @@ -16,25 +16,30 @@ package dev.morling.onebrc; import java.io.IOException; +import java.io.PrintWriter; import java.io.RandomAccessFile; import java.nio.MappedByteBuffer; import java.nio.channels.FileChannel; import java.nio.charset.StandardCharsets; import java.util.ArrayList; -import java.util.Collections; import java.util.HashMap; +import java.util.HashSet; import java.util.List; import java.util.Map; -import java.util.concurrent.CompletableFuture; +import java.util.Set; +import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ExecutionException; +import java.util.stream.Collectors; public class CalculateAverage_gnabyl { private static final String FILE = "./measurements.txt"; - private static final int NB_CHUNKS = 8; + private static final int NB_CHUNKS = Runtime.getRuntime().availableProcessors(); - private static record Chunk(long start, int bytesCount, MappedByteBuffer mappedByteBuffer) { + private static Map<Integer, String> stationNameMap = new ConcurrentHashMap<>(10000, 0.9f, NB_CHUNKS); + + private static record Chunk(int bytesCount, MappedByteBuffer mappedByteBuffer) { } private static int reduceSizeToFitLineBreak(FileChannel channel, long startPosition, int startSize) @@ -61,9 +66,9 @@ public class CalculateAverage_gnabyl { return realSize; } - private static List<Chunk> readChunks(long nbChunks) throws IOException { + private static List<Chunk> readChunks(int nbChunks) throws IOException { RandomAccessFile file = new RandomAccessFile(FILE, "rw"); - List<Chunk> res = new ArrayList<>(); + List<Chunk> res = new ArrayList<>(nbChunks); FileChannel channel = file.getChannel(); long bytesCount = channel.size(); long bytesPerChunk = bytesCount / nbChunks; @@ -71,16 +76,18 @@ public class CalculateAverage_gnabyl { // Memory map the file in read-only mode // TODO: Optimize using threads long currentPosition = 0; + int startSize; + int realSize; for (int i = 0; i < nbChunks; i++) { - int startSize = (int) bytesPerChunk; - int realSize = startSize; + startSize = (int) bytesPerChunk; + realSize = startSize; if (i == nbChunks - 1) { realSize = (int) (bytesCount - currentPosition); MappedByteBuffer mappedByteBuffer = channel.map(FileChannel.MapMode.READ_ONLY, currentPosition, realSize); - res.add(new Chunk(currentPosition, realSize, mappedByteBuffer)); + res.add(new Chunk(realSize, mappedByteBuffer)); break; } @@ -90,7 +97,7 @@ public class CalculateAverage_gnabyl { MappedByteBuffer mappedByteBuffer = channel.map(FileChannel.MapMode.READ_ONLY, currentPosition, realSize); - res.add(new Chunk(currentPosition, realSize, mappedByteBuffer)); + res.add(new Chunk(realSize, mappedByteBuffer)); currentPosition += realSize; } @@ -101,32 +108,32 @@ public class CalculateAverage_gnabyl { } private static class StationData { - private double sum, min, max; - private long count; + private float sum, min, max; + private int count; - public StationData(double value) { + public StationData(float value) { this.count = 1; this.sum = value; this.min = value; this.max = value; } - public void update(double value) { + public void update(float value) { this.count++; this.sum += value; this.min = Math.min(this.min, value); this.max = Math.max(this.max, value); } - public double getMean() { + public float getMean() { return sum / count; } - public double getMin() { + public float getMin() { return min; } - public double getMax() { + public float getMax() { return max; } @@ -139,47 +146,44 @@ public class CalculateAverage_gnabyl { } - static double round(double value) { - return Math.round(value * 10.0) / 10.0; + static float round(float value) { + return Math.round(value * 10.0f) * 0.1f; } private static class ChunkResult { - private Map<String, StationData> data; + private Map<Integer, StationData> data; public ChunkResult() { data = new HashMap<>(); } - public StationData getData(String name) { - return data.get(name); + public StationData getData(int hash) { + return data.get(hash); } - public void addStation(String name, double value) { - this.data.put(name, new StationData(value)); + public void addStation(int hash, float value) { + this.data.put(hash, new StationData(value)); } public void print() { - var stationNames = new ArrayList<String>(this.data.keySet()); - Collections.sort(stationNames); - System.out.print("{"); - for (int i = 0; i < stationNames.size() - 1; i++) { - var name = stationNames.get(i); - var stationData = data.get(name); - System.out.printf("%s=%.1f/%.1f/%.1f, ", name, round(stationData.getMin()), - round(stationData.getMean()), - round(stationData.getMax())); - } - var name = stationNames.get(stationNames.size() - 1); - var stationData = data.get(name); - System.out.printf("%s=%.1f/%.1f/%.1f", name, round(stationData.getMin()), - round(stationData.getMean()), - round(stationData.getMax())); - System.out.println("}"); + PrintWriter out = new PrintWriter(System.out); + out.println( + this.data.keySet().parallelStream() + .map(hash -> { + var stationData = data.get(hash); + var name = stationNameMap.get(hash); + return String.format("%s=%.1f/%.1f/%.1f", name, round(stationData.getMin()), + round(stationData.getMean()), + round(stationData.getMax())); + }) + .sorted((a, b) -> a.split("=")[0].compareTo(b.split("=")[0])) + .collect(Collectors.joining(", ", "{", "}"))); + out.flush(); } public void mergeWith(ChunkResult other) { - for (Map.Entry<String, StationData> entry : other.data.entrySet()) { - String stationName = entry.getKey(); + for (Map.Entry<Integer, StationData> entry : other.data.entrySet()) { + int stationName = entry.getKey(); StationData otherStationData = entry.getValue(); StationData thisStationData = this.data.get(stationName); @@ -201,16 +205,22 @@ public class CalculateAverage_gnabyl { chunk.mappedByteBuffer().get(data); // Process each line - String stationName; - double value; + float value; int iSplit, iEol; StationData stationData; - long negative; + int negative; + int hash, prime = 31; + Set<Integer> seenHashes = new HashSet<>(10000, 0.9f); for (int offset = 0; offset < data.length; offset++) { // Find station name + hash = 0; for (iSplit = offset; data[iSplit] != ';'; iSplit++) { + hash = (hash << 5) - hash + (data[iSplit] & 0xFF); + } + if (!seenHashes.contains(hash)) { + seenHashes.add(hash); + stationNameMap.put(hash, new String(data, offset, iSplit - offset, StandardCharsets.UTF_8)); } - stationName = new String(data, offset, iSplit - offset, StandardCharsets.UTF_8); // Find value iSplit++; @@ -222,7 +232,7 @@ public class CalculateAverage_gnabyl { continue; } if (data[iEol] == '.') { - value = value + (data[iEol + 1] - 48) / 10.0; + value = value + (data[iEol + 1] - 48) * 0.1f; iEol += 2; break; } @@ -231,10 +241,10 @@ public class CalculateAverage_gnabyl { value *= negative; // Init & count - stationData = result.getData(stationName); + stationData = result.getData(hash); if (stationData == null) { - result.addStation(stationName, value); + result.addStation(hash, value); } else { stationData.update(value); @@ -247,32 +257,8 @@ public class CalculateAverage_gnabyl { } private static ChunkResult processAllChunks(List<Chunk> chunks) throws InterruptedException, ExecutionException { - // var globalRes = new ChunkResult(); - // for (var chunk : chunks) { - // var chunkRes = processChunk(chunk); - // globalRes.mergeWith(chunkRes); - // } - // return globalRes; - - List<CompletableFuture<ChunkResult>> computeTasks = new ArrayList<>(); - - for (Chunk chunk : chunks) { - computeTasks.add(CompletableFuture.supplyAsync(() -> processChunk(chunk))); - } - - ChunkResult globalRes = null; - - for (CompletableFuture<ChunkResult> completedTask : computeTasks) { - ChunkResult chunkRes = completedTask.get(); - if (globalRes == null) { - globalRes = completedTask.get(); - } - else { - globalRes.mergeWith(chunkRes); - } - } - - return globalRes; + return chunks.parallelStream().map(CalculateAverage_gnabyl::processChunk).collect(ChunkResult::new, + ChunkResult::mergeWith, ChunkResult::mergeWith); } public static void main(String[] args) throws IOException, InterruptedException, ExecutionException { |
