aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorCool_Mineman <62723322+CoolMineman@users.noreply.github.com>2024-01-07 10:39:45 +0000
committerGitHub <noreply@github.com>2024-01-07 11:39:45 +0100
commitd89d1b488d17fb66c712cf6d36463d54a426117f (patch)
treecc1a99f9f014652df2c769ae11324791fe5e118c
parentfc675c4614fa7f4ad52344ee24b8003a38fb12f8 (diff)
Initial Implementation - coolmineman (#196)
* start * slower * still bad * finally faster than baseline :) * starting to go fast * improve * we ball * fix race condition an newline * change threadpool * ~18sec on my machine
-rwxr-xr-xcalculate_average_coolmineman.sh30
-rw-r--r--src/main/java/dev/morling/onebrc/CalculateAverage_coolmineman.java280
2 files changed, 310 insertions, 0 deletions
diff --git a/calculate_average_coolmineman.sh b/calculate_average_coolmineman.sh
new file mode 100755
index 0000000..cabd0c4
--- /dev/null
+++ b/calculate_average_coolmineman.sh
@@ -0,0 +1,30 @@
+#!/bin/sh
+#
+# Copyright 2023 The original authors
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+# Uncomment below to use sdk
+# source "$HOME/.sdkman/bin/sdkman-init.sh"
+# sdk use java 21.0.1-graal 1>&2
+
+source "$HOME/.sdkman/bin/sdkman-init.sh"
+sdk use java 21.0.1-graal 1>&2
+if [ -f "target/calculate_average_coolmineman.jsa" ]; then
+ JAVA_OPTS="-XX:SharedArchiveFile=target/calculate_average_coolmineman.jsa -Xshare:on"
+else
+ # First run, create the archive:
+ JAVA_OPTS="-XX:ArchiveClassesAtExit=target/calculate_average_coolmineman.jsa"
+fi
+time java $JAVA_OPTS --class-path target/average-1.0.0-SNAPSHOT.jar dev.morling.onebrc.CalculateAverage_coolmineman
diff --git a/src/main/java/dev/morling/onebrc/CalculateAverage_coolmineman.java b/src/main/java/dev/morling/onebrc/CalculateAverage_coolmineman.java
new file mode 100644
index 0000000..f2fb471
--- /dev/null
+++ b/src/main/java/dev/morling/onebrc/CalculateAverage_coolmineman.java
@@ -0,0 +1,280 @@
+/*
+ * Copyright 2023 The original authors
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package dev.morling.onebrc;
+
+import java.nio.ByteBuffer;
+import java.nio.channels.AsynchronousFileChannel;
+import java.nio.charset.StandardCharsets;
+import java.nio.file.Paths;
+import java.nio.file.StandardOpenOption;
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.Set;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
+
+public class CalculateAverage_coolmineman {
+
+ private static final String FILE = "./measurements.txt";
+
+ private static class MeasurementAggregator {
+ private double min = Double.POSITIVE_INFINITY;
+ private double max = Double.NEGATIVE_INFINITY;
+ private double sum;
+ private long count;
+
+ void add(double value) {
+ min = Math.min(min, value);
+ max = Math.max(max, value);
+ sum += value;
+ count++;
+ }
+
+ void merge(MeasurementAggregator o) {
+ min = Math.min(min, o.min);
+ max = Math.max(max, o.max);
+ sum += o.sum;
+ count += o.count;
+ }
+
+ public String toString() {
+ return round(min) + "/" + round(sum / count) + "/" + round(max);
+ }
+
+ private double round(double value) {
+ return Math.round(value * 10.0) / 10.0;
+ }
+ }
+
+ static class BytesKey implements Comparable<BytesKey> {
+ byte[] value;
+ int hashcode;
+
+ BytesKey(byte[] value) {
+ this.value = value;
+ }
+
+ @Override
+ public String toString() {
+ return new String(value, StandardCharsets.UTF_8);
+ }
+
+ @Override
+ public int hashCode() {
+ return Arrays.hashCode(value);
+ }
+
+ @Override
+ public boolean equals(Object obj) {
+ if (obj instanceof BytesKey bk) {
+ return Arrays.equals(value, bk.value);
+ }
+ return false;
+ }
+
+ @Override
+ public int compareTo(BytesKey o) {
+ return Arrays.compare(value, o.value);
+ }
+
+ }
+
+ static void parse(ByteBuffer a, int as, ByteBuffer b, int bs, boolean isFirst, HashMap<BytesKey, MeasurementAggregator> map) {
+ var aa = a.array();
+ var ba = b.array();
+ int pos = 0;
+ int nameEnd = 0;
+ boolean parseDouble = false;
+ int doubleStart = 0;
+ int doubleEnd = 0;
+ if (!isFirst) {
+ while (aa[pos] != (byte) '\n') {
+ pos++;
+ }
+ pos++;
+ }
+ int nameStart = pos;
+ while (pos < as) {
+ if (parseDouble) {
+ if (aa[pos] == '\n') {
+ doubleEnd = pos;
+
+ BytesKey station = new BytesKey(Arrays.copyOfRange(aa, nameStart, nameEnd));
+ double value = parseDouble(aa, doubleStart, doubleEnd);
+ MeasurementAggregator ma = map.get(station);
+ if (ma == null)
+ map.put(station, ma = new MeasurementAggregator());
+ ma.add(value);
+
+ parseDouble = false;
+ nameStart = pos + 1;
+ nameEnd = 0;
+ doubleStart = 0;
+ doubleEnd = 0;
+ }
+ }
+ else {
+ if (aa[pos] == ';') {
+ nameEnd = pos;
+ parseDouble = true;
+ doubleStart = pos + 1;
+ }
+ }
+
+ pos++;
+ }
+ if (bs <= 0)
+ return;
+ for (;;) {
+ if (parseDouble) {
+ if (ba[pos - as] == '\n') {
+ doubleEnd = pos;
+
+ BytesKey station = new BytesKey(ofRange(a, as, b, bs, nameStart, nameEnd));
+ double value = parseDouble(ofRange(a, as, b, bs, doubleStart, doubleEnd), 0, doubleEnd - doubleStart);
+ map.computeIfAbsent(station, k -> new MeasurementAggregator()).add(value);
+
+ return;
+ }
+ }
+ else {
+ if (ba[pos - as] == ';') {
+ nameEnd = pos;
+ parseDouble = true;
+ doubleStart = pos + 1;
+ }
+ }
+
+ pos++;
+ }
+ }
+
+ static byte[] ofRange(ByteBuffer a, int as, ByteBuffer b, int bs, int start, int end) {
+ var aa = a.array();
+ var ba = b.array();
+ byte[] r = new byte[end - start];
+ for (int i = 0; i < r.length; i++) {
+ int pos = start + i;
+ if (pos < as) {
+ r[i] = aa[pos];
+ }
+ else {
+ r[i] = ba[pos - as];
+ }
+ }
+ return r;
+ }
+
+ static double parseDouble(byte[] b, int start, int end) {
+ boolean negative = false;
+ if (b[start] == (byte) '-') {
+ negative = true;
+ start += 1;
+ }
+ double result = 0;
+ for (int i = start; i < end; i++) {
+ if (b[i] != (byte) '.') {
+ result *= 10;
+ result += (b[i] & 0xFF) - '0';
+ }
+ }
+ if (negative)
+ result *= -1;
+ return result * .1;
+ }
+
+ public static void main(String[] args) throws Exception {
+ int pageSize = 1600000;
+ long pos = 0;
+ try (AsynchronousFileChannel fc = AsynchronousFileChannel.open(Paths.get(FILE), Set.of(StandardOpenOption.READ), Executors.newCachedThreadPool())) {
+ var cp = Executors.newWorkStealingPool(Runtime.getRuntime().availableProcessors());
+
+ var bbs = new ByteBuffer[Runtime.getRuntime().availableProcessors() * 8];
+ for (int i = 0; i < bbs.length; i++) {
+ bbs[i] = ByteBuffer.allocate(pageSize);
+ }
+
+ Future<Integer>[] futures = new Future[bbs.length];
+ HashMap<BytesKey, MeasurementAggregator>[] maps = new HashMap[bbs.length];
+ Future[] tasks = new Future[bbs.length];
+
+ for (int i = 0; i < futures.length; i++) {
+ futures[i] = fc.read(bbs[i], pos);
+ maps[i] = new HashMap<>();
+ pos += pageSize;
+ }
+
+ boolean first = true;
+ l: for (;;) {
+ for (int i = 0; i < bbs.length; i++) {
+ int nextIndex = (i + 1) % bbs.length;
+ if (tasks[nextIndex] != null) {
+ tasks[nextIndex].get();
+ bbs[nextIndex].position(0);
+ futures[nextIndex] = fc.read(bbs[nextIndex], pos);
+ pos += pageSize;
+ }
+ var fa = futures[i];
+ var fb = futures[(i + 1) % futures.length];
+ var isFirst = first;
+ int ra = fa.get();
+ if (ra < 0)
+ break l;
+ int rb = Math.max(0, fb.get());
+ var iLol = i;
+ tasks[i] = cp.submit(() -> {
+ parse(bbs[iLol], ra, bbs[nextIndex], rb, isFirst, maps[iLol]);
+ });
+
+ first = false;
+ }
+ }
+
+ for (Future t : tasks) {
+ if (t != null)
+ t.get();
+ }
+
+ for (int i = 1; i < maps.length; i++) {
+ merge(maps[0], maps[i]);
+ }
+
+ System.out.print('{');
+ boolean[] firstE = new boolean[]{ true };
+ maps[0].entrySet().stream().sorted((a, b) -> a.getKey().toString().compareTo(b.getKey().toString())).forEach(e -> {
+ if (!firstE[0]) {
+ System.out.print(',');
+ System.out.print(' ');
+ }
+ firstE[0] = false;
+ System.out.print(e.toString());
+ });
+ System.out.println('}');
+ System.exit(0);
+ }
+ }
+
+ static void merge(HashMap<BytesKey, MeasurementAggregator> a, HashMap<BytesKey, MeasurementAggregator> b) {
+ for (var e : b.entrySet()) {
+ if (a.containsKey(e.getKey())) {
+ a.get(e.getKey()).merge(e.getValue());
+ }
+ else {
+ a.put(e.getKey(), e.getValue());
+ }
+ }
+ }
+}