如何为Kafka集群选择合适的主题和分区数量

这是许多Kafka用户提出的常见问题。 这篇文章的目的是解释几个重要的决定因素,并提供一些简单的公式。   更多的分区提供更高的吞吐量 首先要理解的是,主题分区是Kafka中并行性的单位。 在生产者和代理端,对不同分区的写入可以完全并行完成。 因...
继续阅读 »
kafka.png

这是许多Kafka用户提出的常见问题。 这篇文章的目的是解释几个重要的决定因素,并提供一些简单的公式。
 


更多的分区提供更高的吞吐量


首先要理解的是,主题分区是Kafka中并行性的单位。 在生产者和代理端,对不同分区的写入可以完全并行完成。 因此,昂贵的操作(例如压缩)可以利用更多的硬件资源。 在消费者方面,Kafka总是将一个分区的数据提供给一个消费者线程。 因此,消费者(在消费者组内)的并行度受所消耗的分区的数量的限制。 因此,一般来说,Kafka集群中的分区越多,吞吐量就越高。

用于选择分区数量的粗略公式基于吞吐量。 你衡量整个,你可以在生产单个分区(称之为P)和消费实现(称为C)。 比方说,你的目标吞吐量吨 。 然后,你需要至少有MAX(T / P,T / C)分区。 人们可以在生产者上实现的每分区吞吐量取决于诸如批处理大小,压缩编解码器,确认类型,复制因子等等的配置。然而,一般来说,可以在10秒的MB /如在此所示单个分区的基准 。 消费者吞吐量通常是与应用相关的,因为它对应于消费者逻辑可以如何快速地处理每个消息。 所以,你真的需要测量它。

虽然可以随着时间增加分区的数量,但是如果使用密钥生成消息,则必须小心。 当发布带密钥的消息时,Kafka基于密钥的散列将消息确定性地映射到分区。 这提供了保证具有相同密钥的消息总是路由到同一分区。 此保证对于某些应用可能是重要的,因为分区内的消息总是按顺序递送给消费者。 如果分区的数量改变,则这样的保证可能不再保持。 为了避免这种情况,一种常见的做法是过度分区。 基本上,您可以根据未来的目标吞吐量确定分区数,比如一两年后。 最初,您可以根据您的当前吞吐量只有一个小的Kafka集群。 随着时间的推移,您可以向集群添加更多代理,并按比例将现有分区的一部分移动到新代理(可以在线完成)。 这样,当使用密钥时,您可以跟上吞吐量增长,而不会破坏应用程序中的语义。

除了吞吐量,还有一些其他因素,在选择分区数时值得考虑。 正如你将看到的,在某些情况下,分区太多也可能产生负面影响。
 


更多分区需要打开更多的文件句柄


每个分区映射到broker中文件系统中的目录。 在该日志目录中,每个日志段将有两个文件(一个用于索引,另一个用于实际数据)。 目前,在Kafka中,每个broker打开每个日志段的索引和数据文件的文件句柄。 因此,分区越多,底层操作系统中需要配置打开文件句柄限制的分区越多。 这大多只是一个配置问题。 我们已经看到生产Kafka集群中,运行的每个broker打开的文件句柄数超过30000。
 


更多分区可能是不可用性增加


Kafka支持群集内复制 ,它提供更高的可用性和耐用性。 分区可以有多个副本,每个副本存储在不同的代理上。 其中一个副本被指定为领导者,其余的副本是追随者。 在内部,Kafka自动管理所有这些副本,并确保它们保持同步。 生产者和消费者对分区的请求都在领导副本上提供。 当代理失败时,该代理上具有leader的分区变得暂时不可用。 Kafka将自动将那些不可用分区的领导者移动到其他一些副本以继续服务客户端请求。 这个过程由指定为控制器的Kafka代理之一完成。 它涉及在ZooKeeper中为每个受影响的分区读取和写入一些元数据。 目前,ZooKeeper的操作是在控制器中串行完成的。

在通常情况下,当代理被干净地关闭时,控制器将主动地将领导者一次一个地关闭关闭代理。 单个领导者的移动只需要几毫秒。 因此,从客户的角度来看,在干净的代理关闭期间只有一小窗口的不可用性。

然而,当代理不干净地关闭(例如,kill -9)时,所观察到的不可用性可能与分区的数量成比例。 假设代理具有总共2000个分区,每个具有2个副本。 大致来说,这个代理将是大约1000个分区的领导者。 当这个代理不干净失败时,所有这1000个分区完全不能同时使用。 假设为单个分区选择新的领导需要5毫秒。 对于所有1000个分区,选择新的领导将需要5秒钟。 因此,对于一些分区,它们的观察到的不可用性可以是5秒加上检测故障所花费的时间。

如果一个是不幸的,失败的代理可能是控制器。 在这种情况下,选举新领导者的过程将不会开始,直到控制器故障转移到新的代理。 控制器故障转移自动发生,但需要新控制器在初始化期间从ZooKeeper读取每个分区的一些元数据。 例如,如果Kafka集群中有10,000个分区,并且从ZooKeeper初始化元数据每个分区需要2ms,则可以向不可用性窗口添加20多秒。

一般来说,不洁的故障是罕见的。 但是,如果在这种极少数情况下关心可用性,则可能最好将每个代理的分区数限制为两个到四千个,集群中的分区总数限制到几万个。
 


更多分区可能会增加端到端延迟


Kafka中的端到端延迟由从生产者发布消息到消费者读取消息的时间来定义。 Kafka仅在提交后向消费者公开消息,即,当消息被复制到所有同步副本时。 因此,提交消息的时间可以是端到端等待时间的显着部分。 默认情况下,对于在两个代理之间共享副本的所有分区,Kafka代理仅使用单个线程从另一个代理复制数据。 我们的实验表明,将1000个分区从一个代理复制到另一个代理可以添加大约20毫秒的延迟,这意味着端到端延迟至少为20毫秒。 这对于一些实时应用来说可能太高。

请注意,此问题在较大的群集上减轻。 例如,假设代理上有1000个分区引导,并且在同一个Kafka集群中有10个其他代理。 剩余的10个代理中的每个仅需要从第一代理平均获取100个分区。 因此,由于提交消息而增加的延迟将仅为几毫秒,而不是几十毫秒。

作为一个经验法则,如果你关心的延迟,它可能是一个好主意,每个经纪人的分区数量限制为100 * b * r,其中b是经纪人在kafka集群的数量和r是复制因子。
 


更多分区可能需要更多的内存在客户端


在最新的0.8.2版本,我们汇合到我们平台1.0,我们已经开发出一种更有效的Java生产商。 新生产者的一个好的功能是它允许用户设置用于缓冲传入消息的内存量的上限。 在内部,生产者缓冲每个分区的消息。 在累积了足够的数据或已经过去足够的时间之后,累积的消息从缓冲器中移除并发送到代理。

如果增加分区的数量,消息将在生产者中的更多分区中累积。 所使用的内存总量现在可能超过配置的内存限制。 当这种情况发生时,生产者必须阻止或丢弃任何新的消息,这两者都不是理想的。 为了防止这种情况发生,需要重新配置具有较大内存大小的生产者。

作为经验法则,为了实现良好的吞吐量,应该在生成器中分配至少几十KB的每个分区,并且如果分区的数量显着增加,则调整存储器的总量。

消费者也存在类似的问题。 消费者每个分区获取一批消息。 消费者消耗的分区越多,需要的内存就越多。 然而,这通常只是不是实时的消费者的问题。
 


总结


通常,Kafka集群中的更多分区导致更高的吞吐量。 然而,人们不得不意识到在总体上或每个代理中具有过多分区对可用性和等待时间的潜在影响。 在未来,我们计划改进一些限制,使Kafka在分区数方面更具可扩展性。
 


翻译原文:https://www.confluent.io/blog/how-to-choose-the-number-of-topicspartitions-in-a-kafka-cluster/  
作者:饶俊


收起阅读 »

云主机IO性能测试报告

不同厂商的云主机在性能、设计、操作方面都存在很大的差异化。   吐槽云商 1、阿里云 -- 地主黑势力 阿里的包月包年捆绑式销售,创建完成后你不能及时删除释放主机,只能默默的等到期了,记得我第一次使用阿里云的时候,一口气开了10台云主机,后面我发现我不需要...
继续阅读 »
不同厂商的云主机在性能、设计、操作方面都存在很大的差异化。
 


吐槽云商


1、阿里云 -- 地主黑势力
阿里的包月包年捆绑式销售,创建完成后你不能及时删除释放主机,只能默默的等到期了,记得我第一次使用阿里云的时候,一口气开了10台云主机,后面我发现我不需要这么多,预算出错,但是阿里告诉我的是你人生中只有一次反悔的机会,我只能删掉一台主机。
 
其他的主机我只能等他默认到期了,如果实在没有多大作用我只能浪费我的银两,然后阿里坐收消费,我就当打赏王坚一个小红包了。
 
2、Ucloud -- 清廉百姓官
Ucloud的业务是我最喜欢的,尤其是喜欢他们的服务和UI设计。在国内做市场,我想你最应该的是调研国内技术人员的操作习惯,然后把云产品的使用达到极简、易使用,而不是黑灯找芝麻。在我的心理Ucloud是国内在云主机的操作和体验上给我的感受是最佳的。 还有就是他们的服务,他们可以一对一的帮你解决问题,耐心的解答,没有阿里那财大气粗的蛮横,不会绑架你。
 
3、Azure -- 没长齐毛的鸟
为什么我说Azure是没有长齐毛的鸟,因为他的产品设计还没有完善,操作困难,可能Azure还没有大力投入做中国市场吧,好多功能都依赖于PowerShell去完成操作,比如挂盘、弹性公网ip的绑定申请等,我想说的是我是个运维,脑子里面记了一大堆命令,我使用你们产品,挂个盘你还得让我去记一大堆命令和规则,我疯了。so.......
 
我使用过Ucloud、阿里云、Azure、Google云、AWS等诸多厂商的云产品服务,具体的其他功能我就不一一对比,下面我就介绍一下,我最近做的IO测试几家厂商的报告。
 


IO测试


测试命令如下:
fio -filename=/data/test.out1 -direct=1 -rw=read -bs=4k -size=10g -numjobs=4 -runtime=1200 -group_reporting -name=test1

fio -filename=/data/test.out2 -direct=1 -rw=write -bs=4k -size=10g -numjobs=4 -runtime=1200 -group_reporting -name=test2

fio -filename=/data/test.out3 -direct=1 -rw=randread -bs=4k -size=10g -numjobs=4 -runtime=1200 -group_reporting -name=test3

fio -filename=/data/test.out4 -direct=1 -rw=randwrite -bs=4k -size=10g -numjobs=4 -runtime=1200 -group_reporting -name=test4

测试结果如下:
Ucloud.png

ali.png

Azure.png

GoogleCloud.png

上面分别是Ucloud、阿里、Azure、谷歌云的IO测试结果。
 


速率测试


测试命令如下:
dd iflag=direct /dev/null bs=16k count=102400
dd oflag=direct /data/test2 bs=16k count=102400

测试结果如下:
mancloud.png

 


总结


废话不多说,以上的测试结果仅供大家参考,新时代的云运维者,必须要有语言基础了,没有语言基础,就算Ucloud的操作再怎么人性化,你也不能自动化,所以从云时代的趋势来看作为运维者掌握一门语言还是很有必要的了。云平台有很多坑,在没有充分了解厂商的情况下,你会发现云时代的运维更难。 收起阅读 »

关于Elasticsearch性能优化几个点

Elasticsearch简述 ElasticSearch是现在技术前沿的大数据引擎,常见的组合有ES+Logstash+Kibana作为一套成熟的日志系统,其中Logstash是ETL工具,Kibana是数据分析展示平台。Elasticsearch让人惊...
继续阅读 »


Elasticsearch简述


ElasticSearch是现在技术前沿的大数据引擎,常见的组合有ES+Logstash+Kibana作为一套成熟的日志系统,其中Logstash是ETL工具,Kibana是数据分析展示平台。Elasticsearch让人惊艳的是他强大的搜索相关能力和灾备策略,Elastcisearch开放了一些接口供开发者研发自己的插件,Elasticsearch结合中文分词的插件会给Elasticsearch的搜索和分析起到很大的推动作用。ElasticSearch是使用开源全文检索库Apache Lucene进行索引和搜索的,所以Elasticsearch底层是依赖的Lucene。
 
关于Lucene:
Apache Lucene将写入索引的所有信息组织成一种倒排索引(Inverted Index)的结构之中,该结构是种将词项映射到文档的数据结构。其工作方式与传统的关系数据库不同,大致来说倒排索引是面向词项而不是面向文档的。且Lucene索引之中还存储了很多其他的信息,如词向量等等,每个Lucene都是由多个段构成的,每个段只会被创建一次但会被查询多次,段一旦创建就不会再被修改。多个段会在段合并的阶段合并在一起,何时合并由Lucene的内在机制决定,段合并后数量会变少,但是相应的段本身会变大。段合并的过程是非常消耗I/O的,且与之同时会有些不再使用的信息被清理掉。在Lucene中,将数据转化为倒排索引,将完整串转化为可用于搜索的词项的过程叫做分析。文本分析由分析器(Analyzer)来执行,分析其由分词器(Tokenizer),过滤器(Filter)和字符映射器(Character Mapper)组成,其各个功能显而易见。除此之外,Lucene有自己的一套完整的查询语言来帮助我们进行搜索和读写。
 
*注:Elasticsearch中的索引指的是查询/寻址时URI中的一个字段如:[host]:[port(9200)]/[index]/[type]/[ID]?[option],而Lucene中的索引更多地和ES中的分片的概念相对应。
 
Elasticsearch架构设计理念特性如下:
  1. 合理的默认配置:只需修改节点中的Yaml配置文件,就可以快速配置。这和Spring4中对配置的简化有相似的地方。
  2. 分布式工作模式:Elasticsearch强大的Zen发现机制不仅支持组广播也支持点单播,且有“知一点即知天下”之妙。
  3. 对等架构:节点之间自动备份分片,且使分片本身和样本之间尽量”远离“,可以避免单点故障。且Master节点和Data节点几乎完全等价。
  4. 易于向集群扩充新节点:大大简化研发或运维将新节点加入集群所需的工作。
  5. 不对索引中的数据结构增加任何限制:ES支持在一个索引之中存在多种数据类型。
  6. 准实时:搜索和版本同步,由于ES是分布式应用,一个重大的挑战就是一致性问题,无论索引还是文档数据,然而事实证明ES表现优秀。

 


分片策略


选择合适的分片数和副本数:
ES的分片分为两种,主分片(Primary Shard)和副本(Replicas)。默认情况下,ES会为每个索引创建5个分片,即使是在单机环境下,这种冗余被称作过度分配(Over Allocation),目前看来这么做完全没有必要,仅在散布文档到分片和处理查询的过程中就增加了更多的复杂性,好在ES的优秀性能掩盖了这一点。假设一个索引由一个分片构成,那么当索引的大小超过单个节点的容量的时候,ES不能将索引分割成多份,因此必须在创建索引的时候就指定好需要的分片数量。此时我们所能做的就是创建一个新的索引,并在初始设定之中指定这个索引拥有更多的分片。反之如果过度分配,就增大了Lucene在合并分片查询结果时的复杂度,从而增大了耗时,所以我们得到了以下结论: 我们应该使用最少的分片!
 
主分片,副本和节点最大数之间数量存在以下关系:
节点数 <= 主分片数 *(副本数+1)    NodeNum <= PNum * ( Rnum + 1 )
这个关系,其实就是保持最好一个数据节点,最好保存一个索引的一个分片(不管主副)。
 
控制分片分配行为:
以上是在创建每个索引的时候需要考虑的优化方法,然而在索引已创建好的前提下,是否就是没有办法从分片的角度提高了性能了呢?当然不是,首先能做的是调整分片分配器的类型,具体是在elasticsearch.yml中设置cluster.routing.allocation.type属性,共有两种分片器even_shard,balanced(默认)。even_shard是尽量保证每个节点都具有相同数量的分片,balanced是基于可控制的权重进行分配,相对于前一个分配器,它更暴漏了一些参数而引入调整分配过程的能力。
 
每次ES的分片调整都是在ES上的数据分布发生了变化的时候进行的,最有代表性的就是有新的数据节点加入了集群的时候。当然调整分片的时机并不是由某个阈值触发的,ES内置十一个裁决者来决定是否触发分片调整,这里暂不赘述。另外,这些分配部署策略都是可以在运行时更新的,更多配置分片的属性也请大家自行Google。
 


路由优化


ES中所谓的路由和IP网络不同,是一个类似于Tag的东西。在创建文档的时候,可以通过字段为文档增加一个路由属性的Tag。ES内在机制决定了拥有相同路由属性的文档,一定会被分配到同一个分片上,无论是主分片还是副本。那么,在查询的过程中,一旦指定了感兴趣的路由属性,ES就可以直接到相应的分片所在的机器上进行搜索,而避免了复杂的分布式协同的一些工作,从而提升了ES的性能。于此同时,假设机器1上存有路由属性A的文档,机器2上存有路由属性为B的文档,那么我在查询的时候一旦指定目标路由属性为A,即使机器2故障瘫痪,对机器1构不成很大影响,所以这么做对灾况下的查询也提出了解决方案。所谓的路由,本质上是一个分桶(Bucketing)操作。当然,查询中也可以指定多个路由属性,机制大同小异。
 


Elasticsearch GC调优


ElasticSearch本质上是个Java程序,所以配置JVM垃圾回收器本身也是一个很有意义的工作。我们使用JVM的Xms和Xmx参数来提供指定内存大小,本质上提供的是JVM的堆空间大小,当JVM的堆空间不足的时候就会触发致命的OutOfMemoryException。这意味着要么内存不足,要么出现了内存泄露。处理GC问题,首先要确定问题的源头,一般有三种方案:
  1. 开启ElasticSearch上的GC日志
  2.  使用jstat命令
  3. 生成内存Dump

第一条,在ES的配置文件elasticsearch.yml中有相关的属性可以配置,关于每个属性的用途这里当然说不完。
第二条,jstat命令可以帮助我们查看JVM堆中各个区的使用情况和GC的耗时情况。
第三条,最后的办法就是将JVM的堆空间转储到文件中去,实质上是对JVM堆空间的一个快照。

想了解更多关于JVM本身GC调优方法请参考:http://www.oracle.com/technetwork/java/javase/gc-tuning-6-140523.html 

另外,通过修改ES节点的启动参数,也可以调整GC的方式,但是实质上和上述方法是等同的。
 


避免内存交换


这一点很简单,由于操作系统的虚拟内存页交换机制,会给性能带来障碍,如数据写满内存会写入Linux中的Swap分区。

可以通过在elasticsearch.yml文件中的bootstrap.mlockall设置为true来实现,但是需要管理员权限,需要修改操作系统的相关配置文件。
 


控制索引合并


上文提到过,ES中的分片和副本本质上都是Lucene索引,而Lucene索引又基于多个索引段构建(至少一个),索引文件中的绝大多数都是只被写一次,读多次,在Lucene内在机制控制下,当满足某种条件的时候多个索引段会被合并到一个更大的索引段,而那些旧的索引段会被抛弃并移除磁盘,这个操作叫做段合并。 

Lucene要执行段合并的理由很简单充分:索引段粒度越小,查询性能越低且耗费的内存越多。频繁的文档更改操作会导致大量的小索引段,从而导致文件句柄打开过多的问题,如修改系统配置,增大系统允许的最大文件打开数。总的来讲,当索引段由多一个合并为一个的时候,会减少索引段的数量从而提高ES性能。对于研发者来讲,我们所能做的就是选择合适的合并策略,尽管段合并完全是Lucene的任务,但随着Lucene开放更多配置借口,新版本的ES还是提供了三种合并的策略tiered,log_byte_size,log_doc。另外,ES也提供了两种Lucene索引段合并的调度器:concurrent和serial。其中各者具体区别,这里暂不赘述,只是抛砖引玉。


分享阅读原文:http://www.cnblogs.com/guguli/p/5218297.html


收起阅读 »

CDH Hadoop + HBase HA 部署详解

CDH 的部署和 Apache Hadoop 的部署是没有任何区别的。这里着重的是 HA的部署,需要特殊说明的是NameNode HA 需要依赖 Zookeeper。 准备Hosts文件配置: cat > /etc/hosts << _HOS...
继续阅读 »

CDH 的部署和 Apache Hadoop 的部署是没有任何区别的。这里着重的是 HA的部署,需要特殊说明的是NameNode HA 需要依赖 Zookeeper。


准备

Hosts文件配置:


cat > /etc/hosts << _HOSTS_
127.0.0.1 localhost
10.0.2.59 cdh-m1
10.0.2.60 cdh-m2
10.0.2.61 cdh-s1
_HOSTS_

各个节点服务情况


cdh-m1 Zookeeper JournalNode NameNode DFSZKFailoverController HMaster
cdh-m2 Zookeeper JournalNode NameNode DFSZKFailoverController HMaster
cdh-s1 Zookeeper JournalNode DataNode HRegionServer

对几个新服务说明下:


  • JournalNode 用于同步 NameNode 元数据,和 Zookeeper 一样需要 2N+1个节点存活集群才可用;
  • DFSZKFailoverController(ZKFC) 用于主备切换,类似 Keepalived 所扮演的角色。

NTP 服务

设置时区


rm -f /etc/localtime
ln -s /usr/share/zoneinfo/Asia/Shanghai /etc/localtime

配置NTP Server


yum install -y ntp
cat > /etc/ntp.conf << _NTP_
driftfile /var/lib/ntp/drift

restrict default nomodify
restrict -6 default nomodify

server cn.ntp.org.cn prefer
server news.neu.edu.cn iburst
server dns.sjtu.edu.cn iburst
server 127.127.1.1 iburst

tinker dispersion 100
tinker step 1800
tinker stepout 3600
includefile /etc/ntp/crypto/pw

keys /etc/ntp/keys
_NTP_

# NTP启动时立即同步
cat >> /etc/ntp/step-tickers << _NTP_
server cn.ntp.org.cn prefer
server news.neu.edu.cn iburst
server dns.sjtu.edu.cn iburst
_NTP_

# 同步硬件时钟
cat >> /etc/sysconfig/ntpd << _NTPHW_
SYNC_HWCLOCK=yes
_NTPHW_

启动并设置开机自启动


/etc/init.d/ntpd start
chkconfig ntpd on

配置 NTP Client


yum install -y ntp
# 注意修改内网NTP Server地址
cat > /etc/ntp.conf << _NTP_
driftfile /var/lib/ntp/drift

restrict default nomodify
restrict -6 default nomodify

restrict 127.0.0.1
restrict -6 ::1

server 10.0.2.59 prefer

tinker dispersion 100
tinker step 1800
tinker stepout 3600
includefile /etc/ntp/crypto/pw

keys /etc/ntp/keys
_NTP_

# NTP启动时立即同步
cat >> /etc/ntp/step-tickers << _NTP_
server 10.0.2.59 prefer
_NTP_

# 同步硬件时钟
cat >> /etc/sysconfig/ntpd << _NTPHW_
SYNC_HWCLOCK=yes
_NTPHW_

启动并设置开机自启动


/etc/init.d/ntpd start
chkconfig ntpd on

检查 NTP 同步


ntpq -p

# 结果
remote refid st t when poll reach delay offset jitter
==============================================================================
*time7.aliyun.co 10.137.38.86 2 u 17 64 3 44.995 5.178 0.177
news.neu.edu.cn .INIT. 16 u - 64 0 0.000 0.000 0.000
202.120.2.90 .INIT. 16 u - 64 0 0.000 0.000 0.000

JDK配置

创建目录


mkdir -p /data/{install,app,logs,pid,appData}
mkdir /data/appData/tmp

cd /data/install
wget -c http://oracle.com/jdk-7u51-linux-x64.gz
tar xf jdk-7u51-linux-x64.gz -C /data/app
cd /data/app
ln -s jdk1.7.0_51 jdk1.7
cat >> /etc/profile << _PATH_
export JAVA_HOME=/data/app/jdk1.7
export CLASSPATH=.:\$JAVA_HOME/lib/dt.jar:\$JAVA_HOME/lib/tools.jar
export PATH=\$JAVA_HOME/bin:\$PATH
_PATH_
source /etc/profile

创建运行账户


useradd -u 600 run

下载 安装包

# http://archive.cloudera.com/cdh5/cdh/5/
cd /data/install
wget -c http://archive.cloudera.com/cdh5/cdh/5/hadoop-2.6.0-cdh5.4.5.tar.gz
wget -c http://archive.apache.org/dist/zookeeper/zookeeper-3.4.5/zookeeper-3.4.5.tar.gz
wget -c http://archive.cloudera.com/cdh5/cdh/5/hbase-1.0.0-cdh5.4.5.tar.gz

安装 Zookeeper

cd /data/install
tar xf zookeeper-3.4.5.tar.gz -C /data/app
cd /data/app
ln -s zookeeper-3.4.5 zookeeper

设置环境变量


sed -i '/^export PATH=/i\export ZOOKEEPER_HOME=/data/app/zookeeper' /etc/profile
sed -i 's#export PATH=#&\$ZOOKEEPER_HOME/bin:#' /etc/profile
source /etc/profile

删除无用文件


cd $ZOOKEEPER_HOME
rm -rf *xml *txt zookeeper-3.4.5.jar.* src recipes docs dist-maven contrib
rm -f $ZOOKEEPER_HOME/bin/*.cmd $ZOOKEEPER_HOME/bin/*.txt
rm -f $ZOOKEEPER_HOME/conf/zoo_sample.cfg

创建数据目录


mkdir -p /data/appData/zookeeper/{data,logs}

配置


cat > $ZOOKEEPER_HOME/conf/zoo.cfg << _ZOO_
tickTime=2000
initLimit=10
syncLimit=5
clientPort=2181
dataDir=/data/appData/zookeeper/data
dataLogDir=/data/appData/zookeeper/logs
server.1=cdh-m1:2888:3888
server.2=cdh-m2:2888:3888
server.3=cdh-s1:2888:3888
_ZOO_

修改Zookeeper的日志打印方式,与日志路径设置, 编辑


$ZOOKEEPER_HOME/bin/zkEnv.sh

在27行后加入两个变量


ZOO_LOG_DIR=/data/logs/zookeeper
ZOO_LOG4J_PROP="INFO,ROLLINGFILE"

创建 myid文件


# 注意myid与配置文件保持一致
echo 1 >/data/appData/zookeeper/data/myid

设置目录权限


chown -R run.run /data/{app,appData,logs}

启动、停止


# 启动
runuser - run -c 'zkServer.sh start'
# 停止
runuser - run -c 'zkServer.sh stop'

安装 Hadoop

tar xf hadoop-2.6.0-cdh5.4.5.tar.gz -C /data/app
cd /data/app
ln -s hadoop-2.6.0-cdh5.4.5 hadoop

设置环境变量


sed -i '/^export PATH=/i\export HADOOP_HOME=/data/app/hadoop' /etc/profile
sed -i 's#export PATH=#&\$HADOOP_HOME/bin:\$HADOOP_HOME/sbin:#' /etc/profile
source /etc/profile

删除无用文件


cd $HADOOP_HOME
rm -rf *txt share/doc src examples* include bin-mapreduce1 cloudera
find . -name "*.cmd"|xargs rm -f

新建数据目录


mkdir -p /data/appData/hdfs/{name,edits,data,jn,tmp}

配置

切换到配置文件目录


cd $HADOOP_HOME/etc/hadoop

编辑 core-site.xml


<?xml version="1.0" encoding="UTF-8"?>
<?xml-stylesheet type="text/xsl" href="configuration.xsl"?>
<configuration>
<!-- HDFS 集群名称,可指定端口 -->
<property>
<name>fs.defaultFS</name>
<value>hdfs://hdfs-cdh</value>
</property>

<!-- 临时文件目录 -->
<property>
<name>hadoop.tmp.dir</name>
<value>/data/appData/hdfs/tmp</value>
</property>

<!-- 回收站设置,0不启用回收站,1440 表示1440分钟后删除 -->
<property>
<name>fs.trash.interval</name>
<value>1440</value>
</property>

<!-- SequenceFiles在读写中可以使用的缓存大小,单位 bytes 默认 4096 -->
<property>
<name>io.file.buffer.size</name>
<value>131072</value>
</property>

<!-- 可用压缩算法,启用在hdfs-site.xml中,需要编译动态链接库才能用 -->
<property>
<name>io.compression.codecs</name>
<value>org.apache.hadoop.io.compress.SnappyCodec</value>
</property>
</configuration>

编辑 hdfs-site.xml


<?xml version="1.0" encoding="UTF-8"?>
<?xml-stylesheet type="text/xsl" href="configuration.xsl"?>
<configuration>
<!-- 指定hdfs 集群名称,需要和core-site.xml中的保持一致 -->
<property>
<name>dfs.nameservices</name>
<value>hdfs-cdh</value>
</property>

<!-- 指定 Zookeeper 用于NameNode HA,默认官方配置在core-site.xml中,为了查看清晰配置到hdfs-site.xml也是可用的 -->
<property>
<name>ha.zookeeper.quorum</name>
<value>cdh-m1:2181,cdh-m2:2181,cdh-s1:2181</value>
</property>

<!-- hdfs-cdh 下有两个NameNode,分别为 nn1,nn2 -->
<property>
<name>dfs.ha.namenodes.hdfs-cdh</name>
<value>nn1,nn2</value>
</property>

<!-- nn1 RPC通信地址 -->
<property>
<name>dfs.namenode.rpc-address.hdfs-cdh.nn1</name>
<value>cdh-m1:9000</value>
</property>

<!-- nn1 HTTP通信地址 -->
<property>
<name>dfs.namenode.http-address.hdfs-cdh.nn1</name>
<value>cdh-m1:50070</value>
</property>

<!-- nn2 RPC通信地址 -->
<property>
<name>dfs.namenode.rpc-address.hdfs-cdh.nn2</name>
<value>cdh-m2:9000</value>
</property>

<!-- nn2 HTTP通信地址 -->
<property>
<name>dfs.namenode.http-address.hdfs-cdh.nn2</name>
<value>cdh-m2:50070</value>
</property>

<!-- 指定NameNode元数据在JournalNode上的存储路径 -->
<property>
<name>dfs.namenode.shared.edits.dir</name>
<value>qjournal://cdh-m1:8485;cdh-m2:8485;cdh-s1:8485;/hdfs-cdh</value>
</property>

<!-- 开启NameNode失败自动切换 -->
<property>
<name>dfs.ha.automatic-failover.enabled</name>
<value>true</value>
</property>

<!-- 配置主备切换实现方式 -->
<property>
<name>dfs.client.failover.proxy.provider.hdfs-cdh</name>
<value>org.apache.hadoop.hdfs.server.namenode.ha.ConfiguredFailoverProxyProvider</value>
</property>

<!-- 配置主备切换方法,每个方法一行-->
<property>
<name>dfs.ha.fencing.methods</name>
<value>
sshfence
shell(/bin/true)
</value>
</property>

<!-- 指定运行用户的秘钥,需要NameNode双向免密码登录,用于主备自动切换 -->
<property>
<name>dfs.ha.fencing.ssh.private-key-files</name>
<value>/home/run/.ssh/id_rsa</value>
</property>

<!-- 配置sshfence 超时时间 -->
<property>
<name>dfs.ha.fencing.ssh.connect-timeout</name>
<value>50000</value>
</property>

<!-- NameNode 数据本地存储路径 -->
<property>
<name>dfs.namenode.name.dir</name>
<value>/data/appData/hdfs/name</value>
</property>

<!-- DataNode 数据本地存储路径 -->
<property>
<name>dfs.datanode.data.dir</name>
<value>/data/appData/hdfs/data</value>
</property>

<!-- JournalNode 数据本地存储路径 -->
<property>
<name>dfs.journalnode.edits.dir</name>
<value>/data/appData/hdfs/jn</value>
</property>

<!-- 修改文件存储到edits,定期同步到DataNode -->
<property>
<name>dfs.namenode.edits.noeditlogchannelflush</name>
<value>true</value>
</property>

<!-- edits 数据本地存储路径 -->
<property>
<name>dfs.namenode.edits.dir</name>
<value>/data/appData/hdfs/edits</value>
</property>

<!-- 开启Block Location metadata允许impala知道数据块在哪块磁盘上 默认关闭 -->
<property>
<name>dfs.datanode.hdfs-blocks-metadata.enabled</name>
<value>true</value>
</property>

<!-- 权限检查 默认开启 -->
<property>
<name>dfs.permissions.enabled</name>
<value>false</value>
</property>

<!-- block 大小设置 -->
<property>
<name>dfs.blocksize</name>
<value>64m</value>
</property>
</configuration>

小于5个DataNode建议添加如下配置


<!-- 数据副本数量,不能超过DataNode数量,大集群建议使用默认值 默认 3 -->
<property>
<name>dfs.replication</name>
<value>2</value>
</property>

<!-- 当副本写入失败时不分配新节点,小集群适用 -->
<property>
<name>dfs.client.block.write.replace-datanode-on-failure.policy</name>
<value>NEVER</value>
</property>

在 hadoop-env.sh 中添加如下变量


export JAVA_HOME=/data/app/jdk1.7
export HADOOP_LOG_DIR=/data/logs/hadoop
export HADOOP_PID_DIR=/data/pid
# SSH端口 可选
export HADOOP_SSH_OPTS="-p 22"

Heap 设置,单位 MB


export HADOOP_HEAPSIZE=1024

权限设置


chown -R run.run /data/{app,appData,logs}
chmod 777 /data/pid

格式化

格式化只需要执行一次,格式化之前启动Zookeeper


切换用户


su - run

启动所有 JournalNode


hadoop-daemon.sh start journalnode

格式化 Zookeeper(为 ZKFC 创建znode)


hdfs zkfc -formatZK

NameNode 主节点格式化并启动


hdfs namenode -format
hadoop-daemon.sh start namenode

NameNode 备节点同步数据并启动


hdfs namenode -bootstrapStandby
hadoop-daemon.sh start namenode

启动 ZKFC


hadoop-daemon.sh start zkfc

启动 DataNode


hadoop-daemon.sh start datanode

启动与停止

切换用户


su - run

集群批量启动
需要配置运行用户ssh-key免密码登录,与$HADOOP_HOME/etc/hadoop/slaves


# 启动
start-dfs.sh
# 停止
stop-dfs.sh

单服务启动停止
启动HDFS

hadoop-daemon.sh start journalnode
hadoop-daemon.sh start namenode
hadoop-daemon.sh start zkfc
hadoop-daemon.sh start datanode

停止HDFS


hadoop-daemon.sh stop datanode
hadoop-daemon.sh stop namenode
hadoop-daemon.sh stop journalnode
hadoop-daemon.sh stop zkfc

测试

HDFS HA 测试
打开 NameNode 状态页:
http://cdh-m1:50010
http://cdh-m2:50010


在 Overview 后面能看见 active 或 standby,active 为当前 Master,停止 active 上的 NameNode,检查 standby是否为 active。


HDFS 测试


hadoop fs -mkdir /test
hadoop fs -put /etc/hosts /test
hadoop fs -ls /test

结果:


-rw-r--r--   2 java supergroup         89 2016-06-15 10:30 /test/hosts
# 其中权限后面的列(这里的2)代表文件总数,即副本数量。

HDFS 管理命令


# 动态加载 hdfs-site.xml
hadoop dfsadmin -refreshNodes

HBase安装配置

cd /data/install
tar xf hbase-1.0.0-cdh5.4.5.tar.gz -C /data/app
cd /data/app
ln -s hbase-1.0.0-cdh5.4.5 hbase

设置环境变量


sed -i '/^export PATH=/i\export HBASE_HOME=/data/app/hbase' /etc/profile
sed -i 's#export PATH=#&\$HBASE_HOME/bin:#' /etc/profile
source /etc/profile

删除无用文件


cd $HBASE_HOME
rm -rf *.txt pom.xml src docs cloudera dev-support hbase-annotations hbase-assembly hbase-checkstyle hbase-client hbase-common hbase-examples hbase-hadoop2-compat hbase-hadoop-compat hbase-it hbase-prefix-tree hbase-protocol hbase-rest hbase-server hbase-shell hbase-testing-util hbase-thrift
find . -name "*.cmd"|xargs rm -f

配置
进入配置文件目录

cd $HBASE_HOME/conf

编辑 hbase-site.xml


<?xml version="1.0"?>
<?xml-stylesheet type="text/xsl" href="configuration.xsl"?>
<configuration>
<!-- HBase 数据存储路径 -->
<property>
<name>hbase.rootdir</name>
<value>hdfs://hdfs-cdh/hbase</value>
</property>

<!-- 完全分布式模式 -->
<property>
<name>hbase.cluster.distributed</name>
<value>true</value>
</property>

<!-- HMaster 节点 -->
<property>
<name>hbase.master</name>
<value>cdh-m1:60000,cdh-m2:60000</value>
</property>

<!-- Zookeeper 节点 -->
<property>
<name>hbase.zookeeper.quorum</name>
<value>cdh-m1:2181,cdh-m2:2181,cdh-s1:2181</value>
</property>

<!-- znode 路径,Zookeeper集群中有多个HBase集群需要设置不同znode -->
<property>
<name>zookeeper.znode.parent</name>
<value>/hbase</value>
</property>

<!-- HBase 协处理器 -->
<property>
<name>hbase.coprocessor.user.region.classes</name>
<value>org.apache.hadoop.hbase.coprocessor.AggregateImplementation</value>
</property>
</configuration>

在 hbase-env.sh 中添加如下变量


export JAVA_HOME=/data/app/jdk1.7
export HBASE_LOG_DIR=/data/logs/hbase
export HBASE_PID_DIR=/data/pid
export HBASE_MANAGES_ZK=false
# SSH 默认端口 可选
export HBASE_SSH_OPTS="-o ConnectTimeout=1 -p 36000"

Heap 设置,单位 MB


export HBASE_HEAPSIZE=1024

可选设置 regionservers 中添加所有RegionServer主机名,用于集群批量启动、停止。


启动与停止
切换用户

su - run

集群批量启动
需要配置运行用户ssh-key免密码登录,与$HBASE_HOME/conf/regionservers


# 启动
start-hbase.sh
# 停止
stop-hbase.sh

单服务启动停止
HMaster

# 启动
hbase-daemon.sh start master
# 停止
hbase-daemon.sh stop master

HRegionServer


# 启动
hbase-daemon.sh start regionserver
# 停止
hbase-daemon.sh stop regionserver

测试
HBase HA 测试
浏览器打开两个HMaster状态页:
http://cdh-m1:60010
http://cdh-m2:60010


可以在Master后面看见其中一个主机名,Backup Masters中看见另一个。
停止当前Master,刷新另一个HMaster状态页会发现Master后面已经切换,HA成功。

HBase 测试
进入hbase shell 执行:

create 'users','user_id','address','info'
list
put 'users','anton','info:age','24'
get 'users','anton'

# 最终结果
COLUMN CELL
info:age timestamp=1465972035945, value=24
1 row(s) in 0.0170 seconds

清除测试数据:


disable 'users'
drop 'users'

到这里安装就全部完成。

收起阅读 »

Elasticsearch中常用的API接口整理

Elasticsearch中常用API分类 elasticsearch中常用的API分类如下: 文档API: 提供对文档的增删改查操作搜索API: 提供对文档进行某个字段的查询索引API: 提供对索引进行操作,查看索引信息等查看API: 按照更直...
继续阅读 »
elasticsearch.png


Elasticsearch中常用API分类


elasticsearch中常用的API分类如下:
  • 文档API: 提供对文档的增删改查操作
  • 搜索API: 提供对文档进行某个字段的查询
  • 索引API: 提供对索引进行操作,查看索引信息等
  • 查看API: 按照更直观的形式返回数据,更适用于控制台请求展示
  • 集群API: 对集群进行查看和操作的API
下面简单的一一介绍记录一下。 

文档类API

Index API: 创建并建立索引
PUT twitter/tweet/1{    "user" : "kimchy",    "post_date" : "2009-11-15T14:12:12",    "message" : "trying out Elasticsearch"}
官方文档参考:Index API 。 Get API: 获取文档
curl -XGET 'http://localhost:9200/twitter/tweet/1'
官方文档参考:Get API 。 DELETE API: 删除文档
$ curl -XDELETE 'http://localhost:9200/twitter/tweet/1'
官方文档参考:Delete API 。 UPDATE API: 更新文档
PUT test/type1/1{    "counter" : 1,    "tags" : ["red"]}
官方文档参考:Update API 。 Multi Get API: 一次批量获取文档
curl 'localhost:9200/_mget' -d '{    "docs" : [        {            "_index" : "test",            "_type" : "type",            "_id" : "1"        },        {            "_index" : "test",            "_type" : "type",            "_id" : "2"        }    ]}'
官方文档参考:Multi Get API 。 Bulk API: 批量操作,批量操作中可以执行增删改查
$ curl -s -XPOST localhost:9200/_bulk --data-binary "@requests"; echo{"took":7, "errors": false, "items":[{"index":{"_index":"test","_type":"type1","_id":"1","_version":1,"result":"created","forced_refresh":false}}]}
官方文档参考:Bulk API 。 DELETE By Query API: 根据查询删除
POST twitter/_delete_by_query{  "query": {     "match": {      "message": "some message"    }  }}
官方文档参考:Delete By Query API 。 Update By Query API: 根据查询更新
POST twitter/_update_by_query?conflicts=proceed
官方文档参考:Update By Query API 。 Reindex API:重建索引
POST _reindex{  "source": {    "index": "twitter"  },  "dest": {    "index": "new_twitter"  }}
官方文档参考:Reindex API 。 Term Vectors: 词组分析,只能针对一个文档
curl -XGET 'http://localhost:9200/twitter/tweet/1/_termvectors?pretty=true'
官方文档参考:Term Vectors 。 Multi termvectors API: 多个文档的词组分析
curl 'localhost:9200/_mtermvectors' -d '{   "docs": [      {         "_index": "testidx",         "_type": "test",         "_id": "2",         "term_statistics": true      },      {         "_index": "testidx",         "_type": "test",         "_id": "1",         "fields": [            "text"         ]      }   ]}'
官方文档参考:Multi termvectors API 。 更多关于文档类API请参考:Document APIs 。 

搜索类API

URI Search:url中传参
GET twitter/tweet/_search?q=user:kimchy
官方文档参考:URI Search 。 Request Body搜索接口: 搜索的条件在请求的body中
GET /twitter/tweet/_search{    "query" : {        "term" : { "user" : "kimchy" }    }}
官方文档参考:Request Body Search 。   
  • 搜索模版设置接口: 可以设置搜索的模版,模版的功能是可以根据不同的传入参数,进行不同的实际搜索
  • 搜索分片查询接口: 查询这个搜索会使用到哪个索引和分片
  • Suggest接口: 搜索建议接口,输入一个词,根据某个字段,返回搜索建议。
  • 批量搜索接口: 把批量请求放在一个文件中,批量搜索接口读取这个文件,进行搜索查询
  • Count接口: 只返回符合搜索的文档个数
  • 文档存在接口: 判断是否有符合搜索的文档存在
  • 验证接口: 判断某个搜索请求是否合法,不合法返回错误信息
  • 解释接口: 使用这个接口能返回某个文档是否符合某个查询,为什么符合等信息
  • 抽出器接口: 简单来说,可以用这个接口指定某个文档符合某个搜索,事先未文档建立对应搜索
官方文档参考:Search APIS 。 

索引类API

  • 创建索引接口(POST my_index)
  • 删除索引接口(DELETE my_index)
  • 获取索引信息接口(GET my_index)
  • 索引是否存在接口(HEAD my_index)
  • 打开/关闭索引接口(my_index/_close, my_index/_open)
  • 设置索引映射接口(PUT my_index/_mapping)
  • 获取索引映射接口(GET my_index/_mapping)
  • 获取字段映射接口(GET my_index/_mapping/field/my_field)
  • 类型是否存在接口(HEAD my_index/my_type)
  • 删除映射接口(DELTE my_index/_mapping/my_type)
  • 索引别名接口(_aliases)
  • 更新索引设置接口(PUT my_index/_settings)
  • 获取索引设置接口(GET my_index/_settings)
  • 分析接口(_analyze): 分析某个字段是如何建立索引的
  • 建立索引模版接口(_template): 为索引建立模版,以后新创建的索引都可以按照这个模版进行初始化
  • 预热接口(_warmer): 某些查询可以事先预热,这样预热后的数据存放在内存中,增加后续查询效率
  • 状态接口(_status): 索引状态
  • 批量索引状态接口(_stats): 批量查询索引状态
  • 分片信息接口(_segments): 提供分片信息级别的信息
  • 索引恢复接口(_recovery): 进行索引恢复操作
  • 清除缓存接口(_cache/clear): 清除所有的缓存
  • 输出接口(_flush)
  • 刷新接口(_refresh)
  • 优化接口(_optimize): 对索引进行优化
  • 升级接口(_upgrade): 这里的升级指的是把索引升级到lucence的最新格式
官方文档参考:Indices APIS 。 

查看类API

  • 查看别名接口(_cat/aliases): 查看索引别名
  • 查看分配资源接口(_cat/allocation)
  • 查看文档个数接口(_cat/count)
  • 查看字段分配情况接口(_cat/fielddata)
  • 查看健康状态接口(_cat/health)
  • 查看索引信息接口(_cat/indices)
  • 查看master信息接口(_cat/master)
  • 查看nodes信息接口(_cat/nodes)
  • 查看正在挂起的任务接口(_cat/pending_tasks)
  • 查看插件接口(_cat/plugins)
  • 查看修复状态接口(_cat/recovery)
  • 查看线城池接口(_cat/thread_pool)
  • 查看分片信息接口(_cat/shards)
  • 查看lucence的段信息接口(_cat/segments)
官方文档参考:Cat APIS 。 

集群类API

  • 查看集群健康状态接口(_cluster/health)
  • 查看集群状况接口(_cluster/state)
  • 查看集群统计信息接口(_cluster/stats)
  • 查看集群挂起的任务接口(_cluster/pending_tasks)
  • 集群重新路由操作(_cluster/reroute)
  • 更新集群设置(_cluster/settings)
  • 节点状态(_nodes/stats)
  • 节点信息(_nodes)
  • 节点的热线程(_nodes/hot_threads)
  • 关闭节点(/nodes/_master/_shutdown)

官方文档参考:Cluster APIS 。  尽在:https://www.elastic.co/guide/en/elasticsearch/reference/current/index.html 收起阅读 »

Kafka topic 常用命令介绍

本文主要记录平时kafka topic命令常使用的命令集,包括listTopic,createTopic,deleteTopic和describeTopic和alertTopic等,我这里是基于kafka 0.8.1.1版本,具体情况如下所示。   ...
继续阅读 »
kafka.png

本文主要记录平时kafka topic命令常使用的命令集,包括listTopic,createTopic,deleteTopic和describeTopic和alertTopic等,我这里是基于kafka 0.8.1.1版本,具体情况如下所示。
 
一、 describe topic 显示topic详细信息
# ./kafka-topics.sh --describe --zookeeper localhost:2181
Topic:mobTopic PartitionCount:4 ReplicationFactor:1 Configs:
Topic: mobTopic Partition: 0 Leader: 0 Replicas: 0 Isr: 0
Topic: mobTopic Partition: 1 Leader: 1 Replicas: 1 Isr: 1
Topic: mobTopic Partition: 2 Leader: 2 Replicas: 2 Isr: 2
Topic: mobTopic Partition: 3 Leader: 0 Replicas: 0 Isr: 0
Topic:serverTopic PartitionCount:4 ReplicationFactor:1 Configs:
Topic: serverTopic Partition: 0 Leader: 2 Replicas: 2 Isr: 2
Topic: serverjsTopic Partition: 1 Leader: 0 Replicas: 0 Isr: 0
Topic: serverjsTopic Partition: 2 Leader: 1 Replicas: 1 Isr: 1
Topic: serverjsTopic Partition: 3 Leader: 2 Replicas: 2 Isr: 2
Topic:bugTopic PartitionCount:4 ReplicationFactor:1 Configs:
Topic: bugTopic Partition: 0 Leader: 1 Replicas: 1 Isr: 1
Topic: bugTopic Partition: 1 Leader: 2 Replicas: 2 Isr: 2
Topic: bugTopic Partition: 2 Leader: 0 Replicas: 0 Isr: 0
Topic: bugTopic Partition: 3 Leader: 1 Replicas: 1 Isr: 1
1. 如上面可见,如果指定了--topic就是只显示给定topic的信息,否则显示所有topic的详细信息。
2. 如果指定了under-replicated-partitions,那么就显示那些副本数量不足的分区(ISR size < AR.size)
3. 如果指定了unavailable-partitions,那么就显示那些leader副本已不可用的分区
4. 从zookeeper上获取当前所有可用的broker
5. 遍历每个要describe的topic,
6. 获取这个topic的分区副本分配信息,若该信息不存在说明topic不存在
7. 否则将分配信息按照分区号进行排序
10. 如果没有指定步骤2中的参数也没有指定步骤3中的参数,那么显示分区数信息、副本系数信息以及配置信息
11. 默认情况下还会显示各个分区的信息
12. 从zookeeper中获取每个分区的ISR、Leader、AR信息并显示
 
二、create topic 创建topic
# kafka-topics.sh --zookeeper localhost:2181 --create --topic mobTopic --replication-factor 1  --partitions 4

  1.  从命令行中获取要创建的topic名称
  2. 解析命令行指定的topic配置(如果存在的话),配置都是x=a的格式
  3. 若指定了replica-assignment参数表明用户想要自己分配分区副本与broker的映射——通常都不这么做,如果不提供该参数Kafka帮你做这件事情
  4. 检查必要的参数是否已指定,包括:zookeeper, replication-factor,partition和topic
  5. 获取/brokers/ids下所有broker并按照broker id进行升序排序
  6. 在broker上分配各个分区的副本映射 (没有指定replica-assignment参数,这也是默认的情况)
  7. 检查topic名字合法性、自定义配置的合法性,并且要保证每个分区都必须有相同的副本数
  8. 若zookeeper上已有对应的路径存在,直接抛出异常表示该topic已经存在
  9. 确保某个分区的多个副本不会被分配到同一个broker
  10. 若提供了自定义的配置,更新zookeeper的/config/topics/[topic]节点的数据
  11. 创建/brokers/topics/[topic]节点,并将分区副本分配映射数据写入该节点

  
三、delete topic 删除topic
# ./kafka-topics.sh --zookeeper locahost:2181 --delete --topic mobTopic

  1. 获取待删除的topic,如果没有指定--topic就是删除所有的topic
  2. 对于每个要删除的topic,在zookeeper上的/admin/delete_topics下创建对应的子节点。kafka目前的删除topic逻辑只是在Zookeeper上标记而已,会有专门的线程负责监听该路径下的变更并负责更新zookeeper上其他节点上的数据,但底层的日志文件目前还是需要手动删除。

   
四、alert 修改topic的partion
# ./kafka-topics.sh --zookeeper localhost:2181 --alter --topic mobTopic --partitions 10
减少目前kakfa应该是不支持. 收起阅读 »

Druid任务分配策略配置详解

在说任务配置策略之前,先给大家看一下druid任务处理的大概架构图 如上图可以看出overlord节点如何将任务分配到middlemanager节点进行处理,如果在架构中有多个middlemanager节点,那任务将怎么分配呢,分配的测试是...
继续阅读 »
Druid.png

在说任务配置策略之前,先给大家看一下druid任务处理的大概架构图
DruidWorkFlow.png

如上图可以看出overlord节点如何将任务分配到middlemanager节点进行处理,如果在架构中有多个middlemanager节点,那任务将怎么分配呢,分配的测试是什么?
 
默认策略是fillCapacity, 意思是当一个MiddleManager的worker capacity满了的时候,再有任务到来时,才会分配给另外的MiddleManager节点。
 
补充: middlemanager的capacity意思是,能容纳任务的数量,通过修改middleManager节点下的 runtime.properties配置文件里的druid.worker.capacity属性配置。 
capacity.png

那么,除了这个策略,还有其他策略吗?另外,这个策略如何修改呢? 除了这个策略,还有fillCapacityWithAffinity, equalDistribution and javascript策略,那么策略如何修改呢? 
 
通过向Overlord节点发送个一个HTTP请求来修改,实质上是修改保存druid元数据的数据库,即 MetadataStorage,修改步骤如下: 
http://10.1.3.9:8090/druid/indexer/v1/worker(http://: /druid/indexer/v1/worke
X-Druid-Author sdx(修改配置的作者,可以随意写) X-Druid-Comment equal policy(修改配置的注释,可以随意写) Content-Type application/json
postman.png
http://10.1.3.9:8090/druid/indexer/v1/worker(http://: /druid/indexer/v1/worker)
{
"selectStrategy": {
"type": "equalDistribution" }
}
send.png

通过访问http://10.1.3.9:8090/druid/indexer/v1/worker/history查看配置是否成功 
history.png

或者通过查看MetadataStorage的druid_conifg和druid_audit表查看是否配置成功 
 
注意: linux上通过如下指令配置:
curl -XPOST -H 'X-Druid-Author: lucky' -H 'X-Druid-Comment: lucky' -H 'Content-Type: application/json' http://10.1.3.9:8090/druid/indexer/v1/worker -d '{ "selectStrategy": { "type": "equalDistribution" } }'
更多内容请参考官网:http://druid.io/docs/0.9.1.1/configuration/indexing-service.html收起阅读 »

Druid中Segements保留和自动删除规则配置

经测试发现:  DeepStorage里所有的segements都需要在Historical节点中有一份。其实这样说是不严格的,有时候我们需要DeepStorage里所有的segements(或者某类datasource)在Historical节点中...
继续阅读 »
Druid.png

经测试发现: 
DeepStorage里所有的segements都需要在Historical节点中有一份。其实这样说是不严格的,有时候我们需要DeepStorage里所有的segements(或者某类datasource)在Historical节点中有一份或者n份。这样做的好处是,提高数据查询效率,那么这个n在哪里配置呢?
 
原来是在druid_rules表里面配置,默认情况下,druid_rules表里面只有一条数据,其中payload字段默认值如下:
[{"tieredReplicants":{"_default_tier":2},"type":"loadForever"}]
意思是 保证deepstorage里面的数据,在Historical节点集群存在两份,即副本为2,这两份数据一定保存在不同的Historical服务器。
 
如果只有一台Historical服务器,那么则只会有一份数据,如果你添加一台Historical服务器,则就会在新的节点复制一份数据。
 
如果想修改默认的副本数,不需要数据备份,进行如下操作就好:
update druid_rules set payload='[{"tieredReplicants":{"_default_tier":1},"type":"loadForever"}]' where id="_default_2016-09-23T08:50:09.457Z";
只需把_default_tier的值改为1即可,id得看druid_rules表中的具体值。
 
segment执行过程如下:
  1. 聚合任务生成segment
  2. 将segment push到Deep Storage 
  3. Historical节点从 DeepStorage加载segment
  4. segment加载成功后,调用回调方法结束任务 

 
所以,如果Historical节点硬盘上缓存的segment占满磁盘空间,任务会一直挂起, 最后任务数量达到MiddleManager节点的容量,导致任务排队。 
 
那么现实业务中,如果DeepStorage里所有的segments 都需要在Historical节点中有一份,会非常 浪费空间,浪费空间就是浪费金钱。 
 
很不能理解这种方式,并且我们对DeepStorage节点和Historical节点之间的关系一直都是这样理解的,当查询的数据不在Historical节点的时候,才会从DeepStorage加载。但是,现实是残酷的,现实不是这样的。 
 
如何解决这个问题呢?这时我们就需要用到druid的数据保留和自动删除规则配置。 通过这个配置,我们可以为每个datasource配置一个据保留和自动删除规则。 
 
这个配置可以通过druid提供的HTTP接口配置,也可以通过Coordinator界面配置,如下: 
coordinate.png

此配置的意思是: 我们为agentToic-1m设置了两个rule,第一个rule的意思是(Load-Period-P30D)保 留最近30天的数据。第二个rule的意思是(Drop-Forever)删除所有的数据。另外还需要填写,修改 配置的作者和注释。最后Save all rules。 通过如下界面查看,配置是否成功,或者通过查看MetadataStorage的druid_rules表查看配置是否成功。
rules.png

最后,结合下图,观察左侧segment列表是否会发生变化(shareds的数量和intergvals的数量)。 
datasource.png

经过验证,左侧列表只会展示最近30天的数据,通过查看MetadataStorage的druid_segments表, 发现30天以前的数据都被假删了,即used字段设置为了0,表示配置成功。 
 
注意: 
如果druid_segments表中的某条数据used字段为0,即此条数据对应的segment不再支持可查,同 时会再Historical节点删除。 如果上面的P30D改为P1M,意思是1个月,这个月不是自然月,而是最近30天的意思。 如果按照如上的方式设置了,再修改第一条配置规则,改为P50D,那么数据是不能恢复的,即还 是只会保留最近30天的数据。 一个笨的恢复数据方法是,可以通过修改MetadataStorage的druid_segments表中的used字段来恢复数据。 
 
论坛:
If you configure a per datasource rule that drops data for the current month, and there i
s a default rule where everything is loaded, then yes, data for the current month is dropp ed and all older data is loaded. If you instead configure a load rule for the current month followed by a drop rule for everything else, then the current month of data is kept, and
all older data is dropped.
具体内容,参考官网:http://druid.io/docs/latest/operations/rule-configuration.html
任务分配策略,会让Geek小A  明天写! 收起阅读 »

Python的数据序列化「Json & Pickle」

在介绍Python的数据序列化模块「Json & Pickle」之前,我们先来看看为什么需要数据序列化,什么是数据序列化。   为什么需要数据序列化,我认为有如下两种原因: 一个原因是将对象(一切皆对象)的状态保持在存储媒介(硬盘、网盘......)中...
继续阅读 »
jason.jpg
在介绍Python的数据序列化模块「Json & Pickle」之前,我们先来看看为什么需要数据序列化,什么是数据序列化。
 
为什么需要数据序列化,我认为有如下两种原因:
一个原因是将对象(一切皆对象)的状态保持在存储媒介(硬盘、网盘......)中,以便可以在以后重新创建精确的副本,相当于镜像的概念,比如我们平时利用VMware虚拟机中的挂起功能,这个挂起功能就是利用数据的序列化,把虚拟机当前的状态序列化保存在本地磁盘的文件中,然后恢复的时候只需反序列化,把状态恢复即可。
 
另一个原因是通过值将对象从一个应用程序域发送到另一个应用程序域中。例如,你利用Python监控采集程序采集到的数据想传送给Zabbix处理。当两个进程在进行远程通信时,彼此可以发送各种类型的数据。无论是何种类型的数据,都会以二进制序列的形式在网络上传送。发送方需要把这个对象转换为字节序列,才能在网络上传送;接收方则需要把字节序列再恢复为对象。
 
序列化和反序列化:
  • 序列化: 将数据结构或对象转换成二进制串的过程。
  • 反序列化:将在序列化过程中所生成的二进制串转换成数据结构或者对象的过程。
序列化的目的就是为了跨进程传递格式化数据和保存某个时刻的状态。 什么是数据序列化:数据序列化就是将对象或者数据结构转化成特定的格式,使其可在网络中传输,或者可存储在内存或者文件中。反序列化则是相反的操作,将对象从序列化数据中还原出来。而对象序列化后的数据格式可以是二进制,可以是XML,也可以是JSON等任何格式。对象/数据序列化的重点在于数据的交换和传输,例如在远程调用技术(如EJB,XML-RPC, Web Service),或者在GUI控件开发(JavaBean)等等。 清楚了数据格式化的必要和简单认识了什么是数据格式化之后,我们就来看看Python中两个数据格式化模块的使用。 

Json Module

Json:用于字符串和 python数据类型间进行转换;Json模块提供了四个功能:dumps、dump、loads、load[list=1]
  • dumps把数据类型转换成字符串 
  • dump把数据类型转换成字符串并存储在文件中 
  • loads把字符串转换成数据类型  
  • load把文件打开从字符串转换成数据类型
  • 实例如下:
    #!/usr/bin/env python3# _*_coding:utf-8_*_# Author: Lucky.chenimport jsoninfo = {'1MinLoad': 5, 'MemUse': '5G', 'DiskUse': '80G'}print('dumps 操作之前数据类型: %s' % type(info))JsonInfo = json.dumps(info)print(JsonInfo)# dumps 将数据通过特殊的形式转换为所有程序语言都识别的字符串print('dumps 操作之后数据类型: %s' % type(JsonInfo))# loads 将字符串通过特殊的形式转为python是数据类型  (将字符串转为字典)NewInfo = json.loads(JsonInfo)print('loads 操作之后数据类型为: %s' % type(NewInfo))print('分割线'.center(50, '-'))# dump 将数据通过特殊的形式转换为所有语言都识别的字符串并写入文件with open('SystemInfo.txt', 'w') as f:    json.dump(info, f)    print('dump file end!!')# load 从文件读取字符串并转换为python的数据类型with open('SystemInfo.txt', 'r') as f:    LoadInfo = json.load(f)    print('load file end, data type is %s' % type(LoadInfo), LoadInfo)
    结果如下:
    dumps 操作之前数据类型: {"MemUse": "5G", "DiskUse": "80G", "1MinLoad": 5}dumps 操作之后数据类型: loads 操作之后数据类型为: -----------------------分割线------------------------dump file end!!load file end, data type is  {'MemUse': '5G', '1MinLoad': 5, 'DiskUse': '80G'}
    一个错误案例如下:
    #!/usr/bin/env python3# _*_coding:utf-8_*_# Author: Lucky.chenimport jsondef test():    print('Test Func')info = {'Name': 'crh', 'age': 18, 'Func': test}json.dumps(info)
    结果:
     raise TypeError(repr(o) + " is not JSON serializable")TypeError:  is not JSON serializable
    如上可知函数不能被json序列化。 

    Pickle Module

    pickle,用于python特有的类型 和 python的数据类型间进行转换Pickle模块同样提供了四个功能:dumps、dump、loads、load[list=1]
  • dumps把数据类型转换成字符串 
  • dump把数据类型转换成字符串并存储在文件中 
  • loads把字符串转换成数据类型  
  • load把文件打开从字符串转换成数据类型


  • Pickle可以序列化一些较复杂的数据,和json的区别在于pickle序列化的时候,存放的是二进制的文件,所以打开一个文件的时候,我们要以二进制的格式打开。
     实例如下:
    #!/usr/bin/env python3
    # _*_coding:utf-8_*_
    # Author: Lucky.chen

    import pickle


    def test(name):
    print('%s write Test Func' % name)

    info = {'Name': 'crh', 'age': 18, 'Func': test}

    print('dumps 之前数据的类型为: %s' % type(info))

    # pickle.dumps 将数据通过特殊的形式转换为只有python语言认识bytes类型(Python2.*中是字符串类型)
    NewInfo = pickle.dumps(info)
    print('dumps result is %s, data type is %s' % (NewInfo, type(NewInfo)))

    # pickle.loads 将bytes通过特殊的形式转为python是数据类型
    LoadInfo = pickle.loads(NewInfo)
    print('loads result is %s, data type is %s' % (LoadInfo, type(LoadInfo)))
    LoadInfo['Func']('crh')

    print('分割线'.center(50, '-'))

    # pickle.dump 将数据通过特殊的形式转换为只有python语言认识的字符串,并写入文件
    with open('pickle.rb', 'wb') as f:
    pickle.dump(info, f)

    # pickle.load 从文件读取只有python语言认识的字符串并转换为python的数据类型
    with open('pickle.rb', 'rb') as f:
    Info = pickle.load(f)

    print(Info, 'type is %s' % type(Info))
    结果如下:
    dumps 之前数据的类型为: 
    dumps result is b'\x80\x03}q\x00(X\x03\x00\x00\x00ageq\x01K\x12X\x04\x00\x00\x00Nameq\x02X\x03\x00\x00\x00crhq\x03X\x04\x00\x00\x00Funcq\x04c__main__\ntest\nq\x05u.', data type is
    loads result is {'age': 18, 'Name': 'crh', 'Func': }, data type is
    crh write Test Func
    -----------------------分割线------------------------
    {'age': 18, 'Name': 'crh', 'Func': } type is

     


    总结


    很多情况下不同的程序之间传送数据我们一般通过文件的方式,但是这个方法是最原始的,而dumps可以直接让数据格式化传送给对方,但是不是所有的程序都是python的,所以只利用pickle是不现实的,比如一个python的程序需要发送一段数据给一个java程序开发的应用,这时候很多内存数据的交换,就得用json了。
     
    并且josn能dump的结果更可读,那么有人就问了,那还用pickle做什么不直接用josn,是这样的josn只能把常用的数据类型序列化(列表、字典、列表、字符串、数字、),比如日期格式、类对象!josn就不行了。
     
    为什么他不能序列化上面的东西呢?因为josn是跨语言的!注定了它只能规范出一些通用的数据类型的格式,统一标准。
      收起阅读 »

    Elasticsearch应用在数据中心的实时协议分析和安全威胁检测

    数据中心面临的挑战 被DDOS攻击: 网络瘫痪,大面积影响业务植入后门发包: 占用带宽资源,消耗成本运营“黑盒子”: 无法分辨“好人”、“坏人”监控粒度粗: 无法及时响应并定位事件   早期解决方案  Cacti 利用SNMP监控交换 机出入口流量交换机推...
    继续阅读 »


    数据中心面临的挑战


    1. 被DDOS攻击: 网络瘫痪,大面积影响业务
    2. 植入后门发包: 占用带宽资源,消耗成本
    3. 运营“黑盒子”: 无法分辨“好人”、“坏人”
    4. 监控粒度粗: 无法及时响应并定位事件

     
    早期解决方案 
    • Cacti 利用SNMP监控交换 机出入口流量
    • 交换机推送Sflow流量采样 数据,使用Solarwids监控
    • 遇到DDOS时,使用手动 Sniffer抓包分析
     第一期改造后 
    arch.png
     推送Netflow/Sflow 劣势
    • 消耗路由器CPU资源
    • 100-1000:1采样比,监测粒度粗
    • 业务和应用识别依赖端口号,无法识别日新月异 的业务类型 

     
    如何用数据驱动IDC运营 
    nsm.png

     
    NSM架构设计 
    nsmarch.png

     
    第二期改造后 
    erqi.png

     
    10G下的NSM :
    nsm10g.png

     


    实际效果展示 


    NSM架构解析 
    nsmp.png

    实时协议分析:Bro日志类型 
    bro.png

    Flow: 数据格式 
    format.png

    实时安全威胁检测引擎 
    sbro.png

    Suricata Today 
    suricata.png

    实时流量 +ELK + VirusTotal 
    ELK.png

    构建10G+ NSM的几个关键点 
    1、抓包网卡   
    2、内核优化 
    3、驱动与rss 
    4、PF-Ring_zc 
    5、ntop、nprobe、ndpi 
    6、跨数据中心es 
    流量抓包与网卡 
    ll.png

    ELK部分的关键点 
    1、用Logstash Kafka input接收数据
    2、数据量大,处理结构复杂时:
          预设Kafka分区 
          开启多个Logstash实例,分别读取Kafka分区数据 
          分别写入不同es节点
    3、多集群互联 
    跨数据中心es集群 
    zone.png

    10G NSM平台样例 
    10g.png

    万兆 实时 安全大数据架构 
    WZ.png


    作者:张磊@Zooboa 


    收起阅读 »