TL;DR This article show how Whisper work and some Linux programming tricks it used.
What’s the Whisper
Many people familiar with the famous metric monitor system: graphite
Here are some introduce from official website of graphite:
Graphite is an enterprise-scale monitoring tool that runs well on cheap hardware.
Graphite does two things:
- Store numeric time-series data
- Render graphs of this data on demand
Whisper is one of the core parts of graphite. There are two components of “Store numeric time-series data”: One for receive the data from network which is the job of Carbon, another for write the data into physical disk (like a database) which is the job of Whisper.
Officially, Whisper is a time-serial database or library which design for graphite project, but still very university as a time-serial database.
Important:
Follow code / structure analysis based on whisper==0.9.10
, different version may has different result.
Overview the structure of the Whisper database
Summary
Whisper is a single binary file database, each file have three parts: Header
/ Archives
/ Data
. The reference implement (aka Whisper in Graphite) was wrote in Python with manipulate binary file with struct
library.
Header
Header of Whisper used to record meta-information such as aggregationType
/ maxRetention
/ xff
/ archiveCount
aggregationType
Data type: long int (aka ‘L’ in struct
format). aggregationType
control how the higher precision aggregate to lower precision.
aggregationTypeToMethod = dict({
1: 'average',
2: 'sum',
3: 'last',
4: 'max',
5: 'min',
6: 'avg_zero'
})
maxRetention
Data type: long int. maxRetention
show the max retention this database can hold. The unit of this field is second.
xff
xff
is the abbreviation for xFilesFactor
. Data type: Float (aka ’l’ in struct
format). When higher precision data aggregate to lower precision, if the proportion of valid data lower than this value, the result of aggregation will set to None.
archiveCount
Data type: long int. archiveCount
are used to count the number of archive.
Archives
summary
Archives
are stored the meta-information of Data
or Data
is a part of Archives
. Different archive means different precision and retention for data store.
Archives
field have several strict limitation:
- At least has one archive
- Archive’s precision must strictly monotonically after ordered
- Precision of lower archive must be a multiple of higher’s (not equal)
- Archive’s retention must strictly monotonically too, the lower precision the longer retention
- Archive of higher precision must have enough point to consolidate archive of lower precision
Here comes an example for the last limitation: Higher: 1s/20 Lower: 60s/1
This example fits first four limitation, but not the last one.
Those limitations are checked by:
if not archiveList:
raise InvalidConfiguration("You must specify at least one archive configuration!")
archiveList.sort(key=lambda a: a[0]) # Sort by precision (secondsPerPoint)
for i, archive in enumerate(archiveList):
if i == len(archiveList) - 1:
break
nextArchive = archiveList[i + 1]
if not archive[0] < nextArchive[0]:
raise InvalidConfiguration("A Whisper database may not be configured having "
"two archives with the same precision (archive%d: %s, archive%d: %s)" %
(i, archive, i + 1, nextArchive))
if nextArchive[0] % archive[0] != 0:
raise InvalidConfiguration("Higher precision archives' precision "
"must evenly divide all lower precision archives' precision "
"(archive%d: %s, archive%d: %s)" %
(i, archive[0], i + 1, nextArchive[0]))
retention = archive[0] * archive[1]
nextRetention = nextArchive[0] * nextArchive[1]
if not nextRetention > retention:
raise InvalidConfiguration("Lower precision archives must cover "
"larger time intervals than higher precision archives "
"(archive%d: %s seconds, archive%d: %s seconds)" %
(i, retention, i + 1, nextRetention))
archivePoints = archive[1]
pointsPerConsolidation = nextArchive[0] // archive[0]
if not archivePoints >= pointsPerConsolidation:
raise InvalidConfiguration("Each archive must have at least enough points "
"to consolidate to the next archive (archive%d consolidates %d of "
"archive%d's points but it has only %d total points)" %
(i + 1, pointsPerConsolidation, i, archivePoints))
structure
Archives
have three fields: offset
/ secondsPerPoint
/ points
offset
Data type: long int. offset
indicate the offset from the very begin of file.
secondsPerPoint
Data type: long int. secondsPerPoint
is the precision of archive. Obviously the max precision is one-second per data point.
points
Data type: long int. points
indicate the capacity of this archive.
derivative
Few derivative attribution, not exists in the fields but can be compute form those fields
retention
retention = points * secondsPerPoint
size
Size of the Data
field in physical occupation.
size = points * pointSize
pointSize will covered in the Data
part, it is a fix size in Whisper.
Data
summary
Data part are the actual data store part. Those parts are compose by Point
which contain time and value information.
Point
each point have two parts: Interval, data
Interval
Data type: Long int, UNIX timestamp
Data
Data type: Double (aka ’d’ in struct format), store the metric value
Conclusion
ASCII Art Chart
+-------------------------------------------------------------------------+
|AT|MR|xff|AC|offset|SPP|points| ... |Interval|data| ... |
+-------------------------------------------------------------------------+
| | Archive One | ... | Point One | ... |
+-------------------------------------------------------------------------+
| Header | Archives | Data |
+-------------------------------------------------------------------------+
| Whisper file |
+-------------------------------------------------------------------------+
AT: aggregationType
MR: maxRetention
AC: archiveCount
SPP: secondsPerPoint
Create process
pre-requires
There are two acquire pre-requires:
- path : the path of the database on disk
- archiveList : List of all the archive, can not modified after create
Two optional pre-requires:
- xFilesFactor = None
- aggregationMethod = None
check archiveList
# Validate archive configurations...
validateArchiveList(archiveList)
Trap on the implementation
Function validateArchiveList
seems only check the validation of archiveList
but actually also change the archiveList
by execute order operation archiveList.sort(key=lambda a: a[0])
. This is not a good design, if people don’t look at the implementation of validateArchiveList
, their will never know archiveList
already sorted. And because fallow code depend on the order of archiveList
, if you don’t know this, you will don’t understand how the code works.
Also see summary
section of Archives
part
Write Header
aggregationType = struct.pack(longFormat, aggregationMethodToType.get(aggregationMethod, 1))
oldest = max([secondsPerPoint * points for secondsPerPoint, points in archiveList])
maxRetention = struct.pack(longFormat, oldest)
xFilesFactor = struct.pack(floatFormat, float(xFilesFactor))
archiveCount = struct.pack(longFormat, len(archiveList))
packedMetadata = aggregationType + maxRetention + xFilesFactor + archiveCount
fh.write(packedMetadata)
Write ArchiveList
The key point of this process is the offset computing.
headerSize = metadataSize + (archiveInfoSize * len(archiveList))
archiveOffsetPointer = headerSize
for secondsPerPoint, points in archiveList:
archiveInfo = struct.pack(archiveInfoFormat, archiveOffsetPointer, secondsPerPoint, points)
fh.write(archiveInfo)
archiveOffsetPointer += (points * pointSize)
Write Data
Because the database is just create, so there are no real data, the process will write some fake data: \x00
.
if CAN_FALLOCATE and useFallocate:
remaining = archiveOffsetPointer - headerSize
fallocate(fh, headerSize, remaining)
elif sparse:
fh.seek(archiveOffsetPointer - 1)
fh.write(b'\x00')
else:
remaining = archiveOffsetPointer - headerSize
chunksize = 16384
zeroes = b'\x00' * chunksize
while remaining > chunksize:
fh.write(zeroes)
remaining -= chunksize
fh.write(zeroes[:remaining])
IO optimization
As you see, Whisper use several method to optimize the IO operation
Fallocate
This is a Linux-specific system call. The function prototype is int fallocate(int fd, int mode, off_t offset, off_t len);
. fallocate()
allows the caller to directly manipulate the allocated disk space for the file referred to by fd
for the byte range starting at offset
and continuing for len
bytes.
For more information, please see man fallocate
Sparse File
File is not really allocate to the disk, only record some metadata to represent the length of file. Advantage of sparse file is very fast allocate speed at create time, disadvantage is when really write the data to the file for first, the disk space are allocate, this may lead to disk fragment. This will lead slower write and read speed.
For more information, please see Sparse file on Wikipedia
Write by chunk
If your data is several times bigger than disk sector, this will be the most effective. Modern computer has 4096 bytes (4KB) size disk sector. 4 times of 4k is 16384, so write by this number of data is effective too.
For more information, please see Disk sector on Wikipedia
Query process
preprocess time range
According to the max retention, shrink the query time range:
if now is None:
now = int(time.time())
if untilTime is None:
untilTime = now
fromTime = int(fromTime)
untilTime = int(untilTime)
# Here we try and be flexible and return as much data as we can.
# If the range of data is from too far in the past or fully in the future, we
# return nothing
if fromTime > untilTime:
raise InvalidTimeInterval("Invalid time interval: from time '%s' is after until time '%s'" % (fromTime, untilTime))
oldestTime = now - header['maxRetention']
# Range is in the future
if fromTime > now:
return None
# Range is beyond retention
if untilTime < oldestTime:
return None
# Range requested is partially beyond retention, adjust
if fromTime < oldestTime:
fromTime = oldestTime
# Range is partially in the future, adjust
if untilTime > now:
untilTime = now
Find the archive
Whisper will try to find the most precise archive:
diff = now - fromTime
for archive in header['archives']:
if archive['retention'] >= diff:
break
Justify the data point
Arrange the time to time interval:
fromInterval = int(fromTime - (fromTime % archive['secondsPerPoint'])) + archive['secondsPerPoint']
untilInterval = int(untilTime - (untilTime % archive['secondsPerPoint'])) + archive['secondsPerPoint']
Summary, Whisper always find the next interval near the query time
Special, if the start interval same with the end interval, always contain next interval:
if fromInterval == untilInterval:
# Zero-length time range: always include the next point
untilInterval += archive['secondsPerPoint']
Compute offset
# Determine fromOffset
timeDistance = fromInterval - baseInterval
pointDistance = timeDistance // archive['secondsPerPoint']
byteDistance = pointDistance * pointSize
fromOffset = archive['offset'] + (byteDistance % archive['size'])
# Determine untilOffset
timeDistance = untilInterval - baseInterval
pointDistance = timeDistance // archive['secondsPerPoint']
byteDistance = pointDistance * pointSize
untilOffset = archive['offset'] + (byteDistance % archive['size'])
Trick on %
operation in Python
Whisper use %
to archive the wrap effect:
byteDistance % archive['size']
equal to
if byteDistance >= 0:
return byteDistance
else:
return archive['size'] + byteDistance
The trick or feature of %
operation is:
print(-3 % 5)
# output 2
print(3 % 5)
# output 3
Read data
# Now we unpack the series data we just read (anything faster than unpack?)
byteOrder, pointTypes = pointFormat[0], pointFormat[1:]
points = len(seriesString) // pointSize
seriesFormat = byteOrder + (pointTypes * points)
unpackedSeries = struct.unpack(seriesFormat, seriesString)
# And finally we construct a list of values (optimize this!)
valueList = [None] * points # Pre-allocate entire list for speed
currentInterval = fromInterval
step = archive['secondsPerPoint']
for i in xrange(0, len(unpackedSeries), 2):
pointTime = unpackedSeries[i]
if pointTime == currentInterval:
pointValue = unpackedSeries[i+1]
valueList[i//2] = pointValue # In-place reassignment is faster than append()
currentInterval += step
timeInfo = (fromInterval, untilInterval, step)
return (timeInfo, valueList)
Some key design in the process is that when Whisper read the data it will check weather the interval read from disk equal to expected timestamp. If not equaled, this means although here has a data point but the this not valid, it’s outdate data, then Whisper will use None
as data value. This is very important this checker make Whisper can discrete write the disk and the result is always correct.
Write / Update data process
Although Whisper have two update interface: update()
and update_many()
, but the latter interface support write multiply data point at one call. In the Carbon application, it almost use update_many()
, which have better IO performance. Fallow text will cover update_many()
but don’t worry about the update()
, their are almost the same thing.
pre-require
- path : Database file location
- points : a list of point ((timestamp, value) pair
Order the points
Order the points by timestamp in the descending order, after that the newer point will at the head of list.
points = [(int(t), float(v)) for (t, v) in points]
points.sort(key=lambda p: p[0], reverse=True) # Order points by timestamp, newest first
Group the data point by retention
for point in points:
age = now - point[0]
while currentArchive['retention'] < age: # We can't fit any more points in this archive
if currentPoints: # Commit all the points we've found that it can fit
currentPoints.reverse() # Put points in chronological order
__archive_update_many(fh, header, currentArchive, currentPoints)
currentPoints = []
try:
currentArchive = next(archives)
except StopIteration:
currentArchive = None
break
if not currentArchive:
break # Drop remaining points that don't fit in the database
currentPoints.append(point)
if currentArchive and currentPoints: # Don't forget to commit after we've checked all the archives
currentPoints.reverse()
__archive_update_many(fh, header, currentArchive, currentPoints)
It’s not easy to make clear what’s operation do: data will write from higher precision archive to lower precision one. If the data is belong to this archive, it will write by this archive, if out the retention, leave the data to next (lower precision) archive.
This is important that, before all the data pass to writer function __archive_update_many
, their all do the order reverse by currentPoints.reverse()
Arrange the data point
step = archive['secondsPerPoint']
alignedPoints = [(timestamp - (timestamp % step), value)
for (timestamp, value) in points]
Group the data by timestamp gap
# Create a packed string for each contiguous sequence of points
packedStrings = []
previousInterval = None
currentString = b""
lenAlignedPoints = len(alignedPoints)
for i in xrange(0, lenAlignedPoints):
# Take last point in run of points with duplicate intervals
if i + 1 < lenAlignedPoints and alignedPoints[i][0] == alignedPoints[i + 1][0]:
continue
(interval, value) = alignedPoints[i]
# if the point is the start point or the it's continue point from previous
if (not previousInterval) or (interval == previousInterval + step):
currentString += struct.pack(pointFormat, interval, value)
previousInterval = interval
else: # if the timestamp continue is breaked
numberOfPoints = len(currentString) // pointSize
startInterval = previousInterval - (step * (numberOfPoints - 1))
packedStrings.append((startInterval, currentString))
currentString = struct.pack(pointFormat, interval, value)
previousInterval = interval
if currentString:
numberOfPoints = len(currentString) // pointSize
startInterval = previousInterval - (step * (numberOfPoints - 1))
packedStrings.append((startInterval, currentString))
Important:
Take last point in run of points with duplicate intervals
This is a very important feature.
Write data
# Read base point and determine where our writes will start
fh.seek(archive['offset'])
packedBasePoint = fh.read(pointSize)
(baseInterval, baseValue) = struct.unpack(pointFormat, packedBasePoint)
if baseInterval == 0: # This file's first update
baseInterval = packedStrings[0][0] # Use our first string as the base, so we start at the start
# Write all of our packed strings in locations determined by the baseInterval
for (interval, packedString) in packedStrings:
timeDistance = interval - baseInterval
pointDistance = timeDistance // step
byteDistance = pointDistance * pointSize
myOffset = archive['offset'] + (byteDistance % archive['size'])
fh.seek(myOffset)
archiveEnd = archive['offset'] + archive['size']
bytesBeyond = (myOffset + len(packedString)) - archiveEnd
if bytesBeyond > 0:
fh.write(packedString[:-bytesBeyond])
assert fh.tell() == archiveEnd, "archiveEnd=%d fh.tell=%d bytesBeyond=%d len(packedString)=%d" % (
archiveEnd, fh.tell(), bytesBeyond, len(packedString))
fh.seek(archive['offset'])
fh.write(
packedString[-bytesBeyond:]) # Safe because it can't exceed the archive (retention checking logic above)
else:
fh.write(packedString)
Note here: Write function have warp feature.
Aggregator
All the archive will try to aggregate to the next archive
# Now we propagate the updates to lower-precision archives
higher = archive
lowerArchives = [arc for arc in header['archives'] if arc['secondsPerPoint'] > archive['secondsPerPoint']]
for lower in lowerArchives:
fit = lambda i: i - (i % lower['secondsPerPoint'])
lowerIntervals = [fit(p[0]) for p in alignedPoints]
uniqueLowerIntervals = set(lowerIntervals)
propagateFurther = False
for interval in uniqueLowerIntervals:
if __propagate(fh, header, interval, higher, lower):
propagateFurther = True
if not propagateFurther:
break
higher = lower
Aggregation method on single point
def __propagate(fh, header, timestamp, higher, lower):
aggregationMethod = header['aggregationMethod']
xff = header['xFilesFactor']
lowerIntervalStart = timestamp - (timestamp % lower['secondsPerPoint'])
lowerIntervalEnd = lowerIntervalStart + lower['secondsPerPoint']
fh.seek(higher['offset'])
packedPoint = fh.read(pointSize)
(higherBaseInterval, higherBaseValue) = struct.unpack(pointFormat, packedPoint)
if higherBaseInterval == 0:
higherFirstOffset = higher['offset']
else:
timeDistance = lowerIntervalStart - higherBaseInterval
pointDistance = timeDistance // higher['secondsPerPoint']
byteDistance = pointDistance * pointSize
higherFirstOffset = higher['offset'] + (byteDistance % higher['size'])
higherPoints = lower['secondsPerPoint'] // higher['secondsPerPoint']
higherSize = higherPoints * pointSize
relativeFirstOffset = higherFirstOffset - higher['offset']
relativeLastOffset = (relativeFirstOffset + higherSize) % higher['size']
higherLastOffset = relativeLastOffset + higher['offset']
fh.seek(higherFirstOffset)
if higherFirstOffset < higherLastOffset: # We don't wrap the archive
seriesString = fh.read(higherLastOffset - higherFirstOffset)
else: # We do wrap the archive
higherEnd = higher['offset'] + higher['size']
seriesString = fh.read(higherEnd - higherFirstOffset)
fh.seek(higher['offset'])
seriesString += fh.read(higherLastOffset - higher['offset'])
# Now we unpack the series data we just read
byteOrder, pointTypes = pointFormat[0], pointFormat[1:]
points = len(seriesString) // pointSize
seriesFormat = byteOrder + (pointTypes * points)
unpackedSeries = struct.unpack(seriesFormat, seriesString)
# And finally we construct a list of values
neighborValues = [None] * points
currentInterval = lowerIntervalStart
step = higher['secondsPerPoint']
for i in xrange(0, len(unpackedSeries), 2):
pointTime = unpackedSeries[i]
if pointTime == currentInterval:
neighborValues[i // 2] = unpackedSeries[i + 1]
currentInterval += step
# Propagate aggregateValue to propagate from neighborValues if we have enough known points
knownValues = [v for v in neighborValues if v is not None]
if not knownValues:
return False
knownPercent = float(len(knownValues)) / float(len(neighborValues))
if knownPercent >= xff: # We have enough data to propagate a value!
aggregateValue = aggregate(aggregationMethod, knownValues, neighborValues)
myPackedPoint = struct.pack(pointFormat, lowerIntervalStart, aggregateValue)
fh.seek(lower['offset'])
packedPoint = fh.read(pointSize)
(lowerBaseInterval, lowerBaseValue) = struct.unpack(pointFormat, packedPoint)
if lowerBaseInterval == 0: # First propagated update to this lower archive
fh.seek(lower['offset'])
fh.write(myPackedPoint)
else: # Not our first propagated update to this lower archive
timeDistance = lowerIntervalStart - lowerBaseInterval
pointDistance = timeDistance // lower['secondsPerPoint']
byteDistance = pointDistance * pointSize
lowerOffset = lower['offset'] + (byteDistance % lower['size'])
fh.seek(lowerOffset)
fh.write(myPackedPoint)
return True
else:
return False
If the xff
(too much invalid point below the threhold) lead the aggregation failed, the next level aggregation will be cancled.
MISC
IO operation
Whisper’s author do some effect on the IO optimise. When open the file, Whisper use fadvise
to tell OS optimise the IO operation.
if CAN_FADVISE and FADVISE_RANDOM:
posix_fadvise(fh.fileno(), 0, 0, POSIX_FADV_RANDOM)
Let’s see how manual say about fadvise
:
Allows an application to to tell the kernel how it expects to use a file handle, so that the kernel can choose appropriate read-ahead and caching techniques for access to the corresponding file.
fadvise
have several options:
FADV_NORMAL
:No special treatmentFADV_RANDOM
: Expect page references in random orderFADV_SEQUENTIAL
: Expect page references in sequential orderFADV_WILLNEED
: Expect access in the near futureFADV_DONTNEED
: Do not expect access in the near future. Subsequent access of pages in this range will succeed, but will result either in reloading of the memory contents from the underlying mapped file or zero-fill-in-demand pages for mappings without an underlying fileFADV_NOREUSE
: Access data only once
For more information, please see man fadvise