Druid 使用 HDFS 作为 deep storage

Druid 是大数据中提供快捷查询的组件,可以认为是业务层和持久层之间的一个 cache,除了可以单独使用之外,更多的使用场景是和 hdfs/s3 等持久化系统搭配,本文就以作者自身的体会搭建一下 druid 使用 HDFS 作为 deep storage。

  1. 安装

    因为 druid 要调用 hadoop 的 mapreduce 任务进行数据 index,而调用的方式是使用 hadoop client 的客户端调用的,这个客户端的特性是你用什么用户(whoami)调用的,那么就以什么用户执行任务,所以,我们不能用 root 用户执行,我这里问了方便,创建了一个 hdfs 的用户,并且将 druid 安装在了 /home/hdfs 目录下。

    当我们安装好(其实就是解压)druid 之后,可以在目录中看到两份配置文件,分别是:

    • conf
    • conf-quickstart

      从名字中就可以看出这两份文件的作用,而本文讲以 conf-quickstart 这份配置为例进行讲解。在 conf-quickstart 这个目录下,我们可以发现一个目录路径:conf-quickstart/druid/_common/,里面有两个文件,我们要修改的是:common.runtime.properties

  2. 配置 conf-quickstart/druid/_common/common.runtime.properties

    对于这个文件,我们要修改的地方有几个:

     druid.extensions.loadList=["postgresql-metadata-storage", "druid-hdfs-storage"]
     # 这里是注释了两行
     # druid.storage.type=local
    # druid.storage.storageDirectory=var/druid/segments
     druid.storage.type=hdfs
    druid.storage.storageDirectory=hdfs://hdp-master:8020///tmp/druid/
  3. 拷贝 hadoop 配置文件

    第二个非常重要的步骤就是要讲 hadoop 的配置文件拷贝到目录:conf-quickstart/druid/_common/,需要拷贝的文件有:

    • core-site.xml
    • hdfs-site.xml
    • mapred-site.xml
    • yarn-site.xml
  4. 修改任务配置文件

    因为我们是以 quickstart 为例进行配置的,所以我们修改的是:quickstart/wikiticker-index.json,有一个地方是要修改的:

     "jobProperties" : {
         "mapreduce.job.classloader": "true",
         "mapreduce.job.classloader.system.classes": "-javax.validation.,java.,javax.,org.apache.commons.logging.,org.apache.log4j.,org.apache.hadoop.",
         "mapreduce.map.java.opts":"-Duser.timezone=UTC -Dfile.encoding=UTF-8",
         "mapreduce.reduce.java.opts":"-Duser.timezone=UTC -Dfile.encoding=UTF-8"
    }
  5. 索引数据

    做了这么多工作,是时候进行数据操作了,也很简单,我们在 druid 的 home 目录上执行这个命令:

    [hdfs@druid druid]$ curl -X 'POST' -H 'Content-Type:application/json' -d @quickstart/wikiticker-index.json localhost:8090/druid/indexer/v1/task

    然后登陆:http://localhost:8090/console.html,我们应该会看到一个成功的任务:

    然后我们打开 http://localhost:50070/explorer.html#/tmp/druid,应该可以看到数据已经存储在 hdfs 里面了:

一些 BUG 解决方法

  1. Input path does not exist

    Caused by: org.apache.hadoop.mapreduce.lib.input.InvalidInputException: Input path does not exist: hdfs://hdp-master:8020/user/hdfs/quickstart/wikiticker-2015-09-12-sampled.json
     at org.apache.hadoop.mapreduce.lib.input.FileInputFormat.listStatus(FileInputFormat.java:285) ~[?:?]
     at org.apache.hadoop.mapreduce.lib.input.FileInputFormat.getSplits(FileInputFormat.java:340) ~[?:?]
     at org.apache.hadoop.mapreduce.lib.input.DelegatingInputFormat.getSplits(DelegatingInputFormat.java:115) ~[?:?]
     at org.apache.hadoop.mapreduce.JobSubmitter.writeNewSplits(JobSubmitter.java:493) ~[?:?]
     at org.apache.hadoop.mapreduce.JobSubmitter.writeSplits(JobSubmitter.java:510) ~[?:?]
     at org.apache.hadoop.mapreduce.JobSubmitter.submitJobInternal(JobSubmitter.java:394) ~[?:?]
     at org.apache.hadoop.mapreduce.Job$10.run(Job.java:1285) ~[?:?]
     at org.apache.hadoop.mapreduce.Job$10.run(Job.java:1282) ~[?:?]
     at java.security.AccessController.doPrivileged(Native Method) ~[?:1.8.0_60]
     at javax.security.auth.Subject.doAs(Subject.java:422) ~[?:1.8.0_60]
     at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1548) ~[?:?]
     at org.apache.hadoop.mapreduce.Job.submit(Job.java:1282) ~[?:?]
     at io.druid.indexer.DetermineHashedPartitionsJob.run(DetermineHashedPartitionsJob.java:116) ~[druid-indexing-hadoop-0.10.0.jar:0.10.0]
     at io.druid.indexer.JobHelper.runJobs(JobHelper.java:349) ~[druid-indexing-hadoop-0.10.0.jar:0.10.0]
     at io.druid.indexer.HadoopDruidDetermineConfigurationJob.run(HadoopDruidDetermineConfigurationJob.java:91) ~[druid-indexing-hadoop-0.10.0.jar:0.10.0]
     at io.druid.indexing.common.task.HadoopIndexTask$HadoopDetermineConfigInnerProcessing.runTask(HadoopIndexTask.java:306) ~[druid-indexing-service-0.10.0.jar:0.10.0]
     at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) ~[?:1.8.0_60]
     at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) ~[?:1.8.0_60]
     at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) ~[?:1.8.0_60]
     at java.lang.reflect.Method.invoke(Method.java:497) ~[?:1.8.0_60]
     at io.druid.indexing.common.task.HadoopTask.invokeForeignLoader(HadoopTask.java:208) ~[druid-indexing-service-0.10.0.jar:0.10.0]
     ... 7 more

    这个是因为我们的 quickstart/wikiticker-index.json 配置的是:

    "spec" : {
     "ioConfig" : {
       "type" : "hadoop",
       "inputSpec" : {
         "type" : "static",
         "paths" : "quickstart/wikiticker-2015-09-12-sampled.json"
       }
     }

    所以我们需要将 quickstart/wikiticker-2015-09-12-sampled.json 这个文件放到 hdfs://hdp-master:8020/user/hdfs/quickstart/wikiticker-2015-09-12-sampled.json 中就可以了。

  2. Container exited with a non-zero exit code 143

     2017-05-23T06:18:11,164 INFO [task-runner-0-priority-0] org.apache.hadoop.mapreduce.Job - Task Id : attempt_1495452772384_0008_m_000000_0, Status : FAILED
    Error: class com.fasterxml.jackson.datatype.guava.deser.HostAndPortDeserializer overrides final method deserialize.(Lcom/fasterxml/jackson/core/JsonParser;Lcom/fasterxml/jackson/databind/DeserializationContext;)Ljava/lang/Object;
    Container killed by the ApplicationMaster.
    Container killed on request. Exit code is 143
    Container exited with a non-zero exit code 143
    2017-05-23T06:18:17,221 INFO [task-runner-0-priority-0] org.apache.hadoop.mapreduce.Job - Task Id : attempt_1495452772384_0008_m_000000_1, Status : FAILED
    Error: class com.fasterxml.jackson.datatype.guava.deser.HostAndPortDeserializer overrides final method deserialize.(Lcom/fasterxml/jackson/core/JsonParser;Lcom/fasterxml/jackson/databind/DeserializationContext;)Ljava/lang/Object;
    Container killed by the ApplicationMaster.
    Container killed on request. Exit code is 143
    Container exited with a non-zero exit code 143
    2017-05-23T06:18:23,317 INFO [task-runner-0-priority-0] org.apache.hadoop.mapreduce.Job - Task Id : attempt_1495452772384_0008_m_000000_2, Status : FAILED
    Error: class com.fasterxml.jackson.datatype.guava.deser.HostAndPortDeserializer overrides final method deserialize.(Lcom/fasterxml/jackson/core/JsonParser;Lcom/fasterxml/jackson/databind/DeserializationContext;)Ljava/lang/Object;
    Container killed by the ApplicationMaster.
    Container killed on request. Exit code is 143
    Container exited with a non-zero exit code 143

    这个问题比较棘手,定位了好久,但是,仔细看一下官方文档,发现还是有一个地方没有注意的,那就是这个文档:Working with different versions of Hadoop

    解决方法也比较简单,我们修改一下 quickstart/wikiticker-index.json 文件,然后配置如下:

     "jobProperties" : {
         "mapreduce.job.classloader": "true",
         "mapreduce.job.classloader.system.classes": "-javax.validation.,java.,javax.,org.apache.commons.logging.,org.apache.log4j.,org.apache.hadoop.",
         "mapreduce.map.java.opts":"-Duser.timezone=UTC -Dfile.encoding=UTF-8",
         "mapreduce.reduce.java.opts":"-Duser.timezone=UTC -Dfile.encoding=UTF-8"
       }

    其实如果你按照我上面的步骤做下来的话应该不会有这个问题。

  3. No buckets?? seems there is no data to index.

     java.lang.RuntimeException: java.lang.reflect.InvocationTargetException
     at com.google.common.base.Throwables.propagate(Throwables.java:160) ~[guava-16.0.1.jar:?]
     at io.druid.indexing.common.task.HadoopTask.invokeForeignLoader(HadoopTask.java:211) ~[druid-indexing-service-0.10.0.jar:0.10.0]
     at io.druid.indexing.common.task.HadoopIndexTask.run(HadoopIndexTask.java:223) ~[druid-indexing-service-0.10.0.jar:0.10.0]
     at io.druid.indexing.overlord.ThreadPoolTaskRunner$ThreadPoolTaskRunnerCallable.call(ThreadPoolTaskRunner.java:436) [druid-indexing-service-0.10.0.jar:0.10.0]
     at io.druid.indexing.overlord.ThreadPoolTaskRunner$ThreadPoolTaskRunnerCallable.call(ThreadPoolTaskRunner.java:408) [druid-indexing-service-0.10.0.jar:0.10.0]
     at java.util.concurrent.FutureTask.run(FutureTask.java:266) [?:1.8.0_60]
     at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) [?:1.8.0_60]
     at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) [?:1.8.0_60]
     at java.lang.Thread.run(Thread.java:745) [?:1.8.0_60]
    Caused by: java.lang.reflect.InvocationTargetException
     at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) ~[?:1.8.0_60]
     at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) ~[?:1.8.0_60]
     at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) ~[?:1.8.0_60]
     at java.lang.reflect.Method.invoke(Method.java:497) ~[?:1.8.0_60]
     at io.druid.indexing.common.task.HadoopTask.invokeForeignLoader(HadoopTask.java:208) ~[druid-indexing-service-0.10.0.jar:0.10.0]
     ... 7 more
    Caused by: java.lang.RuntimeException: java.lang.RuntimeException: No buckets?? seems there is no data to index.
     at io.druid.indexer.IndexGeneratorJob.run(IndexGeneratorJob.java:215) ~[druid-indexing-hadoop-0.10.0.jar:0.10.0]
     at io.druid.indexer.JobHelper.runJobs(JobHelper.java:349) ~[druid-indexing-hadoop-0.10.0.jar:0.10.0]
     at io.druid.indexer.HadoopDruidIndexerJob.run(HadoopDruidIndexerJob.java:95) ~[druid-indexing-hadoop-0.10.0.jar:0.10.0]
     at io.druid.indexing.common.task.HadoopIndexTask$HadoopIndexGeneratorInnerProcessing.runTask(HadoopIndexTask.java:276) ~[druid-indexing-service-0.10.0.jar:0.10.0]
     at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) ~[?:1.8.0_60]
     at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) ~[?:1.8.0_60]
     at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) ~[?:1.8.0_60]
     at java.lang.reflect.Method.invoke(Method.java:497) ~[?:1.8.0_60]
     at io.druid.indexing.common.task.HadoopTask.invokeForeignLoader(HadoopTask.java:208) ~[druid-indexing-service-0.10.0.jar:0.10.0]
     ... 7 more
    Caused by: java.lang.RuntimeException: No buckets?? seems there is no data to index.
     at io.druid.indexer.IndexGeneratorJob.run(IndexGeneratorJob.java:176) ~[druid-indexing-hadoop-0.10.0.jar:0.10.0]
     at io.druid.indexer.JobHelper.runJobs(JobHelper.java:349) ~[druid-indexing-hadoop-0.10.0.jar:0.10.0]
     at io.druid.indexer.HadoopDruidIndexerJob.run(HadoopDruidIndexerJob.java:95) ~[druid-indexing-hadoop-0.10.0.jar:0.10.0]
     at io.druid.indexing.common.task.HadoopIndexTask$HadoopIndexGeneratorInnerProcessing.runTask(HadoopIndexTask.java:276) ~[druid-indexing-service-0.10.0.jar:0.10.0]
     at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) ~[?:1.8.0_60]
     at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) ~[?:1.8.0_60]
     at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) ~[?:1.8.0_60]
     at java.lang.reflect.Method.invoke(Method.java:497) ~[?:1.8.0_60]
     at io.druid.indexing.common.task.HadoopTask.invokeForeignLoader(HadoopTask.java:208) ~[druid-indexing-service-0.10.0.jar:0.10.0]
     ... 7 more

    这个问题是因为 timezone 不对导致的,所以需要解决的话,还是修改 quickstart/wikiticker-index.json ,然后配置为:

     "jobProperties" : {
         "mapreduce.job.classloader": "true",
         "mapreduce.job.classloader.system.classes": "-javax.validation.,java.,javax.,org.apache.commons.logging.,org.apache.log4j.,org.apache.hadoop.",
         "mapreduce.map.java.opts":"-Duser.timezone=UTC -Dfile.encoding=UTF-8",
         "mapreduce.reduce.java.opts":"-Duser.timezone=UTC -Dfile.encoding=UTF-8"
       }

    这个配置其实和 2 中的配置一样,因为我已经修改了,所以也就没有做区分了,如果从 2 下来的,也不会出现这个错误了。

  4. 更多的错误

    当然还会有很多我没遇到的错误,但是,我们给一些解决错误的思路:

    1. 第一步肯定是打开 http://localhost:8090/console.html 查看任务的日志,看下有什么错误
    2. 根据错误查找原因,如果不能直接的发现错误,那么就需要打开:http://localhost:19888/jobhistory/app 查看一下 hadoop 中的 mapreduce 任务的日志了,这里面会有详细得执行错误可以帮助定位问题
    3. mapreduce 任务没提交,druid 任务一直处于 runing 状态,这个情况很有可能是 druid 卡住了,这个一般在单机环境中比较容易出现,所以重启一下吧,我有好几次问题都是出在这。

supervisor 配置

因为要管理日志,又要经常重启之内的,所以我将所有的组件都设置为 supervisor 管理的进程,具体配置我放在 gist 上了,有兴趣可以拿去用:

supervisor 配置

Reference

  1. Deep Storage
  2. Druid with HDFS
  3. Working with different versions of Hadoop
  4. No buckets?? seems there is no data to index

· EOF ·

最近爬虫很是凶猛,标注下文章的原文地址: https://liuliqiang.info/post/druid-using-hdfs-as-deep-storage/