diff options
| author | Panagiotis Drakatos <PanagiotisDrakatos@users.noreply.github.com> | 2024-01-29 22:16:40 +0200 |
|---|---|---|
| committer | GitHub <noreply@github.com> | 2024-01-29 21:16:40 +0100 |
| commit | 31a6740ef1a376ff086a337060aa5ed0468a7b26 (patch) | |
| tree | 7355a728c9d6605667cb22c25aeba8950fc3f93e /src/main/java/dev/morling | |
| parent | 1281e77be4baf9f49f068098bc6fce4071e40b91 (diff) | |
New Fresh Solution to Optimize Execution time (#641)
* CalculateAverage_pdrakatos
* Rename to be valid with rules
* CalculateAverage_pdrakatos
* Rename to be valid with rules
* Changes on scripts execution
* Fixing bugs causing scripts not to be executed
* Changes on prepare make it compatible
* Fixing passing all tests
* Increase direct memory allocation buffer
* Fixing memory problem causes heap space exception
* Fresh solution to optimize performance of the execution
Diffstat (limited to 'src/main/java/dev/morling')
| -rw-r--r-- | src/main/java/dev/morling/onebrc/CalculateAverage_PanagiotisDrakatos.java | 85 |
1 files changed, 56 insertions, 29 deletions
diff --git a/src/main/java/dev/morling/onebrc/CalculateAverage_PanagiotisDrakatos.java b/src/main/java/dev/morling/onebrc/CalculateAverage_PanagiotisDrakatos.java index ecf2b70..9ab7a22 100644 --- a/src/main/java/dev/morling/onebrc/CalculateAverage_PanagiotisDrakatos.java +++ b/src/main/java/dev/morling/onebrc/CalculateAverage_PanagiotisDrakatos.java @@ -15,8 +15,10 @@ */ package dev.morling.onebrc; +import java.io.File; import java.io.FileInputStream; import java.io.IOException; +import java.io.RandomAccessFile; import java.nio.ByteBuffer; import java.nio.MappedByteBuffer; import java.nio.channels.FileChannel; @@ -26,18 +28,27 @@ import java.util.stream.Stream; import java.util.stream.StreamSupport; public class CalculateAverage_PanagiotisDrakatos { + private static final String FILE = "./measurements.txt"; + private static final long SEGMENT_SIZE = 4 * 1024 * 1024; + private static final long COMMA_PATTERN = 0x3B3B3B3B3B3B3B3BL; + private static final long DOT_BITS = 0x10101000; + private static final long MAGIC_MULTIPLIER = (100 * 0x1000000 + 10 * 0x10000 + 1); + private static TreeMap<String, MeasurementObject> sortedCities; public static void main(String[] args) throws IOException { SeekableByteRead(FILE); System.out.println(sortedCities); + boolean DEBUG = true; } private static void SeekableByteRead(String path) throws IOException { - FileInputStream fileInputStream = new FileInputStream(FILE); + FileInputStream fileInputStream = new FileInputStream(new File(FILE)); FileChannel fileChannel = fileInputStream.getChannel(); - Optional<Map<String, MeasurementObject>> optimistic = SplitSeekableByteChannel(fileChannel) + Optional<Map<String, MeasurementObject>> optimistic = getFileSegments(new File(FILE), fileChannel) + .stream() + .map(CalculateAverage_PanagiotisDrakatos::SplitSeekableByteChannel) .parallel() .map(CalculateAverage_PanagiotisDrakatos::MappingByteBufferToData) .reduce(CalculateAverage_PanagiotisDrakatos::combineMaps); @@ -46,37 +57,53 @@ public class CalculateAverage_PanagiotisDrakatos { } - private static Stream<ByteBuffer> SplitSeekableByteChannel(FileChannel channel) throws IOException { - return StreamSupport.stream(Spliterators.spliteratorUnknownSize(new Iterator<ByteBuffer>() { - private static final long MAP_SIZE = 1024 * 1024 * 10L; - - private long position = 0; - private long length = channel.size(); + record FileSegment(long start, long end, FileChannel fileChannel) { + } - @Override - public boolean hasNext() { - while (position < length) { - return true; - } - return false; + private static List<FileSegment> getFileSegments(final File file, final FileChannel fileChannel) throws IOException { + final int numberOfSegments = Runtime.getRuntime().availableProcessors(); + final long fileSize = file.length(); + final long segmentSize = fileSize / numberOfSegments; + final List<FileSegment> segments = new ArrayList<>(); + if (segmentSize < 1000) { + segments.add(new FileSegment(0, fileSize, fileChannel)); + return segments; + } + try (RandomAccessFile randomAccessFile = new RandomAccessFile(file, "r")) { + long segStart = 0; + long segEnd = segmentSize; + while (segStart < fileSize) { + segEnd = findSegment(randomAccessFile, segEnd, fileSize); + segments.add(new FileSegment(segStart, segEnd, fileChannel)); + segStart = segEnd; // Just re-use the end and go from there. + segEnd = Math.min(fileSize, segEnd + segmentSize); } + } + return segments; + } - @Override - public ByteBuffer next() { - try { - MappedByteBuffer buffer = channel.map(FileChannel.MapMode.READ_ONLY, position, Math.min(MAP_SIZE, length - position)); - int end = buffer.limit() - 1; - while (buffer.get(end) != '\n') { - end--; - } - position += end + 1; - return buffer.slice(0, end); - } - catch (IOException e) { - throw new RuntimeException(e); - } + private static long findSegment(RandomAccessFile raf, long location, final long fileSize) throws IOException { + raf.seek(location); + while (location < fileSize) { + location++; + if (raf.read() == '\n') + return location; + } + return location; + } + + private static ByteBuffer SplitSeekableByteChannel(FileSegment segment) { + try { + MappedByteBuffer buffer = segment.fileChannel.map(FileChannel.MapMode.READ_ONLY, segment.start(), segment.end - segment.start()); + int end = buffer.limit() - 1; + while (buffer.get(end) != '\n') { + end--; } - }, Spliterator.IMMUTABLE), false); + return buffer.slice(0, end); + } + catch (Exception ex) { + throw new RuntimeException(ex); + } } public static ByteBuffer concat(ByteBuffer[] buffers) { |
