aboutsummaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
authorkarthikeyan97 <skarthikeyan046@gmail.com>2024-01-17 03:16:11 +0530
committerGitHub <noreply@github.com>2024-01-16 22:46:11 +0100
commit455b85c5af1cba9ddb6e6dc686c091d1e000a432 (patch)
tree0cad173406f196624c87a7edbf32202b97599fd3 /src
parentffb09bf4bf0b41835b3340415be4f3c34565c126 (diff)
karthikeyan97 implementation (#417)
Co-authored-by: Karthikeyans <karthikeyan.sn@zohocorp.com>
Diffstat (limited to 'src')
-rw-r--r--src/main/java/dev/morling/onebrc/CalculateAverage_karthikeyan97.java382
1 files changed, 382 insertions, 0 deletions
diff --git a/src/main/java/dev/morling/onebrc/CalculateAverage_karthikeyan97.java b/src/main/java/dev/morling/onebrc/CalculateAverage_karthikeyan97.java
new file mode 100644
index 0000000..c17e927
--- /dev/null
+++ b/src/main/java/dev/morling/onebrc/CalculateAverage_karthikeyan97.java
@@ -0,0 +1,382 @@
+/*
+ * Copyright 2023 The original authors
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package dev.morling.onebrc;
+
+import sun.misc.Unsafe;
+
+import static java.util.stream.Collectors.*;
+
+import java.io.FileInputStream;
+
+import java.io.RandomAccessFile;
+import java.lang.reflect.Field;
+import java.nio.ByteBuffer;
+import java.nio.channels.FileChannel;
+import java.nio.charset.StandardCharsets;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Comparator;
+import java.util.HashMap;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.TreeMap;
+import java.util.function.BiConsumer;
+import java.util.function.BinaryOperator;
+import java.util.function.Function;
+import java.util.function.Supplier;
+import java.util.stream.Collector;
+import java.util.stream.Collectors;
+
+public class CalculateAverage_karthikeyan97 {
+
+ private static final String FILE = "./measurements.txt";
+
+ private record Measurement(modifiedbytearray station, double value) {
+ }
+
+ private record customPair(String stationName, MeasurementAggregator agg) {
+ }
+
+ private static class MeasurementAggregator {
+ private double min = Double.POSITIVE_INFINITY;
+ private double max = Double.NEGATIVE_INFINITY;
+ private long sum;
+ private long count;
+
+ public String toString() {
+ return new StringBuffer(14)
+ .append(round(min))
+ .append("/")
+ .append(round((1.0 * sum) / count))
+ .append("/")
+ .append(round(max)).toString();
+ }
+
+ private double round(double value) {
+ return Math.round(value) / 10.0;
+ }
+ }
+
+ public static void main(String[] args) throws Exception {
+ // long start = System.nanoTime();
+ System.setSecurityManager(null);
+ Collector<Map.Entry<modifiedbytearray, MeasurementAggregator>, MeasurementAggregator, MeasurementAggregator> collector = Collector.of(
+ MeasurementAggregator::new,
+ (a, m) -> {
+ MeasurementAggregator agg = m.getValue();
+ if (a.min >= agg.min) {
+ a.min = agg.min;
+ }
+ if (a.max <= agg.max) {
+ a.max = agg.max;
+ }
+ a.max = Math.max(a.max, m.getValue().max);
+ a.sum += m.getValue().sum;
+ a.count += m.getValue().count;
+ },
+ (agg1, agg2) -> {
+ if (agg1.min <= agg2.min) {
+ agg2.min = agg1.min;
+ }
+ if (agg1.max >= agg2.max) {
+ agg2.max = agg1.max;
+ }
+ agg2.sum = agg1.sum + agg2.sum;
+ agg2.count = agg1.count + agg2.count;
+
+ return agg2;
+ },
+ agg -> agg);
+
+ RandomAccessFile raf = new RandomAccessFile(FILE, "rw");
+ long length = raf.length();
+ int cores = length > 1000 ? Runtime.getRuntime().availableProcessors() : 1;
+ long boundary[][] = new long[cores][2];
+ long segments = length / (cores);
+ long before = -1;
+ for (int i = 0; i < cores - 1; i++) {
+ boundary[i][0] = before + 1;
+ byte[] b = new byte[107];
+ if (before + segments - 107 > 0) {
+ raf.seek(before + segments - 107);
+ }
+ else {
+ raf.seek(0);
+ }
+ while (raf.read() != '\n') {
+ }
+ boundary[i][1] = raf.getChannel().position() - 1;
+ before = boundary[i][1];
+ }
+ boundary[cores - 1][0] = before + 1;
+ boundary[cores - 1][1] = length - 1;
+
+ Field f = Unsafe.class.getDeclaredField("theUnsafe");
+ f.setAccessible(true);
+ Unsafe unsafe = (Unsafe) f.get(null);
+
+ int pageSize = unsafe.pageSize() * 10;
+
+ System.out.println(new TreeMap((Arrays.stream(boundary).parallel().map(i -> {
+ FileInputStream fileInputStream = null;
+ try {
+ fileInputStream = new FileInputStream(FILE);
+ FileChannel fileChannel = fileInputStream.getChannel();
+ HashMap<modifiedbytearray, MeasurementAggregator> resultmap = new HashMap<>(12000, 100);
+
+ ByteBuffer buffer = ByteBuffer.allocateDirect(pageSize);
+
+ fileChannel.position(i[0]);
+ int bytesReading = 0;
+ double num = 0;
+ int sign = 1;
+ boolean isNumber = false;
+ byte bi;
+ modifiedbytearray stationName = null;
+ int hascode = 1;
+ int ctr = 0;
+ byte[] arr = new byte[100];
+ int arrptr = 0;
+ int seglen = (int) (i[1] - i[0] + 1);
+ while (bytesReading < seglen) {
+ buffer.clear();
+ int bytesRead = fileChannel.read(buffer);
+ if ((bytesReading + bytesRead) <= seglen) {
+ if (bytesRead < 0) {
+ bytesRead = 0;
+ }
+ }
+ else {
+ bytesRead = (seglen - bytesReading);
+ }
+ buffer.flip();
+ int bytesptr = 0;
+ byte[] bufferArr = new byte[bytesRead];
+ buffer.get(bufferArr);
+ while (bytesptr < bytesRead) {
+ bytesReading += 1;
+ bi = bufferArr[bytesptr++];
+ if (ctr > 0) {
+ arr[arrptr++] = bi;
+ hascode = 31 * hascode + bi;
+ ctr--;
+ }
+ else {
+ if (bi >= 240) {
+ arr[arrptr++] = bi;
+ hascode = 31 * hascode + bi;
+ ctr = 3;
+ }
+ else if (bi >= 224) {
+ arr[arrptr++] = bi;
+ hascode = 31 * hascode + bi;
+ ctr = 2;
+ }
+ else if (bi >= 192) {
+ arr[arrptr++] = bi;
+ hascode = 31 * hascode + bi;
+ ctr = 1;
+ }
+ else if (bi == 59) {
+ isNumber = true;
+ stationName = new modifiedbytearray(arr, arrptr, hascode);
+ arr = new byte[100];
+ arrptr = 0;
+ hascode = 1;
+ }
+ else if (bi == 10) {
+ hascode = 1;
+ isNumber = false;
+ MeasurementAggregator agg = resultmap.get(stationName);
+ num *= sign;
+ if (agg == null) {
+ agg = new MeasurementAggregator();
+ agg.min = num;
+ agg.max = num;
+ agg.sum = (long) (num);
+ agg.count = 1;
+ resultmap.put(stationName, agg);
+ }
+ else {
+ if (agg.min >= num) {
+ agg.min = num;
+ }
+ if (agg.max <= num) {
+ agg.max = num;
+ }
+ agg.sum += (long) (num);
+ agg.count++;
+ }
+ num = 0;
+ sign = 1;
+ }
+ else {
+ hascode = 31 * hascode + bi;
+ if (isNumber) {
+ switch (bi) {
+ case 0x2E:
+ break;
+ case 0x2D:
+ sign = -1;
+ break;
+ default:
+ num = num * 10 + (bi - 0x30);
+ }
+ }
+ else {
+ arr[arrptr++] = bi;
+ }
+ }
+ }
+ }
+ }
+ /*
+ * while (bytesReading < (i[1] - i[0] + 1) && buffer.position() < buffer.limit()) {
+ * buffer.clear();
+ * bytesRead = fileChannel.read(buffer);
+ * buffer.flip();
+ * while (bytesReading <= (i[1] - i[0]) && buffer.position() < buffer.limit()) {
+ * bytesReading += 1;
+ * bi = buffer.get();
+ * String s;
+ * if (ctr > 0) {
+ * hascode = 31 * hascode + bi;
+ * ctr--;
+ * }
+ * else {
+ * if (bi >= 240) {
+ * ctr = 3;
+ * }
+ * else if (bi >= 224) {
+ * ctr = 2;
+ * }
+ * else if (bi >= 192) {
+ * ctr = 1;
+ * }
+ * else if (bi == 59) {
+ * isNumber = true;
+ * System.out.println(buffer);
+ * stationName = new modifiedbytearray(bbstart, buffer.position() - 1, hascode, buffer);
+ * hascode = 1;
+ * bbstart = buffer.position();
+ * }
+ * else if (bi == 10) {
+ * hascode = 1;
+ * isNumber = false;
+ * MeasurementAggregator agg = resultmap.get(stationName);
+ * if (agg == null) {
+ * agg = new MeasurementAggregator();
+ * agg.min = num * sign;
+ * agg.max = num * sign;
+ * agg.sum = (long) (num * sign);
+ * agg.count = 1;
+ * resultmap.put(stationName, agg);
+ * }
+ * else {
+ * agg.min = Math.min(agg.min, num * sign);
+ * agg.max = Math.max(agg.max, num * sign);
+ * agg.sum += (long) (num * sign);
+ * agg.count++;
+ * }
+ * num = 1;
+ * bbstart = buffer.position();
+ * }
+ * else {
+ * hascode = 31 * hascode + bi;
+ * if (isNumber) {
+ * switch (bi) {
+ * case 0x2E:
+ * break;
+ * case 0x2D:
+ * num = num * -1;
+ * break;
+ * default:
+ * num = num * 10 + (bi - 0x30);
+ * }
+ * }
+ * }
+ * }
+ * }
+ * }
+ */
+ return resultmap;
+ }
+ catch (Exception e) {
+ e.printStackTrace();
+ }
+ return null;
+ }).flatMap(e -> e.entrySet().stream()).collect(groupingBy(e -> e.getKey(), collector)))) {
+ @Override
+ public Object put(Object key, Object value) {
+ return super.put(((modifiedbytearray) key).getStationName(), value);
+ }
+ });
+
+ /*
+ * .map(a -> {
+ * return a.stream().parallel().collect(groupingBy(m -> m.station(), collector));
+ * }).flatMap(m -> m.entrySet()
+ * .stream()
+ */
+ // Get the FileChannel from the FileInputStream
+
+ // System.out.println("time taken:" + (System.nanoTime() - start) / 1000000);
+ // System.out.println(measurements);
+ }
+
+}
+
+class modifiedbytearray {
+ private int length;
+ private byte[] arr;
+ public int hashcode;
+
+ modifiedbytearray(byte[] arr, int length, int hashcode) {
+ this.arr = arr;
+ this.length = length;
+ this.hashcode = hashcode;
+ }
+
+ public String getStationName() {
+ return new String(this.getArr(), 0, length, StandardCharsets.UTF_8);
+ }
+
+ public byte[] getArr() {
+ return this.arr;
+ }
+
+ @Override
+ public String toString() {
+ return getStationName();
+ }
+
+ @Override
+ public boolean equals(Object obj) {
+ modifiedbytearray b = (modifiedbytearray) obj;
+ return Arrays.equals(this.getArr(), 0, length, b.arr, 0, b.length);
+ }
+
+ public int getHashcode() {
+ return hashcode;
+ }
+
+ @Override
+ public int hashCode() {
+ return hashcode;
+ }
+}