通知设置 新通知
Druid中Segements保留和自动删除规则配置
空心菜 发表了文章 0 个评论 12673 次浏览 2016-10-20 22:43
经测试发现:
DeepStorage里所有的segements都需要在Historical节点中有一份。其实这样说是不严格的,有时候我们需要DeepStorage里所有的segements(或者某类datasource)在Historical节点中有一份或者n份。这样做的好处是,提高数据查询效率,那么这个n在哪里配置呢?
原来是在druid_rules表里面配置,默认情况下,druid_rules表里面只有一条数据,其中payload字段默认值如下:
[{"tieredReplicants":{"_default_tier":2},"type":"loadForever"}]
意思是 保证deepstorage里面的数据,在Historical节点集群存在两份,即副本为2,这两份数据一定保存在不同的Historical服务器。如果只有一台Historical服务器,那么则只会有一份数据,如果你添加一台Historical服务器,则就会在新的节点复制一份数据。
如果想修改默认的副本数,不需要数据备份,进行如下操作就好:
update druid_rules set payload='[{"tieredReplicants":{"_default_tier":1},"type":"loadForever"}]' where id="_default_2016-09-23T08:50:09.457Z";
只需把_default_tier的值改为1即可,id得看druid_rules表中的具体值。segment执行过程如下:
- 聚合任务生成segment
- 将segment push到Deep Storage
- Historical节点从 DeepStorage加载segment
- segment加载成功后,调用回调方法结束任务
所以,如果Historical节点硬盘上缓存的segment占满磁盘空间,任务会一直挂起, 最后任务数量达到MiddleManager节点的容量,导致任务排队。
那么现实业务中,如果DeepStorage里所有的segments 都需要在Historical节点中有一份,会非常 浪费空间,浪费空间就是浪费金钱。
很不能理解这种方式,并且我们对DeepStorage节点和Historical节点之间的关系一直都是这样理解的,当查询的数据不在Historical节点的时候,才会从DeepStorage加载。但是,现实是残酷的,现实不是这样的。
如何解决这个问题呢?这时我们就需要用到druid的数据保留和自动删除规则配置。 通过这个配置,我们可以为每个datasource配置一个据保留和自动删除规则。
这个配置可以通过druid提供的HTTP接口配置,也可以通过Coordinator界面配置,如下:
此配置的意思是: 我们为agentToic-1m设置了两个rule,第一个rule的意思是(Load-Period-P30D)保 留最近30天的数据。第二个rule的意思是(Drop-Forever)删除所有的数据。另外还需要填写,修改 配置的作者和注释。最后Save all rules。 通过如下界面查看,配置是否成功,或者通过查看MetadataStorage的druid_rules表查看配置是否成功。
最后,结合下图,观察左侧segment列表是否会发生变化(shareds的数量和intergvals的数量)。
经过验证,左侧列表只会展示最近30天的数据,通过查看MetadataStorage的druid_segments表, 发现30天以前的数据都被假删了,即used字段设置为了0,表示配置成功。
注意:
如果druid_segments表中的某条数据used字段为0,即此条数据对应的segment不再支持可查,同 时会再Historical节点删除。 如果上面的P30D改为P1M,意思是1个月,这个月不是自然月,而是最近30天的意思。 如果按照如上的方式设置了,再修改第一条配置规则,改为P50D,那么数据是不能恢复的,即还 是只会保留最近30天的数据。 一个笨的恢复数据方法是,可以通过修改MetadataStorage的druid_segments表中的used字段来恢复数据。
论坛:
If you configure a per datasource rule that drops data for the current month, and there i具体内容,参考官网:http://druid.io/docs/latest/operations/rule-configuration.html
s a default rule where everything is loaded, then yes, data for the current month is dropp ed and all older data is loaded. If you instead configure a load rule for the current month followed by a drop rule for everything else, then the current month of data is kept, and
all older data is dropped.
任务分配策略,会让Geek小A 明天写!
Python的数据序列化「Json & Pickle」
空心菜 发表了文章 0 个评论 3126 次浏览 2016-10-20 00:06
为什么需要数据序列化,我认为有如下两种原因:
一个原因是将对象(一切皆对象)的状态保持在存储媒介(硬盘、网盘......)中,以便可以在以后重新创建精确的副本,相当于镜像的概念,比如我们平时利用VMware虚拟机中的挂起功能,这个挂起功能就是利用数据的序列化,把虚拟机当前的状态序列化保存在本地磁盘的文件中,然后恢复的时候只需反序列化,把状态恢复即可。
另一个原因是通过值将对象从一个应用程序域发送到另一个应用程序域中。例如,你利用Python监控采集程序采集到的数据想传送给Zabbix处理。当两个进程在进行远程通信时,彼此可以发送各种类型的数据。无论是何种类型的数据,都会以二进制序列的形式在网络上传送。发送方需要把这个对象转换为字节序列,才能在网络上传送;接收方则需要把字节序列再恢复为对象。
序列化和反序列化:
- 序列化: 将数据结构或对象转换成二进制串的过程。
- 反序列化:将在序列化过程中所生成的二进制串转换成数据结构或者对象的过程。
Json:用于字符串和 python数据类型间进行转换;Json模块提供了四个功能:dumps、dump、loads、load[list=1]Json Module
#!/usr/bin/env python3# _*_coding:utf-8_*_# Author: Lucky.chenimport jsoninfo = {'1MinLoad': 5, 'MemUse': '5G', 'DiskUse': '80G'}print('dumps 操作之前数据类型: %s' % type(info))JsonInfo = json.dumps(info)print(JsonInfo)# dumps 将数据通过特殊的形式转换为所有程序语言都识别的字符串print('dumps 操作之后数据类型: %s' % type(JsonInfo))# loads 将字符串通过特殊的形式转为python是数据类型 (将字符串转为字典)NewInfo = json.loads(JsonInfo)print('loads 操作之后数据类型为: %s' % type(NewInfo))print('分割线'.center(50, '-'))# dump 将数据通过特殊的形式转换为所有语言都识别的字符串并写入文件with open('SystemInfo.txt', 'w') as f: json.dump(info, f) print('dump file end!!')# load 从文件读取字符串并转换为python的数据类型with open('SystemInfo.txt', 'r') as f: LoadInfo = json.load(f) print('load file end, data type is %s' % type(LoadInfo), LoadInfo)结果如下:
dumps 操作之前数据类型:一个错误案例如下:{"MemUse": "5G", "DiskUse": "80G", "1MinLoad": 5}dumps 操作之后数据类型: loads 操作之后数据类型为: -----------------------分割线------------------------dump file end!!load file end, data type is {'MemUse': '5G', '1MinLoad': 5, 'DiskUse': '80G'}
#!/usr/bin/env python3# _*_coding:utf-8_*_# Author: Lucky.chenimport jsondef test(): print('Test Func')info = {'Name': 'crh', 'age': 18, 'Func': test}json.dumps(info)结果:
raise TypeError(repr(o) + " is not JSON serializable")TypeError:如上可知函数不能被json序列化。is not JSON serializable
pickle,用于python特有的类型 和 python的数据类型间进行转换Pickle模块同样提供了四个功能:dumps、dump、loads、load[list=1]Pickle Module
Pickle可以序列化一些较复杂的数据,和json的区别在于pickle序列化的时候,存放的是二进制的文件,所以打开一个文件的时候,我们要以二进制的格式打开。
实例如下:
#!/usr/bin/env python3结果如下:
# _*_coding:utf-8_*_
# Author: Lucky.chen
import pickle
def test(name):
print('%s write Test Func' % name)
info = {'Name': 'crh', 'age': 18, 'Func': test}
print('dumps 之前数据的类型为: %s' % type(info))
# pickle.dumps 将数据通过特殊的形式转换为只有python语言认识bytes类型(Python2.*中是字符串类型)
NewInfo = pickle.dumps(info)
print('dumps result is %s, data type is %s' % (NewInfo, type(NewInfo)))
# pickle.loads 将bytes通过特殊的形式转为python是数据类型
LoadInfo = pickle.loads(NewInfo)
print('loads result is %s, data type is %s' % (LoadInfo, type(LoadInfo)))
LoadInfo['Func']('crh')
print('分割线'.center(50, '-'))
# pickle.dump 将数据通过特殊的形式转换为只有python语言认识的字符串,并写入文件
with open('pickle.rb', 'wb') as f:
pickle.dump(info, f)
# pickle.load 从文件读取只有python语言认识的字符串并转换为python的数据类型
with open('pickle.rb', 'rb') as f:
Info = pickle.load(f)
print(Info, 'type is %s' % type(Info))
dumps 之前数据的类型为:
dumps result is b'\x80\x03}q\x00(X\x03\x00\x00\x00ageq\x01K\x12X\x04\x00\x00\x00Nameq\x02X\x03\x00\x00\x00crhq\x03X\x04\x00\x00\x00Funcq\x04c__main__\ntest\nq\x05u.', data type is
loads result is {'age': 18, 'Name': 'crh', 'Func':}, data type is
crh write Test Func
-----------------------分割线------------------------
{'age': 18, 'Name': 'crh', 'Func':} type is
总结
很多情况下不同的程序之间传送数据我们一般通过文件的方式,但是这个方法是最原始的,而dumps可以直接让数据格式化传送给对方,但是不是所有的程序都是python的,所以只利用pickle是不现实的,比如一个python的程序需要发送一段数据给一个java程序开发的应用,这时候很多内存数据的交换,就得用json了。
并且josn能dump的结果更可读,那么有人就问了,那还用pickle做什么不直接用josn,是这样的josn只能把常用的数据类型序列化(列表、字典、列表、字符串、数字、),比如日期格式、类对象!josn就不行了。
为什么他不能序列化上面的东西呢?因为josn是跨语言的!注定了它只能规范出一些通用的数据类型的格式,统一标准。
Elasticsearch应用在数据中心的实时协议分析和安全威胁检测
小白菜 发表了文章 0 个评论 3999 次浏览 2016-10-17 21:08
数据中心面临的挑战
- 被DDOS攻击: 网络瘫痪,大面积影响业务
- 植入后门发包: 占用带宽资源,消耗成本
- 运营“黑盒子”: 无法分辨“好人”、“坏人”
- 监控粒度粗: 无法及时响应并定位事件
早期解决方案
- Cacti 利用SNMP监控交换 机出入口流量
- 交换机推送Sflow流量采样 数据,使用Solarwids监控
- 遇到DDOS时,使用手动 Sniffer抓包分析
- 消耗路由器CPU资源
- 100-1000:1采样比,监测粒度粗
- 业务和应用识别依赖端口号,无法识别日新月异 的业务类型
如何用数据驱动IDC运营
NSM架构设计
第二期改造后
10G下的NSM :
实际效果展示
NSM架构解析
实时协议分析:Bro日志类型
Flow: 数据格式
实时安全威胁检测引擎
Suricata Today
实时流量 +ELK + VirusTotal
构建10G+ NSM的几个关键点
1、抓包网卡
2、内核优化
3、驱动与rss
4、PF-Ring_zc
5、ntop、nprobe、ndpi
6、跨数据中心es
流量抓包与网卡
ELK部分的关键点
1、用Logstash Kafka input接收数据
2、数据量大,处理结构复杂时:
预设Kafka分区
开启多个Logstash实例,分别读取Kafka分区数据
分别写入不同es节点
3、多集群互联
跨数据中心es集群
10G NSM平台样例
万兆 实时 安全大数据架构
作者:张磊@Zooboa
Name node is in safe mode解决
Nock 发表了文章 0 个评论 3172 次浏览 2016-10-16 21:05
这是因为在分布式文件系统启动的时候,开始的时候会有安全模式,当分布式文件系统处于安全模式的情况下,文件系统中的内容不允许修改也不允许删除,直到安全模式结束。安全模式主要是为了系统启动的时候检查各个DataNode上数据块的有效性,同时根据策略必要的复制或者删除部分数据块。运行期通过命令也可以进入安全模式。在实践过程中,系统启动的时候去修改和删除文件也会有安全模式不允许修改的出错提示,只需要等待一会儿即可。
可以通过以下命令来手动离开安全模式:
bin/hadoop dfsadmin -safemode leave用户可以通过dfsadmin -safemode value 来操作安全模式,参数value的说明如下:
enter - 进入安全模式
leave - 强制NameNode离开安全模式
get - 返回安全模式是否开启的信息
wait - 等待,一直到安全模式结束。
Elasticsearch备份和恢复
空心菜 发表了文章 1 个评论 6284 次浏览 2016-10-15 13:04
备份
备份数据之前,要创建一个仓库来保存数据,仓库的类型支持共享文件系统、Amazon S3、 HDFS和Azure Cloud。
Elasticsearch的一大特点就是使用简单,api也比较强大,备份也不例外。简单来说,备份分两步:
- 创建一个仓库
- 备份指定索引
一、创建存储仓库
共享文件系统实例如下:
curl -XPUT http://127.0.0.1:9200/_snapshot/EsBackup上面代码解释:
{
"type": "fs",
"settings": {
"location": "/mount/EsDataBackupDir"
}
}
- 创建了一个名为EsBackup的存仓库
- 指定的备份方式为共享文件系统(type: fs)
- 指定共享存储的具体路径(location参数)
注意:共享存储路径,必须是所有的ES节点都可以访问的,最简单的就是nfs系统,然后每个节点都需要挂载到本地。
如上所示,创建存储仓库的时候,除了可以指定location参数以外,我们还可以指点max_snapshot_bytes_per_sec和max_restore_bytes_per_sec参数来限制备份和恢复时的速度,默认值都是20mb/s,假设我们有一个非常快的网络环境,我们可以增大默认值:
curl -XPOST http://127.0.0.1:9200/_snapshot/EsBackup注意:这是在第一段代码的基础上来增加配置,第一段代码利用的是PUT请求来创建存储库,这段代码则是利用POST请求来更新已经存在的存储库的settings配置。
{
"type": "fs",
"settings": {
"location": "/mount/EsDataBackupDir"
"max_snapshot_bytes_per_sec" : "50mb",
"max_restore_bytes_per_sec" : "50mb"
}
}
Amazon S3存储库实例如下:
curl -XPUT 'http://localhost:9200/_snapshot/s3-backup' -d '{参数名词解释:
"type": "s3",
"settings": {
"bucket": "esbackup",
"region": "cn-north-1",
"access_key": "xxooxxooxxoo",
"secret_key": "xxxxxxxxxooooooooooooyyyyyyyyy"
}
}'
- Type: 仓库类型
- Setting: 仓库的额外信息
- Region: AWS Region
- Access_key: 访问秘钥
- Secret_key: 私有访问秘钥
- Bucket: 存储桶名称
不同的ES版本支持的region参考:https://github.com/elastic/elasticsearch-cloud-aws#aws-cloud-plugin-for-elasticsearch
使用上面的命令,创建一个仓库(s3-backup),并且还创建了存储桶(esbackup),返回{"acknowledged":true} 信息证明创建成功。
确认存储桶是否创建成功:curl -XPOST http://localhost:9200/_snapshot/s3-backup/_verify
查看刚创建的存储桶:curl -XGET localhost:9200/_snapshot/s3-backup?pretty
查看所有的存储桶:curl -XGET localhost:9200/_snapshot/_all?pretty
删除一个快照存储桶:curl -XDELETE localhost:9200/_snapshot/s3-backup?pretty
二、备份索引
创建好存储仓库之后就可以开始备份了。一个仓库可以包含多个快照(snapshots),快照可以存所有的索引或者部分索引,当然也可以存储一个单独的索引。(要注意的一点就是快照只会备份open状态的索引,close状态的不会备份)
备份所有索引
curl -XPUT http://127.0.0.1:9200/_snapshot/EsBackup/snapshot_all上面的代码会将所有正在运行的open状态的索引,备份到EsBacup仓库下一个叫snapshot_all的快照中。上面的api会立刻返回{"accepted":true},然后备份工作在后台运行。如果你想api同步执行,可以加wait_for_completion 标志:
curl -XPUT http://127.0.0.1:9200/_snapshot/EsBackup/snapshot_all?wait_for_completion=true上面的方法会在备份完全完成后才返回,如果快照数据量大的话,会花很长时间。
备份部分索引
默认是备份所有open状态的索引,如果你想只备份某些或者某个索引,可以指定indices参数来完成:
curl -XPUT 'http://localhost:9200/_snapshot/EsBackup/snapshot_12' -d '{ "indices": "index_1,index_2" }'
查看快照信息
查看快照信息,只需要发起GET请求就好:
GET _snapshot/my_backup/snapshot_2这将返回关于快照snapshot_2的详细信息:
{查看所有快照信息如下:
"snapshots": [
{
"snapshot": "snapshot_2",
"indices": [
".marvel_2014_28_10",
"index1",
"index2"
],
"state": "SUCCESS",
"start_time": "2014-09-02T13:01:43.115Z",
"start_time_in_millis": 1409662903115,
"end_time": "2014-09-02T13:01:43.439Z",
"end_time_in_millis": 1409662903439,
"duration_in_millis": 324,
"failures": [],
"shards": {
"total": 10,
"failed": 0,
"successful": 10
}
}
]
}
GET http://127.0.0.1:9200/_snapshot/my_backup/_all另外还有个一api可以看到更加详细的信息:
GET http://127.0.0.1:9200/_snapshot/my_backup/snapshot_2/_status更多详细内容可以到官网查看-官方文档地址。
删除快照
DELETE _snapshot/my_backup/snapshot_2重要的是使用API来删除快照,而不是其他一些机制(如手工删除,或使用自动s3清理工具)。因为快照增量,它是可能的,许多快照依靠old seaments。删除API了解最近仍在使用的数据快照,并将只删除未使用的部分。如果你手动文件删除,但是,你有可能严重破坏你的备份,因为你删除数据仍在使用,如果备份正在后台进行,也可以直接删除来取消此次备份。
监控快照进展
wait_for_completion标志提供了一个基本形式的监控,但没有足够的快照恢复甚至中等大小的集群。
另外两个api会给你更细节的状态的快照。首先你可以执行一个快照ID,就像我们早些时候得到一个特定的快照信息:
GET _snapshot/my_backup/snapshot_3如果当你调用这个快照还在进步,你会看到信息的时候开始,已经运行多长时间,等等。但是请注意,这个API使用相同的threadpool快照机制。如果你是快照非常大的碎片,之间的时间状态更新可以相当大,因为API是争夺相同的threadpool资源。
这时候有个更好的选择_status的api接口:
GET _snapshot/my_backup/snapshot_3/_status_status API立即返回并给出一个更详细的输出的统计:
{快照当前运行将显示IN_PROGRESS作为其状态,这个特定的快照有一个碎片仍然转移(其他四个已经完成)。
"snapshots": [
{
"snapshot": "snapshot_3",
"repository": "my_backup",
"state": "IN_PROGRESS",
"shards_stats": {
"initializing": 0,
"started": 1,
"finalizing": 0,
"done": 4,
"failed": 0,
"total": 5
},
"stats": {
"number_of_files": 5,
"processed_files": 5,
"total_size_in_bytes": 1792,
"processed_size_in_bytes": 1792,
"start_time_in_millis": 1409663054859,
"time_in_millis": 64
},
"indices": {
"index_3": {
"shards_stats": {
"initializing": 0,
"started": 0,
"finalizing": 0,
"done": 5,
"failed": 0,
"total": 5
},
"stats": {
"number_of_files": 5,
"processed_files": 5,
"total_size_in_bytes": 1792,
"processed_size_in_bytes": 1792,
"start_time_in_millis": 1409663054859,
"time_in_millis": 64
},
"shards": {
"0": {
"stage": "DONE",
"stats": {
"number_of_files": 1,
"processed_files": 1,
"total_size_in_bytes": 514,
"processed_size_in_bytes": 514,
"start_time_in_millis": 1409663054862,
"time_in_millis": 22
}
},
...
响应包括总体状况的快照,但还深入每和每个实例统计数据。这给你一个令人难以置信的详细视图快照是如何进展的。碎片可以以不同的方式完成:
INITIALIZING: 集群的碎片是检查状态是否可以快照。这通常是非常快。
STARTED:数据被转移到存储库。
FINALIZING:数据传输完成;碎片现在发送快照的元数据。
DONE:快照完成。
FAILED:在快照过程中错误的出处,这碎片/索引/快照无法完成。检查你的日志以获取更多信息。
恢复
备份好后,恢复就更容易了,恢复snapshot_1里的全部索引:
POST http://127.0.0.1:9200/_snapshot/my_backup/snapshot_1/_restore这个api还有额外的参数:
POST http://127.0.0.1:9200/_snapshot/my_backup/snapshot_1/_restore参数indices 设置只恢复index_1索引,参数rename_pattern 和rename_replacement用来正则匹配要恢复的索引,并且重命名。和备份一样,api会立刻返回值,然后在后台执行恢复,使用wait_for_completion 标记强制同步执行。
{
"indices": "index_1",
"rename_pattern": "index_(.+)",
"rename_replacement": "restored_index_$1"
}
另外可以使用下面两个api查看状态:
GET http://127.0.0.1:9200/_recovery/restored_index_3如果要取消恢复过程(不管是已经恢复完,还是正在恢复),直接删除索引即可:
GET http://127.0.0.1:9200/_recovery/
DELETE http://127.0.0.1:9200/restored_index_3更多内容参考-官方文档。
参考官方文档地址:
1、https://www.elastic.co/guide/en/elasticsearch/guide/current/backing-up-your-cluster.html#_listing_information_about_snapshots
2、https://www.elastic.co/guide/en/elasticsearch/guide/current/_restoring_from_a_snapshot.html
Kafka性能测试
chris 发表了文章 0 个评论 11841 次浏览 2016-10-13 23:24
测试背景
Apache Kafka是一种高吞吐、可扩展的分布式消息队列服务,它最初由LinkedIn公司开发,最后发展为Apache基金会的一个项目。目前kafka已经广泛应用于大数据分析,消息处理等环境,官方文档介绍kafka为提高吞吐率做了很多设计,但是其性能究竟如何呢?本文对kafka在不同参数下的性能进行测试。
测试目标
测试kafka 0.8n的性能(Producer/Consumer性能)。当消息大小、批处理大小、压缩等参数变化时对吞吐率的影响。
测试环境
软件版本:kafka 0.8.1.1
硬件环境:3台多磁盘服务组成的kafka集群。各服务器CPU E5645,内存47G,12快SAS盘,配置千兆网卡,配置如下:
测试方法
使用kafka官方提供的kafa-perf工具做性能测试,在测试时使用ganglia,kafka Web Console来记录服务情况。
测试步骤
一、测试环境准备
1、测试工具kafka-perf编译
kafka官方提供的二进制版本,并不包括性能测试的jar包,会报错找不到ProducerPerformance,要自己重新编译,编译方法如下:
git clone https://git-wip-us.apache.org/repos/asf/kafka.git kafka.git
cd kafka.git/
git checkout -b 0.8.1 origin/0.8.1
vim build.gradle #编辑配置
./gradlew -PscalaVersion=2.10.4 perf:jar #生成2.10.4的kafka-perf的jar包,复制到libs目录下
cp perf/build/libs/kafka-perf_2.10-0.8.1.1-SNAPSHOT.jar /usr/local/kafka/libs/
2、启动kafka
cd /usr/local/kafka
vim config/server.properties #内容见下图
./bin/kafka-server-start.sh config/server.properties &
kafka-producer-perf-test.sh中参数说明:
messages 生产者发送走的消息数量bin/kafka-consumer-perf-test.sh中参数说明:
message-size 每条消息的大小
batch-size 每次批量发送消息的数量
topics 生产者发送的topic
threads 生产者
broker-list 安装kafka服务的机器ip:porta列表
producer-num-retries 一个消息失败发送重试次数
request-timeouts-ms 一个消息请求发送超时时间
zookeeper zk配置
messages 消费者消费消息的总数量
topic 消费者需要消费的topic
threads 消费者使用几个线程同时消费
group 消费者组名称
socket-buffer-sizes socket缓存大小
fetch-size 每次想kafka broker请求消费消息大小
consumer.timeout.ms 消费者去kafka broker拿一条消息的超时时间
二、测试生产者吞吐率
此项只测试producer在不同的batch-zie,patition等参数下的吞吐率,也就是数据只被及计划,没有consumer读取数据消费情况。
生成Topic:
生成不同复制因子,partition的topic
bin/kafka-topics.sh --zookeepr 10.x.x.x:2181/kafka/k1001 --create --topic test-pati1-rep1 --partitions 1 --replication-factor 1测试producer吞吐率
bin/kafka-topics.sh --zookeepr 10.x.x.x:2181/kafka/k1001 --create --topic test-pati1-rep2 --partitions 1 --replication-factor 2
bin/kafka-topics.sh --zookeepr 10.x.x.x:2181/kafka/k1001 --create --topic test-pati10-rep1 --partitions 10 --replication-factor 1
bin/kafka-topics.sh --zookeepr 10.x.x.x:2181/kafka/k1001 --create --topic test-pati10-rep2 --partitions 10 --replication-factor 2
bin/kafka-topics.sh --zookeepr 10.x.x.x:2181/kafka/k1001 --create --topic test-pati100-rep1 --partitions 100 --replication-factor 1
bin/kafka-topics.sh --zookeepr 10.x.x.x:2181/kafka/k1001 --create --topic test-pati100-rep2 --partitions 100 --replication-factor 2
调整batch-size,thread,topic,压缩等参数测试producer吞吐率。
示例:
a)批处理为1,线程数为1,partition为1,复制因子为1说明:消息大小统一使用和业务场景中日志大小相近的512Bype,消息数为50w或200w条。
bin/kafka-producer-perf-test.sh --messages 2000000 --message-size 512 --batch-size 1 --topic test-pati1-rep1
--partitions 1 --threads 1 --broker-list host1:9092,host2:9092,host3:9092
b)批处理为10,线程数为1,partition为1,复制因子为2
bin/kafka-producer-perf-test.sh --messages 2000000 --message-size 512 --batch-size 10 --topic test-pati1-rep2
--partitions 1 --threads 1 --broker-list host1:9092,host2:9092,host3:9092
c)批处理为100,线程数为10,partition为10,复制因子为2,不压缩,sync
bin/kafka-producer-perf-test.sh --messages 2000000 --message-size 512 --batch-size 100 --topic test-pati10-rep2 --partitions 10 --threads 10 --compression-codec 0 --sync 1 --broker-list host1:9092,host2:9092,host3:9092
d)批处理为100,线程数为10,partition为10,复制因子为2,gzip压缩,sync
bin/kafka-producer-perf-test.sh --messages 2000000 --message-size 512 --batch-size 100 --topic test-pati10-rep2 --partitions 10 --threads 10 --compression-codec 1 --sync 1 --broker-list host1:9092,host2:9092,host3:9092
三、测试消费者吞吐率
测试consumer吞吐率
调整批处理数,线程数,partition数,复制因子,压缩等进行测试。
示例:
a)批处理为10,线程数为10,partition为1,复制因子为1
./bin/kafka-consumer-perf-test.sh --messages 500000 --batch-size 10 --topic test-pai1-rep2 --partitions 1 --threads 10 --zookeeper zkhost:2181/kafka/k1001
b)批处理为10,线程数为10,partition为10,复制因子为2
./bin/kafka-consumer-perf-test.sh --messages 500000 --batch-size 10 --topic test-pai1-rep2 --partitions 10 --threads 10 --zookeeper zkhost:2181/kafka/k1001
c)批处理为100,线程数为10,partition为1,复制因子为1,不压缩
./bin/kafka-consumer-perf-test.sh --messages 500000 --batch-size 100 --topic test-pai100-rep1 --partitions 1 --threads 10 --compression-codec 0 --zookeeper zkhost:2181/kafka/k1001
d)批处理为100,线程数为10,partition为1,复制因子为1,Snappy压缩
./bin/kafka-consumer-perf-test.sh --messages 500000 --batch-size 100 --topic test-pai100-rep1 --partitions 1 --threads 10 --compression-codec 2 --zookeeper zkhost:2181/kafka/k1001
测试结果及分析
1、生产者测试结果及分析
调整线程数,批处理数,复制因子等参考,对producer吞吐率进行测试。在测试时消息大小为512Byte,消息数为200w,结果如下:
调整sync模式,压缩方式得到吞吐率数据如表3.在本次测试中msg=512Byte,message=2000000,Partition=10,batch_zie=100
结果分析:
1)kafka在批处理,多线程,不适用同步复制的情况下,吞吐率是比较高的,可以达80MB/s,消息数达17w条/s以上。
2)使用批处理或多线程对提升生产者吞吐率效果明显。
3)复制因子会对吞吐率产生较明显影响
使用同步复制时,复制因子会对吞吐率产生较明显的影响。复制因子为2比因子为1(即无复制)时,吞吐率下降40%左右。
4)使用sync方式,性能有明显下降。
使用Sync方式producer吞吐率会有明显下降,表3中async方式最大吞吐率由82.0MB/s,而使用sync方式时吞吐率只有13.33MB/s.
5)压缩与吞吐率
见图3,粉笔使用Gzip及Snappy方式压缩,吞吐率反而有下降,原因待分析。而Snappy方式吞吐率高于gzip方式。
6)分区数与吞吐率
分区数增加生产者吞吐率反而有所下降
2、消费者结果及分析
调整批处理数,分区数,复制因子等参数,对consumer吞吐率进行测试。在测试时消息大小为512Byte,消息数为200结果如下:
调整压缩方式,分区数,批处理数等,测试参数变化时consumer的吞吐率。测试的复制因子为1。
结果分析:1)kafka consumer吞吐率在parition,threads较大的情况下,在测试场景下,最大吞吐率达到了123MB/s,消息数为25w条/s
2)复制因子,影响较小。replication factor并不会影响consumer的吞吐率测试,运维consumer智慧从每个partition的leader读数据,而与replication factor无关。同样,consumer吞吐率也与同步复制还是异步复制无关。
3)线程数和partition与吞吐率关系
图5为msg_size=512,batch_zie=100时的测试数据备份。可以看到当分区数较大时,如partion为100时,增加thread数可显著提升consumer的吞吐率。Thread10较thread1提升了10倍左右,而thread为100时叫thread为1提升了近20倍,达到120MB/s.
但要注意在分区较大时线程数不改大于分区数,否则会出现No broker partitions consumed by consumer,对提升吞吐率也没有帮助。图5中partion为10时,thread_10,thread_100吞吐率相近都为35MB/s左右。
4)批处理数对吞吐率影响
图表5中可以看出改变批处理数对吞吐率影响不大
5)压缩与吞吐率
图表6当thread=10,复制因子=1不压缩,Gzip,Snappy时不同parititon时Concumser吞吐率。由上图可以看到此场景下,压缩对吞吐率影响小。
zookeeper客户端连接zookeeper服务器策略是什么?
空心菜 回复了问题 2 人关注 1 个回复 5082 次浏览 2016-10-11 12:29
OVM混合虚拟化设计目标及设计思路
maoliang 发表了文章 0 个评论 3145 次浏览 2016-09-29 09:42
OVM是要实现混合虚拟化,做一个大一统的资源管理和交付平台,纵观虚拟化市场,现在当属开源的KVM和Docker最火,我们工作过去有vmware,现在大量使用kvm,未来一定会考虑docker,但现有市场上的产品要么架构太大,开发难度高,易用性不够强,要么就是单纯的虚拟化,类似vmware等。
新形式下,我们需要的产品要同时具备如下目标:
跨平台,支持KVM、Esxi和Docker容器
因为我们单位过去采用虚拟化技术混杂,上面又遗留很多虚拟机,这就要求我们新开发产品能够兼容不同hypervior,同时还能识别并管理原有hypervior上的虚拟机,能够识别并导入原数据库技术并不难,难的是导入后还能纳入新平台管理,这部分我们又研发了自动捕获技术,而不同混合虚拟化管理技术可以摆脱对底层Hypervisor的依赖,专心于资源的统一管理。
单独的用户自助服务门户
过去采用传统虚拟化时,解决了安装部署的麻烦,但资源的申请和扩容、准备资源和切割资源仍然没有变,还是要让我们做,占据了日常运维工作的大部分工作量,每天确实烦人,所以我们想而这部分工作是可以放给相关的部门Leader或者专员,来减少运维的工作量,所以我们希望新设计的OVM允许用户通过一个单独的Web门户来直接访问自己的资源(云主机、云硬盘、网络等),而且对自己的资源进行管理,而不需要知道资源的具体位置,同时用户的所有云主机都建立在高可用环境之上,也不必担心实际物理硬件故障引起服务瘫痪。
Opscode Chef自动化运维集成
实行自助服务后,有一个问题,就是软件不同版本部署不兼容,这就需要设计能够把操作系统、中间件、数据库、web能统一打包部署,减少自助用户哭天喊地甚至骂我们,我们通过对chef的集成,可以一键更新所有的虚拟机,在指定的虚拟机上安装指定的软件。有了chef工具,为虚拟机打补丁、安装软件,只需要几个简单的命令即可搞定。
可持续性
做运维不要故障处理办法是不行,而且还要自动化的,同时我们有vmware和kvm,就需要新平台能在esxi上(不要vcenter,要付钱的)和kvm同时实现ha功能
可运维性
一个平台再强大,技术再厉害,如果不可运维,那结果可想而知。因此我们希望OVM可以给用户带来一个稳定、可控、安全的生产环境
弹性伸缩
自助服务部门拿到资源后,通过对各项目资源的限额,以及对各虚拟机和容器性能指标的实时监控,可以实现弹性的资源伸缩,达到合理分配利用资源。
资源池化
将数据中心的计算、存储、网络资源全部池化,然后通过OVM虚拟化平台统一对外提供IaaS服务。
2、OVM设计思路
OVM架构选型一
我们觉得架构就是一个产品的灵魂所在,决定着产品日后的发展。
我们团队在产品选型初期,分别调研了目前比较热门的openstack,以及前几年的明星产品convirt、ovirt,这几个产品可谓是典型的代表。
Openstack偏重于公有云,架构设计的很不错,其分布式、插件式的模块化架构,可以有效避免单点故障的发生,从发布之初便备受推崇,但是其存在的问题也同样令人头疼目前使用Openstack的大多是一些有实力的IDC、大型的互联网公司在用。而对于一般的企业来说,没有强大的开发和维护团队,并不敢大规模的采用openstack,初期使用一段时间后我们放弃了OS。
而前几年的convirt,在当时也掀起了一股使用热潮,其简单化的使用体验,足以满足小企业的虚拟化需求,但是他的问题是架构采用了集中式的架构,而且对于上了规模之后,也会带来性能方面的瓶颈,除非是把数据库等一些组件松藕合,解决起来也比较麻烦,所以到了后期也是不温不火,官方也停止了社区的更新和维护。
OVM架构选型二
通过对以上产品的优劣势分析,我们决定采用用分布式、松耦合的模块化插件架构,分布式使其可以规避单点故障,达到业务持续高可用,松耦合的模块化特点让产品在后期的扩展性方面不受任何限制,使其向下可以兼容数据中心所有硬件(通过OVM标准的Rest API接口),向上可以实现插件式的我们即将需要的工作流引擎、计费引擎、报表引擎、桌面云引擎和自动化运维引擎。
而目前阶段我们则专心于实现虚拟化的全部功能,发掘内部对虚拟化的需求,打造一款真正简单、易用、稳定、可运维的一款虚拟化管理软件,并预留向上、向下的接口留作后期发展。(架构图大家可以查看刚才上面发的图片)
虚拟化技术选择
Docker当下很火,其轻量级、灵活、高密度部署是优点,但是大规模使用还未成熟。许多场景还是需要依赖传统的虚拟化技术。所以我们选择传统虚拟化技术KVM+Docker,确保线上业务稳定性、连续性的同时,开发、测试环境又可以利用到Docker的轻量级、高密度和灵活。
另外很多用户的生产环境存在不止一种虚拟化技术,例如KVM+Esxi组合、KVM+XenServer组合、KVM+Hyper-V组合,而目前的虚拟化管理平台,大多都是只支持一种Hypervisor的管理,用户想要维护不同的虚拟化技术的虚拟机,就要反复的在不同环境之间切换。
基于此考虑,我和团队内部和外部一些同行选择兼容(兼容KVM、Esxi,Docker),并自主打造新一代虚拟化管理平台——混合虚拟化。
网络
网络方面,我们对所有Hypervisor的虚拟机使用统一的网络管理(包括Docker容器),这样做的一个好处就是可以减少运维工作量,降低网络复杂性。初期我们只实现2层的虚拟网络管理,为虚拟机和容器提供Vlan隔离、DHCP分配网络,当然也可以手工为虚拟机挑选一个网络,这个可以满足一般的虚拟化需求,后期我们会在此基础上增加虚拟网络防火墙、负载均衡。
存储
存储上面我们采用本地存储+NFS两种方式,对于一般中小企业来说,不希望购买高昂的商业存储,直接使用本地存储虚拟机的性能是最好的,而且我们也提供了存储快照、存储热迁移、虚拟机的无共享热迁移来提升业务安全。此外NFS作为辅助,可以为一些高风险业务提供HA。后期存储方面我们会考虑集成Ceph和GlusterFS存储来提升存储管理。
镜像中心
顾名思义,镜像中心就是用来存放镜像的。传统虚拟化我们使用NFS来作为镜像中心,所有宿主机共享一个镜像中心,这样可以更方便的来统一管理镜像,而针对Docker容器则保留了使用Docker自己的私有仓库,但是我们在WEB UI的镜像中心增加了从Docker私有仓库下载模板这么一个功能,实现了在同一个镜像中心的管理,后期我们会着重打造传统虚拟化镜像与Docker镜像的相互转换,实现两者内容的统一。
HA
OVM 主机HA依然坚持全兼容策略,支持对可管理的所有Hypervisor的HA。
在启用HA的资源池,当检测到一个Hypervisor故障,创建和运行在该Hypervisor共享存储上的虚拟机将在相同资源池下的另外一个主机上重启。
具体工作流程为:
第一次检测到故障会将该故障主机标记;
第二次检测依然故障将启动迁移任务;
迁移任务启动后将在该故障主机所在的资源池寻找合适的主机;
确定合适主机后,会将故障主机上所有的虚拟机自动迁移到合适的主机上面并重新启动;
VM分配策略
负载均衡
PERFORMANCE(性能): 这条策略分配虚拟机到不同的主机上。它挑选一组中可用资源最多的主机来部署虚拟机。如果有多个主机都有相同的资源可用,它使用一个循环算法,每个虚拟机分配到一个不同的主机上。
PROGRESSIVE(逐行扫描): 这条策略意味着,所有虚拟机将被分配在同一个主机上,直到它的资源被用尽。此项策略将一个主机资源使用完,然后再切换到另一个主机。
负载均衡级别
所有资源池: 如果选定此选项,则负载均衡级别策略将适用于所选定数据中心的所有资源池中的主机。
所有主机: 该策略将适用于所选数据中心单个资源池的所有主机。
特定主机: 该策略仅适用于所选的特定主机。
VM设计一
VM是整个虚拟化平台的核心,我们开发了一个单独的模块来负责虚拟机的相关操作。其采用异步通信和独立的并行操作,提升了虚拟机性能、稳定性和扩展性。
可扩展性:独立、并发
可追溯性:错误信息和log、监控控制台、性能
非阻塞操作:稳定性、改进重新配置、改进回滚、标准、统一的hypervisor通信、自动化测试
VM设计二
我们设计基于vApp来部署虚拟机,一个vApp可包含N个虚拟机:
N 个虚拟机配置
N个开机请求
我们希望并行运行N个配置(要求资源允许),并在配置后每个虚拟机请求一个开机。这些操作都是并发和独立的。
平台设计
OVM产品目前由三大组件、七大模块组成。其中三大组件分别为OVM UI、OVM API和OVM数据中心组件。
OVM UI提供WEB自服务界面,OVM API负责UI和数据中心组件之间的交互,OVM数据中心组件分别提供不同的功能。七大模块分别负责不同的功能实现,和三大组件之间分别交互。
VDC资源限额
管理员可以为每个VDC设置资源限额,防止资源的过度浪费。
账户管理
OVM提供存储SDK、备份SDK,以及虚拟防火墙SDK,轻松与第三方实现集成
获得帮助
下载请访问OVM社区官网:51ovm.com
使用过程中遇到什么问题及获得下载密码,加入OVM社区qq官方交流群:22265939
“ 实践是检验真理唯一标准“,OVM社区视您为社区的发展动力,此刻,诚邀您参与我们的调查,共同做出一款真正解决问题、放心的产品,一起推动国内虚拟化的进步和发展。
填写调查问卷!只需1分钟喔!
为国产技术点赞,与OVM社区共同开启用户满意度调研
maoliang 发表了文章 0 个评论 2359 次浏览 2016-09-28 14:52
从今年的6月13日推出的OVM的第一个Beta版本至今,已推出四个版本,OVM社区团队非常关心 ”宝宝们“ 对免费OVM虚拟化管理平台的想法!
“ 实践是检验真理唯一标准“
OVM社区视您为社区的发展动力
此刻,诚邀您参与我们的调查
共同做出一款真正解决问题、放心的产品,
一起推动国内虚拟化的进步和发展。
填写调查问卷!只需1分钟喔!
https://www.wenjuan.com/s/iiEVZv
6月13日--推出了OVM的第一个Beta版本。这个版本主要提供了HA、负载均衡、虚拟机无共享热迁移、PCI直通模式等一些商业级的收费功能,但我们是完全免费的,有了这些免费功能,用户进入虚拟化的成本直接降低到了最低,并且更加的安全。
7月18日---发布了第二个版本,除了对原有功能的优化和改进,又增加了新的虚拟化引擎Docker功能,给更多的用户带去了全新的操作使用体验。
8月18日---发布的OVM-V1.0版本,相对于前两个版本来讲,进行了全新的改变。推出单独的OVM-Node和OVM虚拟化管理平台。
9月18日--推出OVM-V1.1,除了增加了如虚拟机的快照、快照恢复功能及虚拟机备份以外,全面支持导入centos6的虚拟机。重点对产品稳定性进行改善。
10月18日将推出OVM-V1.2,敬请期待..........??????
获得帮助
下载请访问OVM社区官网:51ovm.com
使用过程中遇到什么问题及获得下载密码,加入OVM社区qq官方交流群:22265939
OVM-V1.1版已正式发布,新增虚拟机快照、备份等功能!
maoliang 发表了文章 0 个评论 2720 次浏览 2016-09-28 14:48
OVM-V1.1版本已在9月18日正式发布,此版本全面支持导入centos6的虚拟机,同时,在OVM虚拟化管理平台新增了虚拟机的快照、快照恢复功能及虚拟机的备份功能。OVM-Node丰富了物理机、容器、容器镜像等详细信息,欢迎下载安装(51ovm.com)。
版本功能变动如下:
OVM-Node提供如下功能:
丰富物理机详情信息
丰富容器详细信息
丰富容器镜像详细信息
通过admin账户的图形界面设置网桥
OVM虚拟化管理平台新增功能:
Ø 虚拟机快照
Ø 虚拟机备份
Ø 修复若干重要Bug
详细信息
1. 丰富物理机详情信息
该版本新增了物理机的如下详细信息展现:
Libvirt版本信息;docker版本信息,容器数量,正在运行的容器数量;OS信息(内核版本,内核架构信息,发行版本),cpu信息(型号,厂商,处理器,几个核,速度为多少赫兹,目前的cpu使用率),内存信息(总内存,可用内存,内存使用率),存储使用信息(目前存储的大小,存储挂载详情,存储使用率),网卡信息(IP,网关,掩码,DNS)
2. 丰富容器详细信息
该版本新增了容器的如下信息显示:
ID,名称,创建时间,运行状态,使用的镜像ID,对外开放的端口号,环境变量,挂载的目录信息,创建时运行的脚本,网络信息
3、丰富容器镜像详细信息
该版本新增了容器镜像的如下信息显示:
ID,名称,创建时间,对外开放的端口号,环境变量,创建镜像的作者信息
4、通过admin账户的图形界面设置网桥
该版本我们新增了通过admin用户登录进来的图形界面配置网桥的方法,大大降低了用户配置网桥的复杂度。
5、 虚拟机快照功能
在该版本,OVM虚拟化管理平台新增的虚拟机的快照,以及快照恢复功能,快照可以帮助用户快速的恢复到某个时间点。当虚拟机系统崩溃或系统异常,你可以通过使用恢复到快照来保持的磁盘和系统状态。
6、虚拟机备份功能
在该版本,OVM虚拟化管理平台新增了虚拟机的备份功能,包括全量备份和增量备份。全量备份可以帮助用户快速的将整个虚拟机打包备份;而增量备份则可以帮助用户在第一次全量备份的基础上,只备份产生的差异数据,从而节省磁盘空间
OVM混合虚拟化系统物理服务器的最低配置要求是:
最低要求
推荐配置
CPU
64位Intel@ VT-x或AMD-VTMCPU,支持虚拟化
内存
8G
64G
硬盘
300G
1T或更高
网卡
1个100Mb/s或1Gb/s网口
2个1Gb/s或10Gb/s网口
光驱
如果您准备使用光盘安装OVM,请配置DVD光驱
为了体验高级功能,它至少需要两台服务器来构建一测试系统,推荐三台最佳。