aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorGergely Kiss <kiss.gergely@gmail.com>2024-01-07 09:45:30 +0100
committerGitHub <noreply@github.com>2024-01-07 09:45:30 +0100
commitc72a8e97693ee60723f24245b1efabb94bd20550 (patch)
treebfcfaebc215691ea04aea2b80f9ae8a2e4134550
parent15511d1af2bb79a5ae38a81c2c4b24ebc6041e41 (diff)
Adding kgeri's solution (#137)
* Adding kgeri's solution * parallelizing CalculatorAverage_kgeri * fixing aggregation bugs, chunk size calc for small files * removed GC logging Co-authored-by: Gunnar Morling <gunnar.morling@googlemail.com> * fix for when there's no newline at end of input * fix for when the final record ends on the chunk boundary --------- Co-authored-by: Gunnar Morling <gunnar.morling@googlemail.com>
-rwxr-xr-xcalculate_average_kgeri.sh20
-rw-r--r--src/main/java/dev/morling/onebrc/CalculateAverage_kgeri.java229
2 files changed, 249 insertions, 0 deletions
diff --git a/calculate_average_kgeri.sh b/calculate_average_kgeri.sh
new file mode 100755
index 0000000..a89bce9
--- /dev/null
+++ b/calculate_average_kgeri.sh
@@ -0,0 +1,20 @@
+#!/bin/sh
+#
+# Copyright 2023 The original authors
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+
+JAVA_OPTS="--enable-preview"
+time java $JAVA_OPTS -Xmx99M --class-path target/average-1.0.0-SNAPSHOT.jar dev.morling.onebrc.CalculateAverage_kgeri
diff --git a/src/main/java/dev/morling/onebrc/CalculateAverage_kgeri.java b/src/main/java/dev/morling/onebrc/CalculateAverage_kgeri.java
new file mode 100644
index 0000000..a4fd736
--- /dev/null
+++ b/src/main/java/dev/morling/onebrc/CalculateAverage_kgeri.java
@@ -0,0 +1,229 @@
+/*
+ * Copyright 2023 The original authors
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package dev.morling.onebrc;
+
+import java.io.IOException;
+import java.io.RandomAccessFile;
+import java.io.UncheckedIOException;
+import java.lang.foreign.Arena;
+import java.lang.foreign.MemorySegment;
+import java.nio.channels.FileChannel;
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.TreeMap;
+import java.util.concurrent.ForkJoinPool;
+import java.util.stream.LongStream;
+
+import static java.lang.foreign.ValueLayout.JAVA_BYTE;
+import static java.nio.channels.FileChannel.MapMode.READ_ONLY;
+import static java.nio.charset.StandardCharsets.UTF_8;
+
+public class CalculateAverage_kgeri {
+
+ private static final String FILE = "./measurements.txt";
+
+ private static class MeasurementAggregate {
+ private double min = Double.POSITIVE_INFINITY;
+ private double sum = 0d;
+ private double max = Double.NEGATIVE_INFINITY;
+ private long count;
+
+ public void append(double measurement) {
+ min = Math.min(min, measurement);
+ max = Math.max(max, measurement);
+ sum += measurement;
+ count++;
+ }
+
+ public void merge(MeasurementAggregate other) {
+ min = Math.min(min, other.min);
+ max = Math.max(max, other.max);
+ sum += other.sum;
+ count += other.count;
+ }
+
+ @Override
+ public String toString() {
+ return STR."\{round(min)}/\{round(sum / count)}/\{round(max)}";
+ }
+
+ private double round(double value) {
+ return Math.round(value * 10.0) / 10.0;
+ }
+ }
+
+ /**
+ * This is to avoid instantiating `String`s during processing as much as possible.
+ */
+ private static class StringSlice {
+ protected final byte[] buf;
+ protected int len;
+ protected int hash;
+
+ public StringSlice(StringSlice other) {
+ buf = Arrays.copyOfRange(other.buf, 0, other.len);
+ len = other.len;
+ hash = other.hash;
+ }
+
+ public StringSlice(byte[] buf, int len) {
+ this.buf = buf;
+ this.len = len;
+ calculateHash();
+ }
+
+ public int length() {
+ return len;
+ }
+
+ @Override
+ public int hashCode() {
+ return hash;
+ }
+
+ @Override
+ public boolean equals(Object obj) {
+ return obj instanceof StringSlice other && Arrays.equals(buf, 0, len, other.buf, 0, other.len);
+ }
+
+ @Override
+ public String toString() {
+ return new String(buf, 0, len, UTF_8);
+ }
+
+ protected void calculateHash() {
+ hash = 1;
+ for (int i = 0; i < len; i++) {
+ hash = 31 * hash + buf[i];
+ }
+ }
+ }
+
+ /**
+ * A flyweight of {@link StringSlice}, to avoid instantiating that as well.
+ */
+ private static class MutableStringSlice extends StringSlice {
+
+ public MutableStringSlice(byte[] buf) {
+ super(buf, 0);
+ }
+
+ public void updateLength(int length) {
+ len = length;
+ calculateHash();
+ }
+
+ public void clear() {
+ len = 0;
+ hash = 0;
+ }
+ }
+
+ private static Map<StringSlice, MeasurementAggregate> readChunk(FileChannel channel, long from, long chunkSize) {
+ MemorySegment data;
+ long remaining;
+ try {
+ long offset = Math.max(0, from - 1);
+ remaining = channel.size() - offset;
+ data = channel.map(READ_ONLY, offset, Math.min(remaining, chunkSize + 256), Arena.ofConfined()); // +256 to allow for reading records that are split by the chunk
+ }
+ catch (IOException e) {
+ throw new UncheckedIOException(e);
+ }
+
+ long start = 0;
+ if (from > 0) {
+ while (data.get(JAVA_BYTE, start++) != '\n') {
+ }
+ }
+
+ Map<StringSlice, MeasurementAggregate> results = new HashMap<>(1000);
+ byte[] buf = new byte[256];
+ MutableStringSlice name = new MutableStringSlice(buf);
+
+ long until = Math.min(remaining, chunkSize); // Records may end (slightly) after the chunk boundary, but must not start after it
+ for (long pos = start; pos < until;) {
+ name.clear();
+ int i = 0;
+ for (;; i++) {
+ if (pos + i >= data.byteSize()) { // Guard against malformed data (typically a missing newline at the end of the input)
+ pos = chunkSize;
+ break;
+ }
+ byte b = data.get(JAVA_BYTE, pos + i);
+ buf[i] = b;
+ if (b == ';') {
+ name.updateLength(i);
+ }
+ else if (b == '\n') {
+ pos = pos + i + 1;
+ break;
+ }
+ }
+
+ double measurement = parseDoubleFrom(buf, name.length() + 1, i - name.length() - 1);
+ MeasurementAggregate aggr = results.get(name);
+ if (aggr == null) {
+ aggr = new MeasurementAggregate();
+ results.put(new StringSlice(name), aggr);
+ }
+ aggr.append(measurement);
+ }
+
+ return results;
+ }
+
+ // Note: based on java.lang.Integer.parseInt, surely missing some edge cases but avoids allocation
+ private static double parseDoubleFrom(byte[] buf, int offset, int length) {
+ boolean negative = false;
+ int integer = 0;
+ for (int i = 0; i < length; i++) {
+ byte c = buf[offset + i];
+ if (c == '-') {
+ negative = true;
+ }
+ else if (c != '.') {
+ integer *= 10;
+ integer -= c - '0';
+ }
+ }
+ return (negative ? integer : -integer) / 10.0;
+ }
+
+ public static void main(String[] args) throws IOException {
+ Map<String, MeasurementAggregate> measurements = new TreeMap<>();
+
+ try (RandomAccessFile raf = new RandomAccessFile(FILE, "r")) {
+ long size = raf.length();
+ long threads = Math.min(ForkJoinPool.getCommonPoolParallelism(), Math.max(size / 1000000, 1));
+ long chunkSize = Math.ceilDiv(size, threads);
+ System.err.printf("Processing size=%d, threads=%d, chunkSize=%d%n", size, threads, chunkSize);
+
+ List<Map<StringSlice, MeasurementAggregate>> chunks = LongStream.range(0, threads)
+ .parallel()
+ .mapToObj(i -> readChunk(raf.getChannel(), i * chunkSize, chunkSize))
+ .toList();
+
+ for (Map<StringSlice, MeasurementAggregate> chunk : chunks) {
+ chunk.forEach((n, m) -> measurements.computeIfAbsent(n.toString(), x -> new MeasurementAggregate()).merge(m));
+ }
+ }
+
+ System.out.println(measurements);
+ }
+}