aboutsummaryrefslogtreecommitdiff
path: root/src/main/java
diff options
context:
space:
mode:
authorPanagiotis Drakatos <PanagiotisDrakatos@users.noreply.github.com>2024-01-29 22:16:40 +0200
committerGitHub <noreply@github.com>2024-01-29 21:16:40 +0100
commit31a6740ef1a376ff086a337060aa5ed0468a7b26 (patch)
tree7355a728c9d6605667cb22c25aeba8950fc3f93e /src/main/java
parent1281e77be4baf9f49f068098bc6fce4071e40b91 (diff)
New Fresh Solution to Optimize Execution time (#641)
* CalculateAverage_pdrakatos * Rename to be valid with rules * CalculateAverage_pdrakatos * Rename to be valid with rules * Changes on scripts execution * Fixing bugs causing scripts not to be executed * Changes on prepare make it compatible * Fixing passing all tests * Increase direct memory allocation buffer * Fixing memory problem causes heap space exception * Fresh solution to optimize performance of the execution
Diffstat (limited to 'src/main/java')
-rw-r--r--src/main/java/dev/morling/onebrc/CalculateAverage_PanagiotisDrakatos.java85
1 files changed, 56 insertions, 29 deletions
diff --git a/src/main/java/dev/morling/onebrc/CalculateAverage_PanagiotisDrakatos.java b/src/main/java/dev/morling/onebrc/CalculateAverage_PanagiotisDrakatos.java
index ecf2b70..9ab7a22 100644
--- a/src/main/java/dev/morling/onebrc/CalculateAverage_PanagiotisDrakatos.java
+++ b/src/main/java/dev/morling/onebrc/CalculateAverage_PanagiotisDrakatos.java
@@ -15,8 +15,10 @@
*/
package dev.morling.onebrc;
+import java.io.File;
import java.io.FileInputStream;
import java.io.IOException;
+import java.io.RandomAccessFile;
import java.nio.ByteBuffer;
import java.nio.MappedByteBuffer;
import java.nio.channels.FileChannel;
@@ -26,18 +28,27 @@ import java.util.stream.Stream;
import java.util.stream.StreamSupport;
public class CalculateAverage_PanagiotisDrakatos {
+
private static final String FILE = "./measurements.txt";
+ private static final long SEGMENT_SIZE = 4 * 1024 * 1024;
+ private static final long COMMA_PATTERN = 0x3B3B3B3B3B3B3B3BL;
+ private static final long DOT_BITS = 0x10101000;
+ private static final long MAGIC_MULTIPLIER = (100 * 0x1000000 + 10 * 0x10000 + 1);
+
private static TreeMap<String, MeasurementObject> sortedCities;
public static void main(String[] args) throws IOException {
SeekableByteRead(FILE);
System.out.println(sortedCities);
+ boolean DEBUG = true;
}
private static void SeekableByteRead(String path) throws IOException {
- FileInputStream fileInputStream = new FileInputStream(FILE);
+ FileInputStream fileInputStream = new FileInputStream(new File(FILE));
FileChannel fileChannel = fileInputStream.getChannel();
- Optional<Map<String, MeasurementObject>> optimistic = SplitSeekableByteChannel(fileChannel)
+ Optional<Map<String, MeasurementObject>> optimistic = getFileSegments(new File(FILE), fileChannel)
+ .stream()
+ .map(CalculateAverage_PanagiotisDrakatos::SplitSeekableByteChannel)
.parallel()
.map(CalculateAverage_PanagiotisDrakatos::MappingByteBufferToData)
.reduce(CalculateAverage_PanagiotisDrakatos::combineMaps);
@@ -46,37 +57,53 @@ public class CalculateAverage_PanagiotisDrakatos {
}
- private static Stream<ByteBuffer> SplitSeekableByteChannel(FileChannel channel) throws IOException {
- return StreamSupport.stream(Spliterators.spliteratorUnknownSize(new Iterator<ByteBuffer>() {
- private static final long MAP_SIZE = 1024 * 1024 * 10L;
-
- private long position = 0;
- private long length = channel.size();
+ record FileSegment(long start, long end, FileChannel fileChannel) {
+ }
- @Override
- public boolean hasNext() {
- while (position < length) {
- return true;
- }
- return false;
+ private static List<FileSegment> getFileSegments(final File file, final FileChannel fileChannel) throws IOException {
+ final int numberOfSegments = Runtime.getRuntime().availableProcessors();
+ final long fileSize = file.length();
+ final long segmentSize = fileSize / numberOfSegments;
+ final List<FileSegment> segments = new ArrayList<>();
+ if (segmentSize < 1000) {
+ segments.add(new FileSegment(0, fileSize, fileChannel));
+ return segments;
+ }
+ try (RandomAccessFile randomAccessFile = new RandomAccessFile(file, "r")) {
+ long segStart = 0;
+ long segEnd = segmentSize;
+ while (segStart < fileSize) {
+ segEnd = findSegment(randomAccessFile, segEnd, fileSize);
+ segments.add(new FileSegment(segStart, segEnd, fileChannel));
+ segStart = segEnd; // Just re-use the end and go from there.
+ segEnd = Math.min(fileSize, segEnd + segmentSize);
}
+ }
+ return segments;
+ }
- @Override
- public ByteBuffer next() {
- try {
- MappedByteBuffer buffer = channel.map(FileChannel.MapMode.READ_ONLY, position, Math.min(MAP_SIZE, length - position));
- int end = buffer.limit() - 1;
- while (buffer.get(end) != '\n') {
- end--;
- }
- position += end + 1;
- return buffer.slice(0, end);
- }
- catch (IOException e) {
- throw new RuntimeException(e);
- }
+ private static long findSegment(RandomAccessFile raf, long location, final long fileSize) throws IOException {
+ raf.seek(location);
+ while (location < fileSize) {
+ location++;
+ if (raf.read() == '\n')
+ return location;
+ }
+ return location;
+ }
+
+ private static ByteBuffer SplitSeekableByteChannel(FileSegment segment) {
+ try {
+ MappedByteBuffer buffer = segment.fileChannel.map(FileChannel.MapMode.READ_ONLY, segment.start(), segment.end - segment.start());
+ int end = buffer.limit() - 1;
+ while (buffer.get(end) != '\n') {
+ end--;
}
- }, Spliterator.IMMUTABLE), false);
+ return buffer.slice(0, end);
+ }
+ catch (Exception ex) {
+ throw new RuntimeException(ex);
+ }
}
public static ByteBuffer concat(ByteBuffer[] buffers) {