Introduce to the implement of Whisper: the time-serial database

TL;DR This article show how Whisper work and some Linux programming tricks it used.

What’s the Whisper

Many people familiar with the famous metric monitor system: graphite

Here are some introduce from official website of graphite:

Graphite is an enterprise-scale monitoring tool that runs well on cheap hardware.

Graphite does two things:

  • Store numeric time-series data
  • Render graphs of this data on demand

Whisper is one of the core parts of graphite. There are two components of “Store numeric time-series data”: One for receive the data from network which is the job of Carbon, another for write the data into physical disk (like a database) which is the job of Whisper.

Officially, Whisper is a time-serial database or library which design for graphite project, but still very university as a time-serial database.

Important:

Follow code / structure analysis based on whisper==0.9.10, different version may has different result.

Overview the structure of the Whisper database

Summary

Whisper is a single binary file database, each file have three parts: Header / Archives / Data. The reference implement (aka Whisper in Graphite) was wrote in Python with manipulate binary file with struct library.

Header of Whisper used to record meta-information such as aggregationType / maxRetention / xff / archiveCount

aggregationType

Data type: long int (aka ‘L’ in struct format). aggregationType control how the higher precision aggregate to lower precision.

1
2
3
4
5
6
7
8
aggregationTypeToMethod = dict({
1: 'average',
2: 'sum',
3: 'last',
4: 'max',
5: 'min',
6: 'avg_zero'
})

maxRetention

Data type: long int. maxRetention show the max retention this database can hold. The unit of this field is second.

xff

xff is the abbreviation for xFilesFactor. Data type: Float (aka ‘l’ in struct format). When higher precision data aggregate to lower precision, if the proportion of valid data lower than this value, the result of aggregation will set to None.

archiveCount

Data type: long int. archiveCount are used to count the number of archive.

Archives

summary

Archives are stored the meta-information of Data or Data is a part of Archives. Different archive means different precision and retention for data store.

Archives field have several strict limitation:

  • At least has one archive
  • Archive’s precision must strictly monotonically after ordered
  • Precision of lower archive must be a multiple of higher’s (not equal)
  • Archive’s retention must strictly monotonically too, the lower precision the longer retention
  • Archive of higher precision must have enough point to consolidate archive of lower precision

Here comes an example for the last limitation:
Higher: 1s/20
Lower: 60s/1

This example fits first four limitation, but not the last one.

Those limitations are checked by:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
if not archiveList:
raise InvalidConfiguration("You must specify at least one archive configuration!")

archiveList.sort(key=lambda a: a[0]) # Sort by precision (secondsPerPoint)

for i, archive in enumerate(archiveList):
if i == len(archiveList) - 1:
break

nextArchive = archiveList[i + 1]
if not archive[0] < nextArchive[0]:
raise InvalidConfiguration("A Whisper database may not be configured having "
"two archives with the same precision (archive%d: %s, archive%d: %s)" %
(i, archive, i + 1, nextArchive))

if nextArchive[0] % archive[0] != 0:
raise InvalidConfiguration("Higher precision archives' precision "
"must evenly divide all lower precision archives' precision "
"(archive%d: %s, archive%d: %s)" %
(i, archive[0], i + 1, nextArchive[0]))

retention = archive[0] * archive[1]
nextRetention = nextArchive[0] * nextArchive[1]

if not nextRetention > retention:
raise InvalidConfiguration("Lower precision archives must cover "
"larger time intervals than higher precision archives "
"(archive%d: %s seconds, archive%d: %s seconds)" %
(i, retention, i + 1, nextRetention))

archivePoints = archive[1]
pointsPerConsolidation = nextArchive[0] // archive[0]
if not archivePoints >= pointsPerConsolidation:
raise InvalidConfiguration("Each archive must have at least enough points "
"to consolidate to the next archive (archive%d consolidates %d of "
"archive%d's points but it has only %d total points)" %
(i + 1, pointsPerConsolidation, i, archivePoints))

structure

Archives have three fields: offset / secondsPerPoint / points

offset

Data type: long int. offset indicate the offset from the very begin of file.

secondsPerPoint

Data type: long int. secondsPerPoint is the precision of archive. Obviously the max precision is one-second per data point.

points

Data type: long int. points indicate the capacity of this archive.

derivative

Few derivative attribution, not exists in the fields but can be compute form those fields

retention

retention = points * secondsPerPoint

size

Size of the Data field in physical occupation.

size = points * pointSize

pointSize will covered in the Data part, it is a fix size in Whisper.

Data

summary

Data part are the actual data store part. Those parts are compose by Point which contain time and value information.

Point

each point have two parts: Interval, data

Interval

Data type: Long int, UNIX timestamp

Data

Data type: Double (aka ‘d’ in struct format), store the metric value

Conclusion

ASCII Art Chart

1
2
3
4
5
6
7
8
9
10
11
12
13
14
+-------------------------------------------------------------------------+
|AT|MR|xff|AC|offset|SPP|points| ... |Interval|data| ... |
+-------------------------------------------------------------------------+
| | Archive One | ... | Point One | ... |
+-------------------------------------------------------------------------+
| Header | Archives | Data |
+-------------------------------------------------------------------------+
| Whisper file |
+-------------------------------------------------------------------------+

AT: aggregationType
MR: maxRetention
AC: archiveCount
SPP: secondsPerPoint

Create process

pre-requires

There are two acquire pre-requires:

  • path : the path of the database on disk
  • archiveList : List of all the archive, can not modified after create

Two optional pre-requires:

  • xFilesFactor = None
  • aggregationMethod = None

check archiveList

1
2
# Validate archive configurations...
validateArchiveList(archiveList)

Trap on the implementation

Function validateArchiveList seems only check the validation of archiveList but actually also change the archiveList by execute order operation archiveList.sort(key=lambda a: a[0]). This is not a good design, if people don’t look at the implementation of validateArchiveList, their will never know archiveList already sorted. And because fallow code depend on the order of archiveList, if you don’t know this, you will don’t understand how the code works.

Also see summary section of Archives part

Write Header

1
2
3
4
5
6
7
aggregationType = struct.pack(longFormat, aggregationMethodToType.get(aggregationMethod, 1))
oldest = max([secondsPerPoint * points for secondsPerPoint, points in archiveList])
maxRetention = struct.pack(longFormat, oldest)
xFilesFactor = struct.pack(floatFormat, float(xFilesFactor))
archiveCount = struct.pack(longFormat, len(archiveList))
packedMetadata = aggregationType + maxRetention + xFilesFactor + archiveCount
fh.write(packedMetadata)

Write ArchiveList

The key point of this process is the offset computing.

1
2
3
4
5
6
7
headerSize = metadataSize + (archiveInfoSize * len(archiveList))
archiveOffsetPointer = headerSize

for secondsPerPoint, points in archiveList:
archiveInfo = struct.pack(archiveInfoFormat, archiveOffsetPointer, secondsPerPoint, points)
fh.write(archiveInfo)
archiveOffsetPointer += (points * pointSize)

Write Data

Because the database is just create, so there are no real data, the process will write some fake data: \x00.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
if CAN_FALLOCATE and useFallocate:
remaining = archiveOffsetPointer - headerSize
fallocate(fh, headerSize, remaining)
elif sparse:
fh.seek(archiveOffsetPointer - 1)
fh.write(b'\x00')
else:
remaining = archiveOffsetPointer - headerSize
chunksize = 16384
zeroes = b'\x00' * chunksize
while remaining > chunksize:
fh.write(zeroes)
remaining -= chunksize
fh.write(zeroes[:remaining])

IO optimization

As you see, Whisper use several method to optimize the IO operation

Fallocate

This is a Linux-specific system call. The function prototype is int fallocate(int fd, int mode, off_t offset, off_t len);. fallocate() allows the caller to directly manipulate the allocated disk space for the file referred to by fd for the byte range starting at offset and continuing for len bytes.

For more information, please see man fallocate

Sparse File

File is not really allocate to the disk, only record some metadata to represent the length of file. Advantage of sparse file is very fast allocate speed at create time, disadvantage is when really write the data to the file for first, the disk space are allocate, this may lead to disk fragment. This will lead slower write and read speed.

For more information, please see Sparse file on Wikipedia

Write by chunk

If your data is several times bigger than disk sector, this will be the most effective. Modern computer has 4096 bytes (4KB) size disk sector. 4 times of 4k is 16384, so write by this number of data is effective too.

For more information, please see Disk sector on Wikipedia

Query process

preprocess time range

According to the max retention, shrink the query time range:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
if now is None:
now = int(time.time())
if untilTime is None:
untilTime = now
fromTime = int(fromTime)
untilTime = int(untilTime)

# Here we try and be flexible and return as much data as we can.
# If the range of data is from too far in the past or fully in the future, we
# return nothing
if fromTime > untilTime:
raise InvalidTimeInterval("Invalid time interval: from time '%s' is after until time '%s'" % (fromTime, untilTime))

oldestTime = now - header['maxRetention']
# Range is in the future
if fromTime > now:
return None
# Range is beyond retention
if untilTime < oldestTime:
return None
# Range requested is partially beyond retention, adjust
if fromTime < oldestTime:
fromTime = oldestTime
# Range is partially in the future, adjust
if untilTime > now:
untilTime = now

Find the archive

Whisper will try to find the most precise archive:

1
2
3
4
diff = now - fromTime
for archive in header['archives']:
if archive['retention'] >= diff:
break

Justify the data point

Arrange the time to time interval:

1
2
fromInterval = int(fromTime - (fromTime % archive['secondsPerPoint'])) + archive['secondsPerPoint']
untilInterval = int(untilTime - (untilTime % archive['secondsPerPoint'])) + archive['secondsPerPoint']

Summary, Whisper always find the next interval near the query time

Special, if the start interval same with the end interval, always contain next interval:

1
2
3
if fromInterval == untilInterval:
# Zero-length time range: always include the next point
untilInterval += archive['secondsPerPoint']

Compute offset

1
2
3
4
5
6
7
8
9
10
11
# Determine fromOffset
timeDistance = fromInterval - baseInterval
pointDistance = timeDistance // archive['secondsPerPoint']
byteDistance = pointDistance * pointSize
fromOffset = archive['offset'] + (byteDistance % archive['size'])

# Determine untilOffset
timeDistance = untilInterval - baseInterval
pointDistance = timeDistance // archive['secondsPerPoint']
byteDistance = pointDistance * pointSize
untilOffset = archive['offset'] + (byteDistance % archive['size'])

Trick on % operation in Python

Whisper use % to archive the wrap effect:

1
byteDistance % archive['size']

equal to

1
2
3
4
if byteDistance >= 0:
return byteDistance
else:
return archive['size'] + byteDistance

The trick or feature of % operation is:

1
2
3
4
print(-3 % 5)
# output 2
print(3 % 5)
# output 3

Read data

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
# Now we unpack the series data we just read (anything faster than unpack?)
byteOrder, pointTypes = pointFormat[0], pointFormat[1:]
points = len(seriesString) // pointSize
seriesFormat = byteOrder + (pointTypes * points)
unpackedSeries = struct.unpack(seriesFormat, seriesString)

# And finally we construct a list of values (optimize this!)
valueList = [None] * points # Pre-allocate entire list for speed
currentInterval = fromInterval
step = archive['secondsPerPoint']

for i in xrange(0, len(unpackedSeries), 2):
pointTime = unpackedSeries[i]
if pointTime == currentInterval:
pointValue = unpackedSeries[i+1]
valueList[i//2] = pointValue # In-place reassignment is faster than append()
currentInterval += step

timeInfo = (fromInterval, untilInterval, step)
return (timeInfo, valueList)

Some key design in the process is that when Whisper read the data it will check weather the interval read from disk equal to expected timestamp. If not equaled, this means although here has a data point but the this not valid, it’s outdate data, then Whisper will use None as data value. This is very important this checker make Whisper can discrete write the disk and the result is always correct.

Write / Update data process

Although Whisper have two update interface: update() and update_many(), but the latter interface support write multiply data point at one call. In the Carbon application, it almost use update_many(), which have better IO performance. Fallow text will cover update_many() but don’t worry about the update(), their are almost the same thing.

pre-require

  • path : Database file location
  • points : a list of point ((timestamp, value) pair

Order the points

Order the points by timestamp in the descending order, after that the newer point will at the head of list.

1
2
points = [(int(t), float(v)) for (t, v) in points]
points.sort(key=lambda p: p[0], reverse=True) # Order points by timestamp, newest first

Group the data point by retention

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
for point in points:
age = now - point[0]

while currentArchive['retention'] < age: # We can't fit any more points in this archive
if currentPoints: # Commit all the points we've found that it can fit
currentPoints.reverse() # Put points in chronological order
__archive_update_many(fh, header, currentArchive, currentPoints)
currentPoints = []
try:
currentArchive = next(archives)
except StopIteration:
currentArchive = None
break

if not currentArchive:
break # Drop remaining points that don't fit in the database

currentPoints.append(point)

if currentArchive and currentPoints: # Don't forget to commit after we've checked all the archives
currentPoints.reverse()
__archive_update_many(fh, header, currentArchive, currentPoints)

It’s not easy to make clear what’s operation do: data will write from higher precision archive to lower precision one. If the data is belong to this archive, it will write by this archive, if out the retention, leave the data to next (lower precision) archive.

This is important that, before all the data pass to writer function __archive_update_many, their all do the order reverse by currentPoints.reverse()

Arrange the data point

1
2
3
step = archive['secondsPerPoint']
alignedPoints = [(timestamp - (timestamp % step), value)
for (timestamp, value) in points]

Group the data by timestamp gap

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
# Create a packed string for each contiguous sequence of points
packedStrings = []
previousInterval = None
currentString = b""
lenAlignedPoints = len(alignedPoints)
for i in xrange(0, lenAlignedPoints):
# Take last point in run of points with duplicate intervals
if i + 1 < lenAlignedPoints and alignedPoints[i][0] == alignedPoints[i + 1][0]:
continue
(interval, value) = alignedPoints[i]

# if the point is the start point or the it's continue point from previous
if (not previousInterval) or (interval == previousInterval + step):
currentString += struct.pack(pointFormat, interval, value)
previousInterval = interval
else: # if the timestamp continue is breaked
numberOfPoints = len(currentString) // pointSize
startInterval = previousInterval - (step * (numberOfPoints - 1))
packedStrings.append((startInterval, currentString))
currentString = struct.pack(pointFormat, interval, value)
previousInterval = interval
if currentString:
numberOfPoints = len(currentString) // pointSize
startInterval = previousInterval - (step * (numberOfPoints - 1))
packedStrings.append((startInterval, currentString))

Important:

Take last point in run of points with duplicate intervals

This is a very important feature.

Write data

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
# Read base point and determine where our writes will start
fh.seek(archive['offset'])
packedBasePoint = fh.read(pointSize)
(baseInterval, baseValue) = struct.unpack(pointFormat, packedBasePoint)
if baseInterval == 0: # This file's first update
baseInterval = packedStrings[0][0] # Use our first string as the base, so we start at the start

# Write all of our packed strings in locations determined by the baseInterval
for (interval, packedString) in packedStrings:
timeDistance = interval - baseInterval
pointDistance = timeDistance // step
byteDistance = pointDistance * pointSize
myOffset = archive['offset'] + (byteDistance % archive['size'])
fh.seek(myOffset)
archiveEnd = archive['offset'] + archive['size']
bytesBeyond = (myOffset + len(packedString)) - archiveEnd

if bytesBeyond > 0:
fh.write(packedString[:-bytesBeyond])
assert fh.tell() == archiveEnd, "archiveEnd=%d fh.tell=%d bytesBeyond=%d len(packedString)=%d" % (
archiveEnd, fh.tell(), bytesBeyond, len(packedString))
fh.seek(archive['offset'])
fh.write(
packedString[-bytesBeyond:]) # Safe because it can't exceed the archive (retention checking logic above)
else:
fh.write(packedString)

Note here: Write function have warp feature.

Aggregator

All the archive will try to aggregate to the next archive

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
# Now we propagate the updates to lower-precision archives
higher = archive
lowerArchives = [arc for arc in header['archives'] if arc['secondsPerPoint'] > archive['secondsPerPoint']]

for lower in lowerArchives:
fit = lambda i: i - (i % lower['secondsPerPoint'])
lowerIntervals = [fit(p[0]) for p in alignedPoints]
uniqueLowerIntervals = set(lowerIntervals)
propagateFurther = False
for interval in uniqueLowerIntervals:
if __propagate(fh, header, interval, higher, lower):
propagateFurther = True

if not propagateFurther:
break
higher = lower

Aggregation method on single point

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
def __propagate(fh, header, timestamp, higher, lower):
aggregationMethod = header['aggregationMethod']
xff = header['xFilesFactor']

lowerIntervalStart = timestamp - (timestamp % lower['secondsPerPoint'])
lowerIntervalEnd = lowerIntervalStart + lower['secondsPerPoint']

fh.seek(higher['offset'])
packedPoint = fh.read(pointSize)
(higherBaseInterval, higherBaseValue) = struct.unpack(pointFormat, packedPoint)

if higherBaseInterval == 0:
higherFirstOffset = higher['offset']
else:
timeDistance = lowerIntervalStart - higherBaseInterval
pointDistance = timeDistance // higher['secondsPerPoint']
byteDistance = pointDistance * pointSize
higherFirstOffset = higher['offset'] + (byteDistance % higher['size'])

higherPoints = lower['secondsPerPoint'] // higher['secondsPerPoint']
higherSize = higherPoints * pointSize
relativeFirstOffset = higherFirstOffset - higher['offset']
relativeLastOffset = (relativeFirstOffset + higherSize) % higher['size']
higherLastOffset = relativeLastOffset + higher['offset']
fh.seek(higherFirstOffset)

if higherFirstOffset < higherLastOffset: # We don't wrap the archive
seriesString = fh.read(higherLastOffset - higherFirstOffset)
else: # We do wrap the archive
higherEnd = higher['offset'] + higher['size']
seriesString = fh.read(higherEnd - higherFirstOffset)
fh.seek(higher['offset'])
seriesString += fh.read(higherLastOffset - higher['offset'])

# Now we unpack the series data we just read
byteOrder, pointTypes = pointFormat[0], pointFormat[1:]
points = len(seriesString) // pointSize
seriesFormat = byteOrder + (pointTypes * points)
unpackedSeries = struct.unpack(seriesFormat, seriesString)

# And finally we construct a list of values
neighborValues = [None] * points
currentInterval = lowerIntervalStart
step = higher['secondsPerPoint']

for i in xrange(0, len(unpackedSeries), 2):
pointTime = unpackedSeries[i]
if pointTime == currentInterval:
neighborValues[i // 2] = unpackedSeries[i + 1]
currentInterval += step

# Propagate aggregateValue to propagate from neighborValues if we have enough known points
knownValues = [v for v in neighborValues if v is not None]
if not knownValues:
return False

knownPercent = float(len(knownValues)) / float(len(neighborValues))
if knownPercent >= xff: # We have enough data to propagate a value!
aggregateValue = aggregate(aggregationMethod, knownValues, neighborValues)
myPackedPoint = struct.pack(pointFormat, lowerIntervalStart, aggregateValue)
fh.seek(lower['offset'])
packedPoint = fh.read(pointSize)
(lowerBaseInterval, lowerBaseValue) = struct.unpack(pointFormat, packedPoint)

if lowerBaseInterval == 0: # First propagated update to this lower archive
fh.seek(lower['offset'])
fh.write(myPackedPoint)
else: # Not our first propagated update to this lower archive
timeDistance = lowerIntervalStart - lowerBaseInterval
pointDistance = timeDistance // lower['secondsPerPoint']
byteDistance = pointDistance * pointSize
lowerOffset = lower['offset'] + (byteDistance % lower['size'])
fh.seek(lowerOffset)
fh.write(myPackedPoint)

return True

else:
return False

If the xff (too much invalid point below the threhold) lead the aggregation failed, the next level aggregation will be cancled.

MISC

IO operation

Whisper’s author do some effect on the IO optimise. When open the file, Whisper use fadvise to tell OS optimise the IO operation.

1
2
if CAN_FADVISE and FADVISE_RANDOM:
posix_fadvise(fh.fileno(), 0, 0, POSIX_FADV_RANDOM)

Let’s see how manual say about fadvise:

Allows an application to to tell the kernel how it expects to use a file handle, so that the kernel can choose appropriate read-ahead and caching techniques for access to the corresponding file.

fadvise have several options:

  • FADV_NORMAL :No special treatment
  • FADV_RANDOM : Expect page references in random order
  • FADV_SEQUENTIAL : Expect page references in sequential order
  • FADV_WILLNEED : Expect access in the near future
  • FADV_DONTNEED : Do not expect access in the near future. Subsequent access of pages in this range will succeed, but will result either in reloading of the memory contents from the underlying mapped file or zero-fill-in-demand pages for mappings without an underlying file
  • FADV_NOREUSE : Access data only once

For more information, please see man fadvise