aboutsummaryrefslogtreecommitdiff
path: root/src/main
diff options
context:
space:
mode:
authorNick Palmer <nick@palmr.co.uk>2024-01-03 16:21:56 +0000
committerGitHub <noreply@github.com>2024-01-03 17:21:56 +0100
commit8e6298cd2a3f99f984f8256a9df6ab0f7b29b6d2 (patch)
tree794d5b58981ba1b18c2c22c433ca8015e09a7f7a /src/main
parenteceaf1868d5d541a03b65bf3a04861b6e1837960 (diff)
Adding Nick Palmer's submission;
* Memory mapped file, single-pass parsing, custom hash map, fixed thread pool The threading was a hasty addition and needs work * Used arraylist instead of treemap to reduce a little overhead We only need it sorted for output, so only construct a treemap for output * Attempt to speed up double conversion * Cap core count for low-core systems * Fix wrong exponent * Accumulate measurement value in double, seems marginally faster Benchmark Mode Cnt Score Error Units DoubleParsingBenchmark.ourToDouble thrpt 10 569.771 ± 7.065 ops/us DoubleParsingBenchmark.ourToDoubleAccumulateInToDouble thrpt 10 648.026 ± 7.741 ops/us DoubleParsingBenchmark.ourToDoubleDivideInsteadOfMultiply thrpt 10 570.412 ± 9.329 ops/us DoubleParsingBenchmark.ourToDoubleNegative thrpt 10 512.618 ± 8.580 ops/us DoubleParsingBenchmark.ourToDoubleNegativeAccumulateInToDouble thrpt 10 565.043 ± 18.137 ops/us DoubleParsingBenchmark.ourToDoubleNegativeDivideInsteadOfMultiply thrpt 10 511.228 ± 13.967 ops/us DoubleParsingBenchmark.stringToDouble thrpt 10 52.310 ± 1.351 ops/us DoubleParsingBenchmark.stringToDoubleNegative thrpt 10 50.785 ± 1.252 ops/us
Diffstat (limited to 'src/main')
-rw-r--r--src/main/java/dev/morling/onebrc/CalculateAverage_palmr.java250
1 files changed, 250 insertions, 0 deletions
diff --git a/src/main/java/dev/morling/onebrc/CalculateAverage_palmr.java b/src/main/java/dev/morling/onebrc/CalculateAverage_palmr.java
new file mode 100644
index 0000000..bb57d33
--- /dev/null
+++ b/src/main/java/dev/morling/onebrc/CalculateAverage_palmr.java
@@ -0,0 +1,250 @@
+/*
+ * Copyright 2023 The original authors
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package dev.morling.onebrc;
+
+import java.io.IOException;
+import java.io.RandomAccessFile;
+import java.nio.ByteBuffer;
+import java.nio.channels.FileChannel;
+import java.nio.charset.StandardCharsets;
+import java.util.*;
+
+public class CalculateAverage_palmr {
+
+ private static final String FILE = "./measurements.txt";
+ public static final int CHUNK_SIZE = 1024 * 1024 * 10; // Trial and error showed ~10MB to be a good size on our machine
+ public static final int LITTLE_CHUNK_SIZE = 128; // Enough bytes to cover a station name and measurement value :fingers-crossed:
+ public static final int STATION_NAME_BUFFER_SIZE = 50;
+ public static final int THREAD_COUNT = Math.min(8, Runtime.getRuntime().availableProcessors());
+
+ public static void main(String[] args) throws IOException {
+
+ @SuppressWarnings("resource") // It's faster to leak the file than be well-behaved
+ RandomAccessFile file = new RandomAccessFile(FILE, "r");
+ FileChannel channel = file.getChannel();
+ long fileSize = channel.size();
+
+ long threadChunk = fileSize / THREAD_COUNT;
+
+ Thread[] threads = new Thread[THREAD_COUNT];
+ ByteArrayKeyedMap[] results = new ByteArrayKeyedMap[THREAD_COUNT];
+ for (int i = 0; i < THREAD_COUNT; i++) {
+ final int j = i;
+ long startPoint = j * threadChunk;
+ long endPoint = startPoint + threadChunk;
+ Thread thread = new Thread(() -> {
+ try {
+ results[j] = readAndParse(channel, startPoint, endPoint, fileSize);
+ }
+ catch (Throwable t) {
+ System.err.println("It's broken :(");
+ // noinspection CallToPrintStackTrace
+ t.printStackTrace();
+ }
+ });
+ threads[i] = thread;
+ thread.start();
+ }
+
+ final Map<String, MeasurementAggregator> finalAggregator = new TreeMap<>();
+
+ for (int i = 0; i < THREAD_COUNT; i++) {
+ try {
+ threads[i].join();
+ }
+ catch (InterruptedException e) {
+ throw new RuntimeException(e);
+ }
+
+ results[i].getAsUnorderedList().forEach(v -> {
+ String stationName = new String(v.stationNameBytes, StandardCharsets.UTF_8);
+ finalAggregator.compute(stationName, (_, x) -> {
+ if (x == null) {
+ return v;
+ }
+ else {
+ x.count += v.count;
+ x.min = Math.min(x.min, v.min);
+ x.max = Math.max(x.max, v.max);
+ x.sum += v.sum;
+ return x;
+ }
+ });
+ });
+ }
+ System.out.println(finalAggregator);
+ }
+
+ private static ByteArrayKeyedMap readAndParse(final FileChannel channel,
+ final long startPoint,
+ final long endPoint,
+ final long fileSize) {
+ final State state = new State();
+
+ boolean skipFirstEntry = startPoint != 0;
+
+ long offset = startPoint;
+ while (offset < endPoint) {
+ parseData(channel, state, offset, Math.min(CHUNK_SIZE, fileSize - offset), false, skipFirstEntry);
+ skipFirstEntry = false;
+ offset += CHUNK_SIZE;
+ }
+
+ if (offset < fileSize) {
+ // Make sure we finish reading any partially read entry by going a little in to the next chunk, stopping at the first newline
+ parseData(channel, state, offset, Math.min(LITTLE_CHUNK_SIZE, fileSize - offset), true, false);
+ }
+
+ return state.aggregators;
+ }
+
+ private static void parseData(final FileChannel channel,
+ final State state,
+ final long offset,
+ final long bufferSize,
+ final boolean stopAtNewline,
+ final boolean skipFirstEntry) {
+ ByteBuffer byteBuffer;
+ try {
+ byteBuffer = channel.map(FileChannel.MapMode.READ_ONLY, offset, bufferSize);
+ }
+ catch (IOException e) {
+ throw new RuntimeException(e);
+ }
+
+ boolean isSkippingToFirstCleanEntry = skipFirstEntry;
+
+ while (byteBuffer.hasRemaining()) {
+ byte currentChar = byteBuffer.get();
+
+ if (isSkippingToFirstCleanEntry) {
+ if (currentChar == '\n') {
+ isSkippingToFirstCleanEntry = false;
+ }
+
+ continue;
+ }
+
+ if (currentChar == ';') {
+ state.parsingValue = true;
+ }
+ else if (currentChar == '\n') {
+ if (state.stationPointerEnd != 0) {
+ double value = state.measurementValue * state.exponent;
+
+ MeasurementAggregator aggregator = state.aggregators.computeIfAbsent(state.stationBuffer, state.stationPointerEnd, state.signedHashCode);
+ aggregator.count++;
+ aggregator.min = Math.min(aggregator.min, value);
+ aggregator.max = Math.max(aggregator.max, value);
+ aggregator.sum += value;
+ }
+
+ if (stopAtNewline) {
+ return;
+ }
+
+ // reset
+ state.reset();
+ }
+ else {
+ if (!state.parsingValue) {
+ state.stationBuffer[state.stationPointerEnd++] = currentChar;
+ state.signedHashCode = 31 * state.signedHashCode + (currentChar & 0xff);
+ }
+ else {
+ if (currentChar == '-') {
+ state.exponent = -0.1;
+ }
+ else if (currentChar != '.') {
+ state.measurementValue = state.measurementValue * 10 + (currentChar - '0');
+ }
+ }
+ }
+ }
+ }
+
+ static final class State {
+ ByteArrayKeyedMap aggregators = new ByteArrayKeyedMap();
+ boolean parsingValue = false;
+ byte[] stationBuffer = new byte[STATION_NAME_BUFFER_SIZE];
+ int signedHashCode = 0;
+ int stationPointerEnd = 0;
+ double measurementValue = 0;
+ double exponent = 0.1;
+
+ public void reset() {
+ parsingValue = false;
+ signedHashCode = 0;
+ stationPointerEnd = 0;
+ measurementValue = 0;
+ exponent = 0.1;
+ }
+ }
+
+ private static class MeasurementAggregator {
+ final byte[] stationNameBytes;
+ final int stationNameHashCode;
+ private double min = Double.POSITIVE_INFINITY;
+ private double max = Double.NEGATIVE_INFINITY;
+ private double sum;
+ private long count;
+
+ public MeasurementAggregator(final byte[] stationNameBytes, final int stationNameHashCode) {
+ this.stationNameBytes = stationNameBytes;
+ this.stationNameHashCode = stationNameHashCode;
+ }
+
+ public String toString() {
+ return round(min) + "/" + round(sum / count) + "/" + round(max);
+ }
+
+ private double round(double value) {
+ return Math.round(value * 10.0) / 10.0;
+ }
+ }
+
+ private static class ByteArrayKeyedMap {
+ private final int BUCKET_COUNT = 0xFFF; // 413 unique stations in the data set, & 0xFFF ~= 399 (only 14 collisions (given our hashcode implementation))
+ private final MeasurementAggregator[] buckets = new MeasurementAggregator[BUCKET_COUNT + 1];
+ private final List<MeasurementAggregator> compactUnorderedBuckets = new ArrayList<>(413);
+
+ public MeasurementAggregator computeIfAbsent(final byte[] key, final int keyLength, final int keyHashCode) {
+ int index = keyHashCode & BUCKET_COUNT;
+
+ while (true) {
+ MeasurementAggregator maybe = buckets[index];
+ if (maybe == null) {
+ final byte[] copiedKey = Arrays.copyOf(key, keyLength);
+ MeasurementAggregator measurementAggregator = new MeasurementAggregator(copiedKey, keyHashCode);
+ buckets[index] = measurementAggregator;
+ compactUnorderedBuckets.add(measurementAggregator);
+ return measurementAggregator;
+ }
+ else {
+ if (Arrays.equals(key, 0, keyLength, maybe.stationNameBytes, 0, maybe.stationNameBytes.length)) {
+ return maybe;
+ }
+ index++;
+ index &= BUCKET_COUNT;
+ }
+ }
+ }
+
+ public List<MeasurementAggregator> getAsUnorderedList() {
+ return compactUnorderedBuckets;
+ }
+ }
+}