aboutsummaryrefslogtreecommitdiff
path: root/src/main/java/dev/morling/onebrc
diff options
context:
space:
mode:
authorAbstractKamen <73014640+AbstractKamen@users.noreply.github.com>2024-01-10 22:48:12 +0200
committerGitHub <noreply@github.com>2024-01-10 21:48:12 +0100
commit7483b90cec1335db70c4e1bc4b72dc42bd444009 (patch)
tree8d4250d9137bc629c47717b7385584cd4e070946 /src/main/java/dev/morling/onebrc
parent209e005461c2b436d9454828f13ddd2732505410 (diff)
CalculateAverage_AbstractKamen
* initial commit * first attempt: segment the file and process it in parallel * remove commented stuff * custom parseDouble for this simple case * fixed some issues and improved parsing * format * Update calculate_average_AbstractKamen.sh --------- Co-authored-by: Gunnar Morling <gunnar.morling@googlemail.com>
Diffstat (limited to 'src/main/java/dev/morling/onebrc')
-rw-r--r--src/main/java/dev/morling/onebrc/CalculateAverage_AbstractKamen.java220
1 files changed, 220 insertions, 0 deletions
diff --git a/src/main/java/dev/morling/onebrc/CalculateAverage_AbstractKamen.java b/src/main/java/dev/morling/onebrc/CalculateAverage_AbstractKamen.java
new file mode 100644
index 0000000..0c8016c
--- /dev/null
+++ b/src/main/java/dev/morling/onebrc/CalculateAverage_AbstractKamen.java
@@ -0,0 +1,220 @@
+/*
+ * Copyright 2023 The original authors
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package dev.morling.onebrc;
+
+import java.io.File;
+import java.io.IOException;
+import java.io.RandomAccessFile;
+import java.nio.ByteBuffer;
+import java.nio.channels.FileChannel;
+import java.nio.channels.FileChannel.MapMode;
+import java.nio.charset.StandardCharsets;
+import java.nio.file.Paths;
+import java.nio.file.StandardOpenOption;
+import java.util.*;
+import java.util.function.Supplier;
+import java.util.stream.Collectors;
+import java.util.stream.Stream;
+import java.util.stream.StreamSupport;
+
+public class CalculateAverage_AbstractKamen {
+
+ private static final String FILE = "./measurements.txt";
+
+ private static class Measurement {
+ private int min = Integer.MAX_VALUE;
+ private int max = Integer.MIN_VALUE;
+ private int sum;
+ private long count;
+
+ public String toString() {
+ return round(min / 10.0) + "/" + round(sum / 10.0 / count) + "/" + round(max / 10.0);
+ }
+
+ private double round(double value) {
+ return Math.round(value * 10.0) / 10.0;
+ }
+ }
+
+ public static void main(String[] args) throws IOException {
+ try (final FileChannel fc = FileChannel.open(Paths.get(FILE), StandardOpenOption.READ);
+ final RandomAccessFile raf = new RandomAccessFile(new File(FILE), "r")) {
+ final Map<String, Measurement> res = getParallelBufferStream(raf, fc)
+ .map(CalculateAverage_AbstractKamen::getMeasurements)
+ .flatMap(m -> m.entrySet().stream())
+ .collect(Collectors.collectingAndThen(
+ Collectors.toMap(Map.Entry::getKey,
+ Map.Entry::getValue,
+ CalculateAverage_AbstractKamen::aggregateMeasurements),
+ TreeMap::new));
+ System.out.println(res);
+ }
+ }
+
+ private static Measurement aggregateMeasurements(Measurement src, Measurement target) {
+ target.min = Math.min(src.min, target.min);
+ target.max = Math.max(src.max, target.max);
+ target.sum = src.sum + target.sum;
+ target.count = src.count + target.count;
+ return target;
+ }
+
+ private static Map<String, Measurement> getMeasurements(BufferSupplier getBuffer) {
+ final Map<String, Measurement> map = new HashMap<>(50_000);
+ final ByteBuffer byteBuffer = getBuffer.get();
+ final byte[] bytes = new byte[512];
+ while (byteBuffer.hasRemaining()) {
+ int nameLen = 0;
+ String name;
+ byte b;
+ while ((b = byteBuffer.get()) != ';') {
+ bytes[nameLen++] = b;
+ }
+ name = new String(bytes, 0, nameLen, StandardCharsets.UTF_8);
+ int valueLen = 0;
+ int neg = 1;
+ while (byteBuffer.hasRemaining() && ((b = byteBuffer.get()) != '\n')) {
+ if (b == '-') {
+ neg = -1;
+ }
+ else if (b == '.' || b == '\r') {
+ // skip the dot and retart char
+ }
+ else {
+ bytes[valueLen++] = b;
+ }
+ }
+ final int val = parseAsInt(valueLen, bytes);
+ takeMeasurement(val * neg, map, name);
+ }
+ return map;
+ }
+
+ private static int parseAsInt(int valueLen, byte[] bytes) {
+ int val;
+ switch (valueLen) {
+ case 2 -> val = (bytes[0] - 48) * 10 + (bytes[1] - 48);
+ case 3 -> val = (bytes[0] - 48) * 100 + (bytes[1] - 48) * 10 + (bytes[2] - 48);
+ default -> val = 0;
+ }
+ return val;
+ }
+
+ private static void takeMeasurement(int temperature, Map<String, Measurement> map, String name) {
+ Measurement measurement = map.get(name);
+ if (measurement != null) {
+ measurement.min = Math.min(measurement.min, temperature);
+ measurement.max = Math.max(measurement.max, temperature);
+ measurement.sum += temperature;
+ measurement.count++;
+ }
+ else {
+ measurement = new Measurement();
+ map.put(name, measurement);
+ measurement.min = temperature;
+ measurement.max = temperature;
+ measurement.sum = temperature;
+ measurement.count = 1;
+ }
+ }
+
+ private static Stream<BufferSupplier> getParallelBufferStream(RandomAccessFile raf, FileChannel fc) throws IOException {
+ final int availableProcessors = Runtime.getRuntime().availableProcessors();
+ return StreamSupport.stream(
+ StreamSupport.stream(
+ Spliterators.spliterator(
+ new BufferSupplierIterator(raf, fc, availableProcessors), availableProcessors,
+ Spliterator.IMMUTABLE | Spliterator.SIZED | Spliterator.SUBSIZED),
+ false)
+ .spliterator(),
+ true);
+ }
+
+}
+
+interface BufferSupplier extends Supplier<ByteBuffer> {
+}
+
+class BufferSupplierIterator implements Iterator<BufferSupplier> {
+ private long start;
+ private final RandomAccessFile raf;
+ private final FileChannel fc;
+ private final long fileLength;
+ private final long chunkSize;
+
+ public BufferSupplierIterator(RandomAccessFile raf, FileChannel fc, int numberOfParts) throws IOException {
+ this.raf = raf;
+ this.fc = fc;
+ this.fileLength = fc.size();
+ this.chunkSize = Math.min(fileLength / numberOfParts, 1073741824);
+ }
+
+ @Override
+ public boolean hasNext() {
+ return start < fileLength;
+ }
+
+ @Override
+ public BufferSupplier next() {
+ try {
+ if (hasNext()) {
+ final long end = getEnd();
+ long s = start;
+ this.start = end;
+ return getBufferSupplier(s, end);
+ }
+ else {
+ throw new NoSuchElementException();
+ }
+ }
+ catch (IOException e) {
+ throw new RuntimeException(e);
+ }
+ }
+
+ private long getEnd() throws IOException {
+ long end = Math.min(start + chunkSize, fileLength);
+ while (end < fileLength) {
+ raf.seek(end++);
+ if (raf.read() == '\n')
+ break;
+ }
+ return end;
+ }
+
+ private BufferSupplier getBufferSupplier(long position, long end) {
+ final long size = end - position;
+ return new BufferSupplier() {
+
+ private ByteBuffer bb;
+
+ @Override
+ public ByteBuffer get() {
+ try {
+ if (bb == null) {
+ return (bb = fc.map(MapMode.READ_ONLY, position, size));
+ }
+ else {
+ return bb;
+ }
+ }
+ catch (IOException e) {
+ throw new RuntimeException(e);
+ }
+ }
+ };
+ }
+} \ No newline at end of file