O Apache Spark é uma das principais ferramentas para a análise e processamento de grandes conjuntos de dados, porém, manipular os dados utilizando os métodos e operações do Spark pode ser complicado para quem não conhece o modelo de programação da ferramenta ou não é um programador experiente, por isso, o Spark oferece uma extensão que possibilita a manipulação dos dados utilizando aStandard Query Language (SQL) o que facilita bastante o uso da ferramenta.

O Apache Spark é uma ferramenta Big Data que tem o objetivo de processar grandes conjuntos de dados de forma paralela e distribuída. Essa ferramenta estende o modelo de programação MapReduce popularizado pelo Apache Hadoop facilitando bastante o desenvolvimento de aplicações de processamento de dados. Além do modelo de programação estendido, o Spark também apresenta uma performance muito superior ao Hadoop, chegando em alguns casos a apresentar um desempenho quase 100x maior.

Um dos principais componentes do Spark é o Spark SQL, que permite a realização de consultas nos dados com a Standard Query Language (SQL) o que facilita muito o uso da ferramenta para quem já está acostumado a manipular dados utilizando essa linguagem, e para programadores que não conhecem o modelo de programação do Spark.

Para demonstrar a utilização do Spark SQL, serão mostrados alguns exemplos de programação utilizando a API para transformação dos dados do Spark que estão alocados na memória em uma estrutura chamada DataFrame, que tem formato de tabelas, e por isso permite consultas utilizando SQL. Será mostrada também a arquitetura básica das aplicações Spark, e como configurar aplicações no Eclipse. Para mostrar as funcionalidades do Spark SQL serão desenvolvidas diversas aplicações que mostram diferentes formas de manipular os dados na ferramenta.

Para conhecer os conceitos básicos do Spark como o modelo de programação básico e os outros componentes do Spark leia o artigo disponível no site.

Arquitetura do Spark

A arquitetura básica do Spark SQL é a mesma do Spark, que é constituída por três partes principais: o Driver Program, que é a aplicação principal que gerencia a criação e quem executará o processamento definido pelo programados. O Cluster Manager responsável por administrar as maquinas que serão utilizadas como workers. Finalmente, os Workers são as maquinas que realmente executarão as tarefas que são enviadas pelo Driver Program. A Figura 1 mostra a arquitetura do Spark e seus principais componentes. No Spark SQL essa estrutura básica é mantida.

Trabalhando com SQL em aplicações Big Data
Figura 1. Arquitetura do Spark

Além da arquitetura é importante conhecer os principais componentes do modelo de programação do Spark. Existem três conceitos fundamentais que serão utilizados em todas as aplicações desenvolvidas, que são:

  • Resilient Distributed Datasets (RDD): abstraem um conjunto de objetos distribuídos no cluster, geralmente executados em memória principal. Ele é o objeto principal do modelo de programação do Spark, pois nesses objetos que serão executados os processamentos dos dados.
  • Operações: representam transformações (como agrupamentos, filtros e mapeamentos entre os dados) ou ações (como contagens e persistências) que são realizados em um RDD;
  • Contexto Spark (Spark Context): o contexto é o objeto que conecta o Spark ao programa que está sendo desenvolvido, ele pode ser acessado como uma variável em um programa que pode ser acessada para utilizar os recursos do Spark.

Além desses três componentes, que são a base do Spark, o Spark SQL adiciona mais uma abstração, chamada DataFrame, que é uma extensão do RDD que organiza os dados em um formato de tabela a partir de uma estrutura de dados, que pode ser um arquivo JSON ou uma classe Java, e que permite a manipulação dos dados de forma parecida com banco de dados relacionais, inclusive utilizando a Standard Query Language (SQL).

Desenvolvendo Aplicações com o Spark SQL

O primeiro passo para o desenvolvimento de uma aplicação do Spark SQL é configurar o projeto, o que é bastante simples, basta adicionar as dependências da ferramenta no arquivo pom.xml do Maven, que são a spark-core_2.10, que é a dependência do Spark Core, e a spark-sql_2.10, que é a dependência do Spark SQL. Para o desenvolvimento das aplicações desse artigo foi utilizado o Maven na IDE Eclipse, porém é possível desenvolver os mesmos exemplos em qualquer IDE e com outros gerenciados de dependências como o Gradle. A Listagem 1 mostra o arquivo pom.xml do projeto desenvolvido com as dependências configuradas.


<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0
http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<groupId>com.santana.devmedia</groupId>
<artifactId>spark-examples</artifactId>
<version>0.0.1</version>

<dependencies>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-core_2.10</artifactId>
<version>1.5.2</version>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-sql_2.10</artifactId>
<version>1.5.2</version>
</dependency>
</dependencies>
</project>
Listagem 1. Configuração do projeto Apache Spark com o Maven

No Spark os RDDs são os principais componentes no modelo de programação, porém, utilizando o Spark SQL são utilizados os DataFrames que são abstrações de mais alto nível que permitem a manipulação dos dados com o SQL. Todos os exemplos que serão mostrados nas próximas listagens de código utilizam os DataFrames.

Todos os exemplos desse artigo utilizarão como dado de entrada um arquivo com as leituras das posições dos ônibus que é disponibilizada pela prefeitura de São Paulo para o acompanhamento do transporte público da cidade. Caso tenha interesse nesses dados, na seção Links há o endereço da API OlhoVivo que é onde esses dados podem ser obtidos. A Listagem 2 mostra um exemplo desse arquivo com algumas das leituras dos dados. Os dados do arquivo são o código do ônibus, o código da linha do ônibus, o nome da linha do ônibus, o horário da leitura da posição do ônibus, e a latitude e longitude que representam a posição do ônibus na hora da leitura.


546 1745 SHOP.CENTER.NORTE 18:40 -23.511788000000003 -46.62516575
33314 1745 VL.NOVA.CACHOEIRINHA 18:40 -23.479581500000002 -46.65016075
673 174M MUSEU.DO.IPIRANGA 18:40 -23.500357 -46.615757
33431 715M JD.MARIA.LUIZA 18:40 -23.534662124999997 -46.62369675
33441 775A JD.ADALGIZA 18:40 -23.5346621253459997 -46.6546369675
33441 174M JD.BRASIL 18:40 -23.534662124999997 -46.64562369675
Listagem 2. Leitura das posições dos ônibus da cidade de São Paulo

No Spark SQL é necessário criar uma classe que representará os dados que serão manipulados, essa classe servirá para que a ferramenta crie a estrutura da tabela em memória do DataFrame, assim, as colunas terão os nomes dos atributos da classe, e com ela será possível fazer as consultas aos dados. A Listagem 3 mostra o código da classe Onibus, que foi criada para esse fim. Essa classe tem os atributos code para representar o código do ônibus, o codigoLinha que armazena o código da linha, o nomeLinha representando o nome da linha, e a lagitude e longitude que armazenam a posição geográfica do ônibus. Além dos atributos, a classe tem também o construtor e os métodos get e set.


package sql;

public class Onibus {

private int code;
private String codigoLinha;
private String nomeLinha;
private double latitude;
private double longitude;

public Onibus(int code, String codigoLinha, String nomeLinha, double latitude, double longitude) {
super();
this.code = code;
this.codigoLinha = codigoLinha;
this.nomeLinha = nomeLinha;
this.latitude = latitude;
this.longitude = longitude;
}

public int getCode() {
return code;
}
public void setCode(int code) {
this.code = code;
}
public String getCodigoLinha() {
return codigoLinha;
}
public void setCodigoLinha(String codigoLinha) {
this.codigoLinha = codigoLinha;
}
public String getNomeLinha() {
return nomeLinha;
}
public void setNomeLinha(String nomeLinha) {
this.nomeLinha = nomeLinha;
}
public double getLatitude() {
return latitude;
}
public void setLatitude(double latitude) {
this.latitude = latitude;
}
public double getLongitude() {
return longitude;
}
public void setLongitude(double longitude) {
this.longitude = longitude;
}

}
Listagem 3. Classe Ônibus que representa os dados que serão manipulados

O primeiro passo para usar o Spark SQL é carregar os dados em um RDD, existem várias fontes de dados possíveis, como banco de dados relacionais e NoSQL, arquivos e dados recebidos pela internet, depois com os dados em um RDD é possível criar um DataFrame que terá a estrutura de uma tabela. Nesse programa utilizamos um arquivo com os dados dos ônibus como fonte de dados. A Listagem 4 mostra o código para fazer essa operação.


package sql;


import java.util.Date;

import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.sql.DataFrame;
import org.apache.spark.sql.SQLContext;

public class Exemplo1 {

public static void main(String[] args) {

// Configuração do Spark
SparkConf conf = new SparkConf().setMaster("local").setAppName("BusProcessor");
JavaSparkContext ctx = new JavaSparkContext(conf);
SQLContext sctx = new SQLContext(ctx);

// Carrega os dados dos ônibus e os transforma no objeto Onibus
JavaRDD<String> linhas = ctx.textFile("c:/dev/onibus.txt");
JavaRDD<Onibus> onibus = linhas.
map(x -> x.split(" ")).
map(o -> new Onibus(Integer.parseInt(o[0]), o[1], o[2],
Double.parseDouble(o[4]), Double.parseDouble(o[5])));

// Cria o DataFrame
DataFrame onibusDF = sctx.createDataFrame(onibus, Onibus.class);

// Mostra os dados do DataFrame
onibusDF.show();
}

}
Listagem 4. Criando as tabelas ônibus com o Spark SQL

As três primeiras linhas fazem a configuração da aplicação, indicando que ela será executada apenas na máquina local com o método setMaster, e criando o contexto do Spark. Depois, com o método textFile, os dados dos ônibus são carregados em um RDD, porém esses dados ainda estão no formato de uma String só com todos os atributos juntos, então com duas operações map, uma para dividir a String usando o método split, e outro para criar os objetos ônibus utilizando o construtor da classe, os dados ficam prontos para a criação do DataFrame.

O DataFrame é a estrutura que cria uma tabela em memória com os dados, nesse primeiro exemplo apenas para mostrar a estrutura, é chamado o método show que imprime a tabela e todos os seus registros. A Listagem 5 mostra a saída da execução desse código, como é possível observar, todos os registros do arquivo foram carregados no DataFrame chamado onibusDF, e ao executar o comando show, todos esses registros foram exibidos no formato de uma tabela. No DataFrame, as colunas são criadas com os nomes dos atributos da classe.


+-----+-----------+-------------------+---------------+--------------------+
| code|codigoLinha| latitude| longitude| nomeLinha|
+-----+-----------+-------------------+---------------+--------------------+
| 546| 1745|-23.511788000000003| -46.62516575| SHOP.CENTER.NORTE|
|33314| 1745|-23.479581500000002| -46.65016075|VL.NOVA.CACHOEIRINHA|
| 673| 174M| -23.500357| -46.615757| MUSEU.DO.IPIRANGA|
|33431| 715M|-23.534662124999997| -46.62369675| JD.MARIA.LUIZA|
|33441| 775A| -23.534662125346| -46.6546369675| JD.ADALGIZA|
|33441| 174M|-23.534662124999997|-46.64562369675| JD.BRASIL|
+-----+-----------+-------------------+---------------+--------------------+[
Listagem 5. Saída do comando Show

Agora que já temos a tabela criado, é possível fazer várias consultas sobre os dados, o Spark possui um conjunto de métodos no DataFrame onde é possível criar as consultas via código ou também é possível criar as consultas utilizando SQL. Inicialmente, na Listagem 6 é mostrado como fazer as consultas com código. Alguns exemplos de métodos são o select, onde é possível selecionar apenas uma ou um conjunto de colunas do DataFrame, o group by, que agrupa um dado por uma determinada coluna e o filter, que filtra o registro por alguma condição passada como parâmetro.


package sql;

import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.sql.DataFrame;
import org.apache.spark.sql.SQLContext;

public class Exemplo2 {

public static void main(String[] args) {

// Configuração do Spark
SparkConf conf = new SparkConf().setMaster("local").setAppName("BusProcessor");
JavaSparkContext ctx = new JavaSparkContext(conf);
SQLContext sctx = new SQLContext(ctx);

// Carrega os dados dos ônibus e os transforma no objeto Onibus

JavaRDD<String> linhas = ctx.textFile("c:/dev/onibus.txt");
JavaRDD<Onibus> onibus = linhas.
map(x -> x.split(" ")).
map(o -> new Onibus(Integer.parseInt(o[0]), o[1], o[2], Double.parseDouble(o[4]), Double.parseDouble(o[5])));


DataFrame onibusDF = sctx.createDataFrame(onibus, Onibus.class);

// Realiza as operações nos dados utilizando os métodos do Spark
onibusDF.show();

onibusDF.select("codigoLinha").show();

onibusDF.groupBy("codigoLinha").count().show();

onibusDF.orderBy("codigoLinha").show();

onibusDF.filter(onibusDF.col("codigoLinha").equalTo("715M-10")).show();



}

}
Listagem 6. Fazendo consultas simples com os dados

O código da listagem anterior tem diversas saídas, apenas como exemplo, a Listagem 7 mostra a saída do comando groupBy, que agrupa os ônibus pelo código da linha desses ônibus na coluna codigoLinha, e conta a quantidade de ônibus para cada linha na coluna count.


+-----------+-----+
|codigoLinha|count|
+-----------+-----+
| 715M| 1|
| 775A| 1|
| 1745| 2|
| 174M| 2|
+-----------+-----+
Listagem 7. Saída para o comando groupBy

Além de usar os métodos do Spark para fazer as consultas nos dados, também é possível utilizar a linguagem SQL, o que facilita muito a manipulação dos dados para quem já está acostumado com essa linguagem. A Listagem 8 mostra o código utilizando SQL. Inicialmente, utilizando o método registerTempTable, é criado um apelido (alias) para o DataFrame, que será o nome utilizado nas consultas, depois usando o método sql do DataFrame é possível fazer consultas utilizando grande parte dos comandos SQL. Nesse primeiro exemplo são feitas duas consultas simples, apenas utilizando os comandos básicos do SQL como o SELECT, FROM e WHERE.


package sql;

import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.sql.DataFrame;
import org.apache.spark.sql.SQLContext;

public class Exemplo3 {

public static void main(String[] args) {

// Configuração do Spark
SparkConf conf = new SparkConf().setMaster("local").setAppName("BusProcessor");
JavaSparkContext ctx = new JavaSparkContext(conf);
SQLContext sctx = new SQLContext(ctx);


// Carrega os dados dos ônibus e os transforma no objeto Onibus
JavaRDD<String> linhas = ctx.textFile("c:/dev/onibus.txt");
JavaRDD<Onibus> onibus = linhas.
map(x -> x.split(" ")).
map(o -> new Onibus(Integer.parseInt(o[0]), o[1],
o[2], Double.parseDouble(o[4]), Double.parseDouble(o[5])));
DataFrame onibusDF = sctx.createDataFrame(onibus, Onibus.class);

onibusDF.registerTempTable("onibus");

// Realiza as operações nos dados utilizando SQL
sctx.sql("SELECT code, codigoLinha, nomeLinha FROM onibus").show();

sctx.sql("SELECT * FROM onibus WHERE codigoLinha like "715M"").show();


}

}
Listagem 8. Fazendo consultas utilizando SQL

A Listagem 9 mostra a saída da primeira consulta, que selecionou as colunas code, codigoLinha e nomeLinha da coluna ônibus, criada a partir do DataFrame. Como mostrado no exemplo anterior, é possível utilizar diversas operações da linguagem SQL, o que permite buscas sofisticadas nos dados do Spark.


+-----+-----------+--------------------+
| code|codigoLinha| nomeLinha|
+-----+-----------+--------------------+
| 546| 1745| SHOP.CENTER.NORTE|
|33314| 1745|VL.NOVA.CACHOEIRINHA|
| 673| 174M| MUSEU.DO.IPIRANGA|
|33431| 715M| JD.MARIA.LUIZA|
|33441| 775A| JD.ADALGIZA|
|33441| 174M| JD.BRASIL|
+-----+-----------+--------------------+
Listagem 9. Saída da consulta a todos os ônibus de código 715M-10

Além dos comandos básicos de SELECT, também é possível utilizar alguns comandos mais avançados do SQL como o ORDER BY e o GROUP BY também. A Listagem 10 mostra o código utilizando essas outras possibilidades de consultas.


sctx.sql("SELECT * FROM onibus WHERE codigoLinha like "715M" ORDER BY code").show();

sctx.sql("SELECT code, count(code) FROM onibus GROUP BY code").show();
Listagem 10. Usando outros comandos SQL

Apenas como exemplo, a Listagem 11 mostra a saída do comando com o GROUP BY, como é possível observar, na coluna code é mostrado o código do ônibus, e na coluna _c1 (um nome criado pelo Spark, já que não definimos nome para coluna), é mostrado o número de registros para cada código.


+-----+---+
| code|_c1|
+-----+---+
|33431| 1|
|33441| 2|
| 673| 1|
|33314| 1|
| 546| 1|
+-----+---+
Listagem 11. Saída do comando Group By

Além dos comandos demonstrados aqui, o Spark SQL permite ainda mais opções da linguagem SQL como joins, sorts e comparações utilizando expressões lógicas.

Esse artigo mostrou os conceitos básicos do Apache Spark, que é uma das principais ferramentas Big Data para o processamento de grandes conjuntos de dados e também o uso do componente Spark SQL que permite a manipulação dos dados utilizando a linguagem SQL, o que facilita muito o uso da ferramenta para quem não conhece os conceitos do Spark.