aboutsummaryrefslogtreecommitdiff
path: root/src/main/java/dev/morling/onebrc
diff options
context:
space:
mode:
authorDimitar Dimitrov <dimitar.dimitrov@gmail.com>2024-01-03 22:53:38 +0900
committerGunnar Morling <gunnar.morling@googlemail.com>2024-01-03 21:04:44 +0100
commit57cfa54c684222a43e7b618dc34367537d969153 (patch)
tree4b14d1ee3c64965fef8fba0e5dcb0e26963ad1ed /src/main/java/dev/morling/onebrc
parent2458f056d63579da624ba02eebcea3e9e8ccda49 (diff)
ddimtirov - single-threaded datastructures tuning - reading to char buffers, one pass, no allocation processing
Diffstat (limited to 'src/main/java/dev/morling/onebrc')
-rw-r--r--src/main/java/dev/morling/onebrc/CalculateAverage_ddimtirov.java271
1 files changed, 193 insertions, 78 deletions
diff --git a/src/main/java/dev/morling/onebrc/CalculateAverage_ddimtirov.java b/src/main/java/dev/morling/onebrc/CalculateAverage_ddimtirov.java
index 285ab00..9183893 100644
--- a/src/main/java/dev/morling/onebrc/CalculateAverage_ddimtirov.java
+++ b/src/main/java/dev/morling/onebrc/CalculateAverage_ddimtirov.java
@@ -15,117 +15,232 @@
*/
package dev.morling.onebrc;
-import java.io.BufferedReader;
-import java.io.IOException;
-import java.io.InputStreamReader;
+import java.io.*;
+import java.nio.CharBuffer;
+import java.nio.charset.StandardCharsets;
import java.nio.file.Files;
import java.nio.file.Path;
import java.time.Duration;
import java.time.Instant;
+import java.util.Map;
import java.util.TreeMap;
-import java.util.stream.Collector;
-
-import static java.util.stream.Collectors.groupingByConcurrent;
// gunnar morling - 2:10
// roy van rijn - 1:01
-// 0:53
+// 0:37
public class CalculateAverage_ddimtirov {
- public record InputLine(String station, int value) {
- public static InputLine fromLine(String line) {
- int endOfText = line.indexOf(";");
- String station = line.substring(0, endOfText);
+ @SuppressWarnings("RedundantSuppression")
+ public static void main(String[] args) throws IOException {
+// var start = Instant.now();
+ Instant start = null;
+ var path = Path.of("./measurements.txt");
+ var bufferSize = 512 * 64; // 64 blocks
+ var tracker = new Tracker();
+ var charset = StandardCharsets.UTF_8;
+
+ // Files.lines() is optimized for files that can be indexed by int
+ // For larger files it falls back to buffered reader, which we now
+ // use directly to be able to tweak the buffer size.
+ try (var stream = Files.newInputStream(path); var reader = new InputStreamReader(stream, charset)) {
+ var buffered = new RecordReader(reader, bufferSize);
- int startOfWhole = endOfText + 1;
- int sign;
- if (line.charAt(startOfWhole) == '-') {
- sign = -1;
- startOfWhole++;
- } else {
- sign = 1;
+ InputRecord record = null;
+ while (true) {
+ record = buffered.readRecord(record);
+ if (record==null) break;
+
+ tracker.process(record);
}
+ }
+
+ System.out.println(tracker.stationsToMetrics());
+
+ //noinspection ConstantValue
+ if (start!=null) System.err.println(Duration.between(start, Instant.now()));
+
+ assert Files.readAllLines(Path.of("expected_result.txt")).getFirst().equals(tracker.stationsToMetrics().toString());
+ }
+
+ /**
+ * <p>Reads records we can use to do the filtering from a stream of characters.
+ * Takes care of framing, and I/O buffering.
+ * This class in combination with {@link RecordReader} allows us to fully hide the
+ * I/O and internal representation.
+ *
+ * <p>If used with recycled {@link InputRecord} instances, this class allocates no memory
+ * after instantiation. The class is stateful, and the internals are not threadsafe
+ * - use from single thread or with proper synchronization.
+ */
+ static class RecordReader {
+ /**
+ * The source of input data
+ */
+ private final Readable input;
+
+ /**
+ * Used for i/o buffering and record parsing.
+ * @see #buf
+ */
+ private final CharBuffer buffer;
+
+ /**
+ * <p>Cached backing array from {@link #buffer}.
+ * <p>This is optimization because {@link CharBuffer#array()} was showing on the CPU profile.
+ */
+ private final char[] buf;
+
+ public RecordReader(Readable input, int bufferSize) {
+ this.input = input;
+ buffer = CharBuffer.allocate(bufferSize).flip();
+ buf = buffer.array();
+ }
+
+ public InputRecord readRecord(InputRecord recycled) throws IOException {
+ var record = parseRecord(recycled);
+ if (record!=null) return record;
- int endOfWhole = line.lastIndexOf(".");
- var whole = unsafeParsePositiveInt(line, startOfWhole, endOfWhole);
- var decimal = unsafeParsePositiveInt(line,endOfWhole+1, line.length());
- int fixpoint10 = (whole * 10 + decimal) * sign;
+ if (input.read(buffer.compact())==-1) return null;
+ buffer.flip();
- return new InputLine(station, fixpoint10);
+ return parseRecord(recycled);
}
- static int unsafeParsePositiveInt(String s, int start, int end) {
- int acc = 0;
- for (int i = start; i<end; i++) {
- if (acc != 0) acc *= 10;
- char c = s.charAt(i);
+
+ private InputRecord parseRecord(InputRecord recycled) {
+ var lim = buffer.limit();
+ if (buffer.isEmpty()) return null;
+
+ var nameOff = buffer.position();
+ var buff = buf;
+ while (buff[nameOff]=='\n' || buff[nameOff]=='\r' || buff[nameOff]==' ' ) {
+ nameOff++;
+ if (nameOff>=lim) return null;
+ }
+
+ var nameHash = 0;
+ var nameLen = 0;
+ while (buff[nameOff+nameLen]!=';') {
+ nameHash = nameHash*31 + buff[nameOff+nameLen];
+ nameLen++;
+ if (nameOff+nameLen>=lim) return null;
+ }
+
+ //noinspection DuplicateExpressions
+ assert new String(buf, nameOff, nameLen).hashCode()==nameHash
+ : "'%s'@%d !-> %d".formatted(new String(buf, nameOff, nameLen), new String(buf, nameOff, nameLen).hashCode(), nameHash);
+
+ var valCursor = nameOff + nameLen +1;
+ int signum = 1;
+ var acc = 0;
+ while (true) {
+ if (valCursor >= lim) {
+ return null;
+ }
+ char c = buff[valCursor++];
+ if (c == '\n' || c == '\r') {
+ break;
+ }
+ if (c=='.') continue;
+ if (acc == 0) {
+ if (c == '-') {
+ signum = -1;
+ continue;
+ }
+ } else {
+ acc *= 10;
+ }
var v = c - '0';
assert v>=0 && v<=9 : String.format("Character '%s', value %,d", c, v);
acc += v;
}
- return acc;
+
+ buffer.position(valCursor);
+
+ var record = recycled!=null ? recycled : new InputRecord(buf);
+ record.init(nameHash, nameOff, nameLen, acc*signum);
+ return record;
}
}
- private static class OutputMetrics {
- private int min = Integer.MAX_VALUE;
- private int max = Integer.MIN_VALUE;
- private long sum;
- private long count;
-
- @SuppressWarnings("ManualMinMaxCalculation")
- public OutputMetrics combine(OutputMetrics o) {
- var r = new OutputMetrics();
- r.min = min < o.min ? min : o.min;
- r.max = max > o.max ? max : o.max;
- r.sum = sum + o.sum;
- r.count = count + o.count;
- return r;
- }
- public void accumulate(InputLine m) {
- if (m.value < min) min = m.value;
- if (m.value > max) max = m.value;
- sum += m.value;
- count++;
+ static class Tracker {
+ private static final int ADDRESS_NO_CLASH_MODULUS = 49999;
+ private static final int OFFSET_MIN = 0;
+ private static final int OFFSET_MAX = 1;
+ private static final int OFFSET_COUNT = 2;
+
+ private final int[] minMaxCount = new int[ADDRESS_NO_CLASH_MODULUS * 3];
+ private final long[] sums = new long[ADDRESS_NO_CLASH_MODULUS];
+ private final String[] names = new String[ADDRESS_NO_CLASH_MODULUS];
+
+ public void process(InputRecord r) {
+ var i = Math.abs(r.nameHash) % ADDRESS_NO_CLASH_MODULUS;
+
+ if (names[i]==null) names[i] = r.name();
+
+ sums[i] += r.value;
+
+ int mmcIndex = i * 3;
+ var min = minMaxCount[mmcIndex + OFFSET_MIN];
+ var max = minMaxCount[mmcIndex + OFFSET_MAX];
+ if (r.value < min) minMaxCount[mmcIndex + OFFSET_MIN] = r.value;
+ if (r.value > max) minMaxCount[mmcIndex + OFFSET_MAX] = r.value;
+
+ minMaxCount[mmcIndex + OFFSET_COUNT]++;
}
- @Override
- public String toString() {
- var min = this.min / 10.0;
- var mean = Math.round(this.sum / (double) count) / 10.0;
- var max = this.max / 10.0;
- return min + "/" + mean + "/" + max;
+
+
+ public Map<String, String> stationsToMetrics() {
+ var m = new TreeMap<String, String>();
+ for (int i = 0; i < names.length; i++) {
+ var name = names[i];
+ if (name==null) continue;
+
+ var min = minMaxCount[i*3] / 10.0;
+ var max = minMaxCount[i*3+1] / 10.0;
+ var count = minMaxCount[i*3+2];
+ var sum = sums[i];
+ var mean = Math.round((double) sum / count) / 10.0;
+
+ m.put(name, min + "/" + mean + "/" + max);
+ }
+ return m;
}
+
}
- public static void main(String[] args) throws IOException {
-// var start = Instant.now();
- Instant start = null;
- var path = Path.of("./measurements.txt");
- var buffer = 8192 * 20;
+ static class InputRecord {
+ private final char[] chars;
+ private int idOffset;
+ private int idLength;
- // Files.lines() is optimized for files that can be indexed by int
- // For larger files it falls back to buffered reader, which we now
- // use directly to be able to tweak the buffer size.
- try (var reader = new BufferedReader(new InputStreamReader(Files.newInputStream(path)), buffer)) {
- var stationsToMetrics = reader.lines()
- .map(InputLine::fromLine)
- .parallel()
- .collect(groupingByConcurrent(InputLine::station, Collector.of(
- OutputMetrics::new,
- OutputMetrics::accumulate,
- OutputMetrics::combine,
- OutputMetrics::toString
- )));
- System.out.println(new TreeMap<>(stationsToMetrics));
- assert Files.readAllLines(Path.of("expected_result.txt")).getFirst().equals(new TreeMap<>(stationsToMetrics).toString());
+ public int value; // fixpoint scaled by 10
+ public int nameHash;
+
+ public InputRecord(char[] chars) {
+ this.chars = chars;
}
- //noinspection ConstantValue
- if (start!=null) System.err.println(Duration.between(start, Instant.now()));
- }
+ public void init(int nameHash, int nameOffset, int nameLength, int fixpointValue) {
+ assert nameOffset+nameLength<chars.length : String.format("idOffset+idLength=%d < chars.length=%d", nameOffset+nameLength, chars.length);
+ this.idOffset = nameOffset;
+ this.idLength = nameLength;
+ this.value = fixpointValue;
+ this.nameHash = nameHash;
+ }
+
+ public String name() {
+ return new String(chars, idOffset, idLength);
+ }
+
+ @Override
+ public String toString() {
+ return name() + ";" + (value / 10.0);
+ }
+ }
}