时间数据库Whisper的实现简介

TL;DR 本文从代码级别详细介绍了 Whisper 的实现和一些其中用到的编程技巧

什么是Whisper

很多人熟悉著名的指标监控系统:Graphite

这里有一些来自Graphite官方的介绍,翻译成中文后大致是:

Graphite是一个运行在廉价硬件上的企业级的监控工具

Graphite做两件事:

  • 存储数值化的时间序列数据
  • 按需渲染数据图形

Whisper是Graphite核心组件之一。负责“存储数值化的时间序列数据”的两个部分:一个是负责接收网络数据的Carbon组件,另一个就是负责存储到磁盘的Whisper组件。

正式的说,Whisper是为Graphite项目定制的时间序列数据库(或者软件库),其本身也是可以单独作为或者集成为通用的时间数据库。

重要:

以下代码/结构分析基于whisper==0.9.10,不同的版本可能会存在变动。

数据库结构总揽

综述

一个Whisper数据库,由单个文件构成。这个文件可以分成三个部分:Header,Archives,data。
每个部分都是C兼容的数据结构构成,在实现上whisper使用struct库来实现pack和unpack。
每一个archive都对应着一个precision不同的存储区域。

Header由四个字段构成:aggregationType, maxRetention, xff, archiveCount

aggregationType

数据类型:Long int (aka ‘L’ in struct format), 用来控制高精度向低精度聚合时采用的策略(算法)
具体策略如下:

1
2
3
4
5
6
7
8
aggregationTypeToMethod = dict({
1: 'average',
2: 'sum',
3: 'last',
4: 'max',
5: 'min',
6: 'avg_zero'
})

maxRetention

数据类型:Long int, 该数据库能够存储的最大时间长度(单位秒)

xff

全称: xFilesFactor, 数据类型:Float (aka ‘f’ in struct format), 当higher precision向lower precision聚合时,如果有效数据低于这个threshold,那么聚合后的结果将设置成None

archiveCount

数据类型:Long int, 描述archive的数量

Archives

综述


whisper在创建数据库文件时,关于Archives做了如下检查:

  • 至少有个archive
  • Archives的精度在顺序上必须严格递减,不能精度相同
  • Arhcives的精度上必须是整数关系,高精度必须是低精度的整数倍
  • Archives的retention必须严格递增,不能相同
  • 高精度的archive必须有足够的点保证至少完成一次Consolidation

举例:

  1. Higher: 1s/20
  2. Lower: 60s/1

满足前四点,但不满足最后一个条件

具体检查代码如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
if not archiveList:
raise InvalidConfiguration("You must specify at least one archive configuration!")

archiveList.sort(key=lambda a: a[0]) # Sort by precision (secondsPerPoint)

for i, archive in enumerate(archiveList):
if i == len(archiveList) - 1:
break

nextArchive = archiveList[i + 1]
if not archive[0] < nextArchive[0]:
raise InvalidConfiguration("A Whisper database may not be configured having "
"two archives with the same precision (archive%d: %s, archive%d: %s)" %
(i, archive, i + 1, nextArchive))

if nextArchive[0] % archive[0] != 0:
raise InvalidConfiguration("Higher precision archives' precision "
"must evenly divide all lower precision archives' precision "
"(archive%d: %s, archive%d: %s)" %
(i, archive[0], i + 1, nextArchive[0]))

retention = archive[0] * archive[1]
nextRetention = nextArchive[0] * nextArchive[1]

if not nextRetention > retention:
raise InvalidConfiguration("Lower precision archives must cover "
"larger time intervals than higher precision archives "
"(archive%d: %s seconds, archive%d: %s seconds)" %
(i, retention, i + 1, nextRetention))

archivePoints = archive[1]
pointsPerConsolidation = nextArchive[0] // archive[0]
if not archivePoints >= pointsPerConsolidation:
raise InvalidConfiguration("Each archive must have at least enough points "
"to consolidate to the next archive (archive%d consolidates %d of "
"archive%d's points but it has only %d total points)" %
(i + 1, pointsPerConsolidation, i, archivePoints))

结构

Archives对应着不同精度的存储实现,其有三个部分组成:offset, secondsPerPoint, points

offset

数据类型:Long int,offset指示相应的data区域在这个文件中的offset

secondsPerPoint

数据类型:Long int, 表示每个点所代表的采样时长,e.g. archive的精度/precision,显然最高精度只能是1秒一次

points

数据类型:Long int, 表示数据点的数量

衍生指标

这些指标本身不存在于文件中,由其他指标计算得到

####### retention

1
'retention': secondsPerPoint * points

表示这个archive的保存时长,单位秒

####### size

1
'size': points * pointSize

表示data部分所占据的字节长度

Data

这个部分表示具体的数据点,数据点线性排列在文件中,每个数据点有两个部分构成:Interval, data

Interval

数据类型:Long int,表示时间戳(从UNIX纪元 (aka 1970-01-01 00:00 UTC) 开始的秒数)

data

数据类型:double (aka ‘d’ in struct format),表示具体的metric数值

总结

ASCII art 图表

1
2
3
4
5
6
7
8
9
10
11
12
13
14
+-------------------------------------------------------------------------+
|AT|MR|xff|AC|offset|SPP|points| ... |Interval|data| ... |
+-------------------------------------------------------------------------+
| | Archive One | ... | Point One | ... |
+-------------------------------------------------------------------------+
| Header | Archives | Data |
+-------------------------------------------------------------------------+
| Whisper file |
+-------------------------------------------------------------------------+

AT: aggregationType
MR: maxRetention
AC: archiveCount
SPP: secondsPerPoint

创建数据库

参数

关键参数:

  • path 数据库文件的路径
  • archiveList
  • xFilesFactor=None
  • aggregationMethod=None

archiveList

1
2
# Validate archive configurations...
validateArchiveList(archiveList)

检查条件参见 inline page

写入Header

1
2
3
4
5
6
7
aggregationType = struct.pack(longFormat, aggregationMethodToType.get(aggregationMethod, 1))
oldest = max([secondsPerPoint * points for secondsPerPoint, points in archiveList])
maxRetention = struct.pack(longFormat, oldest)
xFilesFactor = struct.pack(floatFormat, float(xFilesFactor))
archiveCount = struct.pack(longFormat, len(archiveList))
packedMetadata = aggregationType + maxRetention + xFilesFactor + archiveCount
fh.write(packedMetadata)

写入ArchiveList

其中比较重要的是offset的计算

1
2
3
4
5
6
7
headerSize = metadataSize + (archiveInfoSize * len(archiveList))
archiveOffsetPointer = headerSize

for secondsPerPoint, points in archiveList:
archiveInfo = struct.pack(archiveInfoFormat, archiveOffsetPointer, secondsPerPoint, points)
fh.write(archiveInfo)
archiveOffsetPointer += (points * pointSize)

Data区域填充 \x00

1
2
3
4
5
6
7
8
9
10
11
12
13
14
if CAN_FALLOCATE and useFallocate:
remaining = archiveOffsetPointer - headerSize
fallocate(fh, headerSize, remaining)
elif sparse:
fh.seek(archiveOffsetPointer - 1)
fh.write(b'\x00')
else:
remaining = archiveOffsetPointer - headerSize
chunksize = 16384
zeroes = b'\x00' * chunksize
while remaining > chunksize:
fh.write(zeroes)
remaining -= chunksize
fh.write(zeroes[:remaining])
这里的存在几种优化的 IO方法
fallocate

这个一个Linux独有的系统调用,函数原型是int fallocate(int fd, int mode, off_t offset, off_t len);。该函数允许调用者直接分配文件中范围在offsetlen的区段的磁盘空间,速度比写入文件分配的更快。

更多信息,请参见 fallocate手册

sparse file

文件并没有真正分配在磁盘上,而是记录该文件有这个尺寸,等到真正写入时,才会分配磁盘空间,因此这个文件时稀疏的。优点是创建时非常快,但真正写入时存在磁盘碎片的可能,导致写入和读取比普通方式更慢。

更多信息,请参见 稀疏文件的维基百科

write by chunk

系统底层的磁盘时按照扇区工作的,如果写入数据和扇区大小一致,那么就会减少不必要的调整时间。现代磁盘的扇区大小通常为4K,因此按照4K或者4K的整数倍写入都可以获得性能提升。

更多信息,请参见 磁盘扇区的维基百科

实现方面的陷阱

代码的实现部分有一个陷阱:

validateArchiveList(archiveList)

在函数内对 archiveList 做了排序 archiveList.sort(key=lambda a: a[0]) , 如果不阅读内部代码,容易让人丢失重要的细节

查询数据库

根据最大Retention,修正查询时间范围

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
if now is None:
now = int(time.time())
if untilTime is None:
untilTime = now
fromTime = int(fromTime)
untilTime = int(untilTime)

# Here we try and be flexible and return as much data as we can.
# If the range of data is from too far in the past or fully in the future, we
# return nothing
if fromTime > untilTime:
raise InvalidTimeInterval("Invalid time interval: from time '%s' is after until time '%s'" % (fromTime, untilTime))

oldestTime = now - header['maxRetention']
# Range is in the future
if fromTime > now:
return None
# Range is beyond retention
if untilTime < oldestTime:
return None
# Range requested is partially beyond retention, adjust
if fromTime < oldestTime:
fromTime = oldestTime
# Range is partially in the future, adjust
if untilTime > now:
untilTime = now

查找能够覆盖查询范围的最高精度的archive

1
2
3
4
diff = now - fromTime
for archive in header['archives']:
if archive['retention'] >= diff:
break

时间范围对齐到Interval

1
2
fromInterval = int(fromTime - (fromTime % archive['secondsPerPoint'])) + archive['secondsPerPoint']
untilInterval = int(untilTime - (untilTime % archive['secondsPerPoint'])) + archive['secondsPerPoint']

概括说来,总是寻找和时间点最接近的下一个Interval

特别的,相同时间的开始和结束的查询范围被调整成为总是包含下一个step:

1
2
3
if fromInterval == untilInterval:
# Zero-length time range: always include the next point
untilInterval += archive['secondsPerPoint']

计算offset

1
2
3
4
5
6
7
8
9
10
11
# Determine fromOffset
timeDistance = fromInterval - baseInterval
pointDistance = timeDistance // archive['secondsPerPoint']
byteDistance = pointDistance * pointSize
fromOffset = archive['offset'] + (byteDistance % archive['size'])

# Determine untilOffset
timeDistance = untilInterval - baseInterval
pointDistance = timeDistance // archive['secondsPerPoint']
byteDistance = pointDistance * pointSize
untilOffset = archive['offset'] + (byteDistance % archive['size'])

python % 运算 的一个trick

1
byteDistance % archive['size']

能够达到wrap的效果,等价于

1
2
3
4
if byteDistance >= 0:
return byteDistance
else:
return archive['size'] + byteDistance

原因是python的求余运算 % 的特性

1
2
3
4
print(-3 % 5)
# 输出 2
print(3 % 5)
# 输出 3

读取数据

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
# Now we unpack the series data we just read (anything faster than unpack?)
byteOrder, pointTypes = pointFormat[0], pointFormat[1:]
points = len(seriesString) // pointSize
seriesFormat = byteOrder + (pointTypes * points)
unpackedSeries = struct.unpack(seriesFormat, seriesString)

# And finally we construct a list of values (optimize this!)
valueList = [None] * points # Pre-allocate entire list for speed
currentInterval = fromInterval
step = archive['secondsPerPoint']

for i in xrange(0, len(unpackedSeries), 2):
pointTime = unpackedSeries[i]
if pointTime == currentInterval:
pointValue = unpackedSeries[i + 1]
valueList[i // 2] = pointValue # In-place reassignment is faster than append()
currentInterval += step

timeInfo = (fromInterval, untilInterval, step)
return (timeInfo, valueList)

其中比较重要的信息是,读取数据时会判断时间戳(Interval)是否等于期望的时间戳,如果不相同就认为没有值,设为None, 这是一种重要的行为,这样写的时候就可以离散写,不用连续写数据,结果的正确性得到保障。

数据库更新

Whisper数据库本身虽然支持单个数据点更新API update()也支持多个数据点更新API update_many(),但是在carbon的使用中,是使用多数据点更新的API,两者实现上类似,只是后者批量更新,IO效率更高,本文将讨论 update_many()

参数

  • path 代表文件路径
  • points is a list of (timestamp,value) points

排序

将列表中的点按照时间戳从大到小降序排列,完成后较新的数据点在前面

1
2
points = [(int(t), float(v)) for (t, v) in points]
points.sort(key=lambda p: p[0], reverse=True) # Order points by timestamp, newest first

读取文件头

读取文件头返回数据结构

1
2
3
4
5
6
info = {
'aggregationMethod': aggregationTypeToMethod.get(aggregationType, 'average'),
'maxRetention': maxRetention,
'xFilesFactor': xff,
'archives': archives,
}

将数据点按照Archive’s Retention分组进行写入

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
for point in points:
age = now - point[0]

while currentArchive['retention'] < age: # We can't fit any more points in this archive
if currentPoints: # Commit all the points we've found that it can fit
currentPoints.reverse() # Put points in chronological order
__archive_update_many(fh, header, currentArchive, currentPoints)
currentPoints = []
try:
currentArchive = next(archives)
except StopIteration:
currentArchive = None
break

if not currentArchive:
break # Drop remaining points that don't fit in the database

currentPoints.append(point)

if currentArchive and currentPoints: # Don't forget to commit after we've checked all the archives
currentPoints.reverse()
__archive_update_many(fh, header, currentArchive, currentPoints)

值得注意的是数据在传入具体函数写入时,Points都做了order reverse,变成了oldest数据在最前面

PS:这块代码不容易理解

分组写入操作

具体实现函数 __archive_update_many(fh, header, archive, points)

数据点对齐
1
2
3
step = archive['secondsPerPoint']
alignedPoints = [(timestamp - (timestamp % step), value)
for (timestamp, value) in points]
数据点按照连续性分组
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
# Create a packed string for each contiguous sequence of points
packedStrings = []
previousInterval = None
currentString = b""
lenAlignedPoints = len(alignedPoints)
for i in xrange(0, lenAlignedPoints):
# Take last point in run of points with duplicate intervals
if i + 1 < lenAlignedPoints and alignedPoints[i][0] == alignedPoints[i + 1][0]:
continue
(interval, value) = alignedPoints[i]

# 如果是开头或者时间点是连续的
if (not previousInterval) or (interval == previousInterval + step):
currentString += struct.pack(pointFormat, interval, value)
previousInterval = interval
else: # 如果时间点断开了
numberOfPoints = len(currentString) // pointSize
startInterval = previousInterval - (step * (numberOfPoints - 1))
packedStrings.append((startInterval, currentString))
currentString = struct.pack(pointFormat, interval, value)
previousInterval = interval
if currentString:
numberOfPoints = len(currentString) // pointSize
startInterval = previousInterval - (step * (numberOfPoints - 1))
packedStrings.append((startInterval, currentString))

其中需要注意的点:

对齐后的时间点做了去重复操作,只保留时间上最后一个点,这是很重要的特性

数据写入
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
# Read base point and determine where our writes will start
fh.seek(archive['offset'])
packedBasePoint = fh.read(pointSize)
(baseInterval, baseValue) = struct.unpack(pointFormat, packedBasePoint)
if baseInterval == 0: # This file's first update
baseInterval = packedStrings[0][0] # Use our first string as the base, so we start at the start

# Write all of our packed strings in locations determined by the baseInterval
for (interval, packedString) in packedStrings:
timeDistance = interval - baseInterval
pointDistance = timeDistance // step
byteDistance = pointDistance * pointSize
myOffset = archive['offset'] + (byteDistance % archive['size'])
fh.seek(myOffset)
archiveEnd = archive['offset'] + archive['size']
bytesBeyond = (myOffset + len(packedString)) - archiveEnd

if bytesBeyond > 0:
fh.write(packedString[:-bytesBeyond])
assert fh.tell() == archiveEnd, "archiveEnd=%d fh.tell=%d bytesBeyond=%d len(packedString)=%d" % (
archiveEnd, fh.tell(), bytesBeyond, len(packedString))
fh.seek(archive['offset'])
fh.write(
packedString[-bytesBeyond:]) # Safe because it can't exceed the archive (retention checking logic above)
else:
fh.write(packedString)

注意:这里的文件写入有warp的现象

聚合到下一级Archive
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
# Now we propagate the updates to lower-precision archives
higher = archive
lowerArchives = [arc for arc in header['archives'] if arc['secondsPerPoint'] > archive['secondsPerPoint']]

for lower in lowerArchives:
fit = lambda i: i - (i % lower['secondsPerPoint'])
lowerIntervals = [fit(p[0]) for p in alignedPoints]
uniqueLowerIntervals = set(lowerIntervals)
propagateFurther = False
for interval in uniqueLowerIntervals:
if __propagate(fh, header, interval, higher, lower):
propagateFurther = True

if not propagateFurther:
break
higher = lower
单点聚合
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
def __propagate(fh, header, timestamp, higher, lower):
aggregationMethod = header['aggregationMethod']
xff = header['xFilesFactor']

lowerIntervalStart = timestamp - (timestamp % lower['secondsPerPoint'])
lowerIntervalEnd = lowerIntervalStart + lower['secondsPerPoint']

fh.seek(higher['offset'])
packedPoint = fh.read(pointSize)
(higherBaseInterval, higherBaseValue) = struct.unpack(pointFormat, packedPoint)

if higherBaseInterval == 0:
higherFirstOffset = higher['offset']
else:
timeDistance = lowerIntervalStart - higherBaseInterval
pointDistance = timeDistance // higher['secondsPerPoint']
byteDistance = pointDistance * pointSize
higherFirstOffset = higher['offset'] + (byteDistance % higher['size'])

higherPoints = lower['secondsPerPoint'] // higher['secondsPerPoint']
higherSize = higherPoints * pointSize
relativeFirstOffset = higherFirstOffset - higher['offset']
relativeLastOffset = (relativeFirstOffset + higherSize) % higher['size']
higherLastOffset = relativeLastOffset + higher['offset']
fh.seek(higherFirstOffset)

if higherFirstOffset < higherLastOffset: # We don't wrap the archive
seriesString = fh.read(higherLastOffset - higherFirstOffset)
else: # We do wrap the archive
higherEnd = higher['offset'] + higher['size']
seriesString = fh.read(higherEnd - higherFirstOffset)
fh.seek(higher['offset'])
seriesString += fh.read(higherLastOffset - higher['offset'])

# Now we unpack the series data we just read
byteOrder, pointTypes = pointFormat[0], pointFormat[1:]
points = len(seriesString) // pointSize
seriesFormat = byteOrder + (pointTypes * points)
unpackedSeries = struct.unpack(seriesFormat, seriesString)

# And finally we construct a list of values
neighborValues = [None] * points
currentInterval = lowerIntervalStart
step = higher['secondsPerPoint']

for i in xrange(0, len(unpackedSeries), 2):
pointTime = unpackedSeries[i]
if pointTime == currentInterval:
neighborValues[i // 2] = unpackedSeries[i + 1]
currentInterval += step

# Propagate aggregateValue to propagate from neighborValues if we have enough known points
knownValues = [v for v in neighborValues if v is not None]
if not knownValues:
return False

knownPercent = float(len(knownValues)) / float(len(neighborValues))
if knownPercent >= xff: # We have enough data to propagate a value!
aggregateValue = aggregate(aggregationMethod, knownValues, neighborValues)
myPackedPoint = struct.pack(pointFormat, lowerIntervalStart, aggregateValue)
fh.seek(lower['offset'])
packedPoint = fh.read(pointSize)
(lowerBaseInterval, lowerBaseValue) = struct.unpack(pointFormat, packedPoint)

if lowerBaseInterval == 0: # First propagated update to this lower archive
fh.seek(lower['offset'])
fh.write(myPackedPoint)
else: # Not our first propagated update to this lower archive
timeDistance = lowerIntervalStart - lowerBaseInterval
pointDistance = timeDistance // lower['secondsPerPoint']
byteDistance = pointDistance * pointSize
lowerOffset = lower['offset'] + (byteDistance % lower['size'])
fh.seek(lowerOffset)
fh.write(myPackedPoint)

return True

else:
return False

如果上一个精度的aggrigation的xff过低导致聚合失败,那么后续级别的aggrigation就会取消

Trick

在打开文件时,Whisper 使用了 Linux 上的 fadvise 来建议操作系统对文件访问进行某个策略的优化。

1
2
if CAN_FADVISE and FADVISE_RANDOM:
posix_fadvise(fh.fileno(), 0, 0, POSIX_FADV_RANDOM)

fadvise 在其手册中的介绍翻译成中文大意是:

允许应用程序告知操作系统它会如何使用文件描述符,这样操作系统就能选用最合适的读取和缓存策略来访问相应的文件

fadvise 有多个选项:

  • FADV_NORMAL :不需要特殊对待
  • FADV_RANDOM : 期望页面以随机访问进行
  • FADV_SEQUENTIAL : 期望页面访问以顺序访问进行
  • FADV_WILLNEED : 期望在近期再次访问
  • FADV_DONTNEED : 不期望在近期再次访问
  • FADV_NOREUSE : 只会访问数据一次

关于fadvise的更多信息,请参见 man fadvise