storm学习第三天-storm与其他组件交互

storm如何把数据插入到elasticsearch

1 storm提供的例子

https://github.com/apache/storm/tree/master/external/storm-elasticsearch

代码:

<dependency>
<groupId>org.apache.storm</groupId>
<artifactId>storm-elasticsearch</artifactId>
<version>1.1.0</version>
</dependency>

EsConfig esConfig = new EsConfig(clusterName, new String[]{"localhost:9300"});
EsTupleMapper tupleMapper = new DefaultEsTupleMapper();
EsIndexBolt indexBolt = new EsIndexBolt(esConfig, tupleMapper);

问题:依赖低版本的Elasticsearch 这个问题没有解决
查看最新代码已经修复了 没提供jar包
需要重新编译storm1.10代码 直接放弃了 采用下面方法

方法2 elasticsearch-hadoop

image.png

疑问?
仅仅支持hadoop吗 storm支持吗我要的是storm?

ES-Hadoop无缝打通了ES和Hadoop两个非常优秀的框架,我们既可以把HDFS的数据导入到ES里面做分析,也可以将es数据导出到HDFS上做备份,归档,其中值得一提的是ES-Hadoop全面的支持了Spark框架,
其中包括Spark(五角星那个上面中间位置)

  • 支持Hive(像蜜蜂的那个下面最左位置)
  • 支持Cascading(有五个竖线那个 上面最右位置)
    Cascading is the proven application development platform for
    building data applications on Hadoop.
  • Storm(闪电的那个)
  • 当然还有标准的MapReduce,
    无论用那一个框架集成ES,都是非常简洁的。
image.png

疑问:
为了使用这个jar 是否引用一系列相关的jar呀

经过验证不需要引入hadoop 但是json和http引入
折腾不起

     <dependency>
        <groupId>commons-httpclient</groupId>
        <artifactId>commons-httpclient</artifactId>
        <version>3.1</version>
    </dependency>
       
   <dependency>
    <groupId>commons-codec</groupId>
    <artifactId>commons-codec</artifactId>
    <version>1.10</version>
  </dependency>
       
       
        <!-- https://mvnrepository.com/artifact/org.elasticsearch/elasticsearch-hadoop -->
    <dependency>
        <groupId>org.elasticsearch</groupId>
        <artifactId>elasticsearch-hadoop</artifactId>
        <version>5.5.1</version>
    </dependency>
    
    <dependency>
            <groupId>org.apache.httpcomponents</groupId>
            <artifactId>httpclient</artifactId>
            <version>4.5.3</version>
    </dependency>
    
    <!-- https://mvnrepository.com/artifact/org.codehaus.jackson/jackson-mapper-asl -->
    <dependency>
        <groupId>org.codehaus.jackson</groupId>
        <artifactId>jackson-mapper-asl</artifactId>
        <version>1.8.8</version>
    </dependency>
    
   <dependency>
        <groupId>org.codehaus.jackson</groupId>
        <artifactId>jackson-core-asl</artifactId>
        <version>1.8.8</version>
    </dependency>
    
    <dependency>
    <groupId>org.codehaus.jackson</groupId>
    <artifactId>jackson-jaxrs</artifactId>
    <version>1.8.8</version>
</dependency>   
    <dependency>
    <groupId>org.codehaus.jackson</groupId>
    <artifactId>jackson-xc</artifactId>
    <version>1.8.8</version>
</dependency>

    
     <!-- https://mvnrepository.com/artifact/com.fasterxml.jackson.core/jackson-core -->
    <dependency>
        <groupId>com.fasterxml.jackson.core</groupId>
        <artifactId>jackson-core</artifactId>
        <version>2.9.0</version>
    </dependency>
    
                  
      
   <!--
         <dependency>
          <groupId>org.apache.storm</groupId>
          <artifactId>storm-elasticsearch</artifactId>
          <version>1.1.0</version>
       </dependency>
       
       
        <dependency>
            <groupId>org.elasticsearch</groupId>
            <artifactId>elasticsearch-hadoop</artifactId>
            <version>5.5.1</version>
    </dependency>
    
     <dependency>
            <groupId>org.elasticsearch</groupId>
            <artifactId>elasticsearch-storm</artifactId>
            <version>5.5.1</version>
      </dependency>
      -->   
       <!-- https://mvnrepository.com/artifact/org.elasticsearch/elasticsearch -->
        <dependency>
            <groupId>org.elasticsearch</groupId>
            <artifactId>elasticsearch</artifactId>
            <version>5.5.1</version>
        </dependency>

代码实现

配置文件:
<dependency>
<groupId>org.elasticsearch</groupId>
<artifactId>elasticsearch-hadoop</artifactId>
<version>5.5.1</version>
</dependency>

方法3 elasticsearch 官方提供的例子

<dependency>
<groupId>org.elasticsearch</groupId>
<artifactId>elasticsearch-storm</artifactId>
<version>5.5.1</version>
</dependency>

阅读代码:
关键类:TransportClient

Elasticsearch uses standard RESTful APIs and JSON.
TransportClient client = new PreBuiltTransportClient(Settings.EMPTY)
   .addTransportAddress(new InetSocketTransportAddress(InetAddress.getByName("localhost"), 9300));
SearchResponse sr = client.prepareSearch()
 .setQuery(QueryBuilders.matchQuery("message", "myProduct"))
 .addAggregation(AggregationBuilders.terms("top_10_states")
 .field("state").size(10))
 .execute().actionGet();
client.close();

es Elasticsearch from Storm

image.png

http://blog.csdn.net/sunnyyoona/article/details/52860861

https://www.elastic.co/guide/en/elasticsearch/guide/current/dynamic-mapping.html
https://www.elastic.co/guide/en/elasticsearch/reference/2.4/dynamic-field-mapping.html#date-detection
参考

storm连接kafka

//重点
Storm 如何来封装kafka接口
class:DynamicPartitionConnections

[storm数据怎样输出到elasticsearch]

最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
平台声明:文章内容(如有图片或视频亦包括在内)由作者上传并发布,文章内容仅代表作者本人观点,简书系信息发布平台,仅提供信息存储服务。

推荐阅读更多精彩内容

  • Spring Cloud为开发人员提供了快速构建分布式系统中一些常见模式的工具(例如配置管理,服务发现,断路器,智...
    卡卡罗2017阅读 135,308评论 19 139
  • 我只是很喜欢那时的自己。
    林加加阅读 1,072评论 0 0
  • 你还记得你第一次嚎啕大哭的场景吗? 你若是个女人,就一定哭过。 而且,一定有过撕心裂肺,伤心绝望的痛哭。 无论是站...
    芈九阅读 3,770评论 1 3
  • 1、揉捏风池穴 取穴定位:位于颈后两侧枕骨下方,发际两边大筋外侧的凹陷处。 按摩方法:被按摩者取坐位,按摩者站在被...
    松本静阅读 3,177评论 0 1
  • 简书
    mortyu阅读 1,124评论 0 0