aboutsummaryrefslogtreecommitdiff
path: root/src/main/java/dev/morling/onebrc
diff options
context:
space:
mode:
Diffstat (limited to 'src/main/java/dev/morling/onebrc')
-rw-r--r--src/main/java/dev/morling/onebrc/CalculateAverage_gnabyl.java283
1 files changed, 283 insertions, 0 deletions
diff --git a/src/main/java/dev/morling/onebrc/CalculateAverage_gnabyl.java b/src/main/java/dev/morling/onebrc/CalculateAverage_gnabyl.java
new file mode 100644
index 0000000..7c9769e
--- /dev/null
+++ b/src/main/java/dev/morling/onebrc/CalculateAverage_gnabyl.java
@@ -0,0 +1,283 @@
+/*
+ * Copyright 2023 The original authors
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package dev.morling.onebrc;
+
+import java.io.IOException;
+import java.io.RandomAccessFile;
+import java.nio.MappedByteBuffer;
+import java.nio.channels.FileChannel;
+import java.nio.charset.StandardCharsets;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ExecutionException;
+
+public class CalculateAverage_gnabyl {
+
+ private static final String FILE = "./measurements.txt";
+
+ private static final int NB_CHUNKS = 8;
+
+ private static record Chunk(long start, int bytesCount, MappedByteBuffer mappedByteBuffer) {
+ }
+
+ private static int reduceSizeToFitLineBreak(FileChannel channel, long startPosition, int startSize)
+ throws IOException {
+ long currentPosition = startPosition + startSize - 1;
+ int realSize = startSize;
+
+ if (currentPosition >= channel.size()) {
+ currentPosition = channel.size() - 1;
+ realSize = (int) (currentPosition - startPosition);
+ }
+
+ while (currentPosition >= startPosition) {
+ channel.position(currentPosition);
+ byte byteValue = channel.map(FileChannel.MapMode.READ_ONLY, currentPosition, 1).get();
+ if (byteValue == '\n') {
+ // found line break
+ break;
+ }
+
+ realSize--;
+ currentPosition--;
+ }
+ return realSize;
+ }
+
+ private static List<Chunk> readChunks(long nbChunks) throws IOException {
+ RandomAccessFile file = new RandomAccessFile(FILE, "rw");
+ List<Chunk> res = new ArrayList<>();
+ FileChannel channel = file.getChannel();
+ long bytesCount = channel.size();
+ long bytesPerChunk = bytesCount / nbChunks;
+
+ // Memory map the file in read-only mode
+ // TODO: Optimize using threads
+ long currentPosition = 0;
+ for (int i = 0; i < nbChunks; i++) {
+ int startSize = (int) bytesPerChunk;
+ int realSize = startSize;
+
+ if (i == nbChunks - 1) {
+ realSize = (int) (bytesCount - currentPosition);
+ MappedByteBuffer mappedByteBuffer = channel.map(FileChannel.MapMode.READ_ONLY, currentPosition,
+ realSize);
+
+ res.add(new Chunk(currentPosition, realSize, mappedByteBuffer));
+ break;
+ }
+
+ // Adjust size so that it ends on a newline
+ realSize = reduceSizeToFitLineBreak(channel, currentPosition, startSize);
+
+ MappedByteBuffer mappedByteBuffer = channel.map(FileChannel.MapMode.READ_ONLY, currentPosition,
+ realSize);
+
+ res.add(new Chunk(currentPosition, realSize, mappedByteBuffer));
+ currentPosition += realSize;
+ }
+
+ channel.close();
+ file.close();
+
+ return res;
+ }
+
+ private static class StationData {
+ private double sum, min, max;
+ private long count;
+
+ public StationData(double value) {
+ this.count = 1;
+ this.sum = value;
+ this.min = value;
+ this.max = value;
+ }
+
+ public void update(double value) {
+ this.count++;
+ this.sum += value;
+ this.min = Math.min(this.min, value);
+ this.max = Math.max(this.max, value);
+ }
+
+ public double getMean() {
+ return sum / count;
+ }
+
+ public double getMin() {
+ return min;
+ }
+
+ public double getMax() {
+ return max;
+ }
+
+ public void mergeWith(StationData other) {
+ this.sum += other.sum;
+ this.count += other.count;
+ this.min = Math.min(this.min, other.min);
+ this.max = Math.max(this.max, other.max);
+ }
+
+ }
+
+ static double round(double value) {
+ return Math.round(value * 10.0) / 10.0;
+ }
+
+ private static class ChunkResult {
+ private Map<String, StationData> data;
+
+ public ChunkResult() {
+ data = new HashMap<>();
+ }
+
+ public StationData getData(String name) {
+ return data.get(name);
+ }
+
+ public void addStation(String name, double value) {
+ this.data.put(name, new StationData(value));
+ }
+
+ public void print() {
+ var stationNames = new ArrayList<String>(this.data.keySet());
+ Collections.sort(stationNames);
+ System.out.print("{");
+ for (int i = 0; i < stationNames.size() - 1; i++) {
+ var name = stationNames.get(i);
+ var stationData = data.get(name);
+ System.out.printf("%s=%.1f/%.1f/%.1f, ", name, round(stationData.getMin()),
+ round(stationData.getMean()),
+ round(stationData.getMax()));
+ }
+ var name = stationNames.get(stationNames.size() - 1);
+ var stationData = data.get(name);
+ System.out.printf("%s=%.1f/%.1f/%.1f", name, round(stationData.getMin()),
+ round(stationData.getMean()),
+ round(stationData.getMax()));
+ System.out.println("}");
+ }
+
+ public void mergeWith(ChunkResult other) {
+ for (Map.Entry<String, StationData> entry : other.data.entrySet()) {
+ String stationName = entry.getKey();
+ StationData otherStationData = entry.getValue();
+ StationData thisStationData = this.data.get(stationName);
+
+ if (thisStationData == null) {
+ this.data.put(stationName, otherStationData);
+ }
+ else {
+ thisStationData.mergeWith(otherStationData);
+ }
+ }
+ }
+ }
+
+ private static ChunkResult processChunk(Chunk chunk) {
+ ChunkResult result = new ChunkResult();
+
+ // Perform processing on the chunk data
+ byte[] data = new byte[chunk.bytesCount()];
+ chunk.mappedByteBuffer().get(data);
+
+ // Process each line
+ String stationName;
+ double value;
+ int iSplit, iEol;
+ StationData stationData;
+ long negative;
+ for (int offset = 0; offset < data.length; offset++) {
+ // Find station name
+ for (iSplit = offset; data[iSplit] != ';'; iSplit++) {
+ }
+ stationName = new String(data, offset, iSplit - offset, StandardCharsets.UTF_8);
+
+ // Find value
+ iSplit++;
+ negative = 1;
+ value = 0;
+ for (iEol = iSplit; data[iEol] != '\n'; iEol++) {
+ if (data[iEol] == '-') {
+ negative = -1;
+ continue;
+ }
+ if (data[iEol] == '.') {
+ value = value + (data[iEol + 1] - 48) / 10.0;
+ iEol += 2;
+ break;
+ }
+ value = value * 10 + data[iEol] - 48;
+ }
+ value *= negative;
+
+ // Init & count
+ stationData = result.getData(stationName);
+
+ if (stationData == null) {
+ result.addStation(stationName, value);
+ }
+ else {
+ stationData.update(value);
+ }
+
+ offset = iEol;
+ }
+
+ return result;
+ }
+
+ private static ChunkResult processAllChunks(List<Chunk> chunks) throws InterruptedException, ExecutionException {
+ // var globalRes = new ChunkResult();
+ // for (var chunk : chunks) {
+ // var chunkRes = processChunk(chunk);
+ // globalRes.mergeWith(chunkRes);
+ // }
+ // return globalRes;
+
+ List<CompletableFuture<ChunkResult>> computeTasks = new ArrayList<>();
+
+ for (Chunk chunk : chunks) {
+ computeTasks.add(CompletableFuture.supplyAsync(() -> processChunk(chunk)));
+ }
+
+ ChunkResult globalRes = null;
+
+ for (CompletableFuture<ChunkResult> completedTask : computeTasks) {
+ ChunkResult chunkRes = completedTask.get();
+ if (globalRes == null) {
+ globalRes = completedTask.get();
+ }
+ else {
+ globalRes.mergeWith(chunkRes);
+ }
+ }
+
+ return globalRes;
+ }
+
+ public static void main(String[] args) throws IOException, InterruptedException, ExecutionException {
+ var chunks = readChunks(NB_CHUNKS);
+ var result = processAllChunks(chunks);
+ result.print();
+ }
+}