aboutsummaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
authorKeshavram Kuduwa <131107576+kuduwa-keshavram@users.noreply.github.com>2024-01-07 14:41:56 +0530
committerGitHub <noreply@github.com>2024-01-07 10:11:56 +0100
commit78b34156784777e0bb8bdfcee8461541e0824199 (patch)
tree4e2cc3865dbd4ab43acdb9b46ac3386d40b6055a /src
parentc13997c9e0e67dc7cbfe3aae2b72bb80998d6492 (diff)
Optimised Code to use FileSegments with ByteBuffer (#184)
* Keshavram Kuduwa's Submission * Resolves #102 and Code Optimizations * Resolves #102 and Code Optimizations * Optimised Code with Roy's Reference * Fixed Tests * Clean Up Code --------- Co-authored-by: Keshavram Kuduwa <keshavram.kuduwa@apptware.com>
Diffstat (limited to 'src')
-rw-r--r--src/main/java/dev/morling/onebrc/CalculateAverage_kuduwa_keshavram.java204
1 files changed, 156 insertions, 48 deletions
diff --git a/src/main/java/dev/morling/onebrc/CalculateAverage_kuduwa_keshavram.java b/src/main/java/dev/morling/onebrc/CalculateAverage_kuduwa_keshavram.java
index d9da874..d7f1a61 100644
--- a/src/main/java/dev/morling/onebrc/CalculateAverage_kuduwa_keshavram.java
+++ b/src/main/java/dev/morling/onebrc/CalculateAverage_kuduwa_keshavram.java
@@ -15,53 +15,165 @@
*/
package dev.morling.onebrc;
+import java.io.File;
import java.io.IOException;
+import java.io.RandomAccessFile;
+import java.nio.ByteOrder;
+import java.nio.MappedByteBuffer;
+import java.nio.channels.FileChannel;
+import java.nio.channels.FileChannel.MapMode;
import java.nio.file.Files;
import java.nio.file.Path;
+import java.nio.file.StandardOpenOption;
import java.util.DoubleSummaryStatistics;
+import java.util.Iterator;
+import java.util.List;
import java.util.Map;
-import java.util.function.Function;
-import java.util.function.ToDoubleFunction;
+import java.util.Spliterator;
+import java.util.Spliterators;
import java.util.stream.Collectors;
-import java.util.stream.Stream;
+import java.util.stream.IntStream;
+import java.util.stream.StreamSupport;
public class CalculateAverage_kuduwa_keshavram {
private static final String FILE = "./measurements.txt";
- private static final Function<String, String> KEY_MAPPER = line -> {
- int pivot = line.indexOf(";");
- return line.substring(0, pivot);
- };
- private static final ToDoubleFunction<String> VALUE_MAPPER = line -> {
- int pivot = line.indexOf(";");
- return toDouble(line.substring(pivot + 1));
- };
+ private static final long LEFT_SHIFT_EIGHT = Long.MAX_VALUE / (1L << 8);
+ private static final long LEFT_SHIFT_FOUR = Long.MAX_VALUE / (1L << 4);
+ private static final long LEFT_SHIFT_TWO = Long.MAX_VALUE / (1L << 2);
+ private static final long LEFT_SHIFT_ONE = Long.MAX_VALUE / (1L << 1);
public static void main(String[] args) throws IOException, InterruptedException {
- try (Stream<String> lines = Files.lines(Path.of(FILE))) {
- Map<String, DoubleSummaryStatistics> resultMap = lines
- .parallel()
- .collect(
- Collectors.groupingBy(KEY_MAPPER, Collectors.summarizingDouble(VALUE_MAPPER)));
- System.out.println(
- resultMap.entrySet().stream()
- .sorted(Map.Entry.comparingByKey())
- .map(
- entry -> String.format(
- "%s=%.1f/%.1f/%.1f",
- entry.getKey(),
- entry.getValue().getMin(),
- entry.getValue().getAverage(),
- entry.getValue().getMax()))
- .collect(Collectors.joining(", ", "{", "}")));
+ Map<String, DoubleSummaryStatistics> resultMap = getFileSegments(new File(FILE)).stream()
+ .parallel()
+ .flatMap(
+ segment -> {
+ try (FileChannel fileChannel = (FileChannel) Files.newByteChannel(Path.of(FILE), StandardOpenOption.READ)) {
+ MappedByteBuffer byteBuffer = fileChannel.map(
+ MapMode.READ_ONLY, segment.start, segment.end - segment.start);
+ byteBuffer.order(ByteOrder.nativeOrder());
+ Iterator<Measurement> iterator = getMeasurementIterator(byteBuffer);
+ return StreamSupport.stream(
+ Spliterators.spliteratorUnknownSize(iterator, Spliterator.NONNULL), true);
+ }
+ catch (IOException e) {
+ throw new RuntimeException(e);
+ }
+ })
+ .collect(
+ Collectors.groupingBy(
+ Measurement::city, Collectors.summarizingDouble(Measurement::temp)));
+ System.out.println(
+ resultMap.entrySet().stream()
+ .sorted(Map.Entry.comparingByKey())
+ .map(
+ entry -> String.format(
+ "%s=%.1f/%.1f/%.1f",
+ entry.getKey(),
+ entry.getValue().getMin(),
+ entry.getValue().getAverage(),
+ entry.getValue().getMax()))
+ .collect(Collectors.joining(", ", "{", "}")));
+ }
+
+ private static Iterator<Measurement> getMeasurementIterator(MappedByteBuffer byteBuffer) {
+ return new Iterator<>() {
+
+ private int initialPosition;
+
+ private int delimiterIndex;
+
+ @Override
+ public boolean hasNext() {
+ boolean hasRemaining = byteBuffer.hasRemaining();
+ if (hasRemaining) {
+ initialPosition = byteBuffer.position();
+ delimiterIndex = 0;
+ while (true) {
+ byte b = byteBuffer.get();
+ if (b == 59) {
+ break;
+ }
+ delimiterIndex++;
+ }
+ return true;
+ }
+ return false;
+ }
+
+ @Override
+ public Measurement next() {
+ byteBuffer.position(initialPosition);
+
+ byte[] city = new byte[delimiterIndex];
+ for (int j = 0; j < delimiterIndex; j++) {
+ city[j] = byteBuffer.get();
+ }
+
+ byteBuffer.get();
+ String temp = "";
+ while (true) {
+ char c = (char) byteBuffer.get();
+ if (c == '\n') {
+ break;
+ }
+ temp += c;
+ }
+ return new Measurement(new String(city), toDouble(temp));
+ }
+ };
+ }
+
+ private record FileSegment(long start, long end) {
+ }
+
+ private record Measurement(String city, double temp) {
+ }
+
+ private static List<FileSegment> getFileSegments(final File file) throws IOException {
+ final int numberOfSegments = Runtime.getRuntime().availableProcessors();
+ final long fileSize = file.length();
+ final long segmentSize = fileSize / numberOfSegments;
+ if (segmentSize < 1000) {
+ return List.of(new FileSegment(0, fileSize));
+ }
+
+ try (RandomAccessFile randomAccessFile = new RandomAccessFile(file, "r")) {
+ int lastSegment = numberOfSegments - 1;
+ return IntStream.range(0, numberOfSegments)
+ .mapToObj(
+ i -> {
+ long segStart = i * segmentSize;
+ long segEnd = (i == lastSegment) ? fileSize : segStart + segmentSize;
+ try {
+ segStart = findSegment(i, 0, randomAccessFile, segStart, segEnd);
+ segEnd = findSegment(i, lastSegment, randomAccessFile, segEnd, fileSize);
+ }
+ catch (IOException e) {
+ throw new RuntimeException(e);
+ }
+ return new FileSegment(segStart, segEnd);
+ })
+ .toList();
}
}
- private static final long MAX_VALUE_DIVIDE_10 = Long.MAX_VALUE / 10;
+ private static long findSegment(
+ final int i, final int skipSegment, RandomAccessFile raf, long location, final long fileSize)
+ throws IOException {
+ if (i != skipSegment) {
+ raf.seek(location);
+ while (location < fileSize) {
+ location++;
+ if (raf.read() == '\n')
+ return location;
+ }
+ }
+ return location;
+ }
private static double toDouble(String num) {
long value = 0;
- int exp = 0;
boolean negative = false;
int decimalPlaces = Integer.MIN_VALUE;
for (byte ch : num.getBytes()) {
@@ -80,32 +192,26 @@ public class CalculateAverage_kuduwa_keshavram {
}
}
- return asDouble(value, exp, negative, decimalPlaces);
+ return asDouble(value, negative, decimalPlaces);
}
- private static double asDouble(long value, int exp, boolean negative, int decimalPlaces) {
+ private static double asDouble(long value, boolean negative, int decimalPlaces) {
+ int exp = -48;
+ value <<= 48;
if (decimalPlaces > 0 && value < Long.MAX_VALUE / 2) {
- if (value < Long.MAX_VALUE / (1L << 32)) {
- exp -= 32;
- value <<= 32;
- }
- if (value < Long.MAX_VALUE / (1L << 16)) {
- exp -= 16;
- value <<= 16;
- }
- if (value < Long.MAX_VALUE / (1L << 8)) {
+ if (value < LEFT_SHIFT_EIGHT) {
exp -= 8;
value <<= 8;
}
- if (value < Long.MAX_VALUE / (1L << 4)) {
+ if (value < LEFT_SHIFT_FOUR) {
exp -= 4;
value <<= 4;
}
- if (value < Long.MAX_VALUE / (1L << 2)) {
+ if (value < LEFT_SHIFT_TWO) {
exp -= 2;
value <<= 2;
}
- if (value < Long.MAX_VALUE / (1L << 1)) {
+ if (value < LEFT_SHIFT_ONE) {
exp -= 1;
value <<= 1;
}
@@ -115,25 +221,27 @@ public class CalculateAverage_kuduwa_keshavram {
long mod = value % 5;
value /= 5;
int modDiv = 1;
- if (value < Long.MAX_VALUE / (1L << 4)) {
+ if (value < LEFT_SHIFT_FOUR) {
exp -= 4;
value <<= 4;
modDiv <<= 4;
}
- if (value < Long.MAX_VALUE / (1L << 2)) {
+ if (value < LEFT_SHIFT_TWO) {
exp -= 2;
value <<= 2;
modDiv <<= 2;
}
- if (value < Long.MAX_VALUE / (1L << 1)) {
+ if (value < LEFT_SHIFT_ONE) {
exp -= 1;
value <<= 1;
modDiv <<= 1;
}
- if (decimalPlaces > 1)
+ if (decimalPlaces > 1) {
value += modDiv * mod / 5;
- else
+ }
+ else {
value += (modDiv * mod + 4) / 5;
+ }
}
final double d = Math.scalb((double) value, exp);
return negative ? -d : d;