Veremos neste artigo como funciona o desenvolvimento de um Crawler que realiza a classificação de conteúdos encontrados em páginas Web. A ideia, apresentada na Figura 1, é navegar por conteúdos Web e enviar esses para classificação no Apache Spark e indexar o documento no Elasticsearch. A arquitetura utiliza ferramentas open-source, em especial:

  • Elasticsearch, para indexação dos conteúdos;
  • Apache Spark, para as atividades de aprendizado de máquina;
  • Crawler4J, para recuperar conteúdos encontrados na Web;
  • Apache Lucene é empregado pontualmente para retirar palavras não importantes;
  • JSoup ajuda durante o parsing dos códigos HTML das páginas analisadas.
Arquitetura do Smart Crawler
Figura 1. Arquitetura do Smart Crawler

Nos últimos anos, o Elasticsearch deixou de ser uma solução desconhecida para conquistar grandes players do mercado de Big Data. O seu objetivo é apoiar o desenvolvimento de aplicações centradas em textos como, por exemplo, redes sociais, sistemas de e-commerce, sites de notícias e canais de educação. O Elasticsearch, por meio de uma interface baseada em comunicação RESTful, ou seja, que utiliza REST/HTTP, permitindo que clientes de distintas linguagens de programação indexem e busquem documentos baseados em JSON. Além disso, outra grande vantagem do Elasticsearch reside na sua arquitetura, projetada para ser escalável e para gerenciar grandes quantidades de dados de forma simples e eficiente.

O Apache Spark é uma ferramenta nova e extremamente interessante, pois, assim como outros novos frameworks para Big Data desenvolvidos pala Fundação Apache (Kafka, Flume e Storm), ele também é uma ferramenta voltada para Fast Data (análise em tempo real e uma tomada de decisões praticamente instantâneas no sistema), compartilhando o foco em baixa latência nas respostas, oferecendo facilidade de uso e o uso de streaming. Como diferencial em relação aos outros frameworks para Fast Data, o Spark possui interessantes ferramentas para processamento de grafos, de análise estatística e aprendizado de máquina pré-implementadas, conforme ilustrado na Figura 2. Nesse artigo o foco será sobre a MLLib, que oferece os algoritmos de aprendizado de máquina tradicionais e necessários para construção de aplicações inteligentes.

Componentes do Apache Spark
Figura 2. Componentes do Apache Spark

O Crawler4j é uma biblioteca Java que simplifica o processo de criação da Web Crawler. Um Web Crawler é um Bot Internet que sistematicamente navega por páginas e outros conteúdos para a análise, indexação e coleta de conjuntos de informação específicos. A abordagem típica é informar primeiramente um conjunto de URLs sementes, de onde partirá a navegação de páginas Web, visitando repetidamente todos seus links. Essa visita pode ser configurada com muitos parâmetros, entre os quais, listas de domínios (que devem ser evitados), número de links que vão ser visitados a partir de cada semente e quantidade máxima de páginas visitadas.

No exemplo desenvolvido nesse artigo, vamos classificar páginas do portal Infomoney (http://www.infomoney.com.br/) de acordo com seu conteúdo. Para desenvolver esse projeto foi utilizado o Eclipse como IDE e o Maven para gerenciar os pacotes. Como todo projeto Maven, devemos atualizar o arquivo pom.xml com as dependências do projeto. Conforme o código da Listagem 1, as dependências e suas versões são: Jsoup 1.7.2, Crawler4J 4.1, Elasticsearch 1.5 e Spark 1.4.0.


<dependencies>
   <dependency>
         <groupId>org.jsoup</groupId>
         <artifactId>jsoup</artifactId>
         <version>1.7.2</version>
   </dependency>
   <dependency>
         <groupId>edu.uci.ics</groupId>
         <artifactId>crawler4j</artifactId>
         <version>4.1</version>
   </dependency>
   <dependency>
         <groupId>org.elasticsearch</groupId>
         <artifactId>elasticsearch</artifactId>
         <version>1.5</version>
   </dependency>
   <dependency>
         <groupId>org.apache.spark</groupId>
         <artifactId>spark-mllib_2.10</artifactId>
         <version>1.4.0</version>
   </dependency>
</dependencies>
Listagem 1. Dependências do Maven

A primeira classe a ser criada é a IndexingCrawler, que definirá o que é feito quando uma página é visitada. Na Listagem 2 pode-se ver que essa classe IndexingCrawler extende da classe WebCrawler do Crawler4J, cujo método principal visit é executado sempre que uma nova página é encontrada. A IndexingCrawler verifica se uma certa página está descrita em HTML (na linha if (page.getParseData() instanceof HtmlParseData)) e extrai a informação textual da página. Essa classe faz referência as seguintes classes descritas na sequência do artigo: NaiveClassifier, que usa o Spark para classificar os conteúdos; e CrawlerIndexer, que utiliza o Elasticsearch para indexar os conteúdos encontrados.


public class IndexingCrawler extends WebCrawler {
private static CrawlerIndexer indexer = new CrawlerIndexer();
private static NaiveClassifier classifier = new NaiveClassifier();
 
   public void visit(Page page) {
         
       logger.debug("Visiting page {}", page.getWebURL());

       if (page.getParseData() instanceof HtmlParseData) {
              
              String url = page.getWebURL().getURL();
              String domain = page.getWebURL().getDomain();

              HtmlParseData htmlParseData = (HtmlParseData) page.getParseData();
              String text = htmlParseData.getText();
              String html = htmlParseData.getHtml();

              String classification = classifier.getClass(text);

              try {
                     indexer.indexPage(url, text, classification, html);
              } catch (Exception e) {
                     logger.error("Page not indexed",e);
              }
       }
         
       logger.debug("Finishing page visit {}", page.getWebURL());
   }
}
Listagem 2. Classe para visita e indexação de páginas

O próximo passo é criar os códigos para classificação de conteúdos. Para tal, deve-se - ao menos - entender uma visão geral da arquitetura do Spark, ilustrada na Figura 3. Essa arquitetura é bastante simples, pois cada aplicação deverá instanciar um contexto do Spark (Spark Context) que contém as informações necessárias para acessar o cluster onde a mesma será executada. O Spark Context comunica-se com o Cluster Manager, um componente responsável por distribuir a execução de jobs em Workers disponíveis nos nós do cluster. A principal vantagem desta arquitetura é que o processamento dos dados é feito todo em uma memória compartilhada entre os nós, sendo o acesso ao disco evitado sempre que possível.

Apresenta uma visão geral sobre o Apache Spark
Figura 3. Apresenta uma visão geral sobre o Apache Spark

O código para classificação com o Spark deve ser dividido em duas partes: primeiramente, deve-se realizar treinamento com algoritmo de aprendizado de máquina e posteriormente, com o modelo criado por esse treinamento, pode-se executar o código responsável por classificar cada página.

O exemplo apresentado nesse artigo usa o algoritmo Naive Bayes (que aplica o teorema de Bayes com fortes pressupostos de independência entre as características usadas na classificação), que agrupa conteúdos de acordo com a probabilidade de ocorrências de uma certa característica em cada conjunto de treinamento. Por exemplo, como ilustrado na Figura 4, podemos treinar a rede para classificar o texto “Carlos Alberto Torres afirma que o Brasil não inspira confiança em ninguém e critica até mesmo o vestuário dos jogadores, que não usaram terno na chegada ao Brasil” como futebol e o texto “Petrobras anuncia corte de 37% nos investimentos, que atingem menor nível desde 2008: US$ 130,3 bi” como economia.

Naive Bayes
Figura 4. Naive Bayes

As duas próximas listagens (Listagens 3 e 4) apresentam o código para treinamento usando a rede Naive Bayes. Esse código utiliza o Lucene, através do objeto textPreparer, para remover palavras irrelevantes (conhecidas em inglês como Stop Words, pois não agregam valor semântico ao texto). Assim, primeiramente, o código da Listagem 3 apresenta como o Lucene pode ser usado para remover StopWords. A principal providência para que esse código seja efetivo é usar o PortugueseAnalyzer, que contém de forma pré-definida uma série de palavras que são consideradas irrelevantes em Português. Vale ressaltar que a dependência do Lucene não foi incluída explicitamente no pom.xml para evitar um conflito, pois a mesma é parte também do Elasticsearch.


public static String removeStopWords(String textFile) throws Exception {
  CharArraySet stopWords = PortugueseAnalyzer.getDefaultStopSet();
  TokenStream tokenStream = new StandardTokenizer(Version.LUCENE_48, 
  new StringReader(textFile.trim()));
  tokenStream = new StopFilter(Version.LUCENE_48, tokenStream, stopWords);
  StringBuilder sb = new StringBuilder();
  CharTermAttribute charTermAttribute = tokenStream
  .addAttribute(CharTermAttribute.class);
  tokenStream.reset();
  Set<String> tokens=new HashSet<String>();
  while (tokenStream.incrementToken()) {
      String term = charTermAttribute.toString();
      tokens.add(term.toLowerCase());
  }
  for(String term:tokens){
     if(term.length()>5 && term.length()<15) sb.append(term + " ");
  }
  return sb.toString();
}
Listagem 3. Remove Stopwords

Na Listagem 4 apresenta-se o código completo para treinamento de uma rede Naive Bayes. O texto recebido pelo método train deve ser transformado em pontos rotulados (LabeledPoint), ou seja, pontos distribuídos em um espaço, como apresentado anteriormente na Figura 4, para que posteriormente o modelo possa classificar novos elementos usando esse treinamento. A método getPoints (apresentado no final da Listagem 4) irá transformar uma lista de textos de páginas nesses pontos, usando para isso o método transform do objeto HashingTF, que calcula a frequência dos termos em cada página. Para comunicação com o Spark, cria-se um JavaSparkContext, que gerenciará o aplicativo durante a execução no cluster. O JavaSparkContext recebe um objeto do tipo SparkConf que contém o nome da aplicação, o endereço do cluster Spark (que vamos explicar mais adiante) e o caminho para a aplicação empacotada em um Jar (também explicada mais adiante). Com a lista de LabeledPoint, chamada trainList, deve-se executar a função NaiveBayes.train para criação do modelo, o segundo parâmetro dessa função é chamado de lambda e controla a regularização dos valores (para ajustes, como por exemplo, a remoção de valores discrepantes). Após o treinamento, pode-se utilizar os dados de test (presentes na lista testList) para calcular a precisão do modelo criado com a função predictionAndLabel. Ao final, salva-se o modelo em disco no caminho “modelPath” dado na função model.save(sc.sc(), "modelPath").


public void train(List<LabeledPoint> trainList, 
List<LabeledPoint> testList) {
   SparkConf conf = new SparkConf()
         .setAppName("NB Classifier")
         .setMaster("spark://192.168.56.1:7077")
         .setJars(
new String[] {"/smart-crawler/target/smart-crawler-0.0.1-SNAPSHOT.jar" })
         .set("spark.akka.frameSize", "20");
   JavaSparkContext sc = new JavaSparkContext(conf);

   JavaRDD<LabeledPoint> training = sc.parallelize(trainList, 2).cache();
   JavaRDD<LabeledPoint> test = sc.parallelize(testList, 2).cache();

   final NaiveBayesModel model = NaiveBayes.train(training.rdd(), 1.0);

   JavaPairRDD<Double, Double> predictionAndLabel = 
test.mapToPair(new PairFunction<LabeledPoint, Double, Double>() {
                  
   private static final long serialVersionUID = -4498879256866700408L;

   @Override public Tuple2<Double, Double> call(LabeledPoint p) {
return new Tuple2<Double, Double>(model.predict(p.features()), p.label());
   }
});
double accuracy = predictionAndLabel.filter(new 
Function<Tuple2<Double, Double>, Boolean>() {

private static final long serialVersionUID =
8604799362581634343L;

   @Override public Boolean call(Tuple2<Double, Double> pl) {
   return pl._1().equals(pl._2());
}}).count() / (double) test.count();

   logger.info("Model accuracy "+accuracy);

   model.save(sc.sc(), "modelPath");
   sc.close();
}

public List<LabeledPoint> getPoints(String [] pages, 
double label) throws IOException {
      HashingTF tf = new HashingTF();

      List<LabeledPoint> labeledPoints = 
      new ArrayList<LabeledPoint>();

      for(String page:pages){

           String text = removeStopWords(page);

           Vector vector = tf.transform(Arrays
           .asList(text.split(" ")));

           labeledPoints.add(new 
           LabeledPoint(label, vector));
}

return labeledPoints;
}
Listagem 4. Treinamento de uma Rede Naive Bayes

Finalmente, retomando a Listagem 2, o método indexPage não foi ainda explicado. Na Listagem 5 apresenta a indexação dos dados encontrados e classificados no Elasticsearch. A primeira função getClient- apresenta um singleton simples para acessar o cluster Elasticsearch. Aí o passo mais importante é a criação do transportClient em transportClient .addTransportAddress(“localhost” new InetSocketTransportAddress(9300)), que recebe como entrada o endereço de rede (no caso localhost) e a porta (no caso, 9300). Com esse cliente, podemos criar um objeto JSON com o helper XContentBuilder e da indexação usando o código prepareIndex, sendo que, nesse caso, o nome do índice é devmedia e o tipo é page.


public static synchronized Client getClient() throws Exception {

   if (client == null) {
      TransportClient transportClient;
      Settings settings = ImmutableSettings.settingsBuilder()
      .put("cluster.name", utils.getCluster()).build();
      transportClient = new TransportClient(settings);
      client = transportClient
.addTransportAddress(“localhost”
  new InetSocketTransportAddress(9300));
     }
           
     return client;
}

public void indexPage(String url, String text, String classification, String html)
throws ElasticsearchException, Exception {

    if (utils.getIndexBlockedDomains().contains(domain)) return;

    if(domain.contains("infomoney")){
       
       if(!category.equals("") && !pureText.equals("")){
          XContentBuilder newPage = jsonBuilder().startObject()
          .field("title", doc.title())
          .field("url", url)
          .field("text", text)
          .field("classification", classification)
          .field("domain", domain)
          .field("html", html).endObject();
     

IndexResponse response = getClient()
.prepareIndex("devmedia", "page", url) .setSource(newPage)
.execute()
.actionGet();
           }      
}                   
}
Listagem 5. Indexação com o Elasticsearch

Para executar os códigos, é necessário que o Spark e o Elasticsearch estejam instalados. O Apache Spark está em compilado na sua página principal (vide seção Links). Escolha um dos tipos dos pacotes, pois não há uma versão para executar um código que se chama standalone. Assim, você pode escolher qualquer uma das versões, que mesmo assim será possível executar dessa forma. Em seguida, desempacote o arquivo baixado e execute o código a seguir


bin\spark-class.cmd org.apache.spark.deploy.master.Master

Vá em http://localhost:8080/ e veja qual é o endereço do master (geralmente spark://:7077). Você receberá uma página parecida com a da Figura 5.

Painel de controle do Spark
Figura 5. Painel de controle do Spark

Agora execute o seguinte comando:


bin\spark-class.cmd org.apache.spark.deploy.worker.Worker spark://<IP>:7077

Se você voltar em http://localhost:8080/, vai ver que um novo worker foi adicionado. Agora sim é possível executar os comandos que serão apresentados neste artigo.

Entretanto, como dito na Listagem 4, é necessário que o projeto seja empacotado em um arquivo .jar para que possa ser usado pelo cluster durante sua execução. Para fazer isso, pode-se usar o maven-shade-plugin plugin do Maven, conforme a Listagem 6. Assim, com o plugin atualizado no arquivo pom.xml deve-se executar o comando mvn package, que criará a pasta target que deve ser referenciada pela aplicação ao instanciar o SparkContext.


<build>
    <plugins>
    <plugin>
      <groupId>org.apache.maven.plugins</groupId>
      <artifactId>maven-shade-plugin</artifactId>
      <version>2.2</version>
      <configuration>
        <filters>
          <filter>
            <artifact>*:*</artifact>
            <excludes>
              <exclude>META-INF/*.SF</exclude>
              <exclude>META-INF/*.DSA</exclude>
              <exclude>META-INF/*.RSA</exclude>
            </excludes>
         </filter>
       </filters>
      </configuration>
      <executions>
       <execution>
        <id>job-driver-jar</id>
        <phase>package</phase>
        <goals>
        <goal>shade</goal>
        </goals>
     <configuration>
     <shadedArtifactAttached>true</shadedArtifactAttached>
     <shadedClassifierName>driver</shadedClassifierName>
     <transformers>
        <transformer
          implementation="org.apache.maven.plugins.shade.resource
          .ServicesResourceTransformer" />
        <transformer
          implementation="org.apache.maven.plugins.shade.resource
          .AppendingTransformer">
        <resource>reference.conf</resource>
        </transformer>
      <transformer
        implementation="org.apache.maven.plugins.shade.resource
        .ManifestResourceTransformer">
        <mainClass>Classe principal aqui</mainClass>
      </transformer>
   </transformers>
  </configuration>
  </execution>
  <execution>
     <id>worker-library-jar</id>
     <phase>package</phase>
     <goals>
      <goal>shade</goal>
     </goals>
   <configuration>
 <shadedArtifactAttached>true</shadedArtifactAttached>
 <shadedClassifierName>worker</shadedClassifierName>
   <artifactSet>
     <includes>
      <include>com.fasterxml.jackson.core:*</include>
      <include>com.fasterxml.jackson.datatype:*</include>
      <include>com.fasterxml.jackson.module:*</include>
      <include>org.joda:joda-convert</include>
      <include>joda-time:joda-time</include>
     </includes>
   </artifactSet>
   <transformers>
   <transformer
    implementation="org.apache.maven.plugins.shade.resource
    .ServicesResourceTransformer" />
   </transformers>
  </configuration>
 </execution>
</executions>
</plugin>
</plugins>
</build>
Listagem 6. Plugin maven-shade-plugin

Além disso, devemos instalar o Elasticsearch. Em uma máquina com o Java instalado, devemos baixar a última versão do site do Elasticsearch (vide seção Links), desempacotá-la e executar o seguinte comando:


./bin/elasticsearch

Se tudo ocorreu bem, podemos chamar localhost:9200 em um navegador, conforme ilustrado na Listagem 7, e o Elasticsearch irá retornar uma resposta JSON. Nessa resposta, o parâmetro name provavelmente irá variar para cada leitor, pois é escolhido de forma aleatória (em resumo, não se preocupe se a resposta JSON não for exatamente igual a Listagem 7).


{
 "status" : 200,
 "name" : "Alistaire Stuart",
 "cluster_name" : "elasticsearch",
 "version" : {
   "number" : "1.4.5",
   "build_hash" : "2aaf797f2a571dcb779a3b61180afe8390ab61f9",
   "build_timestamp" : "2015-04-27T08:06:06Z",
   "build_snapshot" : false,
   "lucene_version" : "4.10.4"
 },
 "tagline" : "You Know, for Search"
}
Listagem 7. Resposta do Elasticsearch

Com tudo instalado, pode-se executar o código de treinamento. Após a primeira execução - considerando que tudo correu bem - um modelo será criado no caminho modelPath, conforme ilustrado na Figura 4. Esse caminho irá conter uma pasta com subpastas data e metadata, que será o modelo de classificação criada. No momento de executar uma classificação, deve-se referenciar essa pasta, conforme ilustrado na Listagem 8, e executar o comando predict que retornará a classe (o valor double) a qual essa página pertence.


public double classify(String pageToBeClassified){
  HashingTF tf = new HashingTF();
  Vector testData = tf.transform(Arrays.asList(pageToBeClassified.split(" ")));

  SparkConf conf = new SparkConf()
  .setAppName("NB Classifier")
  .setMaster("spark://192.168.56.1:7077")
  .setJars(
  new String[] {"/smart-crawler/target/smart-crawler-0.0.1-SNAPSHOT.jar" })
  .set("spark.akka.frameSize", "20");
  JavaSparkContext sc = new JavaSparkContext(conf);

  NaiveBayesModel model = NaiveBayesModel.load(sc.sc(), "pageToBeClassified");
                
return model.predict(testData);
Listagem 8. Código para previsão

Assim, esse artigo apresentou um exemplo de como combinar ferramentas open-source em uma aplicação capaz de classificar textos de páginas Web. Esse exemplo poderia ser aplicado com pequenos ajustes também para o problema de classificação de e-mail como spam, classificação de reclamação de consumidores de acordo com a gravidade do assunto, verificar fraude e entre outras possibilidades.