TL;DR 本文从代码级别详细介绍了 Whisper 的实现和一些其中用到的编程技巧
什么是Whisper
很多人熟悉著名的指标监控系统:Graphite
这里有一些来自Graphite官方的介绍,翻译成中文后大致是:
Graphite是一个运行在廉价硬件上的企业级的监控工具
Graphite做两件事:
- 存储数值化的时间序列数据
- 按需渲染数据图形
Whisper是Graphite核心组件之一。负责“存储数值化的时间序列数据”的两个部分:一个是负责接收网络数据的Carbon组件,另一个就是负责存储到磁盘的Whisper组件。
正式的说,Whisper是为Graphite项目定制的时间序列数据库(或者软件库),其本身也是可以单独作为或者集成为通用的时间数据库。
重要:
以下代码/结构分析基于whisper==0.9.10
,不同的版本可能会存在变动。
数据库结构总揽
综述
一个Whisper数据库,由单个文件构成。这个文件可以分成三个部分:Header,Archives,data。
每个部分都是C兼容的数据结构构成,在实现上whisper使用struct
库来实现pack和unpack。
每一个archive都对应着一个precision不同的存储区域。
Header
Header由四个字段构成:aggregationType, maxRetention, xff, archiveCount
aggregationType
数据类型:Long int (aka ‘L’ in struct format), 用来控制高精度向低精度聚合时采用的策略(算法) 具体策略如下:
aggregationTypeToMethod = dict({
1: 'average',
2: 'sum',
3: 'last',
4: 'max',
5: 'min',
6: 'avg_zero'
})
maxRetention
数据类型:Long int, 该数据库能够存储的最大时间长度(单位秒)
xff
全称: xFilesFactor, 数据类型:Float
(aka ‘f’ in struct format), 当higher precision向lower precision聚合时,如果有效数据低于这个threshold,那么聚合后的结果将设置成None
。
archiveCount
数据类型:Long int, 描述archive的数量
Archives
综述
whisper在创建数据库文件时,关于Archives做了如下检查:
- 至少有个archive
- Archives的精度在顺序上必须严格递减,不能精度相同
- Arhcives的精度上必须是整数关系,高精度必须是低精度的整数倍
- Archives的retention必须严格递增,不能相同
- 高精度的archive必须有足够的点保证至少完成一次Consolidation
举例:
- Higher: 1s/20
- Lower: 60s/1
满足前四点,但不满足最后一个条件
具体检查代码如下:
if not archiveList:
raise InvalidConfiguration("You must specify at least one archive configuration!")
archiveList.sort(key=lambda a: a[0]) # Sort by precision (secondsPerPoint)
for i, archive in enumerate(archiveList):
if i == len(archiveList) - 1:
break
nextArchive = archiveList[i + 1]
if not archive[0] < nextArchive[0]:
raise InvalidConfiguration("A Whisper database may not be configured having "
"two archives with the same precision (archive%d: %s, archive%d: %s)" %
(i, archive, i + 1, nextArchive))
if nextArchive[0] % archive[0] != 0:
raise InvalidConfiguration("Higher precision archives' precision "
"must evenly divide all lower precision archives' precision "
"(archive%d: %s, archive%d: %s)" %
(i, archive[0], i + 1, nextArchive[0]))
retention = archive[0] * archive[1]
nextRetention = nextArchive[0] * nextArchive[1]
if not nextRetention > retention:
raise InvalidConfiguration("Lower precision archives must cover "
"larger time intervals than higher precision archives "
"(archive%d: %s seconds, archive%d: %s seconds)" %
(i, retention, i + 1, nextRetention))
archivePoints = archive[1]
pointsPerConsolidation = nextArchive[0] // archive[0]
if not archivePoints >= pointsPerConsolidation:
raise InvalidConfiguration("Each archive must have at least enough points "
"to consolidate to the next archive (archive%d consolidates %d of "
"archive%d's points but it has only %d total points)" %
(i + 1, pointsPerConsolidation, i, archivePoints))
结构
Archives对应着不同精度的存储实现,其有三个部分组成:offset
, secondsPerPoint
, points
offset
数据类型:Long int,offset指示相应的data区域在这个文件中的offset
secondsPerPoint
数据类型:Long int, 表示每个点所代表的采样时长,e.g. archive的精度/precision,显然最高精度只能是1秒一次
points
数据类型:Long int, 表示数据点的数量
衍生指标
这些指标本身不存在于文件中,由其他指标计算得到
####### retention
'retention': secondsPerPoint * points
表示这个archive的保存时长,单位秒
####### size
'size': points * pointSize
表示data部分所占据的字节长度
Data
这个部分表示具体的数据点,数据点线性排列在文件中,每个数据点有两个部分构成:Interval, data
Interval
数据类型:Long int,表示时间戳(从UNIX纪元 (aka 1970-01-01 00:00 UTC) 开始的秒数)
data
数据类型:double (aka ’d’ in struct format),表示具体的metric数值
总结
ASCII art 图表
+-------------------------------------------------------------------------+
|AT|MR|xff|AC|offset|SPP|points| ... |Interval|data| ... |
+-------------------------------------------------------------------------+
| | Archive One | ... | Point One | ... |
+-------------------------------------------------------------------------+
| Header | Archives | Data |
+-------------------------------------------------------------------------+
| Whisper file |
+-------------------------------------------------------------------------+
AT: aggregationType
MR: maxRetention
AC: archiveCount
SPP: secondsPerPoint
创建数据库
参数
关键参数:
- path 数据库文件的路径
- archiveList
- xFilesFactor=None
- aggregationMethod=None
archiveList
# Validate archive configurations...
validateArchiveList(archiveList)
检查条件参见 inline page
写入Header
aggregationType = struct.pack(longFormat, aggregationMethodToType.get(aggregationMethod, 1))
oldest = max([secondsPerPoint * points for secondsPerPoint, points in archiveList])
maxRetention = struct.pack(longFormat, oldest)
xFilesFactor = struct.pack(floatFormat, float(xFilesFactor))
archiveCount = struct.pack(longFormat, len(archiveList))
packedMetadata = aggregationType + maxRetention + xFilesFactor + archiveCount
fh.write(packedMetadata)
写入ArchiveList
其中比较重要的是offset的计算
headerSize = metadataSize + (archiveInfoSize * len(archiveList))
archiveOffsetPointer = headerSize
for secondsPerPoint, points in archiveList:
archiveInfo = struct.pack(archiveInfoFormat, archiveOffsetPointer, secondsPerPoint, points)
fh.write(archiveInfo)
archiveOffsetPointer += (points * pointSize)
Data区域填充 \x00
if CAN_FALLOCATE and useFallocate:
remaining = archiveOffsetPointer - headerSize
fallocate(fh, headerSize, remaining)
elif sparse:
fh.seek(archiveOffsetPointer - 1)
fh.write(b'\x00')
else:
remaining = archiveOffsetPointer - headerSize
chunksize = 16384
zeroes = b'\x00' * chunksize
while remaining > chunksize:
fh.write(zeroes)
remaining -= chunksize
fh.write(zeroes[:remaining])
这里的存在几种优化的 IO方法
fallocate
这个一个Linux独有的系统调用,函数原型是int fallocate(int fd, int mode, off_t offset, off_t len);
。该函数允许调用者直接分配文件中范围在offset
到len
的区段的磁盘空间,速度比写入文件分配的更快。
更多信息,请参见 fallocate手册
sparse file
文件并没有真正分配在磁盘上,而是记录该文件有这个尺寸,等到真正写入时,才会分配磁盘空间,因此这个文件时稀疏的。优点是创建时非常快,但真正写入时存在磁盘碎片的可能,导致写入和读取比普通方式更慢。
更多信息,请参见 稀疏文件的维基百科
write by chunk
系统底层的磁盘时按照扇区工作的,如果写入数据和扇区大小一致,那么就会减少不必要的调整时间。现代磁盘的扇区大小通常为4K,因此按照4K或者4K的整数倍写入都可以获得性能提升。
更多信息,请参见 磁盘扇区的维基百科
实现方面的陷阱
代码的实现部分有一个陷阱:
validateArchiveList(archiveList)
在函数内对 archiveList
做了排序 archiveList.sort(key=lambda a: a[0])
, 如果不阅读内部代码,容易让人丢失重要的细节
查询数据库
根据最大Retention,修正查询时间范围
if now is None:
now = int(time.time())
if untilTime is None:
untilTime = now
fromTime = int(fromTime)
untilTime = int(untilTime)
# Here we try and be flexible and return as much data as we can.
# If the range of data is from too far in the past or fully in the future, we
# return nothing
if fromTime > untilTime:
raise InvalidTimeInterval("Invalid time interval: from time '%s' is after until time '%s'" % (fromTime, untilTime))
oldestTime = now - header['maxRetention']
# Range is in the future
if fromTime > now:
return None
# Range is beyond retention
if untilTime < oldestTime:
return None
# Range requested is partially beyond retention, adjust
if fromTime < oldestTime:
fromTime = oldestTime
# Range is partially in the future, adjust
if untilTime > now:
untilTime = now
查找能够覆盖查询范围的最高精度的archive
diff = now - fromTime
for archive in header['archives']:
if archive['retention'] >= diff:
break
时间范围对齐到Interval
fromInterval = int(fromTime - (fromTime % archive['secondsPerPoint'])) + archive['secondsPerPoint']
untilInterval = int(untilTime - (untilTime % archive['secondsPerPoint'])) + archive['secondsPerPoint']
概括说来,总是寻找和时间点最接近的下一个Interval
特别的,相同时间的开始和结束的查询范围被调整成为总是包含下一个step:
if fromInterval == untilInterval:
# Zero-length time range: always include the next point
untilInterval += archive['secondsPerPoint']
计算offset
# Determine fromOffset
timeDistance = fromInterval - baseInterval
pointDistance = timeDistance // archive['secondsPerPoint']
byteDistance = pointDistance * pointSize
fromOffset = archive['offset'] + (byteDistance % archive['size'])
# Determine untilOffset
timeDistance = untilInterval - baseInterval
pointDistance = timeDistance // archive['secondsPerPoint']
byteDistance = pointDistance * pointSize
untilOffset = archive['offset'] + (byteDistance % archive['size'])
python %
运算 的一个trick
byteDistance % archive['size']
能够达到wrap的效果,等价于
if byteDistance >= 0:
return byteDistance
else:
return archive['size'] + byteDistance
原因是python的求余运算 %
的特性
print(-3 % 5)
# 输出 2
print(3 % 5)
# 输出 3
读取数据
# Now we unpack the series data we just read (anything faster than unpack?)
byteOrder, pointTypes = pointFormat[0], pointFormat[1:]
points = len(seriesString) // pointSize
seriesFormat = byteOrder + (pointTypes * points)
unpackedSeries = struct.unpack(seriesFormat, seriesString)
# And finally we construct a list of values (optimize this!)
valueList = [None] * points # Pre-allocate entire list for speed
currentInterval = fromInterval
step = archive['secondsPerPoint']
for i in xrange(0, len(unpackedSeries), 2):
pointTime = unpackedSeries[i]
if pointTime == currentInterval:
pointValue = unpackedSeries[i + 1]
valueList[i // 2] = pointValue # In-place reassignment is faster than append()
currentInterval += step
timeInfo = (fromInterval, untilInterval, step)
return (timeInfo, valueList)
其中比较重要的信息是,读取数据时会判断时间戳(Interval)是否等于期望的时间戳,如果不相同就认为没有值,设为None
, 这是一种重要的行为,这样写的时候就可以离散写,不用连续写数据,结果的正确性得到保障。
数据库更新
Whisper数据库本身虽然支持单个数据点更新API update()
也支持多个数据点更新API update_many()
,但是在carbon的使用中,是使用多数据点更新的API,两者实现上类似,只是后者批量更新,IO效率更高,本文将讨论 update_many()
参数
- path 代表文件路径
- points is a list of (timestamp,value) points
排序
将列表中的点按照时间戳从大到小降序排列,完成后较新的数据点在前面
points = [(int(t), float(v)) for (t, v) in points]
points.sort(key=lambda p: p[0], reverse=True) # Order points by timestamp, newest first
读取文件头
读取文件头返回数据结构
info = {
'aggregationMethod': aggregationTypeToMethod.get(aggregationType, 'average'),
'maxRetention': maxRetention,
'xFilesFactor': xff,
'archives': archives,
}
将数据点按照Archive’s Retention分组进行写入
for point in points:
age = now - point[0]
while currentArchive['retention'] < age: # We can't fit any more points in this archive
if currentPoints: # Commit all the points we've found that it can fit
currentPoints.reverse() # Put points in chronological order
__archive_update_many(fh, header, currentArchive, currentPoints)
currentPoints = []
try:
currentArchive = next(archives)
except StopIteration:
currentArchive = None
break
if not currentArchive:
break # Drop remaining points that don't fit in the database
currentPoints.append(point)
if currentArchive and currentPoints: # Don't forget to commit after we've checked all the archives
currentPoints.reverse()
__archive_update_many(fh, header, currentArchive, currentPoints)
值得注意的是数据在传入具体函数写入时,Points都做了order reverse,变成了oldest数据在最前面
PS:这块代码不容易理解
分组写入操作
具体实现函数 __archive_update_many(fh, header, archive, points)
数据点对齐
step = archive['secondsPerPoint']
alignedPoints = [(timestamp - (timestamp % step), value)
for (timestamp, value) in points]
数据点按照连续性分组
# Create a packed string for each contiguous sequence of points
packedStrings = []
previousInterval = None
currentString = b""
lenAlignedPoints = len(alignedPoints)
for i in xrange(0, lenAlignedPoints):
# Take last point in run of points with duplicate intervals
if i + 1 < lenAlignedPoints and alignedPoints[i][0] == alignedPoints[i + 1][0]:
continue
(interval, value) = alignedPoints[i]
# 如果是开头或者时间点是连续的
if (not previousInterval) or (interval == previousInterval + step):
currentString += struct.pack(pointFormat, interval, value)
previousInterval = interval
else: # 如果时间点断开了
numberOfPoints = len(currentString) // pointSize
startInterval = previousInterval - (step * (numberOfPoints - 1))
packedStrings.append((startInterval, currentString))
currentString = struct.pack(pointFormat, interval, value)
previousInterval = interval
if currentString:
numberOfPoints = len(currentString) // pointSize
startInterval = previousInterval - (step * (numberOfPoints - 1))
packedStrings.append((startInterval, currentString))
其中需要注意的点:
对齐后的时间点做了去重复操作,只保留时间上最后一个点,这是很重要的特性
数据写入
# Read base point and determine where our writes will start
fh.seek(archive['offset'])
packedBasePoint = fh.read(pointSize)
(baseInterval, baseValue) = struct.unpack(pointFormat, packedBasePoint)
if baseInterval == 0: # This file's first update
baseInterval = packedStrings[0][0] # Use our first string as the base, so we start at the start
# Write all of our packed strings in locations determined by the baseInterval
for (interval, packedString) in packedStrings:
timeDistance = interval - baseInterval
pointDistance = timeDistance // step
byteDistance = pointDistance * pointSize
myOffset = archive['offset'] + (byteDistance % archive['size'])
fh.seek(myOffset)
archiveEnd = archive['offset'] + archive['size']
bytesBeyond = (myOffset + len(packedString)) - archiveEnd
if bytesBeyond > 0:
fh.write(packedString[:-bytesBeyond])
assert fh.tell() == archiveEnd, "archiveEnd=%d fh.tell=%d bytesBeyond=%d len(packedString)=%d" % (
archiveEnd, fh.tell(), bytesBeyond, len(packedString))
fh.seek(archive['offset'])
fh.write(
packedString[-bytesBeyond:]) # Safe because it can't exceed the archive (retention checking logic above)
else:
fh.write(packedString)
注意:这里的文件写入有warp的现象
聚合到下一级Archive
# Now we propagate the updates to lower-precision archives
higher = archive
lowerArchives = [arc for arc in header['archives'] if arc['secondsPerPoint'] > archive['secondsPerPoint']]
for lower in lowerArchives:
fit = lambda i: i - (i % lower['secondsPerPoint'])
lowerIntervals = [fit(p[0]) for p in alignedPoints]
uniqueLowerIntervals = set(lowerIntervals)
propagateFurther = False
for interval in uniqueLowerIntervals:
if __propagate(fh, header, interval, higher, lower):
propagateFurther = True
if not propagateFurther:
break
higher = lower
单点聚合
def __propagate(fh, header, timestamp, higher, lower):
aggregationMethod = header['aggregationMethod']
xff = header['xFilesFactor']
lowerIntervalStart = timestamp - (timestamp % lower['secondsPerPoint'])
lowerIntervalEnd = lowerIntervalStart + lower['secondsPerPoint']
fh.seek(higher['offset'])
packedPoint = fh.read(pointSize)
(higherBaseInterval, higherBaseValue) = struct.unpack(pointFormat, packedPoint)
if higherBaseInterval == 0:
higherFirstOffset = higher['offset']
else:
timeDistance = lowerIntervalStart - higherBaseInterval
pointDistance = timeDistance // higher['secondsPerPoint']
byteDistance = pointDistance * pointSize
higherFirstOffset = higher['offset'] + (byteDistance % higher['size'])
higherPoints = lower['secondsPerPoint'] // higher['secondsPerPoint']
higherSize = higherPoints * pointSize
relativeFirstOffset = higherFirstOffset - higher['offset']
relativeLastOffset = (relativeFirstOffset + higherSize) % higher['size']
higherLastOffset = relativeLastOffset + higher['offset']
fh.seek(higherFirstOffset)
if higherFirstOffset < higherLastOffset: # We don't wrap the archive
seriesString = fh.read(higherLastOffset - higherFirstOffset)
else: # We do wrap the archive
higherEnd = higher['offset'] + higher['size']
seriesString = fh.read(higherEnd - higherFirstOffset)
fh.seek(higher['offset'])
seriesString += fh.read(higherLastOffset - higher['offset'])
# Now we unpack the series data we just read
byteOrder, pointTypes = pointFormat[0], pointFormat[1:]
points = len(seriesString) // pointSize
seriesFormat = byteOrder + (pointTypes * points)
unpackedSeries = struct.unpack(seriesFormat, seriesString)
# And finally we construct a list of values
neighborValues = [None] * points
currentInterval = lowerIntervalStart
step = higher['secondsPerPoint']
for i in xrange(0, len(unpackedSeries), 2):
pointTime = unpackedSeries[i]
if pointTime == currentInterval:
neighborValues[i // 2] = unpackedSeries[i + 1]
currentInterval += step
# Propagate aggregateValue to propagate from neighborValues if we have enough known points
knownValues = [v for v in neighborValues if v is not None]
if not knownValues:
return False
knownPercent = float(len(knownValues)) / float(len(neighborValues))
if knownPercent >= xff: # We have enough data to propagate a value!
aggregateValue = aggregate(aggregationMethod, knownValues, neighborValues)
myPackedPoint = struct.pack(pointFormat, lowerIntervalStart, aggregateValue)
fh.seek(lower['offset'])
packedPoint = fh.read(pointSize)
(lowerBaseInterval, lowerBaseValue) = struct.unpack(pointFormat, packedPoint)
if lowerBaseInterval == 0: # First propagated update to this lower archive
fh.seek(lower['offset'])
fh.write(myPackedPoint)
else: # Not our first propagated update to this lower archive
timeDistance = lowerIntervalStart - lowerBaseInterval
pointDistance = timeDistance // lower['secondsPerPoint']
byteDistance = pointDistance * pointSize
lowerOffset = lower['offset'] + (byteDistance % lower['size'])
fh.seek(lowerOffset)
fh.write(myPackedPoint)
return True
else:
return False
如果上一个精度的aggrigation的xff过低导致聚合失败,那么后续级别的aggrigation就会取消
Trick
在打开文件时,Whisper 使用了 Linux 上的 fadvise
来建议操作系统对文件访问进行某个策略的优化。
if CAN_FADVISE and FADVISE_RANDOM:
posix_fadvise(fh.fileno(), 0, 0, POSIX_FADV_RANDOM)
fadvise
在其手册中的介绍翻译成中文大意是:
允许应用程序告知操作系统它会如何使用文件描述符,这样操作系统就能选用最合适的读取和缓存策略来访问相应的文件
fadvise 有多个选项:
FADV_NORMAL
:不需要特殊对待FADV_RANDOM
: 期望页面以随机访问进行FADV_SEQUENTIAL
: 期望页面访问以顺序访问进行FADV_WILLNEED
: 期望在近期再次访问FADV_DONTNEED
: 不期望在近期再次访问FADV_NOREUSE
: 只会访问数据一次
关于fadvise
的更多信息,请参见 man fadvise