Criando um data source personalizado para o Apache Spark

O Spark é uma ferramenta de processamento para grandes volumes de dados que permite o uso de diversas fontes de dados. É uma plataforma muito flexível, onde conseguimos ler e escrever arquivos em diversos formatos (CSV, JSON, Parquet, etc.), bancos de dados (MySQL, SQL Server, DynamoDB, etc.) e até ferramentas de streaming / mensageria (Kafka, etc.).

Esses providers podem ser adicionados como bibliotecas externas ao Spark e, caso você não encontre nenhum que atenda os seus requisitos, o Spark permite a criação dos seus próprios data sources implementando a API DataSourceV2, introduzida na versão 2.3 do Spark.

É com a ideia de exercitar esse conceito que eu resolvi criar um data source para escrever em uma fila da AWS (SQS). O código fonte pode ser encontrado em https://github.com/fabiogouw/spark-aws-messaging e o componente foi disponibilizado como uma biblioteca pública no repositório central do Maven.

Criando seu próprio data source

O diagrama abaixo mostra as classes que foram necessárias a implementação para se criar o data source personalizado. São várias classes que formam toda a estrutura necessária para o funcionamento de um data source personalizado, pois temos classes responsáveis pela configuração do data source, definição das suas capacidades (ou seja, o que o Spark pode contar que ele faça), controle da operação de gravação que pode ser executada em várias máquinas (lembrando que o Spark é uma ferramenta de processamento distribuído, dividindo o trabalho em partições), a própria gravação dos dados de cada partição na fila SQS, etc.

Vamos começar com a classe SQSSkinProvider, que herda da classe TableProvider, que representa o novo data source. É essa classe que vamos referenciar no código para que os dados armazenados em um dataframe sejam enviados para uma fila SQS. Aqui um dos métodos que é implementado é o inferSchema, responsável por dizer ao Spark qual é o schema esperado para que este data source funcione. No caso, esperamos que o dataframe possua uma coluna chamada value, coluna esta que será usada como corpo da mensagem gerada no SQS.


Um dos métodos que precisamos implementar da classe TableProvider é o getTable, responsável por gerar uma "tabela". Essa tabela é uma representação lógica de um conjunto de dados que é manipulado pelo data source.

public class SQSSinkProvider implements TableProvider, DataSourceRegister {
    @Override
    public StructType inferSchema(CaseInsensitiveStringMap options) {
        StructField valueField = new StructField("value", DataTypes.StringType, true, Metadata.empty());
        return new StructType(new StructField[]{ valueField });
    }

    @Override
    public Table getTable(StructType schema, Transform[] partitioning, Map<String, String> properties) {
        return new SQSSinkTable(schema);
    }

    @Override
    public boolean supportsExternalMetadata() { return true; };

    @Override
    public String shortName() {
        return "sqs";
    }
}

Essa "tabela" é representada pela classe SQSSinkTable, que implementa a interface SupportsWrite. Como nosso data source é apenas de gravação, não estamos implementando a interface SupportsRead, que expõe métodos para que o Spark leia dados. Aqui eu destaco o método capabilities, onde eu especifico que esse data source terá a capacidade de escrita em batch (o enumerador TableCapability lista todas as opções disponíveis); e o método newWriteBuilder, que retorna uma instância do objeto responsável por configurar o processo de escrita.

public class SQSSinkTable implements SupportsWrite {

    private Set<TableCapability> capabilities;
    private StructType schema;

    public SQSSinkTable(StructType schema) {
        this.schema = schema;
    }

    @Override
    public WriteBuilder newWriteBuilder(LogicalWriteInfo info) {
        return new SQSSinkWriteBuilder(info);
    }

    @Override
    public String name() {
        return "AWS-SQS";
    }

    @Override
    public StructType schema() {
        return this.schema;
    }

    @Override
    public Set<TableCapability> capabilities() {
        if (capabilities == null) {
            this.capabilities = new HashSet<>();
            capabilities.add(TableCapability.BATCH_WRITE);
        }
        return capabilities;
    }
}

Este objeto responsável pela configuração do processo de escrita é o SQSSinkWriteBuilder (herda de WriteBuilder). Aqui vamos implementar o método buildForBatch apenas (outra opção seria implementar o buildForStreaming, mas como esse data source não foi criado para trabalhar com streaming, pelo menos por hora, vamos deixá-lo de fora). Note que todo o código deste método irá retornar um outro objeto que gerencia o processo de gravação, e em buildForBatch estamos pegando as opções que o desenvolvedor passa na hora de chamar o método write do dataframe, por exemplo, o nome da fila SQS e a região da AWS que essa fila se encontra.

public class SQSSinkWriteBuilder implements WriteBuilder {

    private LogicalWriteInfo info;
    private static final String messageAttributesColumnName = "msg_attributes";
    private static final String valueColumnName = "value";

    public SQSSinkWriteBuilder(LogicalWriteInfo info) {
        this.info = info;
    }

    @Override
    public BatchWrite buildForBatch() {
        int batchSize = Integer.parseInt(info.options().getOrDefault("batchSize", "1000"));
        final StructType schema = info.schema();
        SQSSinkOptions.Service service = SQSSinkOptions.Service.valueOf(
                info.options().getOrDefault("service", "SQS")
                .toUpperCase().trim());
        SQSSinkOptions options = new SQSSinkOptions(
                info.options().get("region"),
                info.options().get("endpoint"),
                info.options().get("queueName"),
                batchSize,
                service,
                schema.fieldIndex(valueColumnName),
                schema.getFieldIndex(messageAttributesColumnName).isEmpty() ? -1 : schema.fieldIndex(messageAttributesColumnName)
                );
        return new SQSSinkBatchWrite(options);
    }
}

A classe SQSSinkBatchWrite (herda de BatchWrite), retornada pela classe que explicamos anteriormente, é responsável por gerenciar a escrita como um todo. Veja que pela característica de processamento distribuído do Spark, o trabalho de gravação é dividido em várias tasks que são espalhadas pelo cluster. Então vamos precisar de alguém que olhe o processo como um todo, até mesmo pra ligar com questões de controle transacional (se algum dos processos de escrita deu erro, o data source pode querer abortar o processo, desfazendo qualquer outra escrita com sucesso).

No código de exemplo, eu não me preocupei com esse controle transacional, por isso os métodos commit e abort estão vazio. O trecho importante aqui é o retorno de um factory de data writers, que serão as instâncias que efetivamente serão usadas para a escrita das partições.

public class SQSSinkBatchWrite implements BatchWrite {

    private SQSSinkOptions options;

    public SQSSinkBatchWrite(SQSSinkOptions options) {
        this.options = options;
    }

    @Override
    public DataWriterFactory createBatchWriterFactory(PhysicalWriteInfo info) {
        return new SQSSinkDataWriterFactory(options);
    }

    @Override
    public void commit(WriterCommitMessage[] messages) {

    }

    @Override
    public void abort(WriterCommitMessage[] messages) {

    }
}

SQSSinkDataWriterFactory (herda de DataWriterFactory) é um objeto criado que é enviado para cada um dos nós do cluster do Spark (por isso a necessidade dele ser serializável), cuja responsabilidade é, dentro de cada um dos executores do Spark, gerar um data writer por partição (ou seja, para cada partição, o Spark irá chamar o método createWriter). Aqui então, para a criação desse data writer, eu monto o cliente do SQS a partir das informações de configurações que foram disponibilizadas (nome da fila, região, etc.).

public class SQSSinkDataWriterFactory implements DataWriterFactory {

    private SQSSinkOptions options;

    public SQSSinkDataWriterFactory(SQSSinkOptions options) {
        this.options = options;
    }

    @Override
    public DataWriter<InternalRow> createWriter(int partitionId, long taskId) {

        AmazonSQSClientBuilder clientBuilder = AmazonSQSClientBuilder.standard();
        if(!options.getEndpoint().isEmpty()) {
            AwsClientBuilder.EndpointConfiguration endpointConfiguration = new AwsClientBuilder.EndpointConfiguration(
                    options.getEndpoint(), options.getRegion());
            clientBuilder.withEndpointConfiguration(endpointConfiguration);
        }
        else {
            clientBuilder.withRegion(options.getRegion());
        }
        AmazonSQS sqs = clientBuilder.build();
        return new SQSSinkDataWriter(partitionId,
                taskId,
                sqs,
                options.getBatchSize(),
                options.getQueueName(),
                options.getValueColumnIndex(),
                options.getMsgAttributesColumnIndex());
    }
}

E finalmente temos a classe SQSSinkDataWriter (herda de DataWriter) que é quem efetivamente vai pegar os dados da partição do dataframe e mandá-los para a fila SQS.

public class SQSSinkDataWriter implements DataWriter<InternalRow> {

    private final int partitionId;
    private final long taskId;
    private final AmazonSQS sqs;
    private final List<SendMessageBatchRequestEntry> messages = new ArrayList<SendMessageBatchRequestEntry>();
    private final int batchMaxSize;
    private final String queueUrl;
    private final int valueColumnIndex;
    private final int msgAttributesColumnIndex;

    public SQSSinkDataWriter(int partitionId,
                             long taskId,
                             AmazonSQS sqs,
                             int batchMaxSize,
                             String queueName,
                             int valueColumnIndex,
                             int msgAttributesColumnIndex) {
        this.partitionId = partitionId;
        this.taskId = taskId;
        this.batchMaxSize = batchMaxSize;
        this.sqs = sqs;
        queueUrl = sqs.getQueueUrl(queueName).getQueueUrl();
        this.valueColumnIndex = valueColumnIndex;
        this.msgAttributesColumnIndex = msgAttributesColumnIndex;
    }

    @Override
    public void write(InternalRow record) throws IOException {
        SendMessageBatchRequestEntry msg = new SendMessageBatchRequestEntry()
                .withMessageBody(record.getString(valueColumnIndex))
                .withId(UUID.randomUUID().toString());
        messages.add(msg);
        if(messages.size() >= batchMaxSize) {
            sendMessages();
        }
    }

    @Override
    public WriterCommitMessage commit() throws IOException {
        try {
            if(messages.size() > 0) {
                sendMessages();
            }
        } catch (Exception e) {
            e.printStackTrace();
        }
        return new SQSSinkWriterCommitMessage(partitionId, taskId);
    }

    @Override
    public void abort() throws IOException {

    }

    @Override
    public void close() throws IOException {

    }

    private void sendMessages() {
        SendMessageBatchRequest batch = new SendMessageBatchRequest()
                .withQueueUrl(queueUrl)
                .withEntries(messages);
        sqs.sendMessageBatch(batch);
        messages.clear();
    }
}

O método write dessa classe é chamado para cada linha da partição do dataframe que está sendo gravado e, quando a partição termina, é chamado o método commit. O interessante aqui é que esse método commit vai gerar um objeto WriterCommitMessage, que será enviado de volta pra instância da classe SQSSinkWriteBatch, exatamente para ela saber o que ocorreu no processamento dessa partição.

Como utilizar o novo data source?

Depois de todo esse trabalho, fica a pergunta: como utilizar esse novo data source no Spark? A resposta é a mais simples possível: quando chamamos o método write do dataframe, nós especificamos qual é o formato usado para gravação. Como parâmetro, nós podemos passar o nome completo da classe SQSSinkProvider que o Spark irá encontrá-la e, no momento da gravação do dataframe, irá utilizá-la. Podemos ver isso no exemplo de código abaixo (desta vez em Python).

df.write.format("com.fabiogouw.spark.awsmessaging.sqs.SQSSinkProvider").mode("append")\
	.option("queueName", "test")\
    .option("batchSize", "10") \
    .option("region", "us-east-2") \
    .save()

Para executar, como não se trata de uma biblioteca nativa do Spark, é necessário passar o nome do package junto da linha de execução do spark-submit, de forma que os binários possam ser baixados para execução.

spark-submit \
--packages com.fabiogouw:spark-aws-messaging:0.3.1,com.amazonaws:aws-java-sdk-sqs:1.12.13 \
test.py sample.txt

Para mais detalhes de execução, exemplos e o código fonte mais atualizado, vocês podem olhar no meu Github. Sugestões de melhorias são bem-vindas!

Abraços e até a próxima

Comentários

Postagens mais visitadas deste blog

Trocando configurações padrão do Live TIM

Uma proposta de Clean Architecure com Modelo de Atores

Testes automatizados em sistemas autenticados com certificados digitais, usando Selenium e PhantomJS