aboutsummaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
authorCarlo <sana.carlo@gmail.com>2024-01-07 09:50:34 +0100
committerGitHub <noreply@github.com>2024-01-07 09:50:34 +0100
commit3ebc7d2b9c958ed951457f76dcd5651bdc3ef724 (patch)
tree083ec9f3ae77b0e9180a204326e93732e7d1a11d /src
parentc72a8e97693ee60723f24245b1efabb94bd20550 (diff)
Experiment from entangled90
* single thread memory mapped file reader, pool of processors * cleanup of inner classes of MetricProcessor * doubles are parsed without external functions, strings are lazily created from byte arrays * remove load() MappedByteBuffer in memory * fixed handling of newline * fix a bug & correct locale used * MappedByteBuffer size set to 1MB * fixed rounding * Do not use ArrayBlockingQueue.offer since it drops elements when queue is full * MappedByteBuffer size = 32 MB
Diffstat (limited to 'src')
-rw-r--r--src/main/java/dev/morling/onebrc/CalculateAverage_entangled90.java328
1 files changed, 328 insertions, 0 deletions
diff --git a/src/main/java/dev/morling/onebrc/CalculateAverage_entangled90.java b/src/main/java/dev/morling/onebrc/CalculateAverage_entangled90.java
new file mode 100644
index 0000000..8adce3c
--- /dev/null
+++ b/src/main/java/dev/morling/onebrc/CalculateAverage_entangled90.java
@@ -0,0 +1,328 @@
+/*
+ * Copyright 2023 The original authors
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package dev.morling.onebrc;
+
+import java.io.IOException;
+import java.io.RandomAccessFile;
+import java.nio.ByteBuffer;
+import java.nio.MappedByteBuffer;
+import java.nio.channels.FileChannel;
+import java.nio.charset.StandardCharsets;
+import java.util.*;
+import java.util.concurrent.ArrayBlockingQueue;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
+import java.util.function.Consumer;
+import java.util.stream.Collectors;
+
+import static java.lang.Math.round;
+
+public class CalculateAverage_entangled90 {
+
+ private static final String FILE = "./measurements.txt";
+
+ public static final boolean DEBUG = false;
+
+ public static void main(String[] args) throws IOException {
+ Scanner scanner = new Scanner();
+ long start = System.currentTimeMillis();
+ PooledChunkProcessor chunkProcessor = new PooledChunkProcessor(Runtime.getRuntime().availableProcessors());
+ scanner.scan(FILE, chunkProcessor, 0L, -1);
+ long finish = System.currentTimeMillis();
+ var map = chunkProcessor.result();
+ map.printResults();
+ if (DEBUG) {
+ System.out.println("Took: " + TimeUnit.MILLISECONDS.toSeconds(finish - start) + "seconds");
+ }
+ }
+}
+
+class AggregatedProcessor {
+ public double max = Double.NEGATIVE_INFINITY;
+ public double min = Double.POSITIVE_INFINITY;
+ private double sum = 0L;
+ public long count = 0L;
+
+ public void addMeasure(double value) {
+ max = Math.max(max, value);
+ min = Math.min(min, value);
+ sum += value;
+ count++;
+ }
+
+ public void combine(AggregatedProcessor processor) {
+ max = Math.max(max, processor.max);
+ min = Math.min(min, processor.min);
+ sum += processor.sum;
+ count += processor.count;
+ }
+
+ public double mean() {
+ return sum / count;
+ }
+
+ private static double round(double d) {
+ return Math.round(d * 10.0) / 10.0;
+ }
+
+ @Override
+ public String toString() {
+ return String.format(Locale.US, "%.1f/%.1f/%.1f", round(min), round(mean()), round(max));
+ }
+}
+
+class ProcessorMap {
+ Map<BytesWrapper, AggregatedProcessor> processors = new HashMap<>(1024);
+
+ public void printResults() {
+ System.out.println("Processed: " + processors.entrySet().stream().mapToLong(e -> e.getValue().count).sum());
+ System.out.print("{");
+ System.out.print(
+ processors.entrySet().stream()
+ .sorted(Map.Entry.comparingByKey()).map(Object::toString).collect(Collectors.joining(", ")));
+ System.out.println("}");
+ }
+
+ public void addMeasure(BytesWrapper city, double value) {
+ var processor = processors.get(city);
+ if (processor == null) {
+ processor = new AggregatedProcessor();
+ processors.put(city, processor);
+ }
+ processor.addMeasure(value);
+ }
+
+ private void combine(BytesWrapper city, AggregatedProcessor processor) {
+ var thisProcessor = processors.get(city);
+ if (thisProcessor == null) {
+ processors.put(city, processor);
+ }
+ else {
+ thisProcessor.combine(processor);
+ }
+ }
+
+ public static ProcessorMap combineAll(ProcessorMap... processors) {
+ var result = new ProcessorMap();
+ for (var p : processors) {
+ for (var entry : p.processors.entrySet()) {
+ result.combine(entry.getKey(), entry.getValue());
+ }
+ }
+ return result;
+ }
+}
+
+class PooledChunkProcessor implements Consumer<ByteBuffer> {
+ private final ArrayBlockingQueue<ByteBuffer> queue;
+ private final ProcessorMap[] results;
+ private final CountDownLatch latch;
+
+ private volatile boolean queueClosed = false;
+
+ public PooledChunkProcessor(int n) {
+ queue = new ArrayBlockingQueue<>(4 * 1024);
+ results = new ProcessorMap[n];
+ latch = new CountDownLatch(n);
+ for (int i = 0; i < n; i++) {
+ int finalI = i;
+ var t = new Thread(() -> {
+ var processor = new ChunkProcessor();
+ while (!Thread.interrupted()) {
+ try {
+ var element = queue.poll(1, TimeUnit.MILLISECONDS);
+ if (element != null)
+ processor.processChunk(element);
+ else if (queueClosed) {
+ if (CalculateAverage_entangled90.DEBUG) {
+ System.out.println("Queue closed, thread #" + finalI + " stopping");
+ }
+ break;
+ }
+ }
+ catch (InterruptedException e) {
+ break;
+ }
+ }
+ results[finalI] = processor.processors;
+ latch.countDown();
+ });
+ t.start();
+ }
+ }
+
+ @Override
+ public void accept(ByteBuffer byteBuffer) {
+ try {
+ queue.put(byteBuffer);
+ }
+ catch (InterruptedException e) {
+ throw new RuntimeException(e);
+ }
+ }
+
+ public ProcessorMap result() {
+ try {
+ queueClosed = true;
+ latch.await();
+ }
+ catch (InterruptedException e) {
+ e.printStackTrace();
+ }
+ return ProcessorMap.combineAll(results);
+ }
+}
+
+class ChunkProcessor implements Consumer<ByteBuffer> {
+ ProcessorMap processors = new ProcessorMap();
+
+ public void processChunk(ByteBuffer bb) {
+ while (processRow(bb)) {
+ }
+ }
+
+ @Override
+ public void accept(ByteBuffer byteBuffer) {
+ processChunk(byteBuffer);
+ }
+
+ // true if you can continue
+ // false if input is missing
+ public boolean processRow(ByteBuffer bb) {
+ int colonIdx = findSemiColon(bb);
+ if (colonIdx < 0) {
+ return false;
+ }
+ while (bb.get(bb.position()) == '\n') {
+ bb.get();
+ }
+ var wrapper = wrapperFromBB(bb, colonIdx - bb.position());
+ bb.position(colonIdx + 1);
+ double value = parseDoubleNewLine(bb);
+ processors.addMeasure(wrapper, value);
+ return true;
+ }
+
+ private static BytesWrapper wrapperFromBB(ByteBuffer bb, int length) {
+ var bytes = new byte[length];
+ bb.get(bytes);
+ return new BytesWrapper(bytes);
+ }
+
+ // don't advance bb
+ private int findSemiColon(ByteBuffer bb) {
+ for (int i = bb.position(); i < bb.limit(); i++) {
+ if (bb.get(i) == (byte) ';')
+ return i;
+ }
+ return -1;
+ }
+
+ // parses double until new line and advances buffer
+ private static double parseDoubleNewLine(ByteBuffer bb) {
+ int result = 0;
+ int sign = 1;
+ byte c;
+ do {
+ c = bb.get();
+ switch (c) {
+ case '-':
+ sign = -1;
+ break;
+ case '.', ',', '\r', '\n':
+ break;
+ default:
+ result = result * 10 + (c - '0');
+ }
+ } while (c != '\n' && bb.position() < bb.limit());
+ return result * sign / 10.0;
+ }
+}
+
+class Scanner {
+ private static final int MAX_MAPPED_MEMORY = 32 * 1024 * 1024;
+
+ public void scan(String fileName, Consumer<ByteBuffer> consumer, long offset, long len) throws IOException {
+ try (var file = new RandomAccessFile(fileName, "r"); FileChannel channel = file.getChannel()) {
+ int chunkId = 0;
+ if (len < 0) {
+ len = file.length();
+ }
+
+ while (true) {
+ if (offset > 0) {
+ file.seek(offset);
+ }
+ MappedByteBuffer bb = channel.map(FileChannel.MapMode.READ_ONLY, offset, Math.min(MAX_MAPPED_MEMORY, len - offset));
+ var limitIdx = findLastNewLine(bb);
+ bb.limit(limitIdx);
+ // get last newline from the end
+ consumer.accept(bb);
+ chunkId++;
+ offset += limitIdx;
+ if (CalculateAverage_entangled90.DEBUG && chunkId % 10 == 0) {
+ System.out.println(" read chunk " + chunkId + " at pointer " + offset + "(" + ((int) (offset * 100 / len)) + "%)");
+ }
+
+ if (offset >= len - 1) {
+ break;
+ }
+ }
+ }
+ }
+
+ public int findLastNewLine(ByteBuffer bb) {
+ for (int i = bb.limit() - 1; i > bb.position(); i--) {
+ if (bb.get(i) == '\n') {
+ return i;
+ }
+ }
+ return -1;
+ }
+}
+
+class BytesWrapper implements Comparable<BytesWrapper> {
+ private final byte[] bytes;
+
+ public BytesWrapper(byte[] bytes) {
+ this.bytes = bytes;
+ }
+
+ @Override
+ public boolean equals(Object obj) {
+ if (obj instanceof BytesWrapper) {
+ return Arrays.equals(bytes, ((BytesWrapper) obj).bytes);
+ }
+ else {
+ return false;
+ }
+ }
+
+ @Override
+ public int hashCode() {
+ return Arrays.hashCode(bytes);
+ }
+
+ @Override
+ public String toString() {
+ return new String(bytes);
+ }
+
+ @Override
+ public int compareTo(BytesWrapper bytesWrapper) {
+ return this.toString().compareTo(bytesWrapper.toString());
+ }
+} \ No newline at end of file