1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
|
/*
* Copyright 2023 The original authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package dev.morling.onebrc;
import java.io.IOException;
import java.lang.foreign.Arena;
import java.lang.reflect.Field;
import java.nio.ByteOrder;
import java.nio.channels.FileChannel;
import java.nio.file.Path;
import java.nio.file.StandardOpenOption;
import java.util.Arrays;
import java.util.Objects;
import java.util.TreeMap;
import java.util.stream.Collectors;
import java.util.stream.IntStream;
import java.util.stream.Stream;
import sun.misc.Unsafe;
/**
* Changelog:
*
* Initial submission: 62000 ms
* Chunked reader: 16000 ms
* Optimized parser: 13000 ms
* Branchless methods: 11000 ms
* Adding memory mapped files: 6500 ms (based on bjhara's submission)
* Skipping string creation: 4700 ms
* Custom hashmap... 4200 ms
* Added SWAR token checks: 3900 ms
* Skipped String creation: 3500 ms (idea from kgonia)
* Improved String skip: 3250 ms
* Segmenting files: 3150 ms (based on spullara's code)
* Not using SWAR for EOL: 2850 ms
* Inlining hash calculation: 2450 ms
* Replacing branchless code: 2200 ms (sometimes we need to kill the things we love)
* Added unsafe memory access: 1900 ms (keeping the long[] small and local)
*
* Best performing JVM on MacBook M2 Pro: 21.0.1-graal
* `sdk use java 21.0.1-graal`
*
*/
public class CalculateAverage_royvanrijn {
private static final String FILE = "./measurements.txt";
private static final Unsafe UNSAFE = initUnsafe();
private static final boolean isBigEndian = ByteOrder.nativeOrder().equals(ByteOrder.BIG_ENDIAN);
private static Unsafe initUnsafe() {
try {
Field theUnsafe = Unsafe.class.getDeclaredField("theUnsafe");
theUnsafe.setAccessible(true);
return (Unsafe) theUnsafe.get(Unsafe.class);
}
catch (NoSuchFieldException | IllegalAccessException e) {
throw new RuntimeException(e);
}
}
public static void main(String[] args) throws Exception {
new CalculateAverage_royvanrijn().run();
}
public void run() throws Exception {
// Calculate input segments.
int numberOfChunks = Runtime.getRuntime().availableProcessors();
long[] chunks = getSegments(numberOfChunks);
// Parallel processing of segments.
TreeMap<String, Measurement> results = IntStream.range(0, chunks.length - 1)
.mapToObj(chunkIndex -> process(chunks[chunkIndex], chunks[chunkIndex + 1])).parallel()
.flatMap(MeasurementRepository::get)
.collect(Collectors.toMap(e -> e.city, MeasurementRepository.Entry::measurement, Measurement::updateWith, TreeMap::new));
System.out.println(results);
}
private static long[] getSegments(int numberOfChunks) throws IOException {
try (var fileChannel = FileChannel.open(Path.of(FILE), StandardOpenOption.READ)) {
long fileSize = fileChannel.size();
long segmentSize = (fileSize + numberOfChunks - 1) / numberOfChunks;
long[] chunks = new long[numberOfChunks + 1];
long mappedAddress = fileChannel.map(FileChannel.MapMode.READ_ONLY, 0, fileSize, Arena.global()).address();
chunks[0] = mappedAddress;
long endAddress = mappedAddress + fileSize;
for (int i = 1; i < numberOfChunks; ++i) {
long chunkAddress = mappedAddress + i * segmentSize;
// Align to first row start.
while (chunkAddress < endAddress && UNSAFE.getByte(chunkAddress++) != '\n') {
// nop
}
chunks[i] = Math.min(chunkAddress, endAddress);
}
chunks[numberOfChunks] = endAddress;
return chunks;
}
}
private MeasurementRepository process(long fromAddress, long toAddress) {
MeasurementRepository repository = new MeasurementRepository();
long ptr = fromAddress;
long[] dataBuffer = new long[16];
while ((ptr = processEntity(dataBuffer, ptr, toAddress, repository)) < toAddress)
;
return repository;
}
private static final long SEPARATOR_PATTERN = compilePattern((byte) ';');
/**
* Already looping the longs here, lets shoehorn in making a hash
*/
private long processEntity(final long[] data, final long start, final long limit, final MeasurementRepository measurementRepository) {
int hash = 1;
long i;
int dataPtr = 0;
for (i = start; i <= limit - 8; i += 8) {
long word = UNSAFE.getLong(i);
if (isBigEndian) {
word = Long.reverseBytes(word); // Reversing the bytes is the cheapest way to do this
}
final long match = word ^ SEPARATOR_PATTERN;
long mask = ((match - 0x0101010101010101L) & ~match) & 0x8080808080808080L;
if (mask != 0) {
final long partialWord = word & ((mask >> 7) - 1);
hash = longHashStep(hash, partialWord);
data[dataPtr] = partialWord;
final int index = Long.numberOfTrailingZeros(mask) >> 3;
return process(start, i + index, hash, data, measurementRepository);
}
data[dataPtr++] = word;
hash = longHashStep(hash, word);
}
// Handle remaining bytes near the limit of the buffer:
long partialWord = 0;
int len = 0;
for (; i < limit; i++) {
byte read;
if ((read = UNSAFE.getByte(i)) == ';') {
hash = longHashStep(hash, partialWord);
data[dataPtr] = partialWord;
return process(start, i, hash, data, measurementRepository);
}
partialWord = partialWord | ((long) read << (len << 3));
len++;
}
return limit;
}
private static final long DOT_BITS = 0x10101000;
private static final long MAGIC_MULTIPLIER = (100 * 0x1000000 + 10 * 0x10000 + 1);
private long process(final long startAddress, final long delimiterAddress, final int hash, final long[] data, final MeasurementRepository measurementRepository) {
long word = UNSAFE.getLong(delimiterAddress + 1);
if (isBigEndian) {
word = Long.reverseBytes(word);
}
final long invWord = ~word;
final int decimalSepPos = Long.numberOfTrailingZeros(invWord & DOT_BITS);
final long signed = (invWord << 59) >> 63;
final long designMask = ~(signed & 0xFF);
final long digits = ((word & designMask) << (28 - decimalSepPos)) & 0x0F000F0F00L;
final long absValue = ((digits * MAGIC_MULTIPLIER) >>> 32) & 0x3FF;
final int measurement = (int) ((absValue ^ signed) - signed);
// Store:
measurementRepository.update(startAddress, data, (int) (delimiterAddress - startAddress), hash, measurement);
return delimiterAddress + (decimalSepPos >> 3) + 4; // Determine next start:
// return nextAddress;
}
static final class Measurement {
int min, max, count;
long sum;
public Measurement() {
this.min = 1000;
this.max = -1000;
}
public Measurement updateWith(int measurement) {
min = min(min, measurement);
max = max(max, measurement);
sum += measurement;
count++;
return this;
}
public Measurement updateWith(Measurement measurement) {
min = min(min, measurement.min);
max = max(max, measurement.max);
sum += measurement.sum;
count += measurement.count;
return this;
}
public String toString() {
return round(min) + "/" + round((1.0 * sum) / count) + "/" + round(max);
}
private double round(double value) {
return Math.round(value) / 10.0;
}
}
// branchless max (unprecise for large numbers, but good enough)
static int max(final int a, final int b) {
final int diff = a - b;
final int dsgn = diff >> 31;
return a - (diff & dsgn);
}
// branchless min (unprecise for large numbers, but good enough)
static int min(final int a, final int b) {
final int diff = a - b;
final int dsgn = diff >> 31;
return b + (diff & dsgn);
}
private static int longHashStep(final int hash, final long word) {
return 31 * hash + (int) (word ^ (word >>> 32));
}
private static long compilePattern(final byte value) {
return ((long) value << 56) | ((long) value << 48) | ((long) value << 40) | ((long) value << 32) |
((long) value << 24) | ((long) value << 16) | ((long) value << 8) | (long) value;
}
/**
* A normal Java HashMap does all these safety things like boundary checks... we don't need that, we need speeeed.
*
* So I've written an extremely simple linear probing hashmap that should work well enough.
*/
class MeasurementRepository {
private int tableSize = 1 << 20; // large enough for the contest.
private int tableMask = (tableSize - 1);
private MeasurementRepository.Entry[] table = new MeasurementRepository.Entry[tableSize];
record Entry(long address, long[] data, int length, int hash, String city, Measurement measurement) {
@Override
public String toString() {
return city + "=" + measurement;
}
}
public void update(long address, long[] data, int length, int hash, int temperature) {
int dataLength = length >> 3;
int index = hash & tableMask;
MeasurementRepository.Entry tableEntry;
while ((tableEntry = table[index]) != null
&& (tableEntry.hash != hash || tableEntry.length != length || !arrayEquals(tableEntry.data, data, dataLength))) { // search for the right spot
index = (index + 1) & tableMask;
}
if (tableEntry != null) {
tableEntry.measurement.updateWith(temperature);
return;
}
// --- This is a brand new entry, insert into the hashtable and do the extra calculations (once!) do slower calculations here.
Measurement measurement = new Measurement();
byte[] bytes = new byte[length];
for (int i = 0; i < length; i++) {
bytes[i] = UNSAFE.getByte(address + i);
}
String city = new String(bytes);
long[] dataCopy = new long[dataLength];
System.arraycopy(data, 0, dataCopy, 0, dataLength);
// And add entry:
MeasurementRepository.Entry toAdd = new MeasurementRepository.Entry(address, dataCopy, length, hash, city, measurement);
table[index] = toAdd;
toAdd.measurement.updateWith(temperature);
}
public Stream<MeasurementRepository.Entry> get() {
return Arrays.stream(table).filter(Objects::nonNull);
}
}
/**
* For case multiple hashes are equal (however unlikely) check the actual key (using longs)
*/
private boolean arrayEquals(final long[] a, final long[] b, final int length) {
for (int i = 0; i < length; i++) {
if (a[i] != b[i])
return false;
}
return true;
}
}
|