Baseline
$ time cat measurements.txt >/dev/null
0m1.052s
$ time wc measurements.txt
1000000000 1179173106 13795293380 measurements.txt
0m55.710s
$ time gawk -b -f 1brc.awk measurements.txt >measurements.out
7m35.567s
BEGIN {
FS = ";"
}
{
if (counts[$1]++) {
mins[$1] = $2 < mins[$1] ? $2 : mins[$1]
maxs[$1] = $2 > maxs[$1] ? $2 : maxs[$1]
} else {
mins[$1] = maxs[$1] = $2 # new entry
}
sums[$1] += $2
}
END {
printf "{"
n = asorti(mins, sorted)
for (i = 1; i <= n; i++) {
station = sorted[i]
min = mins[station]
max = maxs[station]
mean = sums[station] / counts[station]
printf "%s=%.1f/%.1f/%.1f", station, min, mean, max
if (i < n) {
printf ", "
}
}
printf "}\n"
}
Solution 1: simple and idiomatic Go
// r1: simple, idiomatic Go using bufio.Scanner and strconv.ParseFloat
//
// ~1.004s for 10M rows
package main
import (
"bufio"
"fmt"
"io"
"os"
"sort"
"strconv"
"strings"
)
func r1(inputPath string, output io.Writer) error {
type stats struct {
min, max, sum float64
count int64
}
f, err := os.Open(inputPath)
if err != nil {
return err
}
defer f.Close()
stationStats := make(map[string]stats)
scanner := bufio.NewScanner(f)
for scanner.Scan() {
line := scanner.Text()
station, tempStr, hasSemi := strings.Cut(line, ";")
if !hasSemi {
continue
}
temp, err := strconv.ParseFloat(tempStr, 64)
if err != nil {
return err
}
s, ok := stationStats[station]
if !ok {
s.min = temp
s.max = temp
s.sum = temp
s.count = 1
} else {
s.min = min(s.min, temp)
s.max = max(s.max, temp)
s.sum += temp
s.count++
}
stationStats[station] = s
}
stations := make([]string, 0, len(stationStats))
for station := range stationStats {
stations = append(stations, station)
}
sort.Strings(stations)
fmt.Fprint(output, "{")
for i, station := range stations {
if i > 0 {
fmt.Fprint(output, ", ")
}
s := stationStats[station]
mean := s.sum / float64(s.count)
fmt.Fprintf(output, "%s=%.1f/%.1f/%.1f", station, s.min, mean, s.max)
}
fmt.Fprint(output, "}\n")
return nil
}
Solution 2: map with pointer values
// r2: use stats pointer as map value to avoid double hashing
//
// ~921ms for 10M rows (1.09x as fast as r1)
package main
import (
"bufio"
"fmt"
"io"
"os"
"sort"
"strconv"
"strings"
)
func r2(inputPath string, output io.Writer) error {
type stats struct {
min, max, sum float64
count int64
}
f, err := os.Open(inputPath)
if err != nil {
return err
}
defer f.Close()
stationStats := make(map[string]*stats)
scanner := bufio.NewScanner(f)
for scanner.Scan() {
line := scanner.Text()
station, tempStr, hasSemi := strings.Cut(line, ";")
if !hasSemi {
continue
}
temp, err := strconv.ParseFloat(tempStr, 64)
if err != nil {
return err
}
s := stationStats[station]
if s == nil {
stationStats[station] = &stats{
min: temp,
max: temp,
sum: temp,
count: 1,
}
} else {
s.min = min(s.min, temp)
s.max = max(s.max, temp)
s.sum += temp
s.count++
}
}
stations := make([]string, 0, len(stationStats))
for station := range stationStats {
stations = append(stations, station)
}
sort.Strings(stations)
fmt.Fprint(output, "{")
for i, station := range stations {
if i > 0 {
fmt.Fprint(output, ", ")
}
s := stationStats[station]
mean := s.sum / float64(s.count)
fmt.Fprintf(output, "%s=%.1f/%.1f/%.1f", station, s.min, mean, s.max)
}
fmt.Fprint(output, "}\n")
return nil
}
Solution 3: avoid strconv.ParseFloat
// r3: parse temperatures manually instead of using strconv.ParseFloat
//
// ~517ms for 10M rows (1.94x as fast as r1)
package main
import (
"bufio"
"bytes"
"fmt"
"io"
"os"
"sort"
)
func r3(inputPath string, output io.Writer) error {
type stats struct {
min, max, sum float64
count int64
}
f, err := os.Open(inputPath)
if err != nil {
return err
}
defer f.Close()
stationStats := make(map[string]*stats)
scanner := bufio.NewScanner(f)
for scanner.Scan() {
line := scanner.Bytes()
station, tempBytes, hasSemi := bytes.Cut(line, []byte(";"))
if !hasSemi {
continue
}
negative := false
index := 0
if tempBytes[index] == '-' {
index++
negative = true
}
temp := float64(tempBytes[index] - '0')
index++
if tempBytes[index] != '.' {
temp = temp*10 + float64(tempBytes[index]-'0')
index++
}
index++ // skip '.'
temp += float64(tempBytes[index]-'0') / 10
if negative {
temp = -temp
}
s := stationStats[string(station)]
if s == nil {
stationStats[string(station)] = &stats{
min: temp,
max: temp,
sum: temp,
count: 1,
}
} else {
s.min = min(s.min, temp)
s.max = max(s.max, temp)
s.sum += temp
s.count++
}
}
stations := make([]string, 0, len(stationStats))
for station := range stationStats {
stations = append(stations, station)
}
sort.Strings(stations)
fmt.Fprint(output, "{")
for i, station := range stations {
if i > 0 {
fmt.Fprint(output, ", ")
}
s := stationStats[station]
mean := s.sum / float64(s.count)
fmt.Fprintf(output, "%s=%.1f/%.1f/%.1f", station, s.min, mean, s.max)
}
fmt.Fprint(output, "}\n")
return nil
}
Solution 4: fixed point integers
// r4: use fixed point int32s (*10) instead of float64s
//
// ~491ms for 10M rows (2.04x as fast as r1)
package main
import (
"bufio"
"bytes"
"fmt"
"io"
"os"
"sort"
)
func r4(inputPath string, output io.Writer) error {
type stats struct {
min, max, count int32
sum int64
}
f, err := os.Open(inputPath)
if err != nil {
return err
}
defer f.Close()
stationStats := make(map[string]*stats)
scanner := bufio.NewScanner(f)
for scanner.Scan() {
line := scanner.Bytes()
station, tempBytes, hasSemi := bytes.Cut(line, []byte(";"))
if !hasSemi {
continue
}
negative := false
index := 0
if tempBytes[index] == '-' {
index++
negative = true
}
temp := int32(tempBytes[index] - '0')
index++
if tempBytes[index] != '.' {
temp = temp*10 + int32(tempBytes[index]-'0')
index++
}
index++ // skip '.'
temp = temp*10 + int32(tempBytes[index]-'0')
if negative {
temp = -temp
}
s := stationStats[string(station)]
if s == nil {
stationStats[string(station)] = &stats{
min: temp,
max: temp,
sum: int64(temp),
count: 1,
}
} else {
s.min = min(s.min, temp)
s.max = max(s.max, temp)
s.sum += int64(temp)
s.count++
}
}
stations := make([]string, 0, len(stationStats))
for station := range stationStats {
stations = append(stations, station)
}
sort.Strings(stations)
fmt.Fprint(output, "{")
for i, station := range stations {
if i > 0 {
fmt.Fprint(output, ", ")
}
s := stationStats[station]
mean := float64(s.sum) / float64(s.count) / 10
fmt.Fprintf(output, "%s=%.1f/%.1f/%.1f", station, float64(s.min)/10, mean, float64(s.max)/10)
}
fmt.Fprint(output, "}\n")
return nil
}
Solution 5: avoid bytes.Cut
// r5: avoid bytes.Cut
//
// ~442ms for 10M rows (2.27x as fast as r1)
package main
import (
"bufio"
"fmt"
"io"
"os"
"sort"
)
func r5(inputPath string, output io.Writer) error {
type stats struct {
min, max, count int32
sum int64
}
f, err := os.Open(inputPath)
if err != nil {
return err
}
defer f.Close()
stationStats := make(map[string]*stats)
scanner := bufio.NewScanner(f)
for scanner.Scan() {
line := scanner.Bytes()
end := len(line)
tenths := int32(line[end-1] - '0')
ones := int32(line[end-3] - '0') // line[end-2] is '.'
var temp int32
var semicolon int
if line[end-4] == ';' {
temp = ones*10 + tenths
semicolon = end - 4
} else if line[end-4] == '-' {
temp = -(ones*10 + tenths)
semicolon = end - 5
} else {
tens := int32(line[end-4] - '0')
if line[end-5] == ';' {
temp = tens*100 + ones*10 + tenths
semicolon = end - 5
} else { // '-'
temp = -(tens*100 + ones*10 + tenths)
semicolon = end - 6
}
}
station := line[:semicolon]
s := stationStats[string(station)]
if s == nil {
stationStats[string(station)] = &stats{
min: temp,
max: temp,
sum: int64(temp),
count: 1,
}
} else {
s.min = min(s.min, temp)
s.max = max(s.max, temp)
s.sum += int64(temp)
s.count++
}
}
stations := make([]string, 0, len(stationStats))
for station := range stationStats {
stations = append(stations, station)
}
sort.Strings(stations)
fmt.Fprint(output, "{")
for i, station := range stations {
if i > 0 {
fmt.Fprint(output, ", ")
}
s := stationStats[station]
mean := float64(s.sum) / float64(s.count) / 10
fmt.Fprintf(output, "%s=%.1f/%.1f/%.1f", station, float64(s.min)/10, mean, float64(s.max)/10)
}
fmt.Fprint(output, "}\n")
return nil
}
Solution 6: avoid bufio.Scanner
// r6: don't use bufio.Scanner to avoid scanning some bytes twice
//
// ~399ms for 10M rows (2.52x as fast as r1)
package main
import (
"bytes"
"fmt"
"io"
"os"
"sort"
)
func r6(inputPath string, output io.Writer) error {
type stats struct {
min, max, count int32
sum int64
}
f, err := os.Open(inputPath)
if err != nil {
return err
}
defer f.Close()
stationStats := make(map[string]*stats)
buf := make([]byte, 1024*1024)
readStart := 0
for {
n, err := f.Read(buf[readStart:])
if err != nil && err != io.EOF {
return err
}
if readStart+n == 0 {
break
}
chunk := buf[:readStart+n]
newline := bytes.LastIndexByte(chunk, '\n')
if newline < 0 {
break
}
remaining := chunk[newline+1:]
chunk = chunk[:newline+1]
for {
station, after, hasSemi := bytes.Cut(chunk, []byte(";"))
if !hasSemi {
break
}
index := 0
negative := false
if after[index] == '-' {
negative = true
index++
}
temp := int32(after[index] - '0')
index++
if after[index] != '.' {
temp = temp*10 + int32(after[index]-'0')
index++
}
index++ // skip '.'
temp = temp*10 + int32(after[index]-'0')
index += 2 // skip last digit and '\n'
if negative {
temp = -temp
}
chunk = after[index:]
s := stationStats[string(station)]
if s == nil {
stationStats[string(station)] = &stats{
min: temp,
max: temp,
sum: int64(temp),
count: 1,
}
} else {
s.min = min(s.min, temp)
s.max = max(s.max, temp)
s.sum += int64(temp)
s.count++
}
}
readStart = copy(buf, remaining)
}
stations := make([]string, 0, len(stationStats))
for station := range stationStats {
stations = append(stations, station)
}
sort.Strings(stations)
fmt.Fprint(output, "{")
for i, station := range stations {
if i > 0 {
fmt.Fprint(output, ", ")
}
s := stationStats[station]
mean := float64(s.sum) / float64(s.count) / 10
fmt.Fprintf(output, "%s=%.1f/%.1f/%.1f", station, float64(s.min)/10, mean, float64(s.max)/10)
}
fmt.Fprint(output, "}\n")
return nil
}
Solution 7: custom hash table
// r7: use custom hash table and hash station name as we look for ';'
//
// ~234ms for 10M rows (4.29x as fast as r1)
package main
import (
"bytes"
"fmt"
"io"
"os"
"sort"
)
func r7(inputPath string, output io.Writer) error {
type stats struct {
min, max, count int32
sum int64
}
f, err := os.Open(inputPath)
if err != nil {
return err
}
defer f.Close()
type item struct {
key []byte
stat *stats
}
const numBuckets = 1 << 17 // number of hash buckets (power of 2)
items := make([]item, numBuckets) // hash buckets, linearly probed
size := 0 // number of active items in items slice
buf := make([]byte, 1024*1024)
readStart := 0
for {
n, err := f.Read(buf[readStart:])
if err != nil && err != io.EOF {
return err
}
if readStart+n == 0 {
break
}
chunk := buf[:readStart+n]
newline := bytes.LastIndexByte(chunk, '\n')
if newline < 0 {
break
}
remaining := chunk[newline+1:]
chunk = chunk[:newline+1]
for {
const (
// FNV-1 64-bit constants from hash/fnv.
offset64 = 14695981039346656037
prime64 = 1099511628211
)
var station, after []byte
hash := uint64(offset64)
i := 0
for ; i < len(chunk); i++ {
c := chunk[i]
if c == ';' {
station = chunk[:i]
after = chunk[i+1:]
break
}
hash ^= uint64(c) // FNV-1a is XOR then *
hash *= prime64
}
if i == len(chunk) {
break
}
index := 0
negative := false
if after[index] == '-' {
negative = true
index++
}
temp := int32(after[index] - '0')
index++
if after[index] != '.' {
temp = temp*10 + int32(after[index]-'0')
index++
}
index++ // skip '.'
temp = temp*10 + int32(after[index]-'0')
index += 2 // skip last digit and '\n'
if negative {
temp = -temp
}
chunk = after[index:]
hashIndex := int(hash & uint64(numBuckets-1))
for {
if items[hashIndex].key == nil {
// Found empty slot, add new item (copying key).
key := make([]byte, len(station))
copy(key, station)
items[hashIndex] = item{
key: key,
stat: &stats{
min: temp,
max: temp,
sum: int64(temp),
count: 1,
},
}
size++
if size > numBuckets/2 {
panic("too many items in hash table")
}
break
}
if bytes.Equal(items[hashIndex].key, station) {
// Found matching slot, add to existing stats.
s := items[hashIndex].stat
s.min = min(s.min, temp)
s.max = max(s.max, temp)
s.sum += int64(temp)
s.count++
break
}
// Slot already holds another key, try next slot (linear probe).
hashIndex++
if hashIndex >= numBuckets {
hashIndex = 0
}
}
}
readStart = copy(buf, remaining)
}
stationItems := make([]item, 0, size)
for _, item := range items {
if item.key == nil {
continue
}
stationItems = append(stationItems, item)
}
sort.Slice(stationItems, func(i, j int) bool {
return string(stationItems[i].key) < string(stationItems[j].key)
})
fmt.Fprint(output, "{")
for i, item := range stationItems {
if i > 0 {
fmt.Fprint(output, ", ")
}
s := item.stat
mean := float64(s.sum) / float64(s.count) / 10
fmt.Fprintf(output, "%s=%.1f/%.1f/%.1f", item.key, float64(s.min)/10, mean, float64(s.max)/10)
}
fmt.Fprint(output, "}\n")
return nil
}
Solution 8: process chunks in parallel
// r8: add some parallelism (but back to non-optimized r1 version)
//
// ~213ms for 10M rows (4.71x as fast as r1)
package main
import (
"bufio"
"bytes"
"fmt"
"io"
"os"
"sort"
"strconv"
"strings"
)
type r8Stats struct {
min, max, sum float64
count int64
}
func r8(inputPath string, output io.Writer) error {
parts, err := splitFile(inputPath, maxGoroutines)
if err != nil {
return err
}
resultsCh := make(chan map[string]r8Stats)
for _, part := range parts {
go r8ProcessPart(inputPath, part.offset, part.size, resultsCh)
}
totals := make(map[string]r8Stats)
for i := 0; i < len(parts); i++ {
result := <-resultsCh
for station, s := range result {
ts, ok := totals[station]
if !ok {
totals[station] = r8Stats{
min: s.min,
max: s.max,
sum: s.sum,
count: s.count,
}
continue
}
ts.min = min(ts.min, s.min)
ts.max = max(ts.max, s.max)
ts.sum += s.sum
ts.count += s.count
totals[station] = ts
}
}
stations := make([]string, 0, len(totals))
for station := range totals {
stations = append(stations, station)
}
sort.Strings(stations)
fmt.Fprint(output, "{")
for i, station := range stations {
if i > 0 {
fmt.Fprint(output, ", ")
}
s := totals[station]
mean := s.sum / float64(s.count)
fmt.Fprintf(output, "%s=%.1f/%.1f/%.1f", station, s.min, mean, s.max)
}
fmt.Fprint(output, "}\n")
return nil
}
func r8ProcessPart(inputPath string, fileOffset, fileSize int64, resultsCh chan map[string]r8Stats) {
file, err := os.Open(inputPath)
if err != nil {
panic(err)
}
defer file.Close()
_, err = file.Seek(fileOffset, io.SeekStart)
if err != nil {
panic(err)
}
f := io.LimitedReader{R: file, N: fileSize}
stationStats := make(map[string]r8Stats)
scanner := bufio.NewScanner(&f)
for scanner.Scan() {
line := scanner.Text()
station, tempStr, hasSemi := strings.Cut(line, ";")
if !hasSemi {
continue
}
temp, err := strconv.ParseFloat(tempStr, 64)
if err != nil {
panic(err)
}
s, ok := stationStats[station]
if !ok {
s.min = temp
s.max = temp
s.sum = temp
s.count = 1
} else {
s.min = min(s.min, temp)
s.max = max(s.max, temp)
s.sum += temp
s.count++
}
stationStats[station] = s
}
resultsCh <- stationStats
}
type part struct {
offset, size int64
}
func splitFile(inputPath string, numParts int) ([]part, error) {
const maxLineLength = 100
f, err := os.Open(inputPath)
if err != nil {
return nil, err
}
st, err := f.Stat()
if err != nil {
return nil, err
}
size := st.Size()
splitSize := size / int64(numParts)
buf := make([]byte, maxLineLength)
parts := make([]part, 0, numParts)
offset := int64(0)
for i := 0; i < numParts; i++ {
if i == numParts-1 {
if offset < size {
parts = append(parts, part{offset, size - offset})
}
break
}
seekOffset := max(offset+splitSize-maxLineLength, 0)
_, err := f.Seek(seekOffset, io.SeekStart)
if err != nil {
return nil, err
}
n, _ := io.ReadFull(f, buf)
chunk := buf[:n]
newline := bytes.LastIndexByte(chunk, '\n')
if newline < 0 {
return nil, fmt.Errorf("newline not found at offset %d", offset+splitSize-maxLineLength)
}
remaining := len(chunk) - newline - 1
nextOffset := seekOffset + int64(len(chunk)) - int64(remaining)
parts = append(parts, part{offset, nextOffset - offset})
offset = nextOffset
}
return parts, nil
}
Solution 9: all optimisations plus parallelisation
// r9: all the previous optimizations plus parallel execution
//
// 43ms for 10M rows (23.3x as fast as r1)
package main
import (
"bytes"
"fmt"
"io"
"os"
"sort"
)
type r9Stats struct {
min, max, count int32
sum int64
}
func r9(inputPath string, output io.Writer) error {
parts, err := splitFile(inputPath, maxGoroutines)
if err != nil {
return err
}
resultsCh := make(chan map[string]*r9Stats)
for _, part := range parts {
go r9ProcessPart(inputPath, part.offset, part.size, resultsCh)
}
totals := make(map[string]*r9Stats)
for i := 0; i < len(parts); i++ {
result := <-resultsCh
for station, s := range result {
ts := totals[station]
if ts == nil {
totals[station] = s
continue
}
ts.min = min(ts.min, s.min)
ts.max = max(ts.max, s.max)
ts.sum += s.sum
ts.count += s.count
}
}
stations := make([]string, 0, len(totals))
for station := range totals {
stations = append(stations, station)
}
sort.Strings(stations)
fmt.Fprint(output, "{")
for i, station := range stations {
if i > 0 {
fmt.Fprint(output, ", ")
}
s := totals[station]
mean := float64(s.sum) / float64(s.count) / 10
fmt.Fprintf(output, "%s=%.1f/%.1f/%.1f", station, float64(s.min)/10, mean, float64(s.max)/10)
}
fmt.Fprint(output, "}\n")
return nil
}
func r9ProcessPart(inputPath string, fileOffset, fileSize int64, resultsCh chan map[string]*r9Stats) {
file, err := os.Open(inputPath)
if err != nil {
panic(err)
}
defer file.Close()
_, err = file.Seek(fileOffset, io.SeekStart)
if err != nil {
panic(err)
}
f := io.LimitedReader{R: file, N: fileSize}
type item struct {
key []byte
stat *r9Stats
}
const numBuckets = 1 << 17 // number of hash buckets (power of 2)
items := make([]item, numBuckets) // hash buckets, linearly probed
size := 0 // number of active items in items slice
buf := make([]byte, 1024*1024)
readStart := 0
for {
n, err := f.Read(buf[readStart:])
if err != nil && err != io.EOF {
panic(err)
}
if readStart+n == 0 {
break
}
chunk := buf[:readStart+n]
newline := bytes.LastIndexByte(chunk, '\n')
if newline < 0 {
break
}
remaining := chunk[newline+1:]
chunk = chunk[:newline+1]
for {
const (
// FNV-1 64-bit constants from hash/fnv.
offset64 = 14695981039346656037
prime64 = 1099511628211
)
var station, after []byte
hash := uint64(offset64)
i := 0
for ; i < len(chunk); i++ {
c := chunk[i]
if c == ';' {
station = chunk[:i]
after = chunk[i+1:]
break
}
hash ^= uint64(c) // FNV-1a is XOR then *
hash *= prime64
}
if i == len(chunk) {
break
}
index := 0
negative := false
if after[index] == '-' {
negative = true
index++
}
temp := int32(after[index] - '0')
index++
if after[index] != '.' {
temp = temp*10 + int32(after[index]-'0')
index++
}
index++ // skip '.'
temp = temp*10 + int32(after[index]-'0')
index += 2 // skip last digit and '\n'
if negative {
temp = -temp
}
chunk = after[index:]
hashIndex := int(hash & (numBuckets - 1))
for {
if items[hashIndex].key == nil {
// Found empty slot, add new item (copying key).
key := make([]byte, len(station))
copy(key, station)
items[hashIndex] = item{
key: key,
stat: &r9Stats{
min: temp,
max: temp,
sum: int64(temp),
count: 1,
},
}
size++
if size > numBuckets/2 {
panic("too many items in hash table")
}
break
}
if bytes.Equal(items[hashIndex].key, station) {
// Found matching slot, add to existing stats.
s := items[hashIndex].stat
s.min = min(s.min, temp)
s.max = max(s.max, temp)
s.sum += int64(temp)
s.count++
break
}
// Slot already holds another key, try next slot (linear probe).
hashIndex++
if hashIndex >= numBuckets {
hashIndex = 0
}
}
}
readStart = copy(buf, remaining)
}
result := make(map[string]*r9Stats, size)
for _, item := range items {
if item.key == nil {
continue
}
result[string(item.key)] = item.stat
}
resultsCh <- result
}
Solution 10: all the previous optimizations plus faster semicolon finding and hashing
// r10: all the previous optimizations plus faster semicolon finding and
// hashing
//
// Translated from Java by Menno Finlay-Smits Ideas with ideas taken from
// this fast Java solution:
//
// https://github.com/gunnarmorling/1brc/blob/main/src/main/java/dev/morling/onebrc/CalculateAverage_mtopolnik.java
//
// On my (Ben's) laptop I get these initial results:
//
// $ ./go-1brc -revision=1 ../1brc/data/measurements.txt >out-r1
// Processed 13156.2MB in 1m39.507011009s
// $ ./go-1brc -revision=9 ../1brc/data/measurements.txt >out-r9
// Processed 13156.2MB in 2.893693843s # 34.4x as fast as the r1 above
// $ ./go-1brc -revision=10 ../1brc/data/measurements.txt >out-r10
// Processed 13156.2MB in 2.497241029s # 39.8x as fast as the r1 above
package main
import (
"bytes"
"encoding/binary"
"fmt"
"io"
"math/bits"
"os"
"sort"
)
const BroadcastSemicolon = 0x3B3B3B3B3B3B3B3B
const Broadcast0x01 = 0x0101010101010101
const Broadcast0x80 = 0x8080808080808080
type r10Stats struct {
min, max, count int32
sum int64
}
func r10(inputPath string, output io.Writer) error {
parts, err := splitFile(inputPath, maxGoroutines)
if err != nil {
return err
}
resultsCh := make(chan map[string]*r10Stats)
for _, part := range parts {
go r10ProcessPart(inputPath, part.offset, part.size, resultsCh)
}
totals := make(map[string]*r10Stats)
for i := 0; i < len(parts); i++ {
result := <-resultsCh
for station, s := range result {
ts := totals[station]
if ts == nil {
totals[station] = s
continue
}
ts.min = min(ts.min, s.min)
ts.max = max(ts.max, s.max)
ts.sum += s.sum
ts.count += s.count
}
}
stations := make([]string, 0, len(totals))
for station := range totals {
stations = append(stations, station)
}
sort.Strings(stations)
fmt.Fprint(output, "{")
for i, station := range stations {
if i > 0 {
fmt.Fprint(output, ", ")
}
s := totals[station]
mean := float64(s.sum) / float64(s.count) / 10
fmt.Fprintf(output, "%s=%.1f/%.1f/%.1f", station, float64(s.min)/10, mean, float64(s.max)/10)
}
fmt.Fprint(output, "}\n")
return nil
}
func r10ProcessPart(inputPath string, fileOffset, fileSize int64, resultsCh chan map[string]*r10Stats) {
file, err := os.Open(inputPath)
if err != nil {
panic(err)
}
defer file.Close()
_, err = file.Seek(fileOffset, io.SeekStart)
if err != nil {
panic(err)
}
f := io.LimitedReader{R: file, N: fileSize}
type item struct {
key []byte
stat *r10Stats
}
const numBuckets = 1 << 17 // number of hash buckets (power of 2)
items := make([]item, numBuckets) // hash buckets, linearly probed
size := 0 // number of active items in items slice
buf := make([]byte, 1024*1024)
readStart := 0
for {
n, err := f.Read(buf[readStart:])
if err != nil && err != io.EOF {
panic(err)
}
if readStart+n == 0 {
break
}
chunk := buf[:readStart+n]
newline := bytes.LastIndexByte(chunk, '\n')
if newline < 0 {
break
}
remaining := chunk[newline+1:]
chunk = chunk[:newline+1]
chunkLoop:
for {
var hash uint64
var station, after []byte
if len(chunk) < 8 {
break chunkLoop
}
nameWord0 := binary.NativeEndian.Uint64(chunk)
matchBits := semicolonMatchBits(nameWord0)
if matchBits != 0 {
// semicolon is in the first 8 bytes
nameLen := calcNameLen(matchBits)
nameWord0 = maskWord(nameWord0, matchBits)
station = chunk[:nameLen]
after = chunk[nameLen+1:]
hash = calcHash(nameWord0)
} else {
// station name is longer so keep looking for the semicolon in
// uint64 chunks
nameLen := 8
hash = calcHash(nameWord0)
for {
if nameLen > len(chunk)-8 {
break chunkLoop
}
lastNameWord := binary.NativeEndian.Uint64(chunk[nameLen:])
matchBits = semicolonMatchBits(lastNameWord)
if matchBits != 0 {
nameLen += calcNameLen(matchBits)
station = chunk[:nameLen]
after = chunk[nameLen+1:]
break
}
nameLen += 8
}
}
index := 0
negative := false
if after[index] == '-' {
negative = true
index++
}
temp := int32(after[index] - '0')
index++
if after[index] != '.' {
temp = temp*10 + int32(after[index]-'0')
index++
}
index++ // skip '.'
temp = temp*10 + int32(after[index]-'0')
index += 2 // skip last digit and '\n'
if negative {
temp = -temp
}
chunk = after[index:]
hashIndex := int(hash & (numBuckets - 1))
for {
if items[hashIndex].key == nil {
// Found empty slot, add new item (copying key).
key := make([]byte, len(station))
copy(key, station)
items[hashIndex] = item{
key: key,
stat: &r10Stats{
min: temp,
max: temp,
sum: int64(temp),
count: 1,
},
}
size++
if size > numBuckets/2 {
panic("too many items in hash table")
}
break
}
if bytes.Equal(items[hashIndex].key, station) {
// Found matching slot, add to existing stats.
s := items[hashIndex].stat
s.min = min(s.min, temp)
s.max = max(s.max, temp)
s.sum += int64(temp)
s.count++
break
}
// Slot already holds another key, try next slot (linear probe).
hashIndex++
if hashIndex >= numBuckets {
hashIndex = 0
}
}
}
readStart = copy(buf, remaining)
}
result := make(map[string]*r10Stats, size)
for _, item := range items {
if item.key == nil {
continue
}
result[string(item.key)] = item.stat
}
resultsCh <- result
}
func calcNameLen(b uint64) int {
return (bits.TrailingZeros64(b) >> 3)
}
func calcHash(word uint64) uint64 {
return bits.RotateLeft64(word*0x51_7c_c1_b7_27_22_0a_95, 17)
}
func semicolonMatchBits(word uint64) uint64 {
diff := word ^ BroadcastSemicolon
return (diff - Broadcast0x01) & (^diff & Broadcast0x80)
}
func maskWord(word, matchBits uint64) uint64 {
mask := matchBits ^ (matchBits - 1)
return word & mask
}
Alexander Yastrebov
package main
import (
"bytes"
"fmt"
"log"
"math"
"os"
"runtime"
"sort"
"sync"
"syscall"
)
type measurement struct {
min, max, sum, count int64
}
func main() {
if len(os.Args) != 2 {
log.Fatalf("Missing measurements filename")
}
measurements := processFile(os.Args[1])
ids := make([]string, 0, len(measurements))
for id := range measurements {
ids = append(ids, id)
}
sort.Strings(ids)
fmt.Print("{")
for i, id := range ids {
if i > 0 {
fmt.Print(", ")
}
m := measurements[id]
fmt.Printf("%s=%.1f/%.1f/%.1f", id, round(float64(m.min)/10.0), round(float64(m.sum)/10.0/float64(m.count)), round(float64(m.max)/10.0))
}
fmt.Println("}")
}
func processFile(filename string) map[string]*measurement {
f, err := os.Open(filename)
if err != nil {
log.Fatalf("Open: %v", err)
}
defer f.Close()
fi, err := f.Stat()
if err != nil {
log.Fatalf("Stat: %v", err)
}
size := fi.Size()
if size <= 0 || size != int64(int(size)) {
log.Fatalf("Invalid file size: %d", size)
}
data, err := syscall.Mmap(int(f.Fd()), 0, int(size), syscall.PROT_READ, syscall.MAP_SHARED)
if err != nil {
log.Fatalf("Mmap: %v", err)
}
defer func() {
if err := syscall.Munmap(data); err != nil {
log.Fatalf("Munmap: %v", err)
}
}()
return process(data)
}
func process(data []byte) map[string]*measurement {
nChunks := runtime.NumCPU()
chunkSize := len(data) / nChunks
if chunkSize == 0 {
chunkSize = len(data)
}
chunks := make([]int, 0, nChunks)
offset := 0
for offset < len(data) {
offset += chunkSize
if offset >= len(data) {
chunks = append(chunks, len(data))
break
}
nlPos := bytes.IndexByte(data[offset:], '\n')
if nlPos == -1 {
chunks = append(chunks, len(data))
break
} else {
offset += nlPos + 1
chunks = append(chunks, offset)
}
}
var wg sync.WaitGroup
wg.Add(len(chunks))
results := make([]map[string]*measurement, len(chunks))
start := 0
for i, chunk := range chunks {
go func(data []byte, i int) {
results[i] = processChunk(data)
wg.Done()
}(data[start:chunk], i)
start = chunk
}
wg.Wait()
measurements := make(map[string]*measurement)
for _, r := range results {
for id, rm := range r {
m := measurements[id]
if m == nil {
measurements[id] = rm
} else {
m.min = min(m.min, rm.min)
m.max = max(m.max, rm.max)
m.sum += rm.sum
m.count += rm.count
}
}
}
return measurements
}
func processChunk(data []byte) map[string]*measurement {
// Use fixed size linear probe lookup table
const (
// use power of 2 for fast modulo calculation,
// should be larger than max number of keys which is 10_000
entriesSize = 1 << 14
// use FNV-1a hash
fnv1aOffset64 = 14695981039346656037
fnv1aPrime64 = 1099511628211
)
type entry struct {
m measurement
hash uint64
vlen int
value [128]byte // use power of 2 > 100 for alignment
}
entries := make([]entry, entriesSize)
entriesCount := 0
// keep short and inlinable
getMeasurement := func(hash uint64, value []byte) *measurement {
i := hash & uint64(entriesSize-1)
entry := &entries[i]
// bytes.Equal could be commented to speedup assuming no hash collisions
for entry.vlen > 0 && !(entry.hash == hash && bytes.Equal(entry.value[:entry.vlen], value)) {
i = (i + 1) & uint64(entriesSize-1)
entry = &entries[i]
}
if entry.vlen == 0 {
entry.hash = hash
entry.vlen = copy(entry.value[:], value)
entriesCount++
}
return &entry.m
}
// assume valid input
for len(data) > 0 {
idHash := uint64(fnv1aOffset64)
semiPos := 0
for i, b := range data {
if b == ';' {
semiPos = i
break
}
// calculate FNV-1a hash
idHash ^= uint64(b)
idHash *= fnv1aPrime64
}
idData := data[:semiPos]
data = data[semiPos+1:]
var temp int64
// parseNumber
{
negative := data[0] == '-'
if negative {
data = data[1:]
}
_ = data[3]
if data[1] == '.' {
// 1.2\n
temp = int64(data[0])*10 + int64(data[2]) - '0'*(10+1)
data = data[4:]
// 12.3\n
} else {
_ = data[4]
temp = int64(data[0])*100 + int64(data[1])*10 + int64(data[3]) - '0'*(100+10+1)
data = data[5:]
}
if negative {
temp = -temp
}
}
m := getMeasurement(idHash, idData)
if m.count == 0 {
m.min = temp
m.max = temp
m.sum = temp
m.count = 1
} else {
m.min = min(m.min, temp)
m.max = max(m.max, temp)
m.sum += temp
m.count++
}
}
result := make(map[string]*measurement, entriesCount)
for i := range entries {
entry := &entries[i]
if entry.m.count > 0 {
result[string(entry.value[:entry.vlen])] = &entry.m
}
}
return result
}
func round(x float64) float64 {
return roundJava(x*10.0) / 10.0
}
// roundJava returns the closest integer to the argument, with ties
// rounding to positive infinity, see java's Math.round
func roundJava(x float64) float64 {
t := math.Trunc(x)
if x < 0.0 && t-x == 0.5 {
//return t
} else if math.Abs(x-t) >= 0.5 {
t += math.Copysign(1, x)
}
if t == 0 { // check -0
return 0.0
}
return t
}
// parseNumber reads decimal number that matches "^-?[0-9]{1,2}[.][0-9]" pattern,
// e.g.: -12.3, -3.4, 5.6, 78.9 and return the value*10, i.e. -123, -34, 56, 789.
func parseNumber(data []byte) int64 {
negative := data[0] == '-'
if negative {
data = data[1:]
}
var result int64
switch len(data) {
// 1.2
case 3:
result = int64(data[0])*10 + int64(data[2]) - '0'*(10+1)
// 12.3
case 4:
result = int64(data[0])*100 + int64(data[1])*10 + int64(data[3]) - '0'*(100+10+1)
}
if negative {
return -result
}
return result
}
François Pons
package main
import (
"bytes"
"flag"
"fmt"
"log"
"os"
"runtime"
"runtime/pprof"
"slices"
"sync"
"syscall"
"sync/atomic"
"github.com/aytechnet/decimal"
)
type (
measurement struct {
hash atomic.Uint64
minT atomic.Int64
maxT atomic.Int64
sumT atomic.Int64
countT atomic.Int64
nameLen int
nameBuf [208]byte
}
measurements struct {
total atomic.Int64
numParsers int
results [capacity]measurement
}
job struct {
maxOffset atomic.Int64
bufLen int
buf [bufSize]byte
}
)
const (
delta = 439
capacity = 1 << 16 // must be a power of 2 for modulo calculation
// buffer size
bufSize = 512 * 1024 // 1Mb
// use FNV-1a hash
fnv1aOffset64 = 14695981039346656037
fnv1aPrime64 = 1099511628211
)
func main() {
var mode, filename, cpuprofile string
var res measurements
flag.StringVar(&mode, "mode", "default", "Which mode to use among 'mmap', 'seq' and 'default'")
flag.StringVar(&filename, "file", "", "Measurements file to use")
flag.StringVar(&cpuprofile, "cpuprofile", "", "Write cpuprofile to file")
flag.IntVar(&res.numParsers, "parsers", runtime.NumCPU(), "Number of thread to use for parsing")
flag.Parse()
if filename == "" {
flag.Usage()
os.Exit(1)
}
if cpuprofile != "" {
if f, err := os.Create(cpuprofile); err != nil {
log.Fatal(err)
} else {
pprof.StartCPUProfile(f)
defer pprof.StopCPUProfile()
}
}
f, err := os.Open(filename)
if err != nil {
log.Fatalf("Open: %v", err)
}
defer f.Close()
switch mode {
case "mmap":
res.processMmap(f)
case "seq":
res.processSeq(f)
default:
res.process(f)
}
}
func (res *measurements) processMmap(f *os.File) {
jobs := make([]job, res.numParsers)
fi, err := f.Stat()
if err != nil {
log.Fatalf("Stat: %v", err)
}
size := fi.Size()
chunkSize := size/int64(len(jobs))
if chunkSize <= 100 {
log.Fatalf("Invalid file size: %d", size)
}
data, err := syscall.Mmap(int(f.Fd()), 0, int(size), syscall.PROT_READ, syscall.MAP_SHARED)
if err != nil {
log.Fatalf("Mmap: %v", err)
}
defer func() {
if err := syscall.Munmap(data); err != nil {
log.Fatalf("Munmap: %v", err)
}
}()
offset := chunkSize
for i := range jobs {
j := &jobs[i]
if i == len(jobs)-1 {
j.maxOffset.Store(size)
} else {
j.maxOffset.Store(-offset)
}
offset += chunkSize
}
var wg sync.WaitGroup
wg.Add(len(jobs))
offset = 0
for i := range jobs {
go func(i int, offset int64){
defer wg.Done()
j := &jobs[i]
maxOffset := j.maxOffset.Load()
if maxOffset < 0 {
maxOffset = -maxOffset
if nlPos := bytes.IndexByte(data[maxOffset:], '\n'); nlPos >= 0 {
maxOffset += int64(nlPos+1)
}
}
if offset > 0 {
if nlPos := bytes.IndexByte(data[offset:maxOffset], '\n'); nlPos >= 0 {
offset += int64(nlPos+1)
}
}
res.processChunk(data[offset:maxOffset])
}(i, offset)
offset += chunkSize
}
wg.Wait()
res.printResults()
}
func (res *measurements) processSeq(f *os.File) {
jobs := make([]job, res.numParsers)
free_jobs := make(chan *job, len(jobs))
ready_jobs := make(chan *job, len(jobs))
var wg sync.WaitGroup
wg.Add(len(jobs))
for i := range jobs {
j := &jobs[i]
free_jobs <- j
go func(){
for j := range ready_jobs {
res.processChunk(j.buf[:j.bufLen])
free_jobs <- j
}
wg.Done()
}()
}
var prev_j *job
pos := 0
nlPos := 0
for j := range free_jobs {
if count, _ := f.Read(j.buf[pos:]); count > 0 {
// finalize previous line from previous job
if pos > 0 /* && prev_j != nil */ {
copy(j.buf[:pos], prev_j.buf[bufSize-pos:])
}
// prepare next buffer
if nlPos = bytes.LastIndexByte(j.buf[:pos+count], '\n'); nlPos < 0 {
log.Fatalf("buffer too small for a complete line");
} else {
pos = pos + count - nlPos - 1
prev_j = j
j.bufLen = nlPos + 1
}
// spawn a new job on this buffer
ready_jobs <- j
} else {
break
}
}
close(ready_jobs)
wg.Wait()
close(free_jobs)
res.printResults()
}
func (res *measurements) process(f *os.File) {
jobs := make([]job, res.numParsers)
fi, err := f.Stat()
if err != nil {
log.Fatalf("Stat: %v", err)
}
size := fi.Size()
chunkSize := size/int64(len(jobs))
if chunkSize <= 100 {
log.Fatalf("Invalid file size: %d", size)
}
offset := chunkSize
for i := range jobs {
j := &jobs[i]
if i == len(jobs)-1 {
j.maxOffset.Store(size)
} else {
j.maxOffset.Store(-offset)
}
offset += chunkSize
}
var wg sync.WaitGroup
wg.Add(len(jobs))
offset = 0
for i := range jobs {
go func(i int, offset int64){
defer wg.Done()
j := &jobs[i]
nlSkipFirst := offset > 0
for {
maxLen := bufSize
maxOffset := j.maxOffset.Load()
if maxOffset < 0 {
maxOffset = -maxOffset
}
if offset + int64(maxLen) > maxOffset {
maxLen = int(maxOffset - offset)
}
if count, _ := f.ReadAt(j.buf[:maxLen], offset); count > 0 {
pos := 0
if nlSkipFirst {
if nlPos := bytes.IndexByte(j.buf[:maxLen], '\n'); nlPos >= 0 {
pos = nlPos + 1
jobs[i-1].maxOffset.Store(offset + int64(pos))
nlSkipFirst = false
} else {
log.Fatalf("Unable to seek to next line at job n°%d", i)
}
}
if nlPos := bytes.LastIndexByte(j.buf[pos:maxLen], '\n'); nlPos >= 0 {
j.bufLen = pos + nlPos + 1
offset += int64(j.bufLen)
res.processChunk(j.buf[pos:j.bufLen])
} else {
log.Fatalf("Buffer too small at job n°%d", i)
}
} else {
return
}
maxOffset = j.maxOffset.Load()
for maxOffset < 0 {
maxOffset = j.maxOffset.Load()
}
if offset >= maxOffset {
return
}
}
}(i, offset)
offset += chunkSize
}
wg.Wait()
res.printResults()
}
func (res *measurements) printResults() {
log.Printf("Read %d entries", res.total.Load())
ids := make([]int, 0, capacity)
for i := range res.results {
m := &res.results[i]
if m.nameLen > 0 {
ids = append(ids, i)
}
}
slices.SortFunc(ids, func(a,b int) int {
return bytes.Compare(res.results[a].nameBuf[0:res.results[a].nameLen], res.results[b].nameBuf[0:res.results[b].nameLen])
})
count := 0
fmt.Print("{")
for _, i := range ids {
m := &res.results[i]
var buf [128]byte // name is 100 chars max, each 3 decimals is 5 bytes max on output
b := buf[0:0]
if count > 0 {
b = append(b, ',', ' ')
}
b = append(b, m.nameBuf[0:m.nameLen]...)
b = append(b, '=')
b = decimal.New(m.minT.Load(), -1).BytesToFixed(b, 1)
b = append(b, '/')
b = decimal.New(m.sumT.Load(), -1).Div(decimal.NewFromInt(m.countT.Load()-1)).BytesToFixed(b, 1)
b = append(b, '/')
b = decimal.New(m.maxT.Load(), -1).BytesToFixed(b, 1)
count++
fmt.Print(string(b))
// fmt.Printf("%s=%s/%s/%s", m.nameBuf[0:m.nameLen], decimal.New(m.minT.Load(), -1).StringFixed(1), decimal.New(m.sumT.Load(), -1).Div(decimal.NewFromInt(m.countT.Load()-1)).StringFixed(1), decimal.New(m.maxT.Load(), -1).StringFixed(1))
count++
}
fmt.Println("}")
}
func (res *measurements) processChunk(data []byte) {
var total int64
// assume valid input
for len(data) > 0 {
i := 0
// compute FNV-1a hash
idHash := uint64(fnv1aOffset64)
for j, b := range data {
if b == ';' {
i = j
break
}
// calculate FNV-1a hash
idHash ^= uint64(b)
idHash *= fnv1aPrime64
}
if idHash == 0 {
idHash = uint64(len(data))
}
idData := data[:i]
i++ // now i points to temperature
var temp int64
// parseNumber
{
negative := data[i] == '-'
if negative {
i++
}
temp = int64(data[i]-'0')
i++
if data[i] != '.' {
temp = temp*10 + int64(data[i]-'0')
i++
}
i++ // data[i] is '.'
temp = temp*10 + int64(data[i]-'0')
if negative {
temp = -temp
}
data = data[i+2:]
}
// get measurement
{
i := idHash & uint64(capacity-1)
entry := &res.results[i]
for {
if entry.hash.CompareAndSwap(0, idHash) {
// make sure no race occurs as entry may be updated meanwhile as hash has been updated
if entry.countT.Add(1) == 1 {
entry.nameLen = len(idData)
copy(entry.nameBuf[:], idData)
entry.minT.Store(temp)
entry.maxT.Store(temp)
entry.sumT.Store(temp)
entry.countT.Add(1) // unlock for update below
} else {
// wait for countT to be at least 2 for entry init to be complete
for entry.countT.Load() < 2 {}
// update existing entry
minT := entry.minT.Load()
for minT > temp {
if entry.minT.CompareAndSwap(minT, temp) {
break
} else {
minT = entry.minT.Load()
}
}
maxT := entry.maxT.Load()
for maxT < temp {
if entry.maxT.CompareAndSwap(maxT, temp) {
break
} else {
maxT = entry.maxT.Load()
}
}
entry.sumT.Add(temp)
entry.countT.Add(1)
}
break
} else if entry.hash.Load() == idHash {
// the entry is found and may be being updated by another thread
// wait for countT to be at least 2 for entry init to be complete
for entry.countT.Load() < 2 {}
// now that name has been updated, check it is matching
if len(idData) == entry.nameLen /* bytes.Compare(idData, entry.nameBuf[0:entry.nameLen]) == 0 */ {
// update existing entry
minT := entry.minT.Load()
for minT > temp {
if entry.minT.CompareAndSwap(minT, temp) {
break
} else {
minT = entry.minT.Load()
}
}
maxT := entry.maxT.Load()
for maxT < temp {
if entry.maxT.CompareAndSwap(maxT, temp) {
break
} else {
maxT = entry.maxT.Load()
}
}
entry.sumT.Add(temp)
entry.countT.Add(1)
break
} else {
// name does not match idData so jump to next entry
i = (i + delta) & uint64(capacity-1)
entry = &res.results[i]
}
} else {
i = (i + delta) & uint64(capacity-1)
entry = &res.results[i]
}
}
}
total++
}
res.total.Add(total)
}
func (entry *measurement) update(temp int64) {
minT := entry.minT.Load()
for minT > temp {
if entry.minT.CompareAndSwap(minT, temp) {
break
} else {
minT = entry.minT.Load()
}
}
maxT := entry.maxT.Load()
for maxT < temp {
if entry.maxT.CompareAndSwap(maxT, temp) {
break
} else {
maxT = entry.maxT.Load()
}
}
entry.sumT.Add(temp)
entry.countT.Add(1)
}