aboutsummaryrefslogtreecommitdiff
path: root/src/main/java/dev/morling
diff options
context:
space:
mode:
authorMeanderingProgrammer <meanderingprogrammer@gmail.com>2024-01-10 11:53:22 -0800
committerGunnar Morling <gunnar.morling@googlemail.com>2024-01-11 12:03:29 +0100
commit7ca2aa8d1d03587e47dcb962d568afdf163791fe (patch)
treebf116842ca670cf50f594889e836b1849afcaf13 /src/main/java/dev/morling
parent64a78c388054b9d61fa8508417e37ef3f22b22f8 (diff)
First attempt from MeanderingProgrammer
#### Check List: - [x] Tests pass (`./test.sh MeanderingProgrammer` shows no differences between expected and actual outputs) - [x] All formatting changes by the build are committed - [x] Your launch script is named `calculate_average_MeanderingProgrammer.sh` (make sure to match casing of your GH user name) and is executable - [x] Output matches that of `calculate_average_baseline.sh` * Execution time: `00:04.668` * Execution time of reference implementation: `02:40.597` * System: Apple M2 Max, 12 cores, 64 GB
Diffstat (limited to 'src/main/java/dev/morling')
-rw-r--r--src/main/java/dev/morling/onebrc/CalculateAverage_MeanderingProgrammer.java250
1 files changed, 250 insertions, 0 deletions
diff --git a/src/main/java/dev/morling/onebrc/CalculateAverage_MeanderingProgrammer.java b/src/main/java/dev/morling/onebrc/CalculateAverage_MeanderingProgrammer.java
new file mode 100644
index 0000000..ad78d99
--- /dev/null
+++ b/src/main/java/dev/morling/onebrc/CalculateAverage_MeanderingProgrammer.java
@@ -0,0 +1,250 @@
+/*
+ * Copyright 2023 The original authors
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package dev.morling.onebrc;
+
+import java.nio.ByteBuffer;
+import java.nio.channels.FileChannel;
+import java.nio.charset.StandardCharsets;
+import java.nio.file.*;
+import java.util.*;
+import java.util.stream.*;
+
+/*
+ * # Main Speed Drivers
+ *
+ * Changes were made in this order, each header includes the runtime before and after the change,
+ * and whose implementation (if any) was used as a reference.
+ *
+ * ## Parallel Process Chunks [160.5 -> 18] [twobiers]
+ *
+ * Rather than reading data top to bottom and attempting to parallelize processing with batches
+ * of the parsed data, we read chunks of data (about 1 MB) and parrallelize processing per chunk.
+ *
+ * Several implementations do this kind of processing using a FileChannel to map chunks to buffers,
+ * the reference above gave the idea to use an iterator.
+ *
+ * ## Share Byte Array when Deserializing [18 -> 6.5] [Various]
+ *
+ * When deserializing names after going through the effort of processing one byte at a time
+ * when processing a chunk of data we can re-use a single byte array to store the characters
+ * that make up the name. This removes the need to allocate and de-allocate memory for the buffer.
+ *
+ * We can then use the new String(byte[], 0, length) constructor to create the String without
+ * worrying about clearing the underlying byte array as we provide a length.
+ *
+ * For this one I did not use any particular implementation as a reference but have seen it in many.
+ *
+ * ## Store ints Compute Doubles at End [6.5 -> 6.2] [None]
+ *
+ * Since input has a single decimal only we can effectively ignore it, do all of our math with the
+ * numbers as integers, then only when printing out divide by 10.0 to get the correct values.
+ *
+ * The impact of this is small, maybe even nothing in this implementation, but keeping it in place.
+ *
+ * ## Use graal [6.2 -> 5.3] [None]
+ *
+ * Change from 21.0.1-tem to 21.0.1-graal.
+ *
+ * ## Process ByteBuffer for Name then Value [5.3 -> 4.7] [None]
+ *
+ * This started as a refactor and turned out to have noticeable runtime impact, which is nice.
+ *
+ * Rather than processing the ByteBuffer in a single while (current != '\n') with a condition
+ * to switch from getting the name to calculating the integer value on (current == ';') the
+ * logic was split into 2 separate loops.
+ *
+ * The first, while (current != ';') and a second, while (current != '\n').
+ *
+ * # For my Own Reference
+ *
+ * ## Constraints
+ *
+ * - Station name: non null UTF-8 string of length [1, 100] bytes
+ * - Temperature value: non null double [-99.9, 99.9] with one fractional digit
+ * - Station names: maximum of 10,000 unique names
+ *
+ * ## Run Commands
+ *
+ * ./mvnw clean verify && ./test.sh MeanderingProgrammer
+ *
+ * ./mvnw clean verify && ./calculate_average_MeanderingProgrammer.sh
+ *
+ * ## Runtimes
+ *
+ * Baseline: 2:40.597
+ * Current: 0:04.668
+ */
+public class CalculateAverage_MeanderingProgrammer {
+
+ private static final String FILE = "./measurements.txt";
+
+ private static class ChunkReader implements Iterator<ByteBuffer> {
+
+ private static final long CHUNK_SIZE = 1_024 * 1_024;
+
+ private final FileChannel channel;
+ private final long size;
+ private long read;
+
+ public ChunkReader(Path path) throws Exception {
+ this.channel = FileChannel.open(path, StandardOpenOption.READ);
+ this.size = this.channel.size();
+ this.read = 0;
+ }
+
+ public long estimateIterations() {
+ return this.size / CHUNK_SIZE;
+ }
+
+ @Override
+ public boolean hasNext() {
+ return this.nextChunkSize() > 0;
+ }
+
+ @Override
+ public ByteBuffer next() {
+ ByteBuffer buffer = null;
+ try {
+ buffer = this.channel.map(FileChannel.MapMode.READ_ONLY, this.read, this.nextChunkSize());
+ }
+ catch (Exception e) {
+ throw new RuntimeException(e);
+ }
+ // Logic to clamp buffer to last complete line
+ int bufferSize = buffer.limit();
+ while (buffer.get(bufferSize - 1) != '\n') {
+ bufferSize--;
+ }
+ buffer.limit(bufferSize);
+ this.read += bufferSize;
+ return buffer;
+ }
+
+ private long nextChunkSize() {
+ return Math.min(CHUNK_SIZE, this.size - this.read);
+ }
+ }
+
+ private static record Row(String name, int value) {
+ }
+
+ private static class RowReader implements Iterator<Row> {
+
+ private final ByteBuffer buffer;
+ private final byte[] nameBuffer;
+
+ public RowReader(ByteBuffer buffer) {
+ this.buffer = buffer;
+ this.nameBuffer = new byte[100];
+ }
+
+ @Override
+ public boolean hasNext() {
+ return this.buffer.hasRemaining();
+ }
+
+ @Override
+ public Row next() {
+ var index = 0;
+ var current = buffer.get();
+ while (current != ';') {
+ this.nameBuffer[index] = current;
+ index++;
+ current = buffer.get();
+ }
+ var name = new String(this.nameBuffer, 0, index, StandardCharsets.UTF_8);
+
+ var negative = false;
+ var value = 0;
+ current = buffer.get();
+ while (current != '\n') {
+ if (current == '-') {
+ negative = true;
+ }
+ else if (current != '.') {
+ value = (value * 10) + (current - '0');
+ }
+ current = buffer.get();
+ }
+ if (negative) {
+ value *= -1;
+ }
+
+ return new Row(name, value);
+ }
+ }
+
+ private static class Measurement {
+
+ private int min;
+ private int max;
+ private long sum;
+ private int count;
+
+ public Measurement(int value) {
+ this.min = value;
+ this.max = value;
+ this.sum = value;
+ this.count = 1;
+ }
+
+ public Measurement merge(Measurement other) {
+ if (other.min < this.min) {
+ this.min = other.min;
+ }
+ if (other.max > this.max) {
+ this.max = other.max;
+ }
+ this.sum += other.sum;
+ this.count += other.count;
+ return this;
+ }
+
+ @Override
+ public String toString() {
+ return String.format(
+ "%.1f/%.1f/%.1f",
+ this.min / 10.0,
+ (this.sum / 10.0) / this.count,
+ this.max / 10.0);
+ }
+ }
+
+ public static void main(String[] args) throws Exception {
+ run();
+ }
+
+ private static void run() throws Exception {
+ var reader = new ChunkReader(Paths.get(FILE));
+ var iterator = Spliterators.spliterator(reader, reader.estimateIterations(), Spliterator.IMMUTABLE);
+ var measurements = StreamSupport.stream(iterator, true)
+ .flatMap(buffer -> toMeasurements(buffer).entrySet().stream())
+ .collect(Collectors.toConcurrentMap(
+ entry -> entry.getKey(),
+ entry -> entry.getValue(),
+ Measurement::merge));
+ System.out.println(new TreeMap<>(measurements));
+ }
+
+ private static Map<String, Measurement> toMeasurements(ByteBuffer buffer) {
+ var iterator = Spliterators.spliteratorUnknownSize(new RowReader(buffer), Spliterator.IMMUTABLE);
+ return StreamSupport.stream(iterator, false)
+ .collect(Collectors.toMap(
+ row -> row.name(),
+ row -> new Measurement(row.value()),
+ Measurement::merge));
+ }
+}