1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
|
/*
* Copyright 2023 The original authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package dev.morling.onebrc;
import java.nio.ByteBuffer;
import java.nio.channels.FileChannel;
import java.nio.charset.StandardCharsets;
import java.nio.file.*;
import java.util.*;
import java.util.stream.*;
/*
* # Main Speed Drivers
*
* Changes were made in this order, each header includes the runtime before and after the change,
* and whose implementation (if any) was used as a reference.
*
* ## Parallel Process Chunks [160.5 -> 18] [twobiers]
*
* Rather than reading data top to bottom and attempting to parallelize processing with batches
* of the parsed data, we read chunks of data (about 1 MB) and parrallelize processing per chunk.
*
* Several implementations do this kind of processing using a FileChannel to map chunks to buffers,
* the reference above gave the idea to use an iterator.
*
* ## Share Byte Array when Deserializing [18 -> 6.5] [Various]
*
* When deserializing names after going through the effort of processing one byte at a time
* when processing a chunk of data we can re-use a single byte array to store the characters
* that make up the name. This removes the need to allocate and de-allocate memory for the buffer.
*
* We can then use the new String(byte[], 0, length) constructor to create the String without
* worrying about clearing the underlying byte array as we provide a length.
*
* For this one I did not use any particular implementation as a reference but have seen it in many.
*
* ## Store ints Compute Doubles at End [6.5 -> 6.2] [None]
*
* Since input has a single decimal only we can effectively ignore it, do all of our math with the
* numbers as integers, then only when printing out divide by 10.0 to get the correct values.
*
* The impact of this is small, maybe even nothing in this implementation, but keeping it in place.
*
* ## Use graal [6.2 -> 5.3] [None]
*
* Change from 21.0.1-tem to 21.0.1-graal.
*
* ## Process ByteBuffer for Name then Value [5.3 -> 4.7] [None]
*
* This started as a refactor and turned out to have noticeable runtime impact, which is nice.
*
* Rather than processing the ByteBuffer in a single while (current != '\n') with a condition
* to switch from getting the name to calculating the integer value on (current == ';') the
* logic was split into 2 separate loops.
*
* The first, while (current != ';') and a second, while (current != '\n').
*
* # For my Own Reference
*
* ## Constraints
*
* - Station name: non null UTF-8 string of length [1, 100] bytes
* - Temperature value: non null double [-99.9, 99.9] with one fractional digit
* - Station names: maximum of 10,000 unique names
*
* ## Run Commands
*
* ./mvnw clean verify && ./test.sh MeanderingProgrammer
*
* ./mvnw clean verify && ./calculate_average_MeanderingProgrammer.sh
*
* ## Runtimes
*
* Baseline: 2:40.597
* Current: 0:04.668
*/
public class CalculateAverage_MeanderingProgrammer {
private static final String FILE = "./measurements.txt";
private static class ChunkReader implements Iterator<ByteBuffer> {
private static final long CHUNK_SIZE = 1_024 * 1_024;
private final FileChannel channel;
private final long size;
private long read;
public ChunkReader(Path path) throws Exception {
this.channel = FileChannel.open(path, StandardOpenOption.READ);
this.size = this.channel.size();
this.read = 0;
}
public long estimateIterations() {
return this.size / CHUNK_SIZE;
}
@Override
public boolean hasNext() {
return this.nextChunkSize() > 0;
}
@Override
public ByteBuffer next() {
ByteBuffer buffer = null;
try {
buffer = this.channel.map(FileChannel.MapMode.READ_ONLY, this.read, this.nextChunkSize());
}
catch (Exception e) {
throw new RuntimeException(e);
}
// Logic to clamp buffer to last complete line
int bufferSize = buffer.limit();
while (buffer.get(bufferSize - 1) != '\n') {
bufferSize--;
}
buffer.limit(bufferSize);
this.read += bufferSize;
return buffer;
}
private long nextChunkSize() {
return Math.min(CHUNK_SIZE, this.size - this.read);
}
}
private static record Row(String name, int value) {
}
private static class RowReader implements Iterator<Row> {
private final ByteBuffer buffer;
private final byte[] nameBuffer;
public RowReader(ByteBuffer buffer) {
this.buffer = buffer;
this.nameBuffer = new byte[100];
}
@Override
public boolean hasNext() {
return this.buffer.hasRemaining();
}
@Override
public Row next() {
var index = 0;
var current = buffer.get();
while (current != ';') {
this.nameBuffer[index] = current;
index++;
current = buffer.get();
}
var name = new String(this.nameBuffer, 0, index, StandardCharsets.UTF_8);
var negative = false;
var value = 0;
current = buffer.get();
while (current != '\n') {
if (current == '-') {
negative = true;
}
else if (current != '.') {
value = (value * 10) + (current - '0');
}
current = buffer.get();
}
if (negative) {
value *= -1;
}
return new Row(name, value);
}
}
private static class Measurement {
private int min;
private int max;
private long sum;
private int count;
public Measurement(int value) {
this.min = value;
this.max = value;
this.sum = value;
this.count = 1;
}
public Measurement merge(Measurement other) {
if (other.min < this.min) {
this.min = other.min;
}
if (other.max > this.max) {
this.max = other.max;
}
this.sum += other.sum;
this.count += other.count;
return this;
}
@Override
public String toString() {
return String.format(
"%.1f/%.1f/%.1f",
this.min / 10.0,
(this.sum / 10.0) / this.count,
this.max / 10.0);
}
}
public static void main(String[] args) throws Exception {
run();
}
private static void run() throws Exception {
var reader = new ChunkReader(Paths.get(FILE));
var iterator = Spliterators.spliterator(reader, reader.estimateIterations(), Spliterator.IMMUTABLE);
var measurements = StreamSupport.stream(iterator, true)
.flatMap(buffer -> toMeasurements(buffer).entrySet().stream())
.collect(Collectors.toConcurrentMap(
entry -> entry.getKey(),
entry -> entry.getValue(),
Measurement::merge));
System.out.println(new TreeMap<>(measurements));
}
private static Map<String, Measurement> toMeasurements(ByteBuffer buffer) {
var iterator = Spliterators.spliteratorUnknownSize(new RowReader(buffer), Spliterator.IMMUTABLE);
return StreamSupport.stream(iterator, false)
.collect(Collectors.toMap(
row -> row.name(),
row -> new Measurement(row.value()),
Measurement::merge));
}
}
|