/*
* Copyright 2023 The original authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package dev.morling.onebrc;
import sun.misc.Unsafe;
import java.io.IOException;
import java.io.RandomAccessFile;
import java.lang.reflect.Field;
import java.nio.ByteBuffer;
import java.nio.ByteOrder;
import java.nio.channels.AsynchronousFileChannel;
import java.nio.channels.CompletionHandler;
import java.nio.charset.StandardCharsets;
import java.nio.file.Files;
import java.nio.file.Path;
import java.nio.file.StandardOpenOption;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.TreeMap;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.FutureTask;
import java.util.concurrent.LinkedTransferQueue;
import java.util.concurrent.TransferQueue;
/**
* Results on Mac mini (Apple M2 with 8-core CPU / 8GB unified memory):
*
* using AIO and multiple threads:
* 120.15s user 4.33s system 710% cpu 17.522 total
*
* reduce the number of memory copies:
* 45.87s user 2.82s system 530% cpu 9.185 total
*
* processing byte array backwards and using bitwise operation to find specific byte (inspired by thomaswue):
* 25.38s user 3.44s system 342% cpu 8.406 total
*
*
* @author Xylitol
*/
@SuppressWarnings("unchecked")
public class CalculateAverage_C5H12O5 {
private static final int AVAILABLE_PROCESSOR_NUM = Runtime.getRuntime().availableProcessors();
private static final int TRANSFER_QUEUE_CAPACITY = 1024 / 16 / AVAILABLE_PROCESSOR_NUM; // 1GB memory max
private static final int BYTE_BUFFER_CAPACITY = 1024 * 1024 * 16; // 16MB one time
private static final int EXPECTED_MAPPINGS_NUM = 10000;
/**
* Fragment the file into chunks.
*/
private static long[] fragment(Path path) throws IOException {
long size = Files.size(path);
long chunk = size / AVAILABLE_PROCESSOR_NUM;
List positions = new ArrayList<>();
try (RandomAccessFile file = new RandomAccessFile(path.toFile(), "r")) {
long position = chunk;
for (int i = 0; i < AVAILABLE_PROCESSOR_NUM - 1; i++) {
if (position >= size) {
break;
}
file.seek(position);
// move the position to the next newline byte
while (file.read() != '\n') {
position++;
}
positions.add(++position);
position += chunk;
}
}
if (positions.isEmpty() || positions.getLast() < size) {
positions.add(size);
}
return positions.stream().mapToLong(Long::longValue).toArray();
}
public static void main(String[] args) throws Exception {
// fragment the input file
Path path = Path.of("./measurements.txt");
long[] positions = fragment(path);
// start the calculation tasks
FutureTask