This winter Gunnar Morling launched the One Billion Row Challenge. The mission was to process a text file with 1B of temperature measurements:

Hamburg;12.0
Bulawayo;8.9
Palembang;38.8
St. John's;15.2
Cracow;12.6
...

We should calculate the average, min, max values per station, sort them alphabetically, and output it like so:

{Abha=5.0/18.0/27.4, Abidjan=15.7/26.0/34.1, ...

Rules and limits are described in the original repository.

While I missed the deadline for the challenge, it allowed me to explore the problem at my own pace. Here I briefly describe my solution. Also, you can find the code here.
Anyway, this challenge was an excuse to play with go tool pprof and learn something new.

TL;DR

There are two datasets: standard (413 unique stations, 1B records) and extended (10k unique stations, 1B records). I’ve benchmarked my code on the cheap dedicated server with AMD Ryzen 5 3600 6-Core. Then I compared my solution with other Go solutions that I found and with the fastest Java implementation by @thomaswue:

Submitter 1B and 413 stations, seconds 1B and 10k stations, seconds
elh 8.792 13.686
lunemec 8.288 14.232
yastrebov 4.547 7.482
benhoyt 4.468 5.182
my solution 4.132 6.345
thomaswue 1.424 4.379

How I tested and benchmarked

  1. All tests from the original challenge should be passed
  2. Benchmarks should be running on the dedicated server, otherwise they will be unstable. For these purposes, I rented Hetzner AX41 with AMD Ryzen 5 3600 6-Core (Zen2), 64 GB DDR4.
  3. CPU profiling and benchmarks should be running separately.
  4. Programs are run from RAM to get rid of IO overhead (like in the original challenge).
  5. SMT and Turbo Boost should be disabled (like in the original challenge). You can do it with these commands:
    echo off | tee /sys/devices/system/cpu/smt/control
    echo 0 | tee /sys/devices/system/cpu/cpufreq/boost
    
  6. The hyperfine program is used for measuring execution time:
    hyperfine --warmup 0 --runs 5 --export-json=timing.json --show-output "./bin/solution ../measurements.txt"
    
  7. This command were used for benchmarking:
    go test -run '^$' -bench '^Benchmark_process$' -benchtime 10s -count 6 -cpu 4
    
    Results should be compared by the benchstat program.

Table of results

Below is a table with all my optimisation steps from slowest to fastest:

Description 1B and 413 stations, seconds 1B and 10k stations, seconds
Baseline 365.244 478.238
Simplified string split 219.192 340.735
Improved float parsing 198.809 309.275
Mmap syscall 163.017 279.218
Faster bytes to string conversion 82.697 137.846
Numerical keys in map 51.433 75.089
Concurrency 8.834 21.758
Custom hash table 6.350 9.309
Integers instead of float 6.024 8.787
Bitwise search 4.132 6.345

Optimizations

Baseline implementation

In the baseline implementation, I read the file with bufio.Scanner line by line, split string with strings.Split(), and parsed float values with strconv.ParseFloat(). All measurements were stored in a map with string keys. As expected, it’s very slow and took 6 minutes to parse the dataset with 413 unique stations.

On the CPU profile above, you can see our main bottlenecks: string operations (split, parse float) and bufio. First of all, we’ll get rid of them.

Simplified string split

The standard split function checks some edge cases, contains a couple of nested loops, and allocates a slice on each call. To address these limitations and improve performance, we can replace it with a simple for loop:

func split(s string) (string, string) {
	for i := len(s) - 1; i >= 0; i-- {
		if s[i] == ';' {
			return s[:i], s[i+1:]
		}
	}

	return "", ""
} 

The benchstat command confirms a performance boost:

cpu: AMD Ryzen 5 3600 6-Core Processor              
           │ profiles/naive/benchmark.txt │ profiles/customSplit/benchmark.txt │
           │            sec/op            │   sec/op     vs base               │
_process-4                    373.0m ± 4%   243.5m ± 3%  -34.72% (p=0.002 n=6)

           │ profiles/naive/benchmark.txt │ profiles/customSplit/benchmark.txt  │
           │             B/op             │     B/op      vs base               │
_process-4                   47.35Mi ± 0%   16.83Mi ± 0%  -64.46% (p=0.002 n=6)

           │ profiles/naive/benchmark.txt │ profiles/customSplit/benchmark.txt │
           │          allocs/op           │  allocs/op   vs base               │
_process-4                    2.002M ± 0%   1.002M ± 0%  -49.95% (p=0.002 n=6)

Improved float parsing

Like the other standard functions, strconv.ParseFloat also contains a lot of edge case checks inside. In our case, we have only valid values between -99.9 and 99.9, with one number after the decimal separator. So, we can create a custom float parser (commit):

func parseFloat64(s string) float64 {
	var (
		sign   float64 = 1
		result float64
	)

	if s[0] == '-' {
		sign = -1
		s = s[1:]
	}

	if len(s) == 3 {
		result = float64(s[0]-'0') + float64(s[2]-'0')*0.1
	} else {
		result = float64(s[0]-'0')*10 + float64(s[1]-'0') + float64(s[3]-'0')*0.1
	}

	return sign * result
}

Here we take each character and subtract an ASCII value for 0. The ASCII value for the character 0 is 0x30, 1 is 0x31, and so on. This way, we can easily convert a character to the corresponding value.

mmap syscall instead of bufio.Scanner

By using the mmap syscall, we can map the entire file directly into virtual memory in a single call. This eliminates the need for numerous read calls that would be required to process the file line by line. Also, we avoid redundant copying from the kernel to the buffer in user space. After the mapping, we can work with the file data as a slice of bytes (commit):

import "golang.org/x/sys/unix"

func getMeasurements(filePath string) (map[string]cityMeasurement, error) {
	f, err := os.Open(filePath)
	...

	stat, err := f.Stat()
	...

	data, err := unix.Mmap(int(f.Fd()), 0, int(stat.Size()), unix.PROT_READ, unix.MAP_SHARED)
	...

	defer unix.Munmap(data)

	// type of data is []byte
}

Of course, there are some reasons why mmap may be slower than sequental read, so we should benchmark it.

Faster bytes to string conversion

If we look at the CPU profile, we will see how much time we spend on runtime.slicebytetostring:

This function converts byte slices into Go strings. However, Go strings are immutable and internally look like this:

type StringHeader struct {
    Data uintptr // pointer to slice with bytes
    Len  int
}

The bytes to string conversion is equivalent to creating a new byte slice with the same number of elements, copying all bytes to a new byte, and then returning the string from it. We can manually point to the slice with bytes and return the string:

func bytesToString(b []byte) string {
    return unsafe.String(unsafe.SliceData(b), len(b))
}

This technique was also used in Kubernetes or in Prometheus. There is a just thing here: the slice b (and the underlying array) should not be changed or cleared. Otherwise, we have a chance to receive a nondeterministic result or SIGSEGV. So I’ve moved the mmap call to the process function (commit).

goos: linux
goarch: amd64
pkg: github.com/agoosev/1brc
cpu: AMD Ryzen 5 3600 6-Core Processor              
           │ profiles/mmap/benchmark.txt │ profiles/fastString/benchmark.txt  │
           │           sec/op            │   sec/op     vs base               │
_process-4                  160.43m ± 3%   75.22m ± 2%  -53.11% (p=0.002 n=6)

           │ profiles/mmap/benchmark.txt │  profiles/fastString/benchmark.txt  │
           │            B/op             │     B/op      vs base               │
_process-4                10511.7Ki ± 0%   917.9Ki ± 0%  -91.27% (p=0.002 n=6)

           │ profiles/mmap/benchmark.txt │ profiles/fastString/benchmark.txt  │
           │          allocs/op          │  allocs/op   vs base               │
_process-4                1002.083k ± 0%   2.076k ± 0%  -99.79% (p=0.002 n=6)

Among other things, we reduced amount of allocations.

Numerical keys in map

Now, our main bottleneck is access to the map: mapassign_* and mapaccess* functions. In Golang, map type uses a hash table implementation with an array of buckets. Whenewer we write or read data from the map with a string key, we calculate a hash, define a bucket, and then define the position of the element in this bucket. Because strings in Golang are immutable, I’ve assumed we have an additional cost here, and if we replace the string key with a numerical hash, we receive a performance boost. For hash calculation I used the FNV-1a function which is easy to implement (commit):

const (
	fnv1aOffset uint64 = 0xcbf29ce484222325
	fnv1aPrime  uint64 = 0x100000001b3
)

func stringHash(s string) uint64 {
	hash := fnv1aOffset
	for _, b := range []byte(s) {
		hash ^= uint64(b)
		hash *= fnv1aPrime
	}

	return hash
}

Concurrency

The next obvious step - read the file concurrently. I’ve tried different variants of concurrency (a few workers with a channel synchronization, or a few workers with one storage protected by mutex), but all of them significantly slowed the code. Therefore, we’ll split the slice with data into chunks, and each worker will read his chunk and save the result in their own map. Then, we’ll merge these maps into one. Like this (commit):

func getMeasurements(data []byte) (map[uint64]cityMeasurement, error) {
	var wg sync.WaitGroup

	workersCount := runtime.GOMAXPROCS(0)
	chunkSize := len(data) / workersCount
	if chunkSize == 0 {
		chunkSize = len(data)
	}

	borders := getBorders(data, chunkSize, workersCount) // here we split data to workersCount chunks

	start := 0
	results := make([]map[uint64]cityMeasurement, len(borders))
	for i, border := range borders {
		wg.Add(1)

		go func(workerNumber, start, end int) {
			results[workerNumber] = processChunk(data[start:end]) // process each chunk in personal worker and save the result in the map
			wg.Done()
		}(i, start, border)

		start = border
	}

	wg.Wait()
	results = filterResults(results)

	if len(results) > 1 {
		for _, storage := range results[1:] {
			... //merge all maps with map from worker #0
		}

	}

	return results[0], nil
}

Custom hash table instead of maps

I decided to replace maps with custom hash tables, and also I saw a similar approach in other solutions. The idea is to put all data in one slice and use open addressing to handle collisions (commit). The capacity of this custom hash table should be a power of two and greater than the number of elements inside.


const storageCapacity = 16384

func getIndex(hash uint64, s []*cityMeasurement) (uint64, bool) {
	// faster version of hash % storageCapacity. 
	// Works only if storageCapacity is power of 2
	index := hash & (storageCapacity - 1) 

	for s[index] != nil && s[index].hash != hash { 
		index = (index + 1) & (storageCapacity - 1)
	}

	return index, s[index] != nil
}

If we have a collision (s[index].hash != hash), we increment the index and check the alternative location in the hash table until the correct element or unused slot is found. This technique is known as a “Linear probing”. You can read more about open addressing and collision resolution techniques here.

As you can see, this approach worked:

goos: linux
goarch: amd64
pkg: github.com/agoosev/1brc
cpu: AMD Ryzen 5 3600 6-Core Processor              
           │ profiles/concurrency/benchmark.txt │ profiles/openAddressing/benchmark.txt │
           │               sec/op               │     sec/op      vs base               │
_process-4                         14.104m ± 1%      9.548m ± 3%  -32.30% (p=0.002 n=6)

           │ profiles/concurrency/benchmark.txt │ profiles/openAddressing/benchmark.txt │
           │                B/op                │      B/op       vs base               │
_process-4                         3.936Mi ± 0%     1.149Mi ± 0%  -70.81% (p=0.002 n=6)

           │ profiles/concurrency/benchmark.txt │ profiles/openAddressing/benchmark.txt │
           │             allocs/op              │   allocs/op     vs base               │
_process-4                          2.102k ± 0%      3.745k ± 0%  +78.16% (p=0.002 n=6)

Integers instead of float

Next, I assumed arithmetic operations on integers will be faster than on floats. Each temperature value has only one decimal digit, so we can parse it as and integer and divide by 10 in the end. I replaced parseFloat64 to parseInt32 (commit):

func parseInt32(b []byte) int32 {
	var (
		sign   int32 = 1
		result int32
	)

	if b[0] == '-' {
		sign = -1
		b = b[1:]
	}

	if len(b) == 3 {
		result = int32(b[0]-'0')*10 + int32(b[2]-'0')
	} else {
		result = int32(b[0]-'0')*100 + int32(b[1]-'0')*10 + int32(b[3]-'0')
	}

	return sign * result
}

Again, I saw this approach in other solutions, and in the perfect book “Hacker’s Delight” by Henry S. Warren.

// word := *(*uint64)(unsafe.Pointer(&data[i]))
func findPosition(word uint64, symbol byte, offset int) (int, bool) {
	var mask uint64 = 0x101010101010101 * uint64(symbol)
	xorResult := word ^ mask
	found := (xorResult - 0x0101010101010101) &^ xorResult & 0x8080808080808080
	if found == 0 {
		return 0, false
	}

	result := ((((found - 1) & 0x101010101010101) * 0x101010101010101) >> 56) - 1

	return int(result) + offset, true
}

With this function, we can do an almost branchless search for the symbol we need. A description of how it works deserves a separate post.

We have accelerated our code because now we have fewer branches in the main cycle and we read by 8 bytes on each iteration. Also here I moved to maphash hash function instead of fnv1a, because now it’s possible to calculate the hash for the whole city name. The maphash looks fast enough to not worry about it. The commit with changes is here.

Conclusion

Thanks to this challenge, I learned something new or confirmed my assumptions. This is my summary:

  • Never benchmark on the laptop or in the cloud if you want to receive more or less accurate results. There are a lot of factors that can affect the result: temperature throttling, other applications, “noisy neighbor” in the cloud, etc. A lot about effective benchmarking was written by Bartlomiej Plotka in his book “Efficient Go”.
  • Standard Golang functions are ok, but they usually handle a lot of edge cases. An algorithm with fewer edge cases may be faster.
  • Concurrency synchronization (mutexes, channels) is not free, and it’s always a trade-off between usability and performance.
  • Almost always we have a trade-off between performance optimizations and readability (yes, findPosition, it’s about you).

I also found a couple of interesting topics that I want to dive into:

  • Improving performance with SIMD
  • Different bitwise tricks. In other solutions, I saw an algorithm for branchless integer parsing, but it didn’t work for me. It’s a good reason to figure out with that.