TL;DR This article show how Whisper work and some Linux programming tricks it used.
What’s the Whisper
Many people familiar with the famous metric monitor system: graphite
Here are some introduce from official website of graphite:
Graphite is an enterprise-scale monitoring tool that runs well on cheap hardware.
Graphite does two things:
- Store numeric time-series data
 - Render graphs of this data on demand
 
Whisper is one of the core parts of graphite. There are two components of “Store numeric time-series data”: One for receive the data from network which is the job of Carbon, another for write the data into physical disk (like a database) which is the job of Whisper.
Officially, Whisper is a time-serial database or library which design for graphite project, but still very university as a time-serial database.
Important:
Follow code / structure analysis based on whisper==0.9.10, different version may has different result.
Overview the structure of the Whisper database
Summary
Whisper is a single binary file database, each file have three parts: Header / Archives / Data. The reference implement (aka Whisper in Graphite) was wrote in Python with manipulate binary file with struct library.
Header
Header of Whisper used to record meta-information such as aggregationType / maxRetention / xff / archiveCount
aggregationType
Data type: long int (aka ‘L’ in struct format). aggregationType control how the higher precision aggregate to lower precision.
aggregationTypeToMethod = dict({
    1: 'average',
    2: 'sum',
    3: 'last',
    4: 'max',
    5: 'min',
    6: 'avg_zero'
})
maxRetention
Data type: long int. maxRetention show the max retention this database can hold. The unit of this field is second.
xff
xff is the abbreviation for xFilesFactor. Data type: Float (aka ’l’ in struct format). When higher precision data aggregate to lower precision, if the proportion of valid data lower than this value, the result of aggregation will set to None.
archiveCount
Data type: long int. archiveCount are used to count the number of archive.
Archives
summary
Archives are stored the meta-information of Data or Data is a part of Archives. Different archive means different precision and retention for data store.
Archives field have several strict limitation:
- At least has one archive
 - Archive’s precision must strictly monotonically after ordered
 - Precision of lower archive must be a multiple of higher’s (not equal)
 - Archive’s retention must strictly monotonically too, the lower precision the longer retention
 - Archive of higher precision must have enough point to consolidate archive of lower precision
 
Here comes an example for the last limitation: Higher: 1s/20 Lower: 60s/1
This example fits first four limitation, but not the last one.
Those limitations are checked by:
if not archiveList:
    raise InvalidConfiguration("You must specify at least one archive configuration!")
archiveList.sort(key=lambda a: a[0])  # Sort by precision (secondsPerPoint)
for i, archive in enumerate(archiveList):
    if i == len(archiveList) - 1:
        break
    nextArchive = archiveList[i + 1]
    if not archive[0] < nextArchive[0]:
        raise InvalidConfiguration("A Whisper database may not be configured having "
                                   "two archives with the same precision (archive%d: %s, archive%d: %s)" %
                                   (i, archive, i + 1, nextArchive))
    if nextArchive[0] % archive[0] != 0:
        raise InvalidConfiguration("Higher precision archives' precision "
                                   "must evenly divide all lower precision archives' precision "
                                   "(archive%d: %s, archive%d: %s)" %
                                   (i, archive[0], i + 1, nextArchive[0]))
    retention = archive[0] * archive[1]
    nextRetention = nextArchive[0] * nextArchive[1]
    if not nextRetention > retention:
        raise InvalidConfiguration("Lower precision archives must cover "
                                   "larger time intervals than higher precision archives "
                                   "(archive%d: %s seconds, archive%d: %s seconds)" %
                                   (i, retention, i + 1, nextRetention))
    archivePoints = archive[1]
    pointsPerConsolidation = nextArchive[0] // archive[0]
    if not archivePoints >= pointsPerConsolidation:
        raise InvalidConfiguration("Each archive must have at least enough points "
                                   "to consolidate to the next archive (archive%d consolidates %d of "
                                   "archive%d's points but it has only %d total points)" %
                                   (i + 1, pointsPerConsolidation, i, archivePoints))
structure
Archives have three fields: offset / secondsPerPoint / points
offset
Data type: long int. offset indicate the offset from the very begin of file.
secondsPerPoint
Data type: long int. secondsPerPoint is the precision of archive. Obviously the max precision is one-second per data point.
points
Data type: long int. points indicate the capacity of this archive.
derivative
Few derivative attribution, not exists in the fields but can be compute form those fields
retention
retention = points * secondsPerPoint
size
Size of the Data field in physical occupation.
size = points * pointSize
pointSize will covered in the Data part, it is a fix size in Whisper.
Data
summary
Data part are the actual data store part. Those parts are compose by Point which contain time and value information.
Point
each point have two parts: Interval, data
Interval
Data type: Long int, UNIX timestamp
Data
Data type: Double (aka ’d’ in struct format), store the metric value
Conclusion
ASCII Art Chart
+-------------------------------------------------------------------------+
|AT|MR|xff|AC|offset|SPP|points|      ...      |Interval|data|     ...    |
+-------------------------------------------------------------------------+
|            |   Archive One   |      ...      |  Point One  |     ...    |
+-------------------------------------------------------------------------+
|    Header  |             Archives            |            Data          |
+-------------------------------------------------------------------------+
|                              Whisper file                               |
+-------------------------------------------------------------------------+
AT: aggregationType
MR: maxRetention
AC: archiveCount
SPP: secondsPerPoint
Create process
pre-requires
There are two acquire pre-requires:
- path : the path of the database on disk
 - archiveList : List of all the archive, can not modified after create
 
Two optional pre-requires:
- xFilesFactor = None
 - aggregationMethod = None
 
check archiveList
# Validate archive configurations...
validateArchiveList(archiveList)
Trap on the implementation
Function validateArchiveList seems only check the validation of archiveList but actually also change the archiveList by execute order operation archiveList.sort(key=lambda a: a[0]). This is not a good design, if people don’t look at the implementation of validateArchiveList, their will never know archiveList already sorted. And because fallow code depend on the order of archiveList, if you don’t know this, you will don’t understand how the code works.
Also see summary section of Archives part
Write Header
aggregationType = struct.pack(longFormat, aggregationMethodToType.get(aggregationMethod, 1))
oldest = max([secondsPerPoint * points for secondsPerPoint, points in archiveList])
maxRetention = struct.pack(longFormat, oldest)
xFilesFactor = struct.pack(floatFormat, float(xFilesFactor))
archiveCount = struct.pack(longFormat, len(archiveList))
packedMetadata = aggregationType + maxRetention + xFilesFactor + archiveCount
fh.write(packedMetadata)
Write ArchiveList
The key point of this process is the offset computing.
headerSize = metadataSize + (archiveInfoSize * len(archiveList))
archiveOffsetPointer = headerSize
for secondsPerPoint, points in archiveList:
    archiveInfo = struct.pack(archiveInfoFormat, archiveOffsetPointer, secondsPerPoint, points)
    fh.write(archiveInfo)
    archiveOffsetPointer += (points * pointSize)
Write Data
Because the database is just create, so there are no real data, the process will write some fake data: \x00.
if CAN_FALLOCATE and useFallocate:
    remaining = archiveOffsetPointer - headerSize
    fallocate(fh, headerSize, remaining)
elif sparse:
    fh.seek(archiveOffsetPointer - 1)
    fh.write(b'\x00')
else:
    remaining = archiveOffsetPointer - headerSize
    chunksize = 16384
    zeroes = b'\x00' * chunksize
    while remaining > chunksize:
        fh.write(zeroes)
        remaining -= chunksize
    fh.write(zeroes[:remaining])
IO optimization
As you see, Whisper use several method to optimize the IO operation
Fallocate
This is a Linux-specific system call. The function prototype is int fallocate(int fd, int mode, off_t offset, off_t len);. fallocate() allows the caller to directly manipulate the allocated disk space for the file referred to by fd for the byte range starting at offset and continuing for len bytes.
For more information, please see man fallocate
Sparse File
File is not really allocate to the disk, only record some metadata to represent the length of file. Advantage of sparse file is very fast allocate speed at create time, disadvantage is when really write the data to the file for first, the disk space are allocate, this may lead to disk fragment. This will lead slower write and read speed.
For more information, please see Sparse file on Wikipedia
Write by chunk
If your data is several times bigger than disk sector, this will be the most effective. Modern computer has 4096 bytes (4KB) size disk sector. 4 times of 4k is 16384, so write by this number of data is effective too.
For more information, please see Disk sector on Wikipedia
Query process
preprocess time range
According to the max retention, shrink the query time range:
if now is None:
    now = int(time.time())
if untilTime is None:
    untilTime = now
fromTime = int(fromTime)
untilTime = int(untilTime)
# Here we try and be flexible and return as much data as we can.
# If the range of data is from too far in the past or fully in the future, we
# return nothing
if fromTime > untilTime:
    raise InvalidTimeInterval("Invalid time interval: from time '%s' is after until time '%s'" % (fromTime, untilTime))
oldestTime = now - header['maxRetention']
# Range is in the future
if fromTime > now:
    return None
# Range is beyond retention
if untilTime < oldestTime:
    return None
# Range requested is partially beyond retention, adjust
if fromTime < oldestTime:
    fromTime = oldestTime
# Range is partially in the future, adjust
if untilTime > now:
    untilTime = now
Find the archive
Whisper will try to find the most precise archive:
diff = now - fromTime
for archive in header['archives']:
  if archive['retention'] >= diff:
      break
Justify the data point
Arrange the time to time interval:
fromInterval = int(fromTime - (fromTime % archive['secondsPerPoint'])) + archive['secondsPerPoint']
untilInterval = int(untilTime - (untilTime % archive['secondsPerPoint'])) + archive['secondsPerPoint']
Summary, Whisper always find the next interval near the query time
Special, if the start interval same with the end interval, always contain next interval:
if fromInterval == untilInterval:
    # Zero-length time range: always include the next point
    untilInterval += archive['secondsPerPoint']
Compute offset
# Determine fromOffset
timeDistance = fromInterval - baseInterval
pointDistance = timeDistance // archive['secondsPerPoint']
byteDistance = pointDistance * pointSize
fromOffset = archive['offset'] + (byteDistance % archive['size'])
# Determine untilOffset
timeDistance = untilInterval - baseInterval
pointDistance = timeDistance // archive['secondsPerPoint']
byteDistance = pointDistance * pointSize
untilOffset = archive['offset'] + (byteDistance % archive['size'])
Trick on % operation in Python
Whisper use % to archive the wrap effect:
byteDistance % archive['size']
equal to
if byteDistance >= 0:
    return byteDistance
else:
    return archive['size'] + byteDistance
The trick or feature of % operation is:
print(-3 % 5)
#  output 2
print(3 % 5)
#  output 3
Read data
# Now we unpack the series data we just read (anything faster than unpack?)
byteOrder, pointTypes = pointFormat[0], pointFormat[1:]
points = len(seriesString) // pointSize
seriesFormat = byteOrder + (pointTypes * points)
unpackedSeries = struct.unpack(seriesFormat, seriesString)
# And finally we construct a list of values (optimize this!)
valueList = [None] * points  # Pre-allocate entire list for speed
currentInterval = fromInterval
step = archive['secondsPerPoint']
for i in xrange(0, len(unpackedSeries), 2):
pointTime = unpackedSeries[i]
    if pointTime == currentInterval:
          pointValue = unpackedSeries[i+1]
          valueList[i//2] = pointValue  # In-place reassignment is faster than append()
    currentInterval += step
timeInfo = (fromInterval, untilInterval, step)
return (timeInfo, valueList)
Some key design in the process is that when Whisper read the data it will check weather the interval read from disk equal to expected timestamp. If not equaled, this means although here has a data point but the this not valid, it’s outdate data, then Whisper will use None as data value. This is very important this checker make Whisper can discrete write the disk and the result is always correct.
Write / Update data process
Although Whisper have two update interface: update() and update_many(), but the latter interface support write multiply data point at one call. In the Carbon application, it almost use update_many(), which have better IO performance. Fallow text will cover update_many() but don’t worry about the update(), their are almost the same thing.
pre-require
- path : Database file location
 - points : a list of point ((timestamp, value) pair
 
Order the points
Order the points by timestamp in the descending order, after that the newer point will at the head of list.
points = [(int(t), float(v)) for (t, v) in points]
points.sort(key=lambda p: p[0], reverse=True)  # Order points by timestamp, newest first
Group the data point by retention
for point in points:
    age = now - point[0]
    while currentArchive['retention'] < age:  # We can't fit any more points in this archive
        if currentPoints:  # Commit all the points we've found that it can fit
            currentPoints.reverse()  # Put points in chronological order
            __archive_update_many(fh, header, currentArchive, currentPoints)
            currentPoints = []
        try:
            currentArchive = next(archives)
        except StopIteration:
            currentArchive = None
            break
    if not currentArchive:
        break  # Drop remaining points that don't fit in the database
    currentPoints.append(point)
if currentArchive and currentPoints:  # Don't forget to commit after we've checked all the archives
    currentPoints.reverse()
    __archive_update_many(fh, header, currentArchive, currentPoints)
It’s not easy to make clear what’s operation do: data will write from higher precision archive to lower precision one. If the data is belong to this archive, it will write by this archive, if out the retention, leave the data to next (lower precision) archive.
This is important that, before all the data pass to writer function __archive_update_many, their all do the order reverse by currentPoints.reverse()
Arrange the data point
step = archive['secondsPerPoint']
alignedPoints = [(timestamp - (timestamp % step), value)
                for (timestamp, value) in points]
Group the data by timestamp gap
# Create a packed string for each contiguous sequence of points
packedStrings = []
previousInterval = None
currentString = b""
lenAlignedPoints = len(alignedPoints)
for i in xrange(0, lenAlignedPoints):
    # Take last point in run of points with duplicate intervals
    if i + 1 < lenAlignedPoints and alignedPoints[i][0] == alignedPoints[i + 1][0]:
        continue
    (interval, value) = alignedPoints[i]
    # if the point is the start point or the it's continue point from previous
    if (not previousInterval) or (interval == previousInterval + step):
        currentString += struct.pack(pointFormat, interval, value)
        previousInterval = interval
    else:  #  if the timestamp continue is breaked
        numberOfPoints = len(currentString) // pointSize
        startInterval = previousInterval - (step * (numberOfPoints - 1))
        packedStrings.append((startInterval, currentString))
        currentString = struct.pack(pointFormat, interval, value)
        previousInterval = interval
if currentString:
    numberOfPoints = len(currentString) // pointSize
    startInterval = previousInterval - (step * (numberOfPoints - 1))
    packedStrings.append((startInterval, currentString))
Important:
Take last point in run of points with duplicate intervals
This is a very important feature.
Write data
# Read base point and determine where our writes will start
fh.seek(archive['offset'])
packedBasePoint = fh.read(pointSize)
(baseInterval, baseValue) = struct.unpack(pointFormat, packedBasePoint)
if baseInterval == 0:  # This file's first update
    baseInterval = packedStrings[0][0]  # Use our first string as the base, so we start at the start
# Write all of our packed strings in locations determined by the baseInterval
for (interval, packedString) in packedStrings:
    timeDistance = interval - baseInterval
    pointDistance = timeDistance // step
    byteDistance = pointDistance * pointSize
    myOffset = archive['offset'] + (byteDistance % archive['size'])
    fh.seek(myOffset)
    archiveEnd = archive['offset'] + archive['size']
    bytesBeyond = (myOffset + len(packedString)) - archiveEnd
    if bytesBeyond > 0:
        fh.write(packedString[:-bytesBeyond])
        assert fh.tell() == archiveEnd, "archiveEnd=%d fh.tell=%d bytesBeyond=%d len(packedString)=%d" % (
        archiveEnd, fh.tell(), bytesBeyond, len(packedString))
        fh.seek(archive['offset'])
        fh.write(
            packedString[-bytesBeyond:])  # Safe because it can't exceed the archive (retention checking logic above)
    else:
        fh.write(packedString)
Note here: Write function have warp feature.
Aggregator
All the archive will try to aggregate to the next archive
# Now we propagate the updates to lower-precision archives
higher = archive
lowerArchives = [arc for arc in header['archives'] if arc['secondsPerPoint'] > archive['secondsPerPoint']]
for lower in lowerArchives:
    fit = lambda i: i - (i % lower['secondsPerPoint'])
    lowerIntervals = [fit(p[0]) for p in alignedPoints]
    uniqueLowerIntervals = set(lowerIntervals)
    propagateFurther = False
    for interval in uniqueLowerIntervals:
        if __propagate(fh, header, interval, higher, lower):
            propagateFurther = True
    if not propagateFurther:
        break
    higher = lower
Aggregation method on single point
def __propagate(fh, header, timestamp, higher, lower):
    aggregationMethod = header['aggregationMethod']
    xff = header['xFilesFactor']
    lowerIntervalStart = timestamp - (timestamp % lower['secondsPerPoint'])
    lowerIntervalEnd = lowerIntervalStart + lower['secondsPerPoint']
    fh.seek(higher['offset'])
    packedPoint = fh.read(pointSize)
    (higherBaseInterval, higherBaseValue) = struct.unpack(pointFormat, packedPoint)
    if higherBaseInterval == 0:
        higherFirstOffset = higher['offset']
    else:
        timeDistance = lowerIntervalStart - higherBaseInterval
        pointDistance = timeDistance // higher['secondsPerPoint']
        byteDistance = pointDistance * pointSize
        higherFirstOffset = higher['offset'] + (byteDistance % higher['size'])
    higherPoints = lower['secondsPerPoint'] // higher['secondsPerPoint']
    higherSize = higherPoints * pointSize
    relativeFirstOffset = higherFirstOffset - higher['offset']
    relativeLastOffset = (relativeFirstOffset + higherSize) % higher['size']
    higherLastOffset = relativeLastOffset + higher['offset']
    fh.seek(higherFirstOffset)
    if higherFirstOffset < higherLastOffset:  # We don't wrap the archive
        seriesString = fh.read(higherLastOffset - higherFirstOffset)
    else:  # We do wrap the archive
        higherEnd = higher['offset'] + higher['size']
        seriesString = fh.read(higherEnd - higherFirstOffset)
        fh.seek(higher['offset'])
        seriesString += fh.read(higherLastOffset - higher['offset'])
    # Now we unpack the series data we just read
    byteOrder, pointTypes = pointFormat[0], pointFormat[1:]
    points = len(seriesString) // pointSize
    seriesFormat = byteOrder + (pointTypes * points)
    unpackedSeries = struct.unpack(seriesFormat, seriesString)
    # And finally we construct a list of values
    neighborValues = [None] * points
    currentInterval = lowerIntervalStart
    step = higher['secondsPerPoint']
    for i in xrange(0, len(unpackedSeries), 2):
        pointTime = unpackedSeries[i]
        if pointTime == currentInterval:
            neighborValues[i // 2] = unpackedSeries[i + 1]
        currentInterval += step
    # Propagate aggregateValue to propagate from neighborValues if we have enough known points
    knownValues = [v for v in neighborValues if v is not None]
    if not knownValues:
        return False
    knownPercent = float(len(knownValues)) / float(len(neighborValues))
    if knownPercent >= xff:  # We have enough data to propagate a value!
        aggregateValue = aggregate(aggregationMethod, knownValues, neighborValues)
        myPackedPoint = struct.pack(pointFormat, lowerIntervalStart, aggregateValue)
        fh.seek(lower['offset'])
        packedPoint = fh.read(pointSize)
        (lowerBaseInterval, lowerBaseValue) = struct.unpack(pointFormat, packedPoint)
        if lowerBaseInterval == 0:  # First propagated update to this lower archive
            fh.seek(lower['offset'])
            fh.write(myPackedPoint)
        else:  # Not our first propagated update to this lower archive
            timeDistance = lowerIntervalStart - lowerBaseInterval
            pointDistance = timeDistance // lower['secondsPerPoint']
            byteDistance = pointDistance * pointSize
            lowerOffset = lower['offset'] + (byteDistance % lower['size'])
            fh.seek(lowerOffset)
            fh.write(myPackedPoint)
        return True
    else:
        return False
If the xff (too much invalid point below the threhold) lead the aggregation failed, the next level aggregation will be cancled.
MISC
IO operation
Whisper’s author do some effect on the IO optimise. When open the file, Whisper use fadvise to tell OS optimise the IO operation.
if CAN_FADVISE and FADVISE_RANDOM:
    posix_fadvise(fh.fileno(), 0, 0, POSIX_FADV_RANDOM)
Let’s see how manual say about fadvise:
Allows an application to to tell the kernel how it expects to use a file handle, so that the kernel can choose appropriate read-ahead and caching techniques for access to the corresponding file.
fadvise have several options:
FADV_NORMAL:No special treatmentFADV_RANDOM: Expect page references in random orderFADV_SEQUENTIAL: Expect page references in sequential orderFADV_WILLNEED: Expect access in the near futureFADV_DONTNEED: Do not expect access in the near future. Subsequent access of pages in this range will succeed, but will result either in reloading of the memory contents from the underlying mapped file or zero-fill-in-demand pages for mappings without an underlying fileFADV_NOREUSE: Access data only once
For more information, please see man fadvise