Concrete Logo
Hamburger button

MapReduce – parte I

  • Blog
  • 27 de Novembro de 2012
Share

Sempre foi um desafio paralelizar aplicações. O modelo clássico de processamento baseado em troca de mensagens para desenvolver aplicações distribuídas resolve bem muitos problemas mas adiciona uma verbosidade e complexidade que pode ser excessiva em alguns casos.

O MapReduce é um modelo de processamento inspirado nas funções de ordem alta da programação funcional, com uma implementação associada para processar e gerar grandes volumes de dados.

Se fundamenta na observação de que muitas tarefas de processamento tem a mesma estrutura básica: um processo de computação é aplicado em um grande número de registros (exemplo: páginas web) para gerar resultados parciais que depois serão agregados de alguma forma.

    O uso consiste em fornecer uma função map que processe pares chave-valor para gerar um conjunto de pares intermediários chave-valor e uma função reduce que processe e junte todos valores intermediários associados com a mesma chave em uma espécie de sumarização dos resultados.

Neste artigo não falaremos neles mas os frameworks baseados em MapReduce, ou seja, Hadoop, Twister, Phoenix (shared-memory implementation), Dryad/Linq e o do Google que é proprietário, proveem um modelo simples de programação, um sistema de arquivos distribuído com gerenciamento de tarefas e gerenciamento do cluster. São excelentes nos casos com muitos dados e intenso processamento batch.

 

Alguns modelos de processamento:

    – Pipelines (Unix pipes): facilita o reuso de primitivas de processamento simplesmente encadeando módulos existentes para criar um novo.

    – Message queues: facilita a sincronização de primitivas de processamento que são consumidoras e/ou produtoras de mensagens. O sistema gerencia o tempo de execução.

De modo similar MapReduce também é um modelo de processamento. Sua grande vantagem é a facilidade de escalar através de múltiplos nós de um cluster.

As primitivas são chamadas de map e reduce.

Decompor um processamento em mappers e reducers não é sempre trivial, mas uma vez feito se torna fácil escalar.

 

História

É impossível falar de MapReduce sem citar o famoso e seminal paper que apresentou ao mundo o conceito de MapReduce:

Foi escrito em 2004 por dois cientistas do Google com currículos realmente impressionantes:

Há um texto sobre eles cujo título é If Xerox PARC Invented the PC, Google Invented the Internet.

No paper eles dizem explicitamente no 2o parágrafo da introdução:

    “As a reaction to this complexity, we designed a new abstraction that allows us to express the simple computations we were trying to perform but hides the messy details of parallelization, fault-tolerance, data distribution and load balancing in a library. Our abstraction is inspired by the map and reduce primitives present in Lisp and many other functional languages.”

O negrito é meu porque o artigo apresentou o conceito tal como o conhecemos hoje mas ele já existia no mundo das linguagens funcionais. Isto é dito nos comentários de Write your first MapReduce program in 20 minutes.

Nos slides em HTML no artigo do site do Google que fala do paper, os autores citam várias referências. Ver o slide 31 em MapReduce: Simplified Data Processing on Large Clusters.

Como quem realmente inventou o conceito não foi quem o popularizou, quase todo mundo cita o paper de 2004 como a base de MapReduce.

A motivação do Google para usar este modelo foi facilitar o processamento de grandes volumes de dados paralelizando o processamento através de centenas ou mesmo milhares de CPUs. Assim obteve um processamento tolerante à falhas, com ferramentas de monitoração e com uma abstração limpa e fácil para os programadores.

 

MapReduce na prática

Poderíamos começar a explicar MapReduce a partir das linguagens funcionais e de exemplos de uso das funções map e fold. Mas para não ter que explicar linguagens funcionais e como funcionam map e fold, vamos direto a um problema clássico que sempre aparece nos textos de MapReduce.

 

Problema: contar número de vezes cada palavra aparece em um conjunto de documentos

Solução em pseudo código:

    [sourcecode language=”cpp”]
    for each document in documentSet {
    T = tokenize (document)
    for each token in T {
    wordCount[token]++
    }
    }
    display(wordCount)[/sourcecode]

O programa varre todos os documentos. Para cada um, extrai as palavras uma por uma por um processo de tokenização. Para cada palavra, um conjunto wordCount é incrementado. No fim display imprime os resultados.

Bem, tudo roda em uma máquina só e na memória.

Se for preciso escalar, a primeira fase poderia ser distribuída em várias máquinas trocando a instrução display(wordCount) por uma chamada do tipo sendTosecondPhase(wordCount)

E o pseudo código da segunda fase poderia ser:

    [sourcecode language=”cpp”]
    for each wordCount received from firstPhase {
    add (totalWordCount, wordCount)
    }[/sourcecode]

Só que isto ainda não resolve.

Se todos os documentos estiverem na mesma máquina, esta pode não dar conta das muitas solicitações simultâneas de I/O e a rede também pode não ter largura de banda para atender a todos. Parece necessário dividir os documentos em várias máquinas.

Pode acontecer também que a segunda fase não possa ser feita assim armazenando todas as palavras na memória e com uma única máquina recebendo wordCount de muitas máquinas.

Bom, com um pouco de engenhosidade seria possível ir particionando os dados, ajeitando, filtrando e transformando aqui e acolá até que fosse possível paralelizar as coisas e escalar a solução. Nem vou fazer esta tentativa. Acho evidente que o programa iria ficar complicado demais e isto sugere a busca por outro algoritmo.

 

MapReduce com um pouco mais de detalhes

Como dissemos no parágrafo anterior, se seguíssemos adiante, acabaríamos tornando muita complexa a solução do nosso problema. O que precisamos é de um padrão que possa ser aplicado em outros casos.

No modelo MapReduce, o programa é executado em duas fases chamadas de mapper e reducer.

Os dados geralmente vêm de um sistema de arquivos distribuído. Chegam em pares chave/valor que são espalhadas pelos vários mappers. Cada mapper tem seu método map chamado uma vez para cada par chave/valor.

O framework MapReduce distribui os pares chave/valor resultados dos mappers, ajeita (sort e outras coisas mais) e os apresenta aos reducers que trata cada par com seu método reduce.

O parágrafo anterior já deixa claro que normalmente se usa um framework de execução (runtime). É ele que lida de forma transparente com todos os aspectos da execução em cluster, do agendamento das tarefas, do tratamento de falhas e de resolver problemas de sorting e shuffling distribuído entre as fases mapper e reducer.

Nesta série ainda pretendemos mostrar o uso do Hadoop e do Amazon EMR (Amazon Elastic MapReduce) mas hoje apenas queremos mostrar um exemplo bem simples.

Detalhando um pouco mais podemos dizer que a fase de mapper pega os dados, processa, filtra e transforma em algo que depois a fase de reducer pode agregar. O padrão de paralelismo é o seguinte:

    – Cada registro de entrada (linhas de arquivos, registros de bancos de dados), é passado a função map do mapper como um par chave/valor (exemplo: nome do arquivo, linha);


    – Na fase de mapper, todos os valores intermediários correspondentes a uma dada output_key são combinados juntos em uma lista.


    – A função map produz uma lista com um ou mais resultados intermediários formada por pares de valores com as output_key, isto é, list (output_key, intermediate_value)


    – A função reduce do reducer pega cada par chave/valor e combina com outros valores que compartilham a mesma chave. Possivelmente filtrando, agregando ou transformando os valores. Durante o processo, a função reduce combina estes valores intermediários em um ou mais valores finais para esta mesma output_key (na prática, usualmente há um único valor final para cada chave).

Uma otimização comum é o uso de combiners. São similares aos reducers exceto que operam diretamente nos resultados dos mappers que estão no mesmo nó do cluster. São como se fossem “mini reducers” que fazem um trabalho de agregação parcial e diminuem o tráfego na rede. Em alguns casos de operações que são associativas e também comutativas, os próprios reducers podem ser usados como combiners.

Um outro componente é o particioner que é responsável por dividir e endereçar aos reducers os pares intermediários resultados dos mappers. Faz isto simplesmente calculando o valor hash de cada chave e fazendo mod com o número de reducers.

A figura abaixo, tirada de Web-Scale Computer Vision using MapReduce for Multimedia Data Mining, ilustra o que descrevemos:

Ilustração do MapReduce

 

Problema resolvido com MapReduce

As primitivas principais de dados são listas e pares chave/valor. Vamos reescrever o fluxo de dados usando estas primitivas:


    1. O input deve ser estruturado como uma lista de pares chave/valor, list (<k1, v1>). Ao processar muitos arquivos como no caso da contagem da frequência das palavras, o input seria list (<String fileName, String fileContent>). Para processar um arguivo grande de log o input poderia ser list (<Integer lineNumber, String logEvent>).

    2. A lista dos pares chave/valor é quebrada e cada par chave/valor <k1, v1> é processado chamando a função map do mapper. Na prática a chave k1 é geralmente ignorada pelo mapper. O mapper transforma cada par <k1, v1> em uma lista de pares <k2, v2>. Os detalhes desta transformação determinam o que faz o programa MapReduce.



      No caso do programa de contagem da frequência das palavras, nosso mapper recebe <String fileName, String fileContent> e logo ignora o fileName. O output poderia ser uma lista <String word, Integer count> mas na verdade será bem mais simples, repetindo <String word, Integer 1> quantas vezes word aparecer e deixar a agregação para depois. Ao invés de ter um único par <“xpto”, 4>, vamos ter no output quatro vezes o par <“xpto”, 1>.

    3. As saídas de todos os mappers estão todas conceitualmente agregadas em uma lista gigante de pares <k2, v2>. Todos os pares compartilhando o mesmo k2 são agrupados em um novo par <k2, list(v2)> (exemplo: <“xpto”, list(1,1,1,1)>). O framework faz com que o reducer processe estes pares agregados individualmente. O função reduce do reducer que processar xpto terá como saída <“xpto”, 5>. O framework MapReduce automaticamente coletará todos os pares <k3, v3> e os escreverá nos arquivos de saída.

Conceitualmente as funções map e reduce recebem e devolvem (ou imprimem como no pseudo código adiante) os seguintes tipos:

  • map     <k1, v1>         –> list (k2, v2)
  • reduce <k2, list(v2)>   –> list (k3, v3) – no caso do nosso exemplo, os tipos de k2 e k3 são os mesmos e v2 e v3 também tem o mesmo tipo.

 

Pseudo código das funções map e reduce para contar as frequências das palavras:

Map

    [sourcecode language=”cpp”]
    map (String fileName, String document) {
    List <String> T = tokenize(document)
    for each token in T {
    emitIntermediate ( token, 1)
    }
    }[/sourcecode]

Reduce

    [sourcecode language=”cpp”]
    reduce (String token, List<Integer> values) {
    Integer sum = 0
    for each value in values {
    sum = sum + value
    }
    emit ( token, sum)
    }[/sourcecode]

 

Conclusão

Na verdade é uma interrupção. Este post faz parte de uma série. O blá, blá, blá teórico será seguido por um post quase que só com código e este contador de frequência de palavras funcionará na prática.