TL;DR 本文从代码级别详细介绍了 Whisper 的实现和一些其中用到的编程技巧

什么是Whisper

很多人熟悉著名的指标监控系统:Graphite

这里有一些来自Graphite官方的介绍,翻译成中文后大致是:

Graphite是一个运行在廉价硬件上的企业级的监控工具

Graphite做两件事:

  • 存储数值化的时间序列数据
  • 按需渲染数据图形

Whisper是Graphite核心组件之一。负责“存储数值化的时间序列数据”的两个部分:一个是负责接收网络数据的Carbon组件,另一个就是负责存储到磁盘的Whisper组件。

正式的说,Whisper是为Graphite项目定制的时间序列数据库(或者软件库),其本身也是可以单独作为或者集成为通用的时间数据库。

重要:

以下代码/结构分析基于whisper==0.9.10,不同的版本可能会存在变动。

数据库结构总揽

综述

一个Whisper数据库,由单个文件构成。这个文件可以分成三个部分:Header,Archives,data。 每个部分都是C兼容的数据结构构成,在实现上whisper使用struct库来实现pack和unpack。 每一个archive都对应着一个precision不同的存储区域。

Header由四个字段构成:aggregationType, maxRetention, xff, archiveCount

aggregationType

数据类型:Long int (aka ‘L’ in struct format), 用来控制高精度向低精度聚合时采用的策略(算法) 具体策略如下:

aggregationTypeToMethod = dict({
  1: 'average',
  2: 'sum',
  3: 'last',
  4: 'max',
  5: 'min',
  6: 'avg_zero'
})

maxRetention

数据类型:Long int, 该数据库能够存储的最大时间长度(单位秒)

xff

全称: xFilesFactor, 数据类型:Float (aka ‘f’ in struct format), 当higher precision向lower precision聚合时,如果有效数据低于这个threshold,那么聚合后的结果将设置成None

archiveCount

数据类型:Long int, 描述archive的数量

Archives

综述

whisper在创建数据库文件时,关于Archives做了如下检查:

  • 至少有个archive
  • Archives的精度在顺序上必须严格递减,不能精度相同
  • Arhcives的精度上必须是整数关系,高精度必须是低精度的整数倍
  • Archives的retention必须严格递增,不能相同
  • 高精度的archive必须有足够的点保证至少完成一次Consolidation

举例:

  1. Higher: 1s/20
  2. Lower: 60s/1

满足前四点,但不满足最后一个条件

具体检查代码如下:

if not archiveList:
    raise InvalidConfiguration("You must specify at least one archive configuration!")

archiveList.sort(key=lambda a: a[0])  # Sort by precision (secondsPerPoint)

for i, archive in enumerate(archiveList):
    if i == len(archiveList) - 1:
        break

    nextArchive = archiveList[i + 1]
    if not archive[0] < nextArchive[0]:
        raise InvalidConfiguration("A Whisper database may not be configured having "
                                   "two archives with the same precision (archive%d: %s, archive%d: %s)" %
                                   (i, archive, i + 1, nextArchive))

    if nextArchive[0] % archive[0] != 0:
        raise InvalidConfiguration("Higher precision archives' precision "
                                   "must evenly divide all lower precision archives' precision "
                                   "(archive%d: %s, archive%d: %s)" %
                                   (i, archive[0], i + 1, nextArchive[0]))

    retention = archive[0] * archive[1]
    nextRetention = nextArchive[0] * nextArchive[1]

    if not nextRetention > retention:
        raise InvalidConfiguration("Lower precision archives must cover "
                                   "larger time intervals than higher precision archives "
                                   "(archive%d: %s seconds, archive%d: %s seconds)" %
                                   (i, retention, i + 1, nextRetention))

    archivePoints = archive[1]
    pointsPerConsolidation = nextArchive[0] // archive[0]
    if not archivePoints >= pointsPerConsolidation:
        raise InvalidConfiguration("Each archive must have at least enough points "
                                   "to consolidate to the next archive (archive%d consolidates %d of "
                                   "archive%d's points but it has only %d total points)" %
                                   (i + 1, pointsPerConsolidation, i, archivePoints))

结构

Archives对应着不同精度的存储实现,其有三个部分组成:offset, secondsPerPoint, points

offset

数据类型:Long int,offset指示相应的data区域在这个文件中的offset

secondsPerPoint

数据类型:Long int, 表示每个点所代表的采样时长,e.g. archive的精度/precision,显然最高精度只能是1秒一次

points

数据类型:Long int, 表示数据点的数量

衍生指标

这些指标本身不存在于文件中,由其他指标计算得到

####### retention

'retention': secondsPerPoint * points

表示这个archive的保存时长,单位秒

####### size

'size': points * pointSize

表示data部分所占据的字节长度

Data

这个部分表示具体的数据点,数据点线性排列在文件中,每个数据点有两个部分构成:Interval, data

Interval

数据类型:Long int,表示时间戳(从UNIX纪元 (aka 1970-01-01 00:00 UTC) 开始的秒数)

data

数据类型:double (aka ’d’ in struct format),表示具体的metric数值

总结

ASCII art 图表

+-------------------------------------------------------------------------+
|AT|MR|xff|AC|offset|SPP|points|      ...      |Interval|data|     ...    |
+-------------------------------------------------------------------------+
|            |   Archive One   |      ...      |  Point One  |     ...    |
+-------------------------------------------------------------------------+
|    Header  |             Archives            |            Data          |
+-------------------------------------------------------------------------+
|                              Whisper file                               |
+-------------------------------------------------------------------------+

AT: aggregationType
MR: maxRetention
AC: archiveCount
SPP: secondsPerPoint

创建数据库

参数

关键参数:

  • path 数据库文件的路径
  • archiveList
  • xFilesFactor=None
  • aggregationMethod=None

archiveList

# Validate archive configurations...
validateArchiveList(archiveList)

检查条件参见 inline page

写入Header

aggregationType = struct.pack(longFormat, aggregationMethodToType.get(aggregationMethod, 1))
oldest = max([secondsPerPoint * points for secondsPerPoint, points in archiveList])
maxRetention = struct.pack(longFormat, oldest)
xFilesFactor = struct.pack(floatFormat, float(xFilesFactor))
archiveCount = struct.pack(longFormat, len(archiveList))
packedMetadata = aggregationType + maxRetention + xFilesFactor + archiveCount
fh.write(packedMetadata)

写入ArchiveList

其中比较重要的是offset的计算

headerSize = metadataSize + (archiveInfoSize * len(archiveList))
archiveOffsetPointer = headerSize

for secondsPerPoint, points in archiveList:
    archiveInfo = struct.pack(archiveInfoFormat, archiveOffsetPointer, secondsPerPoint, points)
    fh.write(archiveInfo)
    archiveOffsetPointer += (points * pointSize)

Data区域填充 \x00

if CAN_FALLOCATE and useFallocate:
    remaining = archiveOffsetPointer - headerSize
    fallocate(fh, headerSize, remaining)
elif sparse:
    fh.seek(archiveOffsetPointer - 1)
    fh.write(b'\x00')
else:
    remaining = archiveOffsetPointer - headerSize
    chunksize = 16384
    zeroes = b'\x00' * chunksize
    while remaining > chunksize:
        fh.write(zeroes)
        remaining -= chunksize
    fh.write(zeroes[:remaining])
这里的存在几种优化的 IO方法
fallocate

这个一个Linux独有的系统调用,函数原型是int fallocate(int fd, int mode, off_t offset, off_t len);。该函数允许调用者直接分配文件中范围在offsetlen的区段的磁盘空间,速度比写入文件分配的更快。

更多信息,请参见 fallocate手册

sparse file

文件并没有真正分配在磁盘上,而是记录该文件有这个尺寸,等到真正写入时,才会分配磁盘空间,因此这个文件时稀疏的。优点是创建时非常快,但真正写入时存在磁盘碎片的可能,导致写入和读取比普通方式更慢。

更多信息,请参见 稀疏文件的维基百科

write by chunk

系统底层的磁盘时按照扇区工作的,如果写入数据和扇区大小一致,那么就会减少不必要的调整时间。现代磁盘的扇区大小通常为4K,因此按照4K或者4K的整数倍写入都可以获得性能提升。

更多信息,请参见 磁盘扇区的维基百科

实现方面的陷阱

代码的实现部分有一个陷阱:

validateArchiveList(archiveList)

在函数内对 archiveList 做了排序 archiveList.sort(key=lambda a: a[0]) , 如果不阅读内部代码,容易让人丢失重要的细节

查询数据库

根据最大Retention,修正查询时间范围

if now is None:
    now = int(time.time())
if untilTime is None:
    untilTime = now
fromTime = int(fromTime)
untilTime = int(untilTime)

# Here we try and be flexible and return as much data as we can.
# If the range of data is from too far in the past or fully in the future, we
# return nothing
if fromTime > untilTime:
    raise InvalidTimeInterval("Invalid time interval: from time '%s' is after until time '%s'" % (fromTime, untilTime))

oldestTime = now - header['maxRetention']
# Range is in the future
if fromTime > now:
    return None
# Range is beyond retention
if untilTime < oldestTime:
    return None
# Range requested is partially beyond retention, adjust
if fromTime < oldestTime:
    fromTime = oldestTime
# Range is partially in the future, adjust
if untilTime > now:
    untilTime = now

查找能够覆盖查询范围的最高精度的archive

diff = now - fromTime
for archive in header['archives']:
    if archive['retention'] >= diff:
        break

时间范围对齐到Interval

fromInterval = int(fromTime - (fromTime % archive['secondsPerPoint'])) + archive['secondsPerPoint']
untilInterval = int(untilTime - (untilTime % archive['secondsPerPoint'])) + archive['secondsPerPoint']

概括说来,总是寻找和时间点最接近的下一个Interval

特别的,相同时间的开始和结束的查询范围被调整成为总是包含下一个step:

if fromInterval == untilInterval:
    # Zero-length time range: always include the next point
    untilInterval += archive['secondsPerPoint']

计算offset

# Determine fromOffset
timeDistance = fromInterval - baseInterval
pointDistance = timeDistance // archive['secondsPerPoint']
byteDistance = pointDistance * pointSize
fromOffset = archive['offset'] + (byteDistance % archive['size'])

# Determine untilOffset
timeDistance = untilInterval - baseInterval
pointDistance = timeDistance // archive['secondsPerPoint']
byteDistance = pointDistance * pointSize
untilOffset = archive['offset'] + (byteDistance % archive['size'])

python % 运算 的一个trick

byteDistance % archive['size']

能够达到wrap的效果,等价于

if byteDistance >= 0:
    return byteDistance
else:
    return archive['size'] + byteDistance

原因是python的求余运算 % 的特性

print(-3 % 5)
# 输出 2
print(3 % 5)
# 输出 3

读取数据

# Now we unpack the series data we just read (anything faster than unpack?)
byteOrder, pointTypes = pointFormat[0], pointFormat[1:]
points = len(seriesString) // pointSize
seriesFormat = byteOrder + (pointTypes * points)
unpackedSeries = struct.unpack(seriesFormat, seriesString)

# And finally we construct a list of values (optimize this!)
valueList = [None] * points  # Pre-allocate entire list for speed
currentInterval = fromInterval
step = archive['secondsPerPoint']

for i in xrange(0, len(unpackedSeries), 2):
    pointTime = unpackedSeries[i]
    if pointTime == currentInterval:
        pointValue = unpackedSeries[i + 1]
        valueList[i // 2] = pointValue  # In-place reassignment is faster than append()
    currentInterval += step

timeInfo = (fromInterval, untilInterval, step)
return (timeInfo, valueList)

其中比较重要的信息是,读取数据时会判断时间戳(Interval)是否等于期望的时间戳,如果不相同就认为没有值,设为None, 这是一种重要的行为,这样写的时候就可以离散写,不用连续写数据,结果的正确性得到保障。

数据库更新

Whisper数据库本身虽然支持单个数据点更新API update()也支持多个数据点更新API update_many(),但是在carbon的使用中,是使用多数据点更新的API,两者实现上类似,只是后者批量更新,IO效率更高,本文将讨论 update_many()

参数

  • path 代表文件路径
  • points is a list of (timestamp,value) points

排序

将列表中的点按照时间戳从大到小降序排列,完成后较新的数据点在前面

points = [(int(t), float(v)) for (t, v) in points]
points.sort(key=lambda p: p[0], reverse=True)  # Order points by timestamp, newest first

读取文件头

读取文件头返回数据结构

info = {
    'aggregationMethod': aggregationTypeToMethod.get(aggregationType, 'average'),
    'maxRetention': maxRetention,
    'xFilesFactor': xff,
    'archives': archives,
}

将数据点按照Archive’s Retention分组进行写入

for point in points:
    age = now - point[0]

    while currentArchive['retention'] < age:  # We can't fit any more points in this archive
        if currentPoints:  # Commit all the points we've found that it can fit
            currentPoints.reverse()  # Put points in chronological order
            __archive_update_many(fh, header, currentArchive, currentPoints)
            currentPoints = []
        try:
            currentArchive = next(archives)
        except StopIteration:
            currentArchive = None
            break

    if not currentArchive:
        break  # Drop remaining points that don't fit in the database

    currentPoints.append(point)

if currentArchive and currentPoints:  # Don't forget to commit after we've checked all the archives
    currentPoints.reverse()
    __archive_update_many(fh, header, currentArchive, currentPoints)

值得注意的是数据在传入具体函数写入时,Points都做了order reverse,变成了oldest数据在最前面

PS:这块代码不容易理解

分组写入操作

具体实现函数 __archive_update_many(fh, header, archive, points)

数据点对齐
step = archive['secondsPerPoint']
alignedPoints = [(timestamp - (timestamp % step), value)
                 for (timestamp, value) in points]
数据点按照连续性分组
# Create a packed string for each contiguous sequence of points
packedStrings = []
previousInterval = None
currentString = b""
lenAlignedPoints = len(alignedPoints)
for i in xrange(0, lenAlignedPoints):
    # Take last point in run of points with duplicate intervals
    if i + 1 < lenAlignedPoints and alignedPoints[i][0] == alignedPoints[i + 1][0]:
        continue
    (interval, value) = alignedPoints[i]

    # 如果是开头或者时间点是连续的
    if (not previousInterval) or (interval == previousInterval + step):
        currentString += struct.pack(pointFormat, interval, value)
        previousInterval = interval
    else:  # 如果时间点断开了
        numberOfPoints = len(currentString) // pointSize
        startInterval = previousInterval - (step * (numberOfPoints - 1))
        packedStrings.append((startInterval, currentString))
        currentString = struct.pack(pointFormat, interval, value)
        previousInterval = interval
if currentString:
    numberOfPoints = len(currentString) // pointSize
    startInterval = previousInterval - (step * (numberOfPoints - 1))
    packedStrings.append((startInterval, currentString))

其中需要注意的点:

对齐后的时间点做了去重复操作,只保留时间上最后一个点,这是很重要的特性

数据写入
# Read base point and determine where our writes will start
fh.seek(archive['offset'])
packedBasePoint = fh.read(pointSize)
(baseInterval, baseValue) = struct.unpack(pointFormat, packedBasePoint)
if baseInterval == 0:  # This file's first update
    baseInterval = packedStrings[0][0]  # Use our first string as the base, so we start at the start

# Write all of our packed strings in locations determined by the baseInterval
for (interval, packedString) in packedStrings:
    timeDistance = interval - baseInterval
    pointDistance = timeDistance // step
    byteDistance = pointDistance * pointSize
    myOffset = archive['offset'] + (byteDistance % archive['size'])
    fh.seek(myOffset)
    archiveEnd = archive['offset'] + archive['size']
    bytesBeyond = (myOffset + len(packedString)) - archiveEnd

    if bytesBeyond > 0:
        fh.write(packedString[:-bytesBeyond])
        assert fh.tell() == archiveEnd, "archiveEnd=%d fh.tell=%d bytesBeyond=%d len(packedString)=%d" % (
        archiveEnd, fh.tell(), bytesBeyond, len(packedString))
        fh.seek(archive['offset'])
        fh.write(
            packedString[-bytesBeyond:])  # Safe because it can't exceed the archive (retention checking logic above)
    else:
        fh.write(packedString)

注意:这里的文件写入有warp的现象

聚合到下一级Archive
# Now we propagate the updates to lower-precision archives
higher = archive
lowerArchives = [arc for arc in header['archives'] if arc['secondsPerPoint'] > archive['secondsPerPoint']]

for lower in lowerArchives:
    fit = lambda i: i - (i % lower['secondsPerPoint'])
    lowerIntervals = [fit(p[0]) for p in alignedPoints]
    uniqueLowerIntervals = set(lowerIntervals)
    propagateFurther = False
    for interval in uniqueLowerIntervals:
        if __propagate(fh, header, interval, higher, lower):
            propagateFurther = True

    if not propagateFurther:
        break
    higher = lower
单点聚合
def __propagate(fh, header, timestamp, higher, lower):
    aggregationMethod = header['aggregationMethod']
    xff = header['xFilesFactor']

    lowerIntervalStart = timestamp - (timestamp % lower['secondsPerPoint'])
    lowerIntervalEnd = lowerIntervalStart + lower['secondsPerPoint']

    fh.seek(higher['offset'])
    packedPoint = fh.read(pointSize)
    (higherBaseInterval, higherBaseValue) = struct.unpack(pointFormat, packedPoint)

    if higherBaseInterval == 0:
        higherFirstOffset = higher['offset']
    else:
        timeDistance = lowerIntervalStart - higherBaseInterval
        pointDistance = timeDistance // higher['secondsPerPoint']
        byteDistance = pointDistance * pointSize
        higherFirstOffset = higher['offset'] + (byteDistance % higher['size'])

    higherPoints = lower['secondsPerPoint'] // higher['secondsPerPoint']
    higherSize = higherPoints * pointSize
    relativeFirstOffset = higherFirstOffset - higher['offset']
    relativeLastOffset = (relativeFirstOffset + higherSize) % higher['size']
    higherLastOffset = relativeLastOffset + higher['offset']
    fh.seek(higherFirstOffset)

    if higherFirstOffset < higherLastOffset:  # We don't wrap the archive
        seriesString = fh.read(higherLastOffset - higherFirstOffset)
    else:  # We do wrap the archive
        higherEnd = higher['offset'] + higher['size']
        seriesString = fh.read(higherEnd - higherFirstOffset)
        fh.seek(higher['offset'])
        seriesString += fh.read(higherLastOffset - higher['offset'])

    # Now we unpack the series data we just read
    byteOrder, pointTypes = pointFormat[0], pointFormat[1:]
    points = len(seriesString) // pointSize
    seriesFormat = byteOrder + (pointTypes * points)
    unpackedSeries = struct.unpack(seriesFormat, seriesString)

    # And finally we construct a list of values
    neighborValues = [None] * points
    currentInterval = lowerIntervalStart
    step = higher['secondsPerPoint']

    for i in xrange(0, len(unpackedSeries), 2):
        pointTime = unpackedSeries[i]
        if pointTime == currentInterval:
            neighborValues[i // 2] = unpackedSeries[i + 1]
        currentInterval += step

    # Propagate aggregateValue to propagate from neighborValues if we have enough known points
    knownValues = [v for v in neighborValues if v is not None]
    if not knownValues:
        return False

    knownPercent = float(len(knownValues)) / float(len(neighborValues))
    if knownPercent >= xff:  # We have enough data to propagate a value!
        aggregateValue = aggregate(aggregationMethod, knownValues, neighborValues)
        myPackedPoint = struct.pack(pointFormat, lowerIntervalStart, aggregateValue)
        fh.seek(lower['offset'])
        packedPoint = fh.read(pointSize)
        (lowerBaseInterval, lowerBaseValue) = struct.unpack(pointFormat, packedPoint)

        if lowerBaseInterval == 0:  # First propagated update to this lower archive
            fh.seek(lower['offset'])
            fh.write(myPackedPoint)
        else:  # Not our first propagated update to this lower archive
            timeDistance = lowerIntervalStart - lowerBaseInterval
            pointDistance = timeDistance // lower['secondsPerPoint']
            byteDistance = pointDistance * pointSize
            lowerOffset = lower['offset'] + (byteDistance % lower['size'])
            fh.seek(lowerOffset)
            fh.write(myPackedPoint)

        return True

    else:
        return False

如果上一个精度的aggrigation的xff过低导致聚合失败,那么后续级别的aggrigation就会取消

Trick

在打开文件时,Whisper 使用了 Linux 上的 fadvise 来建议操作系统对文件访问进行某个策略的优化。

if CAN_FADVISE and FADVISE_RANDOM:
    posix_fadvise(fh.fileno(), 0, 0, POSIX_FADV_RANDOM)

fadvise 在其手册中的介绍翻译成中文大意是:

允许应用程序告知操作系统它会如何使用文件描述符,这样操作系统就能选用最合适的读取和缓存策略来访问相应的文件

fadvise 有多个选项:

  • FADV_NORMAL :不需要特殊对待
  • FADV_RANDOM : 期望页面以随机访问进行
  • FADV_SEQUENTIAL : 期望页面访问以顺序访问进行
  • FADV_WILLNEED : 期望在近期再次访问
  • FADV_DONTNEED : 不期望在近期再次访问
  • FADV_NOREUSE : 只会访问数据一次

关于fadvise的更多信息,请参见 man fadvise