O es-hadoop é um conector entre duas das tecnologias principais segundo o paradigma Big Data. O principal caso de uso desse conector é utilizar a capacidade de busca e análise do Elasticsearch em conjunto com informações que estejam armazenadas no Hadoop para o desenvolvimento de aplicações de tempo real e detecção de anomalias (por exemplo, para descoberta de fraudes).

É importante conhecer alguns conceitos do Elasticsearch antes que um exemplo prático seja apresentado. Para esse artigo, os principais conceitos são: índice e indexação. Um índice é o local onde serão armazenados os documentos que são gerenciados pelo Elasticsearch. O processo de armazenamento de um documento é chamado de indexação, pois diferentemente dos bancos de dados tradicionais, as informações contidas nesse documento normalmente não serão armazenadas de forma crua, ou seja não estão na mesma forma que foram enviadas ao Elasticsearch, mas antes serão analisados e transformados.

Hadoop se tornou o padrão para o desenvolvimento de aplicações baseadas no padrão Map/Reduce. Para esse artigo, os conceitos mais importantes são: HDFS e job. O Hadoop Distributed Filesytem (HDFS) é o sistema de artigos distribuídos do Hadoop, que irá armazenar as entradas e os resultados do seu processamento de seus Jobs, que são a unidade mínima de execução no Hadoop, que geralmente possui fases de Map e de Reduce, ainda que o Reduce possa ser eventualmente omitido.

Inicialmente precisamos instalar o Elasticsearch e o Hadoop. A primeira atividade deve ser instalar o Elasticsearch em um servidor. Em uma máquina com Java instalado, devemos baixar a última versão do site do Elasticsearch, desempacotá-la, e executar o comando ./bin/elasticsearch. Se tudo ocorreu podemos chamar localhost:9200 em um navegador, conforme ilustrado na Listagem 1 o Elasticsearch irá retornar uma resposta JSON. Nessa resposta, o parâmetro name provavelmente irá variar para cada leitor, pois é escolhido de forma aleatória (em resumo, não se preocupe se a resposta JSON não for exatamente igual a Listagem 1).

{
  "status" : 200,
  "name" : "Madeline Joyce",
  "version" : {
    "number" : "1.3.4",
    "build_hash" : "a70f3ccb52200f8f2c87e9c370c6597448eb3e45",
    "build_timestamp" : "2014-09-30T09:07:17Z",
    "build_snapshot" : false,
    "lucene_version" : "4.9"
  },
  "tagline" : "You Know, for Search"
}
Listagem 1. Resposta do cluster Elasticsearch

Com o Elasticsearch executando, podemos criar um índice para nosso exemplo. Nesse artigo vamos criar um crawler para o RSS da Globo.com para o Santos Futebol Clube, que irá recuperar dos dados do endpoint RSS e mapeá-lo para o Hadoop Filesystem (HDFS). Do HDFS vamos utilizar o hadoop-es para enviar esses dados para um índice ES. Posteriormente, vamos fazer o caminho inverso para utilizar as capacidades de análise do ES para buscar no índice os resultados e armazená-los novamente no HDFS. A Listagem 2 apresenta a criação do índice em ES.

PUT /santos
{
 "analysis": {
    "analyzer": {          
       "description_analyzer": {
         "type": "custom",
         "tokenizer": "whitespace",
         "filter": [ "asciifolding", "stem_minimal_pt" ]
       }
    },
    "filter": 
        {
       "stem_minimal_pt": {
          "type": "stemmer",
          "language": "minimal_portuguese"
       }
    }
 }
}
Listagem 2. Criação do índice Elasticsearch

O próximo passo é adicionar um mapeamento para definir os campos, seus tipos e a forma que como cada um desses campos será analisado. A Listagem 3 apresenta o mapeamento com os seguintes campos um post RSS: title, link, description, e category. Todos campos são do tipo string. É importante saber que o Elasticsearch permite mapear campos com outros tipos (por exemplo, long, integer, boolean) e definir mapeamentos mais complexos (como listas e objetos). Os campos title, link e category são armazenados após processados pelo analisador padrão – standard. Para mais detalhes sobre o analisador standard visite a documentação oficial do Elasticsearch. O campo description é analisado pelo description_analizer, apresentada na Listagem 2, que irá remover qualquer tag HTML que esteja nesse campo.

PUT santos/rss/_mapping
{
"rss" : {
   "properties" : {
     "title" : {
        "type" : "string"
     },
     "link" : {
        "type" : "string"
     },
     "description" : {
        "type" : "string", "analyzer": "description_analyzer"
     },
     "category" : {
        "type" : "date"
     }
    }
}
Listagem 3. Mapeamento do Elasticsearch

Antes de continuar, vamos instalar o Hadoop no Linux. Instalar o Hadoop no Windows é possível, porém não é recomendado seu uso em produção. A Tabela 1 lista os passos necessários para essa instalação de modo simplificado, por isso não será comentada a resposta da execução de cada comando.

Descrição do passo Comandos
Instalar o Java sudo apt-get install default-jdk
Adicionar um grupo sudo addgroup hadoop
Adicionar um usuário para o Hadoop sudo adduser --ingroup hadoop hduser
Instalar o SSH sudo apt-get install ssh
Logar como usuário hduser su hduser
Gerar chave pública ssh-keygen -t rsa -P ""
Adicionar chave ao cat $HOME/.ssh/id_rsa.pub >> $HOME/.ssh/authorized_keys
Testar o SSH ssh localhost
Baixar o Hadoop wget http://mirrors.sonic.net/apache/hadoop/common/hadoop-2.4.1/hadoop-2.4.1.tar.gz
Desempacotar o Hadoop tar xvzf hadoop-2.4.1.tar.gz
Mover o Hadoop e autorizar o usuário hduser sudo mv hadoop-2.4.1 /usr/local/hadoop sudo chown -R hduser:hadoop hadoop
Modificar o ~/.bashrc, adicionando as seguintes linhas. #HADOOP VARIABLES START export JAVA_HOME=/usr/lib/jvm/java-7-openjdk-amd64 export HADOOP_INSTALL=/usr/local/hadoop export PATH=$PATH:$HADOOP_INSTALL/bin export PATH=$PATH:$HADOOP_INSTALL/sbin export HADOOP_MAPRED_HOME=$HADOOP_INSTALL export HADOOP_COMMON_HOME=$HADOOP_INSTALL export HADOOP_HDFS_HOME=$HADOOP_INSTALL export YARN_HOME=$HADOOP_INSTALL export HADOOP_COMMON_LIB_NATIVE_DIR=$HADOOP_INSTALL/lib/native export HADOOP_OPTS="-Djava.library.path=$HADOOP_INSTALL/lib" #HADOOP VARIABLES END
Modificar o hadoop-env.sh, adicionando as seguintes linhas. export JAVA_HOME=/usr/lib/jvm/java-7-openjdk-amd64
Modificar o core-site.xml <configuration> <property> <name>hadoop.tmp.dir</name> <value>/app/hadoop/tmp</value> <description></description> </property> <property> <name>fs.default.name</name> <value>hdfs://localhost:54310</value> <description></description> </property> </configuration>
Modificar o mapred-site.xml <configuration> <property> <name>mapred.job.tracker</name> <value>localhost:54311</value> <description> </description> </property> </configuration>
Modificar o hdfs-site.xml <configuration> <property> <name>dfs.replication</name> <value>1</value> <description> </description> </property> <property> <name>dfs.namenode.name.dir</name> <value>file:/usr/local/hadoop_store/hdfs/namenode</value> </property> <property> <name>dfs.datanode.data.dir</name> <value>file:/usr/local/hadoop_store/hdfs/datanode</value> </property> </configuration>
Formatar o Hadoop Filesystem hadoop namenode -format
Iniciar o Hadoop start-all.sh
Tabela 1. Comandos para instalação do Hadoop no Linux

Se o Hadoop estiver instalado corretamente, o comando jps – que lista os processos Java – resultará em algo parecido com a Listagem 4.

hduser@k:/home/knbsp;jps
6139Jps
5484NameNode
5871SecondaryNameNode
5969ResourceManager
6054NodeManager
7610DataNode
Listagem 4. Dependência do es-hadoop

Com ambas tecnologias instaladas, podemos começar um projeto Java. Nesse artigo vamos utilizar um projeto Maven do Eclipse, por facilitar muito o desenvolvimento, porém os passos são os mesmos, independentemente de IDE ou tipo de projeto. Após criar o projeto, adicione a dependência do es-hadoop como apresentado na Listagem 5.

<dependency>
  <groupId>org.apache.hadoop</groupId>
  <artifactId>hadoop-client</artifactId>
  <version>2.5.1</version>
</dependency>
<dependency>
  <groupId>org.elasticsearch</groupId>
  <artifactId>elasticsearch-hadoop</artifactId>
  <version>2.0.2</version>
</dependency>
Listagem 5. Dependência do es-hadoop

Nesse exemplo, vamos utilurações padrão do Hadoop, por isso se o seu HDFS não estiver acessível localmente e ou na porta padrão isso deve ser alterado no lugar de

public class RSSReader {

      private static Logger logger = Logger.getLogger(RSSReader.class);

      public static void main(String argv[]) {

        try {
            URL url = new URL(
                   "http://globoesporte.globo.com/servico/semantica/editorias
                   /plantao/futebol/times/santos/feed.rss");

            InputStream in = url.openStream();

            DocumentBuilderFactory dbFactory = DocumentBuilderFactory
                   .newInstance();
            DocumentBuilder dBuilder = dbFactory.newDocumentBuilder();
            Document doc = dBuilder.parse(in);

            doc.getDocumentElement().normalize();

            NodeList nList = doc.getElementsByTagName("item");

            List<String> linhas = new ArrayList<String>();

            String linha = "";

            for (int temp = 0; temp < nList.getLength(); temp++) {

            Node nNode = nList.item(temp);

            if (nNode.getNodeType() == Node.ELEMENT_NODE) {

                   Element eElement = (Element) nNode;

                   linha += eElement.getElementsByTagName("title").item(0)
                                          .getTextContent()+"|";
                   linha += eElement.getElementsByTagName("description").item(0)
                                          .getTextContent()+"|";
                   linha += eElement.getElementsByTagName("link").item(0)
                                          .getTextContent()+"|";
                   linha += eElement.getElementsByTagName("category").item(0)
                                          .getTextContent();
                   
                   linhas.add(linha);
                               }
                   }
                   
                   write(linhas);
        } catch (Exception e){
                   e.printStackTrace();
        }
      }

      private static void write(List<String> linhas) throws IOException {

                  Path pt=new Path("hdfs://localhost:54310/devmedia/input.txt");
  FileSystem fs = FileSystem.get(new Configuration());
  
  BufferedWriter br=new BufferedWriter(new OutputStreamWriter(fs.create(pt,true)));
  
                  for(String linha:linhas){                          
                             logger.info(linha);
      br.write(linha);         
                  }
                  
  br.close(); 
      }
}
Listagem 6. Leitura dos dados RSS e escrita no hdfs

Posteriormente, a classe Mapper apresentada na Listagem 7 que irá recuperar os dados e colocá-los num formato capaz de ser lido pelo Hadoop. Nessa etapa, podemos fazer algumas transformações para melhorar o significado dos nossos dados, apenas como exemplos filtraram os valores para apenas escrever os posts que tenham a string “santos” no título.

public class ESReaderMapper extends
  Mapper<LongWritable, Text, NullWritable, MapWritable> {

  private static Logger logger = Logger.getLogger(ESReaderMapper.class);

  @Override
  protected void map(LongWritable key, Text value, Context context)
   throws IOException, InterruptedException {

    logger.debug(value);

    String[] splitValue = value.toString().split("|");

    String title = splitValue[0];

    MapWritable doc = new MapWritable();

    if (title.toLowerCase().contains("santos")) {

       doc.put(new Text("title"), new Text(title));
       doc.put(new Text("description"), new Text(splitValue[1]));
       doc.put(new Text("link"), new Text(splitValue[2]));
       doc.put(new Text("category"), new Text(splitValue[3]));

        context.write(NullWritable.get(), doc);
    }
  }
}
Listagem 7. Classe Mapper da escrita para o ES

Na sequência, podemos escrever o código do job para escrita de dados para o ES. Conforme ilustrado na Listagem 8, primeiramente, devemos adicionar ao objeto de configuração do job códigos que definem onde o es-hadoop irá encontrar nosso Elasticsearch (nesse exemplo, nos limitamos a copiar os parâmetros definidos anteriormente na configuração do ES). Outra definição importante é utilizar o EsOutputFormat (importado do es-hadoop) como o formato de saída (OutputFormatClass) para nosso job. Esse código irá utilizar o ESWriterMapper da listagem anterior para transformar o que foi lido anteriormente para documentos JSON.

public class ESWriterJob {
                  
      public static void main(String[] args) throws Exception {
          Configuration conf = new Configuration();
                  conf.set("es.nodes","localhost");    
                  conf.set("es-port","9200");    
                  conf.set("es.resource","santos/rss");
          
          Job job = new Job(conf, "RSS Writer");
          
  job.setJarByClass(ESWriterJob.class);
  
  //es-hadoop configs
  job.setOutputFormatClass(EsOutputFormat.class);
  job.setMapOutputValueClass(MapWritable.class);
  job.setMapperClass(ESWriterMapper.class);        
  job.setSpeculativeExecution(false);

  FileInputFormat.addInputPath(job, new Path(args[0]));
  job.setInputFormatClass(KeyValueTextInputFormat.class);
      
  boolean result = job.waitForCompletion(true);
  System.exit(result ? 0 : 1);
        }         
}
Listagem 8. Classe Mapper da escrita para o ES

Após a execução desse job, os valores que estavam no HDFS serão enviados ao nosso índice do Elasticsearch. Para verificar isso, podemos executar uma busca match_all (que irá retornar todos os valores) no nosso índice como ilustrado na Listagem 9.

curl -XGET 'localhost:9200/santos/rss/_search' -d '
{
    "query" : {
        "match_all" : {}
    }
}
Listagem 9. Verificar se os valores chegaram ao índice

Como já temos dados cadastrados no índice, podemos fazer o exemplo contrário, ou seja, um código que leia dados do índice e envie ao HDFS. O método da Listagem 10 ilustra como podemos recuperar de RSS que estão no HDFS e enviá-los ao índice do Elasticsearch. De forma semelhante, para ler dados de ElasticSearch, vamos configurar o acesso ao ES e utilizar uma classe de format (nesse caso a EsInputFormat) oferecida pelo es-hadoop.Além disso, e mais importante, devemos definir uma consulta para extrair dados do ES.

public class ESReaderJob {
    
  public static void main(String[] args) throws Exception {
  Configuration conf = new Configuration();
      conf.set("es.nodes","localhost");    
      conf.set("es-port","9200");    
      conf.set("es.resource","santos/rss");
      conf.set("es.query","{ \"match_all\" : { } }");

      Job job = new Job(conf, "RSS Reader");

      job.setJarByClass(ESReaderJob.class);

      job.setInputFormatClass(EsInputFormat.class);
      job.setMapOutputKeyClass(Text.class);
      job.setMapOutputValueClass(MapWritable.class);

  FileOutputFormat.setOutputPath(job, new Path("/devmedia/output.txt"));

      boolean result = job.waitForCompletion(true);
      System.exit(result ? 0 : 1);
}           
}
Listagem 10. Recuperar dados do Elasticsearch

Se tudo ocorreu bem, podemos encontrar o arquivo de saída no diretório que criamos anteriormente, através do seguinte comando (com o usuário hduser): hadoop fs -ls /devmedia.

Em resumo, vamos utilizamos nesse artigo o es-hadoop para comunicação entre o Elaticsearch e o Hadoop. Esse framework fornece um InputFormat e um OutputFormat dedicado a ler e gravar dados do ElasticSearch e uma sequência de parâmetros de configuração que permitirão a um Job do Hadoop encontrar o ES. O es-hadoop não se resume as essas atividades e podemos, por exemplo, indexar diretamente documentos JSON sem aplicar qualquer transformação, utilizar índices de leitura e escrita distintos, e utilizar outros componentes do Hadoop como também com o Hive, Pig e o Spark.

O es-hadoop (Easticsearch for Apache Hadoop) permite que os trabalhos do Hadoop para interagir com ElasticSearch com uma pequena biblioteca e uma configuração fácil, a importância desse framework é combinar ferramentas que desde o nível mais abstrato até a implementação de detalhes de código tem muito para ofertar trabalhando em conjunto. Por exemplo, a escalabilidade, uma das principais vantagens tanto do Elasticsearch quanto do Hadoop, pode ilustrar a semelhança já que um componente crítico é a capacidade de dividir uma tarefa em problemas menores que executam ao mesmo tempo. Esse conceito está presente em ambos: no Hadoop através das suas divisões (o número de partes em que uma fonte ou entrada pode ser dividido) e no ElasticSearch através do uso de shards.