O es-hadoop é um conector entre duas das tecnologias principais segundo o paradigma Big Data. O principal caso de uso desse conector é utilizar a capacidade de busca e análise do Elasticsearch em conjunto com informações que estejam armazenadas no Hadoop para o desenvolvimento de aplicações de tempo real e detecção de anomalias (por exemplo, para descoberta de fraudes).

É importante conhecer alguns conceitos do Elasticsearch antes que um exemplo prático seja apresentado. Para esse artigo, os principais conceitos são: índice e indexação. Um índice é o local onde serão armazenados os documentos que são gerenciados pelo Elasticsearch. O processo de armazenamento de um documento é chamado de indexação, pois diferentemente dos bancos de dados tradicionais, as informações contidas nesse documento normalmente não serão armazenadas de forma crua, ou seja não estão na mesma forma que foram enviadas ao Elasticsearch, mas antes serão analisados e transformados.

Hadoop se tornou o padrão para o desenvolvimento de aplicações baseadas no padrão Map/Reduce. Para esse artigo, os conceitos mais importantes são: HDFS e job. O Hadoop Distributed Filesytem (HDFS) é o sistema de artigos distribuídos do Hadoop, que irá armazenar as entradas e os resultados do seu processamento de seus Jobs, que são a unidade mínima de execução no Hadoop, que geralmente possui fases de Map e de Reduce, ainda que o Reduce possa ser eventualmente omitido.

Inicialmente precisamos instalar o Elasticsearch e o Hadoop. A primeira atividade deve ser instalar o Elasticsearch em um servidor. Em uma máquina com Java instalado, devemos baixar a última versão do site do Elasticsearch, desempacotá-la, e executar o comando ./bin/elasticsearch. Se tudo ocorreu podemos chamar localhost:9200 em um navegador, conforme ilustrado na Listagem 1 o Elasticsearch irá retornar uma resposta JSON. Nessa resposta, o parâmetro name provavelmente irá variar para cada leitor, pois é escolhido de forma aleatória (em resumo, não se preocupe se a resposta JSON não for exatamente igual a Listagem 1).

Listagem 1. Resposta do cluster Elasticsearch

  {
    "status" : 200,
    "name" : "Madeline Joyce",
    "version" : {
      "number" : "1.3.4",
      "build_hash" : "a70f3ccb52200f8f2c87e9c370c6597448eb3e45",
      "build_timestamp" : "2014-09-30T09:07:17Z",
      "build_snapshot" : false,
      "lucene_version" : "4.9"
    },
    "tagline" : "You Know, for Search"
  }

Com o Elasticsearch executando, podemos criar um índice para nosso exemplo. Nesse artigo vamos criar um crawler para o RSS da Globo.com para o Santos Futebol Clube, que irá recuperar dos dados do endpoint RSS e mapeá-lo para o Hadoop Filesystem (HDFS). Do HDFS vamos utilizar o hadoop-es para enviar esses dados para um índice ES. Posteriormente, vamos fazer o caminho inverso para utilizar as capacidades de análise do ES para buscar no índice os resultados e armazená-los novamente no HDFS. A Listagem 2 apresenta a criação do índice em ES.

Listagem 2. Criação do índice Elasticsearch

  PUT /santos
  {
     "analysis": {
        "analyzer": {          
           "description_analyzer": {
             "type": "custom",
             "tokenizer": "whitespace",
             "filter": [ "asciifolding", "stem_minimal_pt" ]
           }
        },
        "filter": 
            {
           "stem_minimal_pt": {
              "type": "stemmer",
              "language": "minimal_portuguese"
           }
        }
     }
  }

O próximo passo é adicionar um mapeamento para definir os campos, seus tipos e a forma que como cada um desses campos será analisado. A Listagem 3 apresenta o mapeamento com os seguintes campos um post RSS: title, link, description, e category. Todos campos são do tipo string. É importante saber que o Elasticsearch permite mapear campos com outros tipos (por exemplo, long, integer, boolean) e definir mapeamentos mais complexos (como listas e objetos). Os campos title, link e category são armazenados após processados pelo analisador padrão – standard. Para mais detalhes sobre o analisador standard visite a documentação oficial do Elasticsearch. O campo description é analisado pelo description_analizer, apresentada na Listagem 2, que irá remover qualquer tag HTML que esteja nesse campo.

Listagem 3. Mapeamento do Elasticsearch

  PUT santos/rss/_mapping
  {
      "rss" : {
         "properties" : {
           "title" : {
              "type" : "string"
           },
           "link" : {
              "type" : "string"
           },
           "description" : {
              "type" : "string", "analyzer": "description_analyzer"
           },
           "category" : {
              "type" : "date"
           }
          }
 }

Antes de continuar, vamos instalar o Hadoop no Linux. Instalar o Hadoop no Windows é possível, porém não é recomendado seu uso em produção. A Tabela 1 lista os passos necessários para essa instalação de modo simplificado, por isso não será comentada a resposta da execução de cada comando.

Descrição do passo

Comandos

Instalar o Java

sudo apt-get install   default-jdk

Adicionar um grupo

sudo addgroup hadoop

Adicionar um usuário para o Hadoop

sudo   adduser --ingroup hadoop hduser

Instalar o SSH

sudo apt-get install   ssh

Logar como usuário hduser

su   hduser

Gerar chave pública

ssh-keygen -t rsa -P   ""

Adicionar chave ao

cat   $HOME/.ssh/id_rsa.pub >> $HOME/.ssh/authorized_keys

Testar o SSH

ssh   localhost

Baixar o Hadoop

wget http://mirrors.sonic.net/apache/hadoop/common/hadoop-2.4.1/hadoop-2.4.1.tar.gz

Desempacotar o Hadoop

tar xvzf   hadoop-2.4.1.tar.gz

Mover o Hadoop e autorizar o usuário hduser

sudo   mv hadoop-2.4.1 /usr/local/hadoop
   sudo chown -R hduser:hadoop   hadoop

Modificar o ~/.bashrc, adicionando as seguintes linhas.

#HADOOP VARIABLES   START
   export   JAVA_HOME=/usr/lib/jvm/java-7-openjdk-amd64
   export   HADOOP_INSTALL=/usr/local/hadoop
   export PATH=$PATH:$HADOOP_INSTALL/bin
   export   PATH=$PATH:$HADOOP_INSTALL/sbin
   export   HADOOP_MAPRED_HOME=$HADOOP_INSTALL
   export   HADOOP_COMMON_HOME=$HADOOP_INSTALL
   export   HADOOP_HDFS_HOME=$HADOOP_INSTALL
   export   YARN_HOME=$HADOOP_INSTALL
   export   HADOOP_COMMON_LIB_NATIVE_DIR=$HADOOP_INSTALL/lib/native
   export   HADOOP_OPTS="-Djava.library.path=$HADOOP_INSTALL/lib"
   #HADOOP VARIABLES END

Modificar o hadoop-env.sh, adicionando as seguintes linhas.

export   JAVA_HOME=/usr/lib/jvm/java-7-openjdk-amd64

Modificar o core-site.xml

<configuration>
    <property>
     <name>hadoop.tmp.dir</name>
     <value>/app/hadoop/tmp</value>
     <description></description>
    </property>
    <property>
     <name>fs.default.name</name>
     <value>hdfs://localhost:54310</value>
     <description></description>
    </property>
   </configuration>

Modificar o mapred-site.xml

<configuration>
    <property>
     <name>mapred.job.tracker</name>
     <value>localhost:54311</value>
     <description>
     </description>
    </property>
   </configuration>

Modificar o hdfs-site.xml

   <configuration>
    <property>
     <name>dfs.replication</name>
     <value>1</value>
     <description>
     </description>
    </property>
    <property>
      <name>dfs.namenode.name.dir</name>
      <value>file:/usr/local/hadoop_store/hdfs/namenode</value>
    </property>
    <property>
      <name>dfs.datanode.data.dir</name>
      <value>file:/usr/local/hadoop_store/hdfs/datanode</value>
    </property>
   </configuration>

Formatar o Hadoop Filesystem

hadoop namenode -format

Iniciar o Hadoop

start-all.sh

Tabela 1. Comandos para instalação do Hadoop no Linux.

Se o Hadoop estiver instalado corretamente, o comando jps – que lista os processos Java – resultará em algo parecido com a Listagem 4.

Listagem 4. Dependência do es-hadoop

  hduser@k:/home/k
array21
nbsp;jps
6139Jps
5484NameNode
5871SecondaryNameNode
5969ResourceManager
6054NodeManager
7610DataNode

Com ambas tecnologias instaladas, podemos começar um projeto Java. Nesse artigo vamos utilizar um projeto Maven do Eclipse, por facilitar muito o desenvolvimento, porém os passos são os mesmos, independentemente de IDE ou tipo de projeto. Após criar o projeto, adicione a dependência do es-hadoop como apresentado na Listagem 5.

Listagem 5. Dependência do es-hadoop.

  <dependency>
    <groupId>org.apache.hadoop</groupId>
    <artifactId>hadoop-client</artifactId>
    <version>2.5.1</version>
  </dependency>
  <dependency>
    <groupId>org.elasticsearch</groupId>
    <artifactId>elasticsearch-hadoop</artifactId>
    <version>2.0.2</version>
  </dependency>

Nesse exemplo, vamos utilizar o es-hadoop com suas funções de Map/Reduce, porém poderíamos integrá-lo também com o Hive, Pig e o Spark, componentes do Hadoop. Devemos primeiramente recuperar os dados do feed RSS. Para tal, criamos a classe RSSReader da Listagem 6 que recupera os dados do feeder e os escreve para o HDFS. Antes de executar essa classe vamos criar um diretório no HDFS com o seguinte comando (com o usuário hduser): hadoop fs -mkdir /devmedia. Nessa classe o mais importante é lembrar que estamos utilizando as configurações padrão do Hadoop, por isso se o seu HDFS não estiver acessível localmente e ou na porta padrão isso deve ser alterado no lugar de

Listagem 6. Leitura dos dados RSS e escrita no HDFS

  public class RSSReader {
   
              private static Logger logger = Logger.getLogger(RSSReader.class);
   
              public static void main(String argv[]) {
   
                          try {
                                     URL url = new URL(
                                                             "http://globoesporte.globo.com/servico/semantica/editorias/plantao/futebol/times/santos/feed.rss");
   
                                     InputStream in = url.openStream();
   
                                     DocumentBuilderFactory dbFactory = DocumentBuilderFactory
                                                             .newInstance();
                                     DocumentBuilder dBuilder = dbFactory.newDocumentBuilder();
                                     Document doc = dBuilder.parse(in);
   
                                     doc.getDocumentElement().normalize();
   
                                     NodeList nList = doc.getElementsByTagName("item");
   
                                     List<String> linhas = new ArrayList<String>();
   
                                     String linha = "";
   
                                     for (int temp = 0; temp < nList.getLength(); temp++) {
   
                                                 Node nNode = nList.item(temp);
   
                                                 if (nNode.getNodeType() == Node.ELEMENT_NODE) {
   
                                                             Element eElement = (Element) nNode;
   
                                                             linha += eElement.getElementsByTagName("title").item(0)
                                                                                    .getTextContent()+"|";
                                                             linha += eElement.getElementsByTagName("description").item(0)
                                                                                    .getTextContent()+"|";
                                                             linha += eElement.getElementsByTagName("link").item(0)
                                                                                    .getTextContent()+"|";
                                                             linha += eElement.getElementsByTagName("category").item(0)
                                                                                    .getTextContent();
                                                             
                                                             linhas.add(linha);
                                                 }
                                     }
                                     
                                     write(linhas);
                          } catch (Exception e){
                                     e.printStackTrace();
                          }
              }
   
              private static void write(List<String> linhas) throws IOException {
   
                          Path pt=new Path("hdfs://localhost:54310/devmedia/input.txt");
          FileSystem fs = FileSystem.get(new Configuration());
          
          BufferedWriter br=new BufferedWriter(new OutputStreamWriter(fs.create(pt,true)));
          
                          for(String linha:linhas){                          
                                     logger.info(linha);
              br.write(linha);         
                          }
                          
          br.close(); 
              }
  }

Posteriormente, a classe Mapper apresentada na Listagem 7 que irá recuperar os dados e colocá-los num formato capaz de ser lido pelo Hadoop. Nessa etapa, podemos fazer algumas transformações para melhorar o significado dos nossos dados, apenas como exemplos filtraram os valores para apenas escrever os posts que tenham a string “santos” no título.

Listagem 7. Classe Mapper da escrita para o ES

  public class ESReaderMapper extends
                          Mapper<LongWritable, Text, NullWritable, MapWritable> {
   
              private static Logger logger = Logger.getLogger(ESReaderMapper.class);
   
              @Override
              protected void map(LongWritable key, Text value, Context context)
                                     throws IOException, InterruptedException {
   
                          logger.debug(value);
   
                          String[] splitValue = value.toString().split("|");
   
                          String title = splitValue[0];
   
                          MapWritable doc = new MapWritable();
   
                          if (title.toLowerCase().contains("santos")) {
   
                                     doc.put(new Text("title"), new Text(title));
                                     doc.put(new Text("description"), new Text(splitValue[1]));
                                     doc.put(new Text("link"), new Text(splitValue[2]));
                                     doc.put(new Text("category"), new Text(splitValue[3]));
   
                                      context.write(NullWritable.get(), doc);
                          }
              }
  }

Na sequência, podemos escrever o código do job para escrita de dados para o ES. Conforme ilustrado na Listagem 8, primeiramente, devemos adicionar ao objeto de configuração do job códigos que definem onde o es-hadoop irá encontrar nosso Elasticsearch (nesse exemplo, nos limitamos a copiar os parâmetros definidos anteriormente na configuração do ES). Outra definição importante é utilizar o EsOutputFormat (importado do es-hadoop) como o formato de saída (OutputFormatClass) para nosso job. Esse código irá utilizar o ESWriterMapper da listagem anterior para transformar o que foi lido anteriormente para documentos JSON.

Listagem 8. Classe Mapper da escrita para o ES

  public class ESWriterJob {
                          
              public static void main(String[] args) throws Exception {
                  Configuration conf = new Configuration();
                          conf.set("es.nodes","localhost");    
                          conf.set("es-port","9200");    
                          conf.set("es.resource","santos/rss");
                  
                  Job job = new Job(conf, "RSS Writer");
                  
          job.setJarByClass(ESWriterJob.class);
          
          //es-hadoop configs
          job.setOutputFormatClass(EsOutputFormat.class);
          job.setMapOutputValueClass(MapWritable.class);
          job.setMapperClass(ESWriterMapper.class);        
          job.setSpeculativeExecution(false);
   
          FileInputFormat.addInputPath(job, new Path(args[0]));
          job.setInputFormatClass(KeyValueTextInputFormat.class);
              
          boolean result = job.waitForCompletion(true);
          System.exit(result ? 0 : 1);
                }         
  }

Após a execução desse job, os valores que estavam no HDFS serão enviados ao nosso índice do Elasticsearch. Para verificar isso, podemos executar uma busca match_all (que irá retornar todos os valores) no nosso índice como ilustrado na Listagem 9.

Listagem 9. Verificar se os valores chegaram ao índice

  curl -XGET 'localhost:9200/santos/rss/_search' -d '
  {
      "query" : {
          "match_all" : {}
      }
  }

Como já temos dados cadastrados no índice, podemos fazer o exemplo contrário, ou seja, um código que leia dados do índice e envie ao HDFS. O método da Listagem 10 ilustra como podemos recuperar de RSS que estão no HDFS e enviá-los ao índice do Elasticsearch. De forma semelhante, para ler dados de ElasticSearch, vamos configurar o acesso ao ES e utilizar uma classe de format (nesse caso a EsInputFormat) oferecida pelo es-hadoop.Além disso, e mais importante, devemos definir uma consulta para extrair dados do ES.

Listagem 10. Recuperar dados do Elasticsearch

  public class ESReaderJob {
                          
              public static void main(String[] args) throws Exception {
                  Configuration conf = new Configuration();
                          conf.set("es.nodes","localhost");    
                          conf.set("es-port","9200");    
                          conf.set("es.resource","santos/rss");
                          conf.set("es.query","{ \"match_all\" : { } }");
                  
                          Job job = new Job(conf, "RSS Reader");
                  
                          job.setJarByClass(ESReaderJob.class);
          
                          job.setInputFormatClass(EsInputFormat.class);
                          job.setMapOutputKeyClass(Text.class);
                          job.setMapOutputValueClass(MapWritable.class);
   
              FileOutputFormat.setOutputPath(job, new Path("/devmedia/output.txt"));
   
                          boolean result = job.waitForCompletion(true);
                          System.exit(result ? 0 : 1);
              }           
  }

Se tudo ocorreu bem, podemos encontrar o arquivo de saída no diretório que criamos anteriormente, através do seguinte comando (com o usuário hduser): hadoop fs -ls /devmedia.

Em resumo, vamos utilizamos nesse artigo o es-hadoop para comunicação entre o Elaticsearch e o Hadoop. Esse framework fornece um InputFormat e um OutputFormat dedicado a ler e gravar dados do ElasticSearch e uma sequência de parâmetros de configuração que permitirão a um Job do Hadoop encontrar o ES. O es-hadoop não se resume as essas atividades e podemos, por exemplo, indexar diretamente documentos JSON sem aplicar qualquer transformação, utilizar índices de leitura e escrita distintos, e utilizar outros componentes do Hadoop como também com o Hive, Pig e o Spark.

O es-hadoop (Easticsearch for Apache Hadoop) permite que os trabalhos do Hadoop para interagir com ElasticSearch com uma pequena biblioteca e uma configuração fácil, a importância desse framework é combinar ferramentas que desde o nível mais abstrato até a implementação de detalhes de código tem muito para ofertar trabalhando em conjunto. Por exemplo, a escalabilidade, uma das principais vantagens tanto do Elasticsearch quanto do Hadoop, pode ilustrar a semelhança já que um componente crítico é a capacidade de dividir uma tarefa em problemas menores que executam ao mesmo tempo. Esse conceito está presente em ambos: no Hadoop através das suas divisões (o número de partes em que uma fonte ou entrada pode ser dividido) e no ElasticSearch através do uso de shards.