aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorBang NGUYEN <lelybang.nguyen@gmail.com>2024-01-12 09:35:35 +0100
committerGitHub <noreply@github.com>2024-01-12 09:35:35 +0100
commitaf1946fcb5468e85142a4eff7e954e7a6d980530 (patch)
tree0101779adfb90ce842d40fd78b6b31454ce50387
parent3c465cecf966c562b5e063420642ef3cf0d0d413 (diff)
[Attempt 1] Memory mapping + split by linebreak + multithreads + fast double parser (#330)
* Create clones * Cleanup code and add memory mapping to read file * Fix chunks reading logic to fit linebreak * Remove unused * Sequential * Multi thread process chunks * Add new line in output * Remove unnecessary operation with map & reducer memory * Reduce mem usage by using only 1 map * formatting * Remove unnecessary length check * Remove trycatch * Optimize double parsing
-rwxr-xr-xcalculate_average_gnabyl.sh23
-rw-r--r--src/main/java/dev/morling/onebrc/CalculateAverage_gnabyl.java283
2 files changed, 306 insertions, 0 deletions
diff --git a/calculate_average_gnabyl.sh b/calculate_average_gnabyl.sh
new file mode 100755
index 0000000..14c449a
--- /dev/null
+++ b/calculate_average_gnabyl.sh
@@ -0,0 +1,23 @@
+#!/bin/sh
+#
+# Copyright 2023 The original authors
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+# Uncomment below to use sdk
+# source "$HOME/.sdkman/bin/sdkman-init.sh"
+# sdk use java 21.0.1-graal 1>&2
+
+JAVA_OPTS=""
+time java $JAVA_OPTS --class-path target/average-1.0.0-SNAPSHOT.jar dev.morling.onebrc.CalculateAverage_gnabyl
diff --git a/src/main/java/dev/morling/onebrc/CalculateAverage_gnabyl.java b/src/main/java/dev/morling/onebrc/CalculateAverage_gnabyl.java
new file mode 100644
index 0000000..7c9769e
--- /dev/null
+++ b/src/main/java/dev/morling/onebrc/CalculateAverage_gnabyl.java
@@ -0,0 +1,283 @@
+/*
+ * Copyright 2023 The original authors
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package dev.morling.onebrc;
+
+import java.io.IOException;
+import java.io.RandomAccessFile;
+import java.nio.MappedByteBuffer;
+import java.nio.channels.FileChannel;
+import java.nio.charset.StandardCharsets;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ExecutionException;
+
+public class CalculateAverage_gnabyl {
+
+ private static final String FILE = "./measurements.txt";
+
+ private static final int NB_CHUNKS = 8;
+
+ private static record Chunk(long start, int bytesCount, MappedByteBuffer mappedByteBuffer) {
+ }
+
+ private static int reduceSizeToFitLineBreak(FileChannel channel, long startPosition, int startSize)
+ throws IOException {
+ long currentPosition = startPosition + startSize - 1;
+ int realSize = startSize;
+
+ if (currentPosition >= channel.size()) {
+ currentPosition = channel.size() - 1;
+ realSize = (int) (currentPosition - startPosition);
+ }
+
+ while (currentPosition >= startPosition) {
+ channel.position(currentPosition);
+ byte byteValue = channel.map(FileChannel.MapMode.READ_ONLY, currentPosition, 1).get();
+ if (byteValue == '\n') {
+ // found line break
+ break;
+ }
+
+ realSize--;
+ currentPosition--;
+ }
+ return realSize;
+ }
+
+ private static List<Chunk> readChunks(long nbChunks) throws IOException {
+ RandomAccessFile file = new RandomAccessFile(FILE, "rw");
+ List<Chunk> res = new ArrayList<>();
+ FileChannel channel = file.getChannel();
+ long bytesCount = channel.size();
+ long bytesPerChunk = bytesCount / nbChunks;
+
+ // Memory map the file in read-only mode
+ // TODO: Optimize using threads
+ long currentPosition = 0;
+ for (int i = 0; i < nbChunks; i++) {
+ int startSize = (int) bytesPerChunk;
+ int realSize = startSize;
+
+ if (i == nbChunks - 1) {
+ realSize = (int) (bytesCount - currentPosition);
+ MappedByteBuffer mappedByteBuffer = channel.map(FileChannel.MapMode.READ_ONLY, currentPosition,
+ realSize);
+
+ res.add(new Chunk(currentPosition, realSize, mappedByteBuffer));
+ break;
+ }
+
+ // Adjust size so that it ends on a newline
+ realSize = reduceSizeToFitLineBreak(channel, currentPosition, startSize);
+
+ MappedByteBuffer mappedByteBuffer = channel.map(FileChannel.MapMode.READ_ONLY, currentPosition,
+ realSize);
+
+ res.add(new Chunk(currentPosition, realSize, mappedByteBuffer));
+ currentPosition += realSize;
+ }
+
+ channel.close();
+ file.close();
+
+ return res;
+ }
+
+ private static class StationData {
+ private double sum, min, max;
+ private long count;
+
+ public StationData(double value) {
+ this.count = 1;
+ this.sum = value;
+ this.min = value;
+ this.max = value;
+ }
+
+ public void update(double value) {
+ this.count++;
+ this.sum += value;
+ this.min = Math.min(this.min, value);
+ this.max = Math.max(this.max, value);
+ }
+
+ public double getMean() {
+ return sum / count;
+ }
+
+ public double getMin() {
+ return min;
+ }
+
+ public double getMax() {
+ return max;
+ }
+
+ public void mergeWith(StationData other) {
+ this.sum += other.sum;
+ this.count += other.count;
+ this.min = Math.min(this.min, other.min);
+ this.max = Math.max(this.max, other.max);
+ }
+
+ }
+
+ static double round(double value) {
+ return Math.round(value * 10.0) / 10.0;
+ }
+
+ private static class ChunkResult {
+ private Map<String, StationData> data;
+
+ public ChunkResult() {
+ data = new HashMap<>();
+ }
+
+ public StationData getData(String name) {
+ return data.get(name);
+ }
+
+ public void addStation(String name, double value) {
+ this.data.put(name, new StationData(value));
+ }
+
+ public void print() {
+ var stationNames = new ArrayList<String>(this.data.keySet());
+ Collections.sort(stationNames);
+ System.out.print("{");
+ for (int i = 0; i < stationNames.size() - 1; i++) {
+ var name = stationNames.get(i);
+ var stationData = data.get(name);
+ System.out.printf("%s=%.1f/%.1f/%.1f, ", name, round(stationData.getMin()),
+ round(stationData.getMean()),
+ round(stationData.getMax()));
+ }
+ var name = stationNames.get(stationNames.size() - 1);
+ var stationData = data.get(name);
+ System.out.printf("%s=%.1f/%.1f/%.1f", name, round(stationData.getMin()),
+ round(stationData.getMean()),
+ round(stationData.getMax()));
+ System.out.println("}");
+ }
+
+ public void mergeWith(ChunkResult other) {
+ for (Map.Entry<String, StationData> entry : other.data.entrySet()) {
+ String stationName = entry.getKey();
+ StationData otherStationData = entry.getValue();
+ StationData thisStationData = this.data.get(stationName);
+
+ if (thisStationData == null) {
+ this.data.put(stationName, otherStationData);
+ }
+ else {
+ thisStationData.mergeWith(otherStationData);
+ }
+ }
+ }
+ }
+
+ private static ChunkResult processChunk(Chunk chunk) {
+ ChunkResult result = new ChunkResult();
+
+ // Perform processing on the chunk data
+ byte[] data = new byte[chunk.bytesCount()];
+ chunk.mappedByteBuffer().get(data);
+
+ // Process each line
+ String stationName;
+ double value;
+ int iSplit, iEol;
+ StationData stationData;
+ long negative;
+ for (int offset = 0; offset < data.length; offset++) {
+ // Find station name
+ for (iSplit = offset; data[iSplit] != ';'; iSplit++) {
+ }
+ stationName = new String(data, offset, iSplit - offset, StandardCharsets.UTF_8);
+
+ // Find value
+ iSplit++;
+ negative = 1;
+ value = 0;
+ for (iEol = iSplit; data[iEol] != '\n'; iEol++) {
+ if (data[iEol] == '-') {
+ negative = -1;
+ continue;
+ }
+ if (data[iEol] == '.') {
+ value = value + (data[iEol + 1] - 48) / 10.0;
+ iEol += 2;
+ break;
+ }
+ value = value * 10 + data[iEol] - 48;
+ }
+ value *= negative;
+
+ // Init & count
+ stationData = result.getData(stationName);
+
+ if (stationData == null) {
+ result.addStation(stationName, value);
+ }
+ else {
+ stationData.update(value);
+ }
+
+ offset = iEol;
+ }
+
+ return result;
+ }
+
+ private static ChunkResult processAllChunks(List<Chunk> chunks) throws InterruptedException, ExecutionException {
+ // var globalRes = new ChunkResult();
+ // for (var chunk : chunks) {
+ // var chunkRes = processChunk(chunk);
+ // globalRes.mergeWith(chunkRes);
+ // }
+ // return globalRes;
+
+ List<CompletableFuture<ChunkResult>> computeTasks = new ArrayList<>();
+
+ for (Chunk chunk : chunks) {
+ computeTasks.add(CompletableFuture.supplyAsync(() -> processChunk(chunk)));
+ }
+
+ ChunkResult globalRes = null;
+
+ for (CompletableFuture<ChunkResult> completedTask : computeTasks) {
+ ChunkResult chunkRes = completedTask.get();
+ if (globalRes == null) {
+ globalRes = completedTask.get();
+ }
+ else {
+ globalRes.mergeWith(chunkRes);
+ }
+ }
+
+ return globalRes;
+ }
+
+ public static void main(String[] args) throws IOException, InterruptedException, ExecutionException {
+ var chunks = readChunks(NB_CHUNKS);
+ var result = processAllChunks(chunks);
+ result.print();
+ }
+}