aboutsummaryrefslogtreecommitdiff
path: root/src/main
diff options
context:
space:
mode:
authorMahadev K <79390504+mahadev-k@users.noreply.github.com>2024-01-28 22:56:44 +0530
committerGitHub <noreply@github.com>2024-01-28 18:26:44 +0100
commitf598d74594a49ca2fa4c3a1a2b87434e155adb9e (patch)
tree1927439c5e18bf7d794bea0e71e0407b27c4e38e /src/main
parentf5bddafaf735c812c6df5887e3969b0151e26eda (diff)
Mahadev virtual thread 1brc (#611)
* Read file with multiple virtual threads and process chunks of file data in parallel. * Updated logic to bucket every chunk of aggs into a vector and merge them into a TreeMap for printing. * Virtual Thread / File Channels Impl. * Renamed files with GHUsername. * Added statement to get vals before updating. * Added executable permission to the files.
Diffstat (limited to 'src/main')
-rw-r--r--src/main/java/dev/morling/onebrc/CalculateAverage_mahadev_k.java152
1 files changed, 152 insertions, 0 deletions
diff --git a/src/main/java/dev/morling/onebrc/CalculateAverage_mahadev_k.java b/src/main/java/dev/morling/onebrc/CalculateAverage_mahadev_k.java
new file mode 100644
index 0000000..4d4ccd5
--- /dev/null
+++ b/src/main/java/dev/morling/onebrc/CalculateAverage_mahadev_k.java
@@ -0,0 +1,152 @@
+/*
+ * Copyright 2023 The original authors
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package dev.morling.onebrc;
+
+import java.io.FileDescriptor;
+import java.io.FileOutputStream;
+import java.io.IOException;
+import java.io.PrintStream;
+import java.io.RandomAccessFile;
+import java.io.UnsupportedEncodingException;
+import java.nio.ByteBuffer;
+import java.nio.charset.StandardCharsets;
+import java.util.Map;
+import java.util.StringTokenizer;
+import java.util.concurrent.ConcurrentSkipListMap;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ThreadFactory;
+
+public class CalculateAverage_mahadev_k {
+
+ private static final String FILE = "./measurements.txt";
+
+ private static Map<String, MeasurementAggregator> stationMap = new ConcurrentSkipListMap<>();
+
+ private static double round(double value) {
+ return Math.round(value * 10.0) / 10.0;
+ }
+
+ private static class MeasurementAggregator {
+ double minima = Double.POSITIVE_INFINITY, maxima = Double.NEGATIVE_INFINITY, total = 0, count = 0;
+
+ public synchronized void accept(double value) {
+ if (minima > value)
+ minima = value;
+ if (maxima < value)
+ maxima = value;
+ total += value;
+ count++;
+ }
+
+ public double min() {
+ return round(minima);
+ }
+
+ public double max() {
+ return round(maxima);
+ }
+
+ public double avg() {
+ return round((Math.round(total * 10.0) / 10.0) / count);
+ }
+ }
+
+ public static void main(String[] args) throws IOException {
+ int chunkSize = args.length == 1 ? Integer.parseInt(args[0]) : 1_000_000;
+ readAndProcess(chunkSize);
+ print();
+ }
+
+ public static void readAndProcess(int chunkSize) {
+ final ThreadFactory factory = Thread.ofVirtual().name("routine-", 0).factory();
+
+ try (RandomAccessFile file = new RandomAccessFile(FILE, "r")) {
+ try (var executor = Executors.newThreadPerTaskExecutor(factory)) {
+
+ var channel = file.getChannel();
+ var size = channel.size();
+ long start = 0;
+ while (start <= size) {
+ long end = start + chunkSize;
+ String letter = "";
+ do {
+ end--;
+ ByteBuffer buffer = ByteBuffer.allocate(1);
+ channel.read(buffer, end);
+ buffer.flip();
+ letter = StandardCharsets.UTF_8.decode(buffer).toString();
+ } while (!letter.equals("\n"));
+
+ if (end < start)
+ end = start + chunkSize;
+
+ final long currentStart = start;
+ final long currentEnd = end;
+ executor.submit(() -> {
+ ByteBuffer buffer = ByteBuffer.allocate((int) (currentEnd - currentStart + 1));
+ try {
+ channel.read(buffer, currentStart);
+ }
+ catch (IOException e) {
+ e.printStackTrace();
+ }
+ buffer.flip();
+ String data = StandardCharsets.UTF_8.decode(buffer).toString();
+ processData(data);
+ });
+ start = end + 1;
+ }
+ }
+
+ }
+ catch (IOException e) {
+ e.printStackTrace();
+ }
+ }
+
+ public static void processData(String dataBlock) {
+ StringTokenizer tokenizer = new StringTokenizer(dataBlock, "\n");
+ while (tokenizer.hasMoreElements()) {
+ StringTokenizer tokens = new StringTokenizer(tokenizer.nextToken(), ";");
+ String station = tokens.nextToken();
+ double value = Double.parseDouble(tokens.nextToken());
+ processMinMaxMean(station, value);
+ }
+ }
+
+ private static void processMinMaxMean(String station, double temp) {
+ var values = stationMap.get(station);
+ if (values == null) {
+ values = new MeasurementAggregator();
+ stationMap.putIfAbsent(station, values);
+ }
+ values = stationMap.get(station);
+ values.accept(temp);
+ }
+
+ public static void print() throws UnsupportedEncodingException {
+ System.setOut(new PrintStream(new FileOutputStream(FileDescriptor.out), true, StandardCharsets.UTF_8));
+ System.out.print("{");
+ int i = stationMap.size();
+ for (var kv : stationMap.entrySet()) {
+ System.out.printf("%s=%s/%s/%s", kv.getKey(), kv.getValue().min(), kv.getValue().avg(), kv.getValue().max());
+ if (i > 1)
+ System.out.print(", ");
+ i--;
+ }
+ System.out.println("}");
+ }
+} \ No newline at end of file