Hero Image
Makefiles for Web Projects: Manage Your Environment Workflow

Makefiles for Web Projects: Manage Your Environment Workflow How I stopped worrying and loved Makefiles Note: Makefile indentation must use tabs, otherwise you’ll get syntax errors. The Core of a Makefile: Targets up: cp .env.example .env docker compose up -d workspace stop: docker compose stop zsh: docker compose exec workspace zsh This example has three targets: up, stop, and zsh. By default, Make treats the first target as the Goal (it cannot start with a dot), which is the project’s primary workflow. In this case, make and make up do the same thing. But the copy step above is not a typical Make use case. Make shines at deciding whether each target needs to run. For example, we often store secrets in .env. If .env already exists, we shouldn’t overwrite it by copying .env.example again. In that case, we can make .env a target: up: .env docker compose up -d workspace .env: cp .env.example .env By default, target names are treated as filenames. The name “make” implies building a target; it will only execute the target’s recipe when the conditions are met (like the file not existing). In this example, when you run the up target, if .env doesn’t exist it will run the .env target first to create it, then start the workspace container. If .env already exists, it skips the .env target and starts the container directly. Likewise, if there is a file named up in the directory, the up target won’t run. You can define Phony Targets to tell Make that certain targets aren’t filenames, but named workflows instead: .PHONY: up stop zsh Add Some Variables Make supports variables (Variable). Following common Unix environment variable conventions, we usually write them in SCREAMING_SNAKE_CASE. When used, variables are wrapped in $().

Hero Image
The One Billion Row Challenge in Go: from 1m45s to 3.4s in nine solutions

The One Billion Row Challenge in Go: from 1m45s to 3.4s in nine solutions https://github.com/benhoyt/go-1brc Baseline $ time cat measurements.txt >/dev/null 0m1.052s $ time wc measurements.txt 1000000000 1179173106 13795293380 measurements.txt 0m55.710s $ time gawk -b -f 1brc.awk measurements.txt >measurements.out 7m35.567s BEGIN { FS = ";" } { if (counts[$1]++) { mins[$1] = $2 < mins[$1] ? $2 : mins[$1] maxs[$1] = $2 > maxs[$1] ? $2 : maxs[$1] } else { mins[$1] = maxs[$1] = $2 # new entry } sums[$1] += $2 } END { printf "{" n = asorti(mins, sorted) for (i = 1; i <= n; i++) { station = sorted[i] min = mins[station] max = maxs[station] mean = sums[station] / counts[station] printf "%s=%.1f/%.1f/%.1f", station, min, mean, max if (i < n) { printf ", " } } printf "}\n" } Solution 1: simple and idiomatic Go // r1: simple, idiomatic Go using bufio.Scanner and strconv.ParseFloat // // ~1.004s for 10M rows package main import ( "bufio" "fmt" "io" "os" "sort" "strconv" "strings" ) func r1(inputPath string, output io.Writer) error { type stats struct { min, max, sum float64 count int64 } f, err := os.Open(inputPath) if err != nil { return err } defer f.Close() stationStats := make(map[string]stats) scanner := bufio.NewScanner(f) for scanner.Scan() { line := scanner.Text() station, tempStr, hasSemi := strings.Cut(line, ";") if !hasSemi { continue } temp, err := strconv.ParseFloat(tempStr, 64) if err != nil { return err } s, ok := stationStats[station] if !ok { s.min = temp s.max = temp s.sum = temp s.count = 1 } else { s.min = min(s.min, temp) s.max = max(s.max, temp) s.sum += temp s.count++ } stationStats[station] = s } stations := make([]string, 0, len(stationStats)) for station := range stationStats { stations = append(stations, station) } sort.Strings(stations) fmt.Fprint(output, "{") for i, station := range stations { if i > 0 { fmt.Fprint(output, ", ") } s := stationStats[station] mean := s.sum / float64(s.count) fmt.Fprintf(output, "%s=%.1f/%.1f/%.1f", station, s.min, mean, s.max) } fmt.Fprint(output, "}\n") return nil } Solution 2: map with pointer values // r2: use stats pointer as map value to avoid double hashing // // ~921ms for 10M rows (1.09x as fast as r1) package main import ( "bufio" "fmt" "io" "os" "sort" "strconv" "strings" ) func r2(inputPath string, output io.Writer) error { type stats struct { min, max, sum float64 count int64 } f, err := os.Open(inputPath) if err != nil { return err } defer f.Close() stationStats := make(map[string]*stats) scanner := bufio.NewScanner(f) for scanner.Scan() { line := scanner.Text() station, tempStr, hasSemi := strings.Cut(line, ";") if !hasSemi { continue } temp, err := strconv.ParseFloat(tempStr, 64) if err != nil { return err } s := stationStats[station] if s == nil { stationStats[station] = &stats{ min: temp, max: temp, sum: temp, count: 1, } } else { s.min = min(s.min, temp) s.max = max(s.max, temp) s.sum += temp s.count++ } } stations := make([]string, 0, len(stationStats)) for station := range stationStats { stations = append(stations, station) } sort.Strings(stations) fmt.Fprint(output, "{") for i, station := range stations { if i > 0 { fmt.Fprint(output, ", ") } s := stationStats[station] mean := s.sum / float64(s.count) fmt.Fprintf(output, "%s=%.1f/%.1f/%.1f", station, s.min, mean, s.max) } fmt.Fprint(output, "}\n") return nil } Solution 3: avoid strconv.ParseFloat // r3: parse temperatures manually instead of using strconv.ParseFloat // // ~517ms for 10M rows (1.94x as fast as r1) package main import ( "bufio" "bytes" "fmt" "io" "os" "sort" ) func r3(inputPath string, output io.Writer) error { type stats struct { min, max, sum float64 count int64 } f, err := os.Open(inputPath) if err != nil { return err } defer f.Close() stationStats := make(map[string]*stats) scanner := bufio.NewScanner(f) for scanner.Scan() { line := scanner.Bytes() station, tempBytes, hasSemi := bytes.Cut(line, []byte(";")) if !hasSemi { continue } negative := false index := 0 if tempBytes[index] == '-' { index++ negative = true } temp := float64(tempBytes[index] - '0') index++ if tempBytes[index] != '.' { temp = temp*10 + float64(tempBytes[index]-'0') index++ } index++ // skip '.' temp += float64(tempBytes[index]-'0') / 10 if negative { temp = -temp } s := stationStats[string(station)] if s == nil { stationStats[string(station)] = &stats{ min: temp, max: temp, sum: temp, count: 1, } } else { s.min = min(s.min, temp) s.max = max(s.max, temp) s.sum += temp s.count++ } } stations := make([]string, 0, len(stationStats)) for station := range stationStats { stations = append(stations, station) } sort.Strings(stations) fmt.Fprint(output, "{") for i, station := range stations { if i > 0 { fmt.Fprint(output, ", ") } s := stationStats[station] mean := s.sum / float64(s.count) fmt.Fprintf(output, "%s=%.1f/%.1f/%.1f", station, s.min, mean, s.max) } fmt.Fprint(output, "}\n") return nil } Solution 4: fixed point integers // r4: use fixed point int32s (*10) instead of float64s // // ~491ms for 10M rows (2.04x as fast as r1) package main import ( "bufio" "bytes" "fmt" "io" "os" "sort" ) func r4(inputPath string, output io.Writer) error { type stats struct { min, max, count int32 sum int64 } f, err := os.Open(inputPath) if err != nil { return err } defer f.Close() stationStats := make(map[string]*stats) scanner := bufio.NewScanner(f) for scanner.Scan() { line := scanner.Bytes() station, tempBytes, hasSemi := bytes.Cut(line, []byte(";")) if !hasSemi { continue } negative := false index := 0 if tempBytes[index] == '-' { index++ negative = true } temp := int32(tempBytes[index] - '0') index++ if tempBytes[index] != '.' { temp = temp*10 + int32(tempBytes[index]-'0') index++ } index++ // skip '.' temp = temp*10 + int32(tempBytes[index]-'0') if negative { temp = -temp } s := stationStats[string(station)] if s == nil { stationStats[string(station)] = &stats{ min: temp, max: temp, sum: int64(temp), count: 1, } } else { s.min = min(s.min, temp) s.max = max(s.max, temp) s.sum += int64(temp) s.count++ } } stations := make([]string, 0, len(stationStats)) for station := range stationStats { stations = append(stations, station) } sort.Strings(stations) fmt.Fprint(output, "{") for i, station := range stations { if i > 0 { fmt.Fprint(output, ", ") } s := stationStats[station] mean := float64(s.sum) / float64(s.count) / 10 fmt.Fprintf(output, "%s=%.1f/%.1f/%.1f", station, float64(s.min)/10, mean, float64(s.max)/10) } fmt.Fprint(output, "}\n") return nil } Solution 5: avoid bytes.Cut // r5: avoid bytes.Cut // // ~442ms for 10M rows (2.27x as fast as r1) package main import ( "bufio" "fmt" "io" "os" "sort" ) func r5(inputPath string, output io.Writer) error { type stats struct { min, max, count int32 sum int64 } f, err := os.Open(inputPath) if err != nil { return err } defer f.Close() stationStats := make(map[string]*stats) scanner := bufio.NewScanner(f) for scanner.Scan() { line := scanner.Bytes() end := len(line) tenths := int32(line[end-1] - '0') ones := int32(line[end-3] - '0') // line[end-2] is '.' var temp int32 var semicolon int if line[end-4] == ';' { temp = ones*10 + tenths semicolon = end - 4 } else if line[end-4] == '-' { temp = -(ones*10 + tenths) semicolon = end - 5 } else { tens := int32(line[end-4] - '0') if line[end-5] == ';' { temp = tens*100 + ones*10 + tenths semicolon = end - 5 } else { // '-' temp = -(tens*100 + ones*10 + tenths) semicolon = end - 6 } } station := line[:semicolon] s := stationStats[string(station)] if s == nil { stationStats[string(station)] = &stats{ min: temp, max: temp, sum: int64(temp), count: 1, } } else { s.min = min(s.min, temp) s.max = max(s.max, temp) s.sum += int64(temp) s.count++ } } stations := make([]string, 0, len(stationStats)) for station := range stationStats { stations = append(stations, station) } sort.Strings(stations) fmt.Fprint(output, "{") for i, station := range stations { if i > 0 { fmt.Fprint(output, ", ") } s := stationStats[station] mean := float64(s.sum) / float64(s.count) / 10 fmt.Fprintf(output, "%s=%.1f/%.1f/%.1f", station, float64(s.min)/10, mean, float64(s.max)/10) } fmt.Fprint(output, "}\n") return nil } Solution 6: avoid bufio.Scanner // r6: don't use bufio.Scanner to avoid scanning some bytes twice // // ~399ms for 10M rows (2.52x as fast as r1) package main import ( "bytes" "fmt" "io" "os" "sort" ) func r6(inputPath string, output io.Writer) error { type stats struct { min, max, count int32 sum int64 } f, err := os.Open(inputPath) if err != nil { return err } defer f.Close() stationStats := make(map[string]*stats) buf := make([]byte, 1024*1024) readStart := 0 for { n, err := f.Read(buf[readStart:]) if err != nil && err != io.EOF { return err } if readStart+n == 0 { break } chunk := buf[:readStart+n] newline := bytes.LastIndexByte(chunk, '\n') if newline < 0 { break } remaining := chunk[newline+1:] chunk = chunk[:newline+1] for { station, after, hasSemi := bytes.Cut(chunk, []byte(";")) if !hasSemi { break } index := 0 negative := false if after[index] == '-' { negative = true index++ } temp := int32(after[index] - '0') index++ if after[index] != '.' { temp = temp*10 + int32(after[index]-'0') index++ } index++ // skip '.' temp = temp*10 + int32(after[index]-'0') index += 2 // skip last digit and '\n' if negative { temp = -temp } chunk = after[index:] s := stationStats[string(station)] if s == nil { stationStats[string(station)] = &stats{ min: temp, max: temp, sum: int64(temp), count: 1, } } else { s.min = min(s.min, temp) s.max = max(s.max, temp) s.sum += int64(temp) s.count++ } } readStart = copy(buf, remaining) } stations := make([]string, 0, len(stationStats)) for station := range stationStats { stations = append(stations, station) } sort.Strings(stations) fmt.Fprint(output, "{") for i, station := range stations { if i > 0 { fmt.Fprint(output, ", ") } s := stationStats[station] mean := float64(s.sum) / float64(s.count) / 10 fmt.Fprintf(output, "%s=%.1f/%.1f/%.1f", station, float64(s.min)/10, mean, float64(s.max)/10) } fmt.Fprint(output, "}\n") return nil } Solution 7: custom hash table // r7: use custom hash table and hash station name as we look for ';' // // ~234ms for 10M rows (4.29x as fast as r1) package main import ( "bytes" "fmt" "io" "os" "sort" ) func r7(inputPath string, output io.Writer) error { type stats struct { min, max, count int32 sum int64 } f, err := os.Open(inputPath) if err != nil { return err } defer f.Close() type item struct { key []byte stat *stats } const numBuckets = 1 << 17 // number of hash buckets (power of 2) items := make([]item, numBuckets) // hash buckets, linearly probed size := 0 // number of active items in items slice buf := make([]byte, 1024*1024) readStart := 0 for { n, err := f.Read(buf[readStart:]) if err != nil && err != io.EOF { return err } if readStart+n == 0 { break } chunk := buf[:readStart+n] newline := bytes.LastIndexByte(chunk, '\n') if newline < 0 { break } remaining := chunk[newline+1:] chunk = chunk[:newline+1] for { const ( // FNV-1 64-bit constants from hash/fnv. offset64 = 14695981039346656037 prime64 = 1099511628211 ) var station, after []byte hash := uint64(offset64) i := 0 for ; i < len(chunk); i++ { c := chunk[i] if c == ';' { station = chunk[:i] after = chunk[i+1:] break } hash ^= uint64(c) // FNV-1a is XOR then * hash *= prime64 } if i == len(chunk) { break } index := 0 negative := false if after[index] == '-' { negative = true index++ } temp := int32(after[index] - '0') index++ if after[index] != '.' { temp = temp*10 + int32(after[index]-'0') index++ } index++ // skip '.' temp = temp*10 + int32(after[index]-'0') index += 2 // skip last digit and '\n' if negative { temp = -temp } chunk = after[index:] hashIndex := int(hash & uint64(numBuckets-1)) for { if items[hashIndex].key == nil { // Found empty slot, add new item (copying key). key := make([]byte, len(station)) copy(key, station) items[hashIndex] = item{ key: key, stat: &stats{ min: temp, max: temp, sum: int64(temp), count: 1, }, } size++ if size > numBuckets/2 { panic("too many items in hash table") } break } if bytes.Equal(items[hashIndex].key, station) { // Found matching slot, add to existing stats. s := items[hashIndex].stat s.min = min(s.min, temp) s.max = max(s.max, temp) s.sum += int64(temp) s.count++ break } // Slot already holds another key, try next slot (linear probe). hashIndex++ if hashIndex >= numBuckets { hashIndex = 0 } } } readStart = copy(buf, remaining) } stationItems := make([]item, 0, size) for _, item := range items { if item.key == nil { continue } stationItems = append(stationItems, item) } sort.Slice(stationItems, func(i, j int) bool { return string(stationItems[i].key) < string(stationItems[j].key) }) fmt.Fprint(output, "{") for i, item := range stationItems { if i > 0 { fmt.Fprint(output, ", ") } s := item.stat mean := float64(s.sum) / float64(s.count) / 10 fmt.Fprintf(output, "%s=%.1f/%.1f/%.1f", item.key, float64(s.min)/10, mean, float64(s.max)/10) } fmt.Fprint(output, "}\n") return nil } Solution 8: process chunks in parallel // r8: add some parallelism (but back to non-optimized r1 version) // // ~213ms for 10M rows (4.71x as fast as r1) package main import ( "bufio" "bytes" "fmt" "io" "os" "sort" "strconv" "strings" ) type r8Stats struct { min, max, sum float64 count int64 } func r8(inputPath string, output io.Writer) error { parts, err := splitFile(inputPath, maxGoroutines) if err != nil { return err } resultsCh := make(chan map[string]r8Stats) for _, part := range parts { go r8ProcessPart(inputPath, part.offset, part.size, resultsCh) } totals := make(map[string]r8Stats) for i := 0; i < len(parts); i++ { result := <-resultsCh for station, s := range result { ts, ok := totals[station] if !ok { totals[station] = r8Stats{ min: s.min, max: s.max, sum: s.sum, count: s.count, } continue } ts.min = min(ts.min, s.min) ts.max = max(ts.max, s.max) ts.sum += s.sum ts.count += s.count totals[station] = ts } } stations := make([]string, 0, len(totals)) for station := range totals { stations = append(stations, station) } sort.Strings(stations) fmt.Fprint(output, "{") for i, station := range stations { if i > 0 { fmt.Fprint(output, ", ") } s := totals[station] mean := s.sum / float64(s.count) fmt.Fprintf(output, "%s=%.1f/%.1f/%.1f", station, s.min, mean, s.max) } fmt.Fprint(output, "}\n") return nil } func r8ProcessPart(inputPath string, fileOffset, fileSize int64, resultsCh chan map[string]r8Stats) { file, err := os.Open(inputPath) if err != nil { panic(err) } defer file.Close() _, err = file.Seek(fileOffset, io.SeekStart) if err != nil { panic(err) } f := io.LimitedReader{R: file, N: fileSize} stationStats := make(map[string]r8Stats) scanner := bufio.NewScanner(&f) for scanner.Scan() { line := scanner.Text() station, tempStr, hasSemi := strings.Cut(line, ";") if !hasSemi { continue } temp, err := strconv.ParseFloat(tempStr, 64) if err != nil { panic(err) } s, ok := stationStats[station] if !ok { s.min = temp s.max = temp s.sum = temp s.count = 1 } else { s.min = min(s.min, temp) s.max = max(s.max, temp) s.sum += temp s.count++ } stationStats[station] = s } resultsCh <- stationStats } type part struct { offset, size int64 } func splitFile(inputPath string, numParts int) ([]part, error) { const maxLineLength = 100 f, err := os.Open(inputPath) if err != nil { return nil, err } st, err := f.Stat() if err != nil { return nil, err } size := st.Size() splitSize := size / int64(numParts) buf := make([]byte, maxLineLength) parts := make([]part, 0, numParts) offset := int64(0) for i := 0; i < numParts; i++ { if i == numParts-1 { if offset < size { parts = append(parts, part{offset, size - offset}) } break } seekOffset := max(offset+splitSize-maxLineLength, 0) _, err := f.Seek(seekOffset, io.SeekStart) if err != nil { return nil, err } n, _ := io.ReadFull(f, buf) chunk := buf[:n] newline := bytes.LastIndexByte(chunk, '\n') if newline < 0 { return nil, fmt.Errorf("newline not found at offset %d", offset+splitSize-maxLineLength) } remaining := len(chunk) - newline - 1 nextOffset := seekOffset + int64(len(chunk)) - int64(remaining) parts = append(parts, part{offset, nextOffset - offset}) offset = nextOffset } return parts, nil } Solution 9: all optimisations plus parallelisation // r9: all the previous optimizations plus parallel execution // // 43ms for 10M rows (23.3x as fast as r1) package main import ( "bytes" "fmt" "io" "os" "sort" ) type r9Stats struct { min, max, count int32 sum int64 } func r9(inputPath string, output io.Writer) error { parts, err := splitFile(inputPath, maxGoroutines) if err != nil { return err } resultsCh := make(chan map[string]*r9Stats) for _, part := range parts { go r9ProcessPart(inputPath, part.offset, part.size, resultsCh) } totals := make(map[string]*r9Stats) for i := 0; i < len(parts); i++ { result := <-resultsCh for station, s := range result { ts := totals[station] if ts == nil { totals[station] = s continue } ts.min = min(ts.min, s.min) ts.max = max(ts.max, s.max) ts.sum += s.sum ts.count += s.count } } stations := make([]string, 0, len(totals)) for station := range totals { stations = append(stations, station) } sort.Strings(stations) fmt.Fprint(output, "{") for i, station := range stations { if i > 0 { fmt.Fprint(output, ", ") } s := totals[station] mean := float64(s.sum) / float64(s.count) / 10 fmt.Fprintf(output, "%s=%.1f/%.1f/%.1f", station, float64(s.min)/10, mean, float64(s.max)/10) } fmt.Fprint(output, "}\n") return nil } func r9ProcessPart(inputPath string, fileOffset, fileSize int64, resultsCh chan map[string]*r9Stats) { file, err := os.Open(inputPath) if err != nil { panic(err) } defer file.Close() _, err = file.Seek(fileOffset, io.SeekStart) if err != nil { panic(err) } f := io.LimitedReader{R: file, N: fileSize} type item struct { key []byte stat *r9Stats } const numBuckets = 1 << 17 // number of hash buckets (power of 2) items := make([]item, numBuckets) // hash buckets, linearly probed size := 0 // number of active items in items slice buf := make([]byte, 1024*1024) readStart := 0 for { n, err := f.Read(buf[readStart:]) if err != nil && err != io.EOF { panic(err) } if readStart+n == 0 { break } chunk := buf[:readStart+n] newline := bytes.LastIndexByte(chunk, '\n') if newline < 0 { break } remaining := chunk[newline+1:] chunk = chunk[:newline+1] for { const ( // FNV-1 64-bit constants from hash/fnv. offset64 = 14695981039346656037 prime64 = 1099511628211 ) var station, after []byte hash := uint64(offset64) i := 0 for ; i < len(chunk); i++ { c := chunk[i] if c == ';' { station = chunk[:i] after = chunk[i+1:] break } hash ^= uint64(c) // FNV-1a is XOR then * hash *= prime64 } if i == len(chunk) { break } index := 0 negative := false if after[index] == '-' { negative = true index++ } temp := int32(after[index] - '0') index++ if after[index] != '.' { temp = temp*10 + int32(after[index]-'0') index++ } index++ // skip '.' temp = temp*10 + int32(after[index]-'0') index += 2 // skip last digit and '\n' if negative { temp = -temp } chunk = after[index:] hashIndex := int(hash & (numBuckets - 1)) for { if items[hashIndex].key == nil { // Found empty slot, add new item (copying key). key := make([]byte, len(station)) copy(key, station) items[hashIndex] = item{ key: key, stat: &r9Stats{ min: temp, max: temp, sum: int64(temp), count: 1, }, } size++ if size > numBuckets/2 { panic("too many items in hash table") } break } if bytes.Equal(items[hashIndex].key, station) { // Found matching slot, add to existing stats. s := items[hashIndex].stat s.min = min(s.min, temp) s.max = max(s.max, temp) s.sum += int64(temp) s.count++ break } // Slot already holds another key, try next slot (linear probe). hashIndex++ if hashIndex >= numBuckets { hashIndex = 0 } } } readStart = copy(buf, remaining) } result := make(map[string]*r9Stats, size) for _, item := range items { if item.key == nil { continue } result[string(item.key)] = item.stat } resultsCh <- result } Solution 10: all the previous optimizations plus faster semicolon finding and hashing // r10: all the previous optimizations plus faster semicolon finding and // hashing // // Translated from Java by Menno Finlay-Smits Ideas with ideas taken from // this fast Java solution: // // https://github.com/gunnarmorling/1brc/blob/main/src/main/java/dev/morling/onebrc/CalculateAverage_mtopolnik.java // // On my (Ben's) laptop I get these initial results: // // $ ./go-1brc -revision=1 ../1brc/data/measurements.txt >out-r1 // Processed 13156.2MB in 1m39.507011009s // $ ./go-1brc -revision=9 ../1brc/data/measurements.txt >out-r9 // Processed 13156.2MB in 2.893693843s # 34.4x as fast as the r1 above // $ ./go-1brc -revision=10 ../1brc/data/measurements.txt >out-r10 // Processed 13156.2MB in 2.497241029s # 39.8x as fast as the r1 above package main import ( "bytes" "encoding/binary" "fmt" "io" "math/bits" "os" "sort" ) const BroadcastSemicolon = 0x3B3B3B3B3B3B3B3B const Broadcast0x01 = 0x0101010101010101 const Broadcast0x80 = 0x8080808080808080 type r10Stats struct { min, max, count int32 sum int64 } func r10(inputPath string, output io.Writer) error { parts, err := splitFile(inputPath, maxGoroutines) if err != nil { return err } resultsCh := make(chan map[string]*r10Stats) for _, part := range parts { go r10ProcessPart(inputPath, part.offset, part.size, resultsCh) } totals := make(map[string]*r10Stats) for i := 0; i < len(parts); i++ { result := <-resultsCh for station, s := range result { ts := totals[station] if ts == nil { totals[station] = s continue } ts.min = min(ts.min, s.min) ts.max = max(ts.max, s.max) ts.sum += s.sum ts.count += s.count } } stations := make([]string, 0, len(totals)) for station := range totals { stations = append(stations, station) } sort.Strings(stations) fmt.Fprint(output, "{") for i, station := range stations { if i > 0 { fmt.Fprint(output, ", ") } s := totals[station] mean := float64(s.sum) / float64(s.count) / 10 fmt.Fprintf(output, "%s=%.1f/%.1f/%.1f", station, float64(s.min)/10, mean, float64(s.max)/10) } fmt.Fprint(output, "}\n") return nil } func r10ProcessPart(inputPath string, fileOffset, fileSize int64, resultsCh chan map[string]*r10Stats) { file, err := os.Open(inputPath) if err != nil { panic(err) } defer file.Close() _, err = file.Seek(fileOffset, io.SeekStart) if err != nil { panic(err) } f := io.LimitedReader{R: file, N: fileSize} type item struct { key []byte stat *r10Stats } const numBuckets = 1 << 17 // number of hash buckets (power of 2) items := make([]item, numBuckets) // hash buckets, linearly probed size := 0 // number of active items in items slice buf := make([]byte, 1024*1024) readStart := 0 for { n, err := f.Read(buf[readStart:]) if err != nil && err != io.EOF { panic(err) } if readStart+n == 0 { break } chunk := buf[:readStart+n] newline := bytes.LastIndexByte(chunk, '\n') if newline < 0 { break } remaining := chunk[newline+1:] chunk = chunk[:newline+1] chunkLoop: for { var hash uint64 var station, after []byte if len(chunk) < 8 { break chunkLoop } nameWord0 := binary.NativeEndian.Uint64(chunk) matchBits := semicolonMatchBits(nameWord0) if matchBits != 0 { // semicolon is in the first 8 bytes nameLen := calcNameLen(matchBits) nameWord0 = maskWord(nameWord0, matchBits) station = chunk[:nameLen] after = chunk[nameLen+1:] hash = calcHash(nameWord0) } else { // station name is longer so keep looking for the semicolon in // uint64 chunks nameLen := 8 hash = calcHash(nameWord0) for { if nameLen > len(chunk)-8 { break chunkLoop } lastNameWord := binary.NativeEndian.Uint64(chunk[nameLen:]) matchBits = semicolonMatchBits(lastNameWord) if matchBits != 0 { nameLen += calcNameLen(matchBits) station = chunk[:nameLen] after = chunk[nameLen+1:] break } nameLen += 8 } } index := 0 negative := false if after[index] == '-' { negative = true index++ } temp := int32(after[index] - '0') index++ if after[index] != '.' { temp = temp*10 + int32(after[index]-'0') index++ } index++ // skip '.' temp = temp*10 + int32(after[index]-'0') index += 2 // skip last digit and '\n' if negative { temp = -temp } chunk = after[index:] hashIndex := int(hash & (numBuckets - 1)) for { if items[hashIndex].key == nil { // Found empty slot, add new item (copying key). key := make([]byte, len(station)) copy(key, station) items[hashIndex] = item{ key: key, stat: &r10Stats{ min: temp, max: temp, sum: int64(temp), count: 1, }, } size++ if size > numBuckets/2 { panic("too many items in hash table") } break } if bytes.Equal(items[hashIndex].key, station) { // Found matching slot, add to existing stats. s := items[hashIndex].stat s.min = min(s.min, temp) s.max = max(s.max, temp) s.sum += int64(temp) s.count++ break } // Slot already holds another key, try next slot (linear probe). hashIndex++ if hashIndex >= numBuckets { hashIndex = 0 } } } readStart = copy(buf, remaining) } result := make(map[string]*r10Stats, size) for _, item := range items { if item.key == nil { continue } result[string(item.key)] = item.stat } resultsCh <- result } func calcNameLen(b uint64) int { return (bits.TrailingZeros64(b) >> 3) } func calcHash(word uint64) uint64 { return bits.RotateLeft64(word*0x51_7c_c1_b7_27_22_0a_95, 17) } func semicolonMatchBits(word uint64) uint64 { diff := word ^ BroadcastSemicolon return (diff - Broadcast0x01) & (^diff & Broadcast0x80) } func maskWord(word, matchBits uint64) uint64 { mask := matchBits ^ (matchBits - 1) return word & mask } The One Billion Row Challenge https://github.com/gunnarmorling/1brc/tree/main/src/main/go/AlexanderYastrebov Alexander Yastrebov package main import ( "bytes" "fmt" "log" "math" "os" "runtime" "sort" "sync" "syscall" ) type measurement struct { min, max, sum, count int64 } func main() { if len(os.Args) != 2 { log.Fatalf("Missing measurements filename") } measurements := processFile(os.Args[1]) ids := make([]string, 0, len(measurements)) for id := range measurements { ids = append(ids, id) } sort.Strings(ids) fmt.Print("{") for i, id := range ids { if i > 0 { fmt.Print(", ") } m := measurements[id] fmt.Printf("%s=%.1f/%.1f/%.1f", id, round(float64(m.min)/10.0), round(float64(m.sum)/10.0/float64(m.count)), round(float64(m.max)/10.0)) } fmt.Println("}") } func processFile(filename string) map[string]*measurement { f, err := os.Open(filename) if err != nil { log.Fatalf("Open: %v", err) } defer f.Close() fi, err := f.Stat() if err != nil { log.Fatalf("Stat: %v", err) } size := fi.Size() if size <= 0 || size != int64(int(size)) { log.Fatalf("Invalid file size: %d", size) } data, err := syscall.Mmap(int(f.Fd()), 0, int(size), syscall.PROT_READ, syscall.MAP_SHARED) if err != nil { log.Fatalf("Mmap: %v", err) } defer func() { if err := syscall.Munmap(data); err != nil { log.Fatalf("Munmap: %v", err) } }() return process(data) } func process(data []byte) map[string]*measurement { nChunks := runtime.NumCPU() chunkSize := len(data) / nChunks if chunkSize == 0 { chunkSize = len(data) } chunks := make([]int, 0, nChunks) offset := 0 for offset < len(data) { offset += chunkSize if offset >= len(data) { chunks = append(chunks, len(data)) break } nlPos := bytes.IndexByte(data[offset:], '\n') if nlPos == -1 { chunks = append(chunks, len(data)) break } else { offset += nlPos + 1 chunks = append(chunks, offset) } } var wg sync.WaitGroup wg.Add(len(chunks)) results := make([]map[string]*measurement, len(chunks)) start := 0 for i, chunk := range chunks { go func(data []byte, i int) { results[i] = processChunk(data) wg.Done() }(data[start:chunk], i) start = chunk } wg.Wait() measurements := make(map[string]*measurement) for _, r := range results { for id, rm := range r { m := measurements[id] if m == nil { measurements[id] = rm } else { m.min = min(m.min, rm.min) m.max = max(m.max, rm.max) m.sum += rm.sum m.count += rm.count } } } return measurements } func processChunk(data []byte) map[string]*measurement { // Use fixed size linear probe lookup table const ( // use power of 2 for fast modulo calculation, // should be larger than max number of keys which is 10_000 entriesSize = 1 << 14 // use FNV-1a hash fnv1aOffset64 = 14695981039346656037 fnv1aPrime64 = 1099511628211 ) type entry struct { m measurement hash uint64 vlen int value [128]byte // use power of 2 > 100 for alignment } entries := make([]entry, entriesSize) entriesCount := 0 // keep short and inlinable getMeasurement := func(hash uint64, value []byte) *measurement { i := hash & uint64(entriesSize-1) entry := &entries[i] // bytes.Equal could be commented to speedup assuming no hash collisions for entry.vlen > 0 && !(entry.hash == hash && bytes.Equal(entry.value[:entry.vlen], value)) { i = (i + 1) & uint64(entriesSize-1) entry = &entries[i] } if entry.vlen == 0 { entry.hash = hash entry.vlen = copy(entry.value[:], value) entriesCount++ } return &entry.m } // assume valid input for len(data) > 0 { idHash := uint64(fnv1aOffset64) semiPos := 0 for i, b := range data { if b == ';' { semiPos = i break } // calculate FNV-1a hash idHash ^= uint64(b) idHash *= fnv1aPrime64 } idData := data[:semiPos] data = data[semiPos+1:] var temp int64 // parseNumber { negative := data[0] == '-' if negative { data = data[1:] } _ = data[3] if data[1] == '.' { // 1.2\n temp = int64(data[0])*10 + int64(data[2]) - '0'*(10+1) data = data[4:] // 12.3\n } else { _ = data[4] temp = int64(data[0])*100 + int64(data[1])*10 + int64(data[3]) - '0'*(100+10+1) data = data[5:] } if negative { temp = -temp } } m := getMeasurement(idHash, idData) if m.count == 0 { m.min = temp m.max = temp m.sum = temp m.count = 1 } else { m.min = min(m.min, temp) m.max = max(m.max, temp) m.sum += temp m.count++ } } result := make(map[string]*measurement, entriesCount) for i := range entries { entry := &entries[i] if entry.m.count > 0 { result[string(entry.value[:entry.vlen])] = &entry.m } } return result } func round(x float64) float64 { return roundJava(x*10.0) / 10.0 } // roundJava returns the closest integer to the argument, with ties // rounding to positive infinity, see java's Math.round func roundJava(x float64) float64 { t := math.Trunc(x) if x < 0.0 && t-x == 0.5 { //return t } else if math.Abs(x-t) >= 0.5 { t += math.Copysign(1, x) } if t == 0 { // check -0 return 0.0 } return t } // parseNumber reads decimal number that matches "^-?[0-9]{1,2}[.][0-9]" pattern, // e.g.: -12.3, -3.4, 5.6, 78.9 and return the value*10, i.e. -123, -34, 56, 789. func parseNumber(data []byte) int64 { negative := data[0] == '-' if negative { data = data[1:] } var result int64 switch len(data) { // 1.2 case 3: result = int64(data[0])*10 + int64(data[2]) - '0'*(10+1) // 12.3 case 4: result = int64(data[0])*100 + int64(data[1])*10 + int64(data[3]) - '0'*(100+10+1) } if negative { return -result } return result } https://github.com/aytechnet/1brc François Pons package main import ( "bytes" "flag" "fmt" "log" "os" "runtime" "runtime/pprof" "slices" "sync" "syscall" "sync/atomic" "github.com/aytechnet/decimal" ) type ( measurement struct { hash atomic.Uint64 minT atomic.Int64 maxT atomic.Int64 sumT atomic.Int64 countT atomic.Int64 nameLen int nameBuf [208]byte } measurements struct { total atomic.Int64 numParsers int results [capacity]measurement } job struct { maxOffset atomic.Int64 bufLen int buf [bufSize]byte } ) const ( delta = 439 capacity = 1 << 16 // must be a power of 2 for modulo calculation // buffer size bufSize = 512 * 1024 // 1Mb // use FNV-1a hash fnv1aOffset64 = 14695981039346656037 fnv1aPrime64 = 1099511628211 ) func main() { var mode, filename, cpuprofile string var res measurements flag.StringVar(&mode, "mode", "default", "Which mode to use among 'mmap', 'seq' and 'default'") flag.StringVar(&filename, "file", "", "Measurements file to use") flag.StringVar(&cpuprofile, "cpuprofile", "", "Write cpuprofile to file") flag.IntVar(&res.numParsers, "parsers", runtime.NumCPU(), "Number of thread to use for parsing") flag.Parse() if filename == "" { flag.Usage() os.Exit(1) } if cpuprofile != "" { if f, err := os.Create(cpuprofile); err != nil { log.Fatal(err) } else { pprof.StartCPUProfile(f) defer pprof.StopCPUProfile() } } f, err := os.Open(filename) if err != nil { log.Fatalf("Open: %v", err) } defer f.Close() switch mode { case "mmap": res.processMmap(f) case "seq": res.processSeq(f) default: res.process(f) } } func (res *measurements) processMmap(f *os.File) { jobs := make([]job, res.numParsers) fi, err := f.Stat() if err != nil { log.Fatalf("Stat: %v", err) } size := fi.Size() chunkSize := size/int64(len(jobs)) if chunkSize <= 100 { log.Fatalf("Invalid file size: %d", size) } data, err := syscall.Mmap(int(f.Fd()), 0, int(size), syscall.PROT_READ, syscall.MAP_SHARED) if err != nil { log.Fatalf("Mmap: %v", err) } defer func() { if err := syscall.Munmap(data); err != nil { log.Fatalf("Munmap: %v", err) } }() offset := chunkSize for i := range jobs { j := &jobs[i] if i == len(jobs)-1 { j.maxOffset.Store(size) } else { j.maxOffset.Store(-offset) } offset += chunkSize } var wg sync.WaitGroup wg.Add(len(jobs)) offset = 0 for i := range jobs { go func(i int, offset int64){ defer wg.Done() j := &jobs[i] maxOffset := j.maxOffset.Load() if maxOffset < 0 { maxOffset = -maxOffset if nlPos := bytes.IndexByte(data[maxOffset:], '\n'); nlPos >= 0 { maxOffset += int64(nlPos+1) } } if offset > 0 { if nlPos := bytes.IndexByte(data[offset:maxOffset], '\n'); nlPos >= 0 { offset += int64(nlPos+1) } } res.processChunk(data[offset:maxOffset]) }(i, offset) offset += chunkSize } wg.Wait() res.printResults() } func (res *measurements) processSeq(f *os.File) { jobs := make([]job, res.numParsers) free_jobs := make(chan *job, len(jobs)) ready_jobs := make(chan *job, len(jobs)) var wg sync.WaitGroup wg.Add(len(jobs)) for i := range jobs { j := &jobs[i] free_jobs <- j go func(){ for j := range ready_jobs { res.processChunk(j.buf[:j.bufLen]) free_jobs <- j } wg.Done() }() } var prev_j *job pos := 0 nlPos := 0 for j := range free_jobs { if count, _ := f.Read(j.buf[pos:]); count > 0 { // finalize previous line from previous job if pos > 0 /* && prev_j != nil */ { copy(j.buf[:pos], prev_j.buf[bufSize-pos:]) } // prepare next buffer if nlPos = bytes.LastIndexByte(j.buf[:pos+count], '\n'); nlPos < 0 { log.Fatalf("buffer too small for a complete line"); } else { pos = pos + count - nlPos - 1 prev_j = j j.bufLen = nlPos + 1 } // spawn a new job on this buffer ready_jobs <- j } else { break } } close(ready_jobs) wg.Wait() close(free_jobs) res.printResults() } func (res *measurements) process(f *os.File) { jobs := make([]job, res.numParsers) fi, err := f.Stat() if err != nil { log.Fatalf("Stat: %v", err) } size := fi.Size() chunkSize := size/int64(len(jobs)) if chunkSize <= 100 { log.Fatalf("Invalid file size: %d", size) } offset := chunkSize for i := range jobs { j := &jobs[i] if i == len(jobs)-1 { j.maxOffset.Store(size) } else { j.maxOffset.Store(-offset) } offset += chunkSize } var wg sync.WaitGroup wg.Add(len(jobs)) offset = 0 for i := range jobs { go func(i int, offset int64){ defer wg.Done() j := &jobs[i] nlSkipFirst := offset > 0 for { maxLen := bufSize maxOffset := j.maxOffset.Load() if maxOffset < 0 { maxOffset = -maxOffset } if offset + int64(maxLen) > maxOffset { maxLen = int(maxOffset - offset) } if count, _ := f.ReadAt(j.buf[:maxLen], offset); count > 0 { pos := 0 if nlSkipFirst { if nlPos := bytes.IndexByte(j.buf[:maxLen], '\n'); nlPos >= 0 { pos = nlPos + 1 jobs[i-1].maxOffset.Store(offset + int64(pos)) nlSkipFirst = false } else { log.Fatalf("Unable to seek to next line at job n°%d", i) } } if nlPos := bytes.LastIndexByte(j.buf[pos:maxLen], '\n'); nlPos >= 0 { j.bufLen = pos + nlPos + 1 offset += int64(j.bufLen) res.processChunk(j.buf[pos:j.bufLen]) } else { log.Fatalf("Buffer too small at job n°%d", i) } } else { return } maxOffset = j.maxOffset.Load() for maxOffset < 0 { maxOffset = j.maxOffset.Load() } if offset >= maxOffset { return } } }(i, offset) offset += chunkSize } wg.Wait() res.printResults() } func (res *measurements) printResults() { log.Printf("Read %d entries", res.total.Load()) ids := make([]int, 0, capacity) for i := range res.results { m := &res.results[i] if m.nameLen > 0 { ids = append(ids, i) } } slices.SortFunc(ids, func(a,b int) int { return bytes.Compare(res.results[a].nameBuf[0:res.results[a].nameLen], res.results[b].nameBuf[0:res.results[b].nameLen]) }) count := 0 fmt.Print("{") for _, i := range ids { m := &res.results[i] var buf [128]byte // name is 100 chars max, each 3 decimals is 5 bytes max on output b := buf[0:0] if count > 0 { b = append(b, ',', ' ') } b = append(b, m.nameBuf[0:m.nameLen]...) b = append(b, '=') b = decimal.New(m.minT.Load(), -1).BytesToFixed(b, 1) b = append(b, '/') b = decimal.New(m.sumT.Load(), -1).Div(decimal.NewFromInt(m.countT.Load()-1)).BytesToFixed(b, 1) b = append(b, '/') b = decimal.New(m.maxT.Load(), -1).BytesToFixed(b, 1) count++ fmt.Print(string(b)) // fmt.Printf("%s=%s/%s/%s", m.nameBuf[0:m.nameLen], decimal.New(m.minT.Load(), -1).StringFixed(1), decimal.New(m.sumT.Load(), -1).Div(decimal.NewFromInt(m.countT.Load()-1)).StringFixed(1), decimal.New(m.maxT.Load(), -1).StringFixed(1)) count++ } fmt.Println("}") } func (res *measurements) processChunk(data []byte) { var total int64 // assume valid input for len(data) > 0 { i := 0 // compute FNV-1a hash idHash := uint64(fnv1aOffset64) for j, b := range data { if b == ';' { i = j break } // calculate FNV-1a hash idHash ^= uint64(b) idHash *= fnv1aPrime64 } if idHash == 0 { idHash = uint64(len(data)) } idData := data[:i] i++ // now i points to temperature var temp int64 // parseNumber { negative := data[i] == '-' if negative { i++ } temp = int64(data[i]-'0') i++ if data[i] != '.' { temp = temp*10 + int64(data[i]-'0') i++ } i++ // data[i] is '.' temp = temp*10 + int64(data[i]-'0') if negative { temp = -temp } data = data[i+2:] } // get measurement { i := idHash & uint64(capacity-1) entry := &res.results[i] for { if entry.hash.CompareAndSwap(0, idHash) { // make sure no race occurs as entry may be updated meanwhile as hash has been updated if entry.countT.Add(1) == 1 { entry.nameLen = len(idData) copy(entry.nameBuf[:], idData) entry.minT.Store(temp) entry.maxT.Store(temp) entry.sumT.Store(temp) entry.countT.Add(1) // unlock for update below } else { // wait for countT to be at least 2 for entry init to be complete for entry.countT.Load() < 2 {} // update existing entry minT := entry.minT.Load() for minT > temp { if entry.minT.CompareAndSwap(minT, temp) { break } else { minT = entry.minT.Load() } } maxT := entry.maxT.Load() for maxT < temp { if entry.maxT.CompareAndSwap(maxT, temp) { break } else { maxT = entry.maxT.Load() } } entry.sumT.Add(temp) entry.countT.Add(1) } break } else if entry.hash.Load() == idHash { // the entry is found and may be being updated by another thread // wait for countT to be at least 2 for entry init to be complete for entry.countT.Load() < 2 {} // now that name has been updated, check it is matching if len(idData) == entry.nameLen /* bytes.Compare(idData, entry.nameBuf[0:entry.nameLen]) == 0 */ { // update existing entry minT := entry.minT.Load() for minT > temp { if entry.minT.CompareAndSwap(minT, temp) { break } else { minT = entry.minT.Load() } } maxT := entry.maxT.Load() for maxT < temp { if entry.maxT.CompareAndSwap(maxT, temp) { break } else { maxT = entry.maxT.Load() } } entry.sumT.Add(temp) entry.countT.Add(1) break } else { // name does not match idData so jump to next entry i = (i + delta) & uint64(capacity-1) entry = &res.results[i] } } else { i = (i + delta) & uint64(capacity-1) entry = &res.results[i] } } } total++ } res.total.Add(total) } func (entry *measurement) update(temp int64) { minT := entry.minT.Load() for minT > temp { if entry.minT.CompareAndSwap(minT, temp) { break } else { minT = entry.minT.Load() } } maxT := entry.maxT.Load() for maxT < temp { if entry.maxT.CompareAndSwap(maxT, temp) { break } else { maxT = entry.maxT.Load() } } entry.sumT.Add(temp) entry.countT.Add(1) }