O Hadoop MapReduce é uma ferramenta muito interessante para o processamento de dados massivos. O que sabemos é que nem sempre esses dados terão a mesma estrutura, então é preciso que o sistema seja capaz de mesclá-los de alguma forma. O Data Join no MapReduce funciona mais ou menos como em bancos de dados: dois arquivos com estruturas diferentes (tabelas) podem ser unidos através de um elemento em comum. Isso abre um mar de possibilidades, e permite que conjuntos de dados heterogêneos possam ser processados pelo mesmo programa MapReduce.
Utilizando o programa MapReduce base: boletins de ocorrência em rodovias federais
No último artigo sobre o Hadoop Mapreduce, foi criado um programa MapReduce base. Conforme foi comentado, não existe a necessidade de criar um novo programa cada vez que utilizamos o MapReduce. Simplesmente alteramos o nosso programa base de acordo com a necessidade de momento.
Para esse artigo serão utilizados dados do Sistema BR-Brasil, que contém boletins de ocorrência em rodovias federais. Os dados utilizados são de ocorrências entre 2007 e 2013. Esses dados são de domínio público e estão disponíveis no site oficial. O conjunto de dados completo contém cerca de 933 MB, o que é pequeno o suficiente para ser processado em modo standalone.
Antes de começarmos o desenvolvimento, é preciso conhecimento sobre os dados. O conjunto de dados de boletins de ocorrência da Polícia Rodoviária Federal contém uma série de pastas, separadas por semestre, cada uma contendo arquivos .csv (Comma-Separated Values, ou valores separados por vírgula), onde a primeira linha descreve as colunas e as demais contém um conjunto de dados. São quatro arquivos por pasta, um contendo o registro de ocorrências confirmadas, um cadastro de ocorrências envolvendo veículos, um cadastro de pessoas envolvidas nos acidentes e um cadastro das pessoas envolvidas na ocorrência. Além disso, há duas pastas, domínios e veículos, que contém as informações referentes aos códigos apresentados nos demais arquivos. A Figura 1 mostra as primeiras linhas de um arquivo de ocorrências confirmadas. Vale ressaltar o prefixo “oco” antes do nome das colunas, que é uma referência ao arquivo das ocorrências. Os demais possuem outros prefixos.

Figura 1. Ocorrências confirmadas no 1º semestre de 2007
Tendo em vista os dados em questão, o objetivo desse exemplo é obter o número de ocorrências confirmadas em um determinado município. Para isso, o programa irá ler todos os arquivos de ocorrências confirmadas (de 2007 a 2013) e irá gerar como saída os municípios e o número de ocorrências em cada um deles. Como os nomes dos municípios estão em um arquivo diferente, será realizada uma junção dos conjuntos de dados.
Contagem de ocorrências por município
Conforme foi colocado, o objetivo aqui é trazer a contagem de todas as ocorrências que ocorreram em cada município nos dados coletados. Para isso, existe a necessidade de contar o número de vezes que cada código do município aparece, simplesmente. Como se trata de um código bastante simples, a partir do programa base não há muito a ser feito. A questão mais complexa é justamente a que diz respeito ao join dos arquivos, trazendo os resultados de uma forma mais amigável. Afinal, é impossível para qualquer pessoa saber a qual município determinado código se refere. Pensando nisso, é interessante colocar o nome do município no resultado final.
Porém, inicialmente será feita a contagem simples das ocorrências por município, sem preocupação com arquivos com diferentes estruturas. Para isso, é preciso ter atenção com a forma como os dados estão dispostos no mesmo. Como foi visto, o código do município compõe a quarta coluna de dados no arquivo de ocorrências. Logo, são os valores dessa coluna que precisam ser lidos e interpretados pelo programa MapReduce.
Como se trata de um programa simples, a partir do template não há muito a ser feito. É necessário ter atenção com os tipos de dados, entretanto. Como já é sabido, o MapReduce trabalha com pares key/value. Primeiramente, é interessante focar no par de entrada da função Mapper. Como a chave desse par não será utilizada para nenhum tipo de processamento, é interessante que ela seja do tipo Object, para ficar o mais genérica possível. Já o valor terá o tipo Text, pois conterá as linhas dos arquivos que serão processados. A partir daí, a linha é processada para que seja obtido o código dos municípios, que será a chave do par de saída da função map (e, como sabemos, entrada da função reduce). A partir do fato de que será realizada uma contagem, o valor do par de saída é um IntWritable de valor 1 (um), o que irá facilitar a contagem durante a execução da função reduce(). A Listagem 1 mostra a classe MapClass do contador de ocorrências. É importante observar que foi utilizada a classe OutputCollector para coletar a saída do mesmo. É uma simples questão de escolha, com relação à classe Context. Porém, quando a classe OutputCollector é utilizada, é necessário que haja um objeto Reporter, pois, ao contrário do objeto Context, o coletor de saída faz apenas isso, a coleta do par chave/valor de saída.
Listagem 1. Classe MapClass
public static class MapClass extends MapReduceBase implements Mapper<Object, Text, Text, IntWritable> {
private final static IntWritable one = new IntWritable(1);
private Text cod = new Text();
public void map(Object key, Text value, OutputCollector<Text,
IntWritable> output, Reporter report) throws IOException {
String line = value.toString();
int i = 0;
StringTokenizer aux = new StringTokenizer(line, ";");
while ((aux.hasMoreTokens()) && (i < 4)){
cod.set(aux.nextToken());
i++;
}
output.collect(cod, one);
}
}Já a classe ReducerClass possui dois pares chave/valor de tipo Text/IntWritable. Como foi visto, o par de entrada de dados consiste no código do município e no valor 1. O par de saída da função deve consistir no código do município juntamente com a soma de todas as ocorrências que foram registradas no mesmo. Isso é possível porque a função reduce recebe todos os valores associados à mesma chave em uma mesma função. Entre a função map e a função reduce, é realizado o que é chamado de shuffle, etapa em que essa junção é realizada. Para tanto, percorre-se a lista de valores que foi recebida e vai-se somando-os, até que todos os valores para determinada chave tenham sido contados. A Listagem 2 mostra como isso é feito. Com relação ao OutputCollector, o mesmo vale aqui.
Listagem 2. Classe ReducerClass
public static class ReducerClass extends MapReduceBase implements Reducer<Text, IntWritable, Text, IntWritable> {
public void reduce(Text key, Iterator<IntWritable> values,
OutputCollector<Text, IntWritable> output, Reporter report)
throws IOException {
int sum = 0;
while (values.hasNext()){
sum += values.next().get();
}
output.collect(key, new IntWritable(sum));
}
}Por fim, é importante relembrar que o código para a função run() já está praticamente pronto. Ele foi criado juntamente com o template e não há muito a ser feito para modifica-lo. Um ponto importante é o acréscimo de um nome para o job, neste caso “Ocorrências”. O mesmo é válido para a função main(), que será a mesma para qualquer programa MapReduce onde é implementada a função Tool.run().
Refinando a saída de dados
Como foi definido, desejamos que a saída de dados contenha os nomes dos municípios, ao invés de seus códigos. Os nomes, de acordo com os códigos, estão guardados em outro arquivo, chamado “municípios.csv”. O MapReduce pode aceitar vários arquivos, todos eles como entrada na função map. Porém, para que o funcionamento do programa ocorra da forma como o desenvolvedor deseja, é necessário que todos esses arquivos tenham o mesmo formato básico, ou seja, pertençam a um mesmo dataset, o que não é o caso aqui. A solução nesse caso é realizar a junção de arquivos, ou join. O conceito é bem semelhante ao observado em bancos de dados relacionais, porém a prática é mais complicada. Existem vários caminhos que podem ser tomados, como o reduce-side joining. Não se trata da técnica mais eficiente, porém é a mais genérica, além de formar a base de outras técnicas mais avançadas. Além disso, para o caso desse exemplo, ele garante o resultado desejado. Nele, o papel do Mapper é empacotar o registro juntamente com uma etiqueta para que vá para o mesmo Reducer de outros com a mesma chave. O Reducer então irá realizar a junção dos dados. O objetivo é obter uma saída que seja um arquivo com o nome da cidade, seu estado e seu código, apenas.
O reduce-side joining introduz novos conceitos e terminologias: data source, tag e group key. Um data source é análogo a uma tabela em um banco de dados relacional, e nesse caso, existem dois: municípios e ocorrências. A tag é utilizada para realizar a ligação entre cada linha de dados com sua fonte (data source). Isso é muito importante para a junção dos dados, uma vez que o paradigma do MapReduce é processar uma informação por vez sem manter informações a respeito de sua fonte. E, quando etiquetamos o registro, a etiqueta (tag) irá sempre com o mesmo, fazendo a ligação entre o registro e sua fonte. Por fim, mas não menos importante, há o conceito de group key. Essa chave funciona como o atributo que liga as tabelas em um join relacional (geralmente uma chave primária/estrangeira), e neste exemplo será o código do município. Porém, vale ressaltar que no caso do reduce-side joining, a chave de grupo pode ser qualquer função definida pelo usuário, permitindo uma maior liberdade com relação ao que é visto em bancos de dados relacionais. O reduce-side joining é um pacote de contribuição do Hadoop, chamado datajoin, que funciona como um framework genérico para realizar a ligação de dados no Hadoop.
Para realizar a implementação desse join, utiliza-se o pacote datajoin do Hadoop. Esse pacote possui três classes abstratas: DataJoinMapperBase, DataJoinReducerBase e TaggedMapOutput. A classe Mapper irá herdar da primeira, enquanto a classe Reducer, da segunda, como o próprio nome sugere. Já a terceira classe, TaggedMapOutput, é um tipo de dados para que os registros recebam a sua etiqueta do tipo Text. Trivialmente, ele implementa os métodos getTag() e setTag(Text tag), além de também especificar um método abstrato getData(). Como ela será saída de um Mapper, é necessário que seja do tipo Writable. Logo, uma subclasse, TaggedWritable será criada simplesmente para lidar com qualquer subtipo da classe Writable. Para isso, é necessário que a classe implemente os métodos readFields() e write(), conforme a Listagem 3.
Listagem 3. Classe TaggedWritable
public static class TaggedWritable extends TaggedMapOutput {
private Writable data;
public TaggedWritable(Writable data) {
this.tag = new Text("");
this.data = data;
}
public Writable getData() {
return data;
}
public void write(DataOutput out) throws IOException {
this.tag.write(out);
this.data.write(out);
}
public void readFields(DataInput in) throws IOException {
this.tag.readFields(in);
this.data.readFields(in);
}
}Já o fluxo de dados para função Mapper é simplesmente empacotar os registros de forma que eles vão para o mesmo Reducer que outros registros com a mesma chave de grupo. A classe básica, DataJoinMapperBase, realiza esse empacotamento por padrão. Porém, há três métodos abstratos que precisam ser implementados: generateInputTag(), generateTaggedMapOutput() e generateGroupKey(). O primeiro é chamado no começo de uma tarefa de mapeamento para que a etiqueta daquela tarefa que processará os registros daquele datasource seja definida. Essa tag é definida como sendo do tipo Text. Já o segundo seta essa tag para os registros. Por fim, como o nome sugere, o último é responsável pela geração da chave de grupo que será utilizada para o joining. Isso pode ser visto em mais detalhes na Listagem 4. É interessante levar-se em conta que o método map já é implementado pela classe base, conforme já foi discutido, e por isso ele não aparece no código.
Conforme é possível ver-se no método generateGroupKey(TaggedMapOutput aRecord), há uma condição para o local de onde a chave de grupo é tirada. Isso ocorre pois os arquivos que serão juntados possuem uma diferença na posição do código do município. Enquanto no arquivo municípios.csv eles estão na primeira coluna, no arquivo de ocorrências estão na quarta coluna. Essa lógica criada dessa forma devido ao fato que o primeiro tem apenas três colunas, o que faz com que o tamanho do vetor criado seja igual a 3.
Listagem 4. Classe MapClass Data Join
public static class MapClass extends DataJoinMapperBase {
protected Text generateInputTag(String inputFile) {
String datasource = inputFile.substring(0,1) + "~";
return new Text(datasource);
}
protected Text generateGroupKey(TaggedMapOutput aRecord) {
String line = ((Text) aRecord.getData()).toString();
String[] tokens = line.split(";");
String groupKey;
if (tokens.length > 3) {
groupKey = tokens[3];
} else {
groupKey = tokens[0];
}
return new Text(groupKey);
}
protected TaggedMapOutput generateTaggedMapOutput(Object value) {
TaggedWritable retv = new TaggedWritable((Text) value);
retv.setTag(this.inputTag);
return retv;
}
}Bem como a função base do mapeamento, DataJoinReducerBase também simplifica a programação realizando um join completo. A subclasse de redução apenas tem que implementar o método combine() para realizar a filtração de combinações indesejadas. A função combine(Object[] tags, Object[] values) recebe uma combinação de registros cruzados, etiquetados, com a mesma chave. Neste caso, ele receberá uma linha do arquivo de municípios e outra do arquivo de ocorrência, e irá realizar o join entre as duas. Por fim, o método espera um retorno do tipo TaggedMapOutput. Não se sabe exatamente porque, uma vez que DataJoinReducerBase o ignora completamente. Portanto, aqui o valor de retorno não tem a menor importância, conforme observa-se na Listagem 5.
Como é possível observar-se, o método combine() recebe como argumentos dois vetores de objetos, um deles contendo as tags e o outro contendo os valores. O tamanho desses dois vetores é, de forma garantida, o mesmo, uma vez que todos os registros estarão etiquetados. A Listagem 5 também mostra como essas tags podem ser utilizadas. Como vimos, a etiqueta é criada como as duas primeiras letras do nome do arquivo, ou seja, “mu” e “oc”. Essa noção é utilizada aqui, para que seja possível sabermos o tipo de dados com que se está lidando. Caso seja do arquivo de municípios, a String resultante receberá o nome e o estado da cidade; caso contrário, apenas o seu código. Além disso, novamente o método principal da classe foi omitido, o reduce(), uma vez que ele já é implementado por DataJoinReducerBase.
Listagem 5. Classe Reduce Data Join
public static class Reduce extends DataJoinReducerBase {
protected TaggedMapOutput combine(Object[] tags, Object[] values) {
if (tags.length < 2) return null;
String joinedStr = "";
for (int i=0; i<values.length; i++) {
if (i > 0) joinedStr += ";";
TaggedWritable tw = (TaggedWritable) values[i];
String line = ((Text) tw.getData()).toString();
String[] tokens;
if (tags[i].substring(0,1) == "mu"){
tokens = line.split(";", 2);
joinedStr += tokens[1];
} else{
tokens = line.split(";");
joinedStr += tokens[3];
}
}
TaggedWritable retv = new TaggedWritable
(new Text(joinedStr));
retv.setTag((Text) tags[0]);
return retv;
}
}Por fim, os métodos run() e main() seguem o padrão mostrado no template. O que pode ser ressaltado é a utilização de um separador no texto de saída. O separador utilizado foi o ponto-e-vírgula (“;”). Esse separador pode ser definido através da seguinte chamada no objeto JobConf: job.set(“mapred.textoutputformat.separator”, “;”).
Utilizando esse conceito simples foi possível alterarmos a saída de dados da aplicação MapReduce. É interessante traçar um paralelo com o que se vê em bases de dados relacionais, onde para que um join possa ser realizado, é necessário que haja uma relação entre as tabelas. No caso do Hadoop, isso também é verdade, e como pudemos observar no exemplo acima, a relação é bastante explícita. Em uma base de dados relacionais, algo muito parecido com o que foi realizado aqui seria como o representado na Listagem 6, para uma tabela com municípios e outra com ocorrências.
Listagem 6. Join SQL Municípios x Ocorrências
Select a.nome, a.estado, b.id from municipios a, ocorrencias b
where a.id = b.codmunicipioÉ importante notar que o Hadoop MapReduce é uma solução para qualquer tipo de dados massivos, independente de como os dados estão organizados, ou se são homogêneos ou heterogêneos. Utilizando o MapReduce, as possibilidades são praticamente infinitas, e o desenvolvedor é totalmente capaz de fazer o que quiser com os dados que tem em mãos. Isso faz com que essa tecnologia esteja atraindo muitas empresas, pequenas e grandes, para as possibilidades que o Big Data traz.