Criando um data source personalizado para o Apache Spark

O Spark é uma ferramenta de processamento para grandes volumes de dados que permite o uso de diversas fontes de dados. É uma plataforma muito flexível, onde conseguimos ler e escrever arquivos em diversos formatos (CSV, JSON, Parquet, etc.), bancos de dados (MySQL, SQL Server, DynamoDB, etc.) e até ferramentas de streaming / mensageria (Kafka, etc.).

Esses providers podem ser adicionados como bibliotecas externas ao Spark e, caso você não encontre nenhum que atenda os seus requisitos, o Spark permite a criação dos seus próprios data sources implementando a API DataSourceV2, introduzida na versão 2.3 do Spark.

É com a ideia de exercitar esse conceito que eu resolvi criar um data source para escrever em uma fila da AWS (SQS). O código fonte pode ser encontrado em https://github.com/fabiogouw/spark-aws-messaging e o componente foi disponibilizado como uma biblioteca pública no repositório central do Maven.

Criando seu próprio data source

O diagrama abaixo mostra as classes que foram necessárias a implementação para se criar o data source personalizado. São várias classes que formam toda a estrutura necessária para o funcionamento de um data source personalizado, pois temos classes responsáveis pela configuração do data source, definição das suas capacidades (ou seja, o que o Spark pode contar que ele faça), controle da operação de gravação que pode ser executada em várias máquinas (lembrando que o Spark é uma ferramenta de processamento distribuído, dividindo o trabalho em partições), a própria gravação dos dados de cada partição na fila SQS, etc.

Vamos começar com a classe SQSSkinProvider, que herda da classe TableProvider, que representa o novo data source. É essa classe que vamos referenciar no código para que os dados armazenados em um dataframe sejam enviados para uma fila SQS. Aqui um dos métodos que é implementado é o inferSchema, responsável por dizer ao Spark qual é o schema esperado para que este data source funcione. No caso, esperamos que o dataframe possua uma coluna chamada value, coluna esta que será usada como corpo da mensagem gerada no SQS.


Um dos métodos que precisamos implementar da classe TableProvider é o getTable, responsável por gerar uma "tabela". Essa tabela é uma representação lógica de um conjunto de dados que é manipulado pelo data source.

public class SQSSinkProvider implements TableProvider, DataSourceRegister {
    @Override
    public StructType inferSchema(CaseInsensitiveStringMap options) {
        StructField valueField = new StructField("value", DataTypes.StringType, true, Metadata.empty());
        return new StructType(new StructField[]{ valueField });
    }

    @Override
    public Table getTable(StructType schema, Transform[] partitioning, Map<String, String> properties) {
        return new SQSSinkTable(schema);
    }

    @Override
    public boolean supportsExternalMetadata() { return true; };

    @Override
    public String shortName() {
        return "sqs";
    }
}

Essa "tabela" é representada pela classe SQSSinkTable, que implementa a interface SupportsWrite. Como nosso data source é apenas de gravação, não estamos implementando a interface SupportsRead, que expõe métodos para que o Spark leia dados. Aqui eu destaco o método capabilities, onde eu especifico que esse data source terá a capacidade de escrita em batch (o enumerador TableCapability lista todas as opções disponíveis); e o método newWriteBuilder, que retorna uma instância do objeto responsável por configurar o processo de escrita.

public class SQSSinkTable implements SupportsWrite {

    private Set<TableCapability> capabilities;
    private StructType schema;

    public SQSSinkTable(StructType schema) {
        this.schema = schema;
    }

    @Override
    public WriteBuilder newWriteBuilder(LogicalWriteInfo info) {
        return new SQSSinkWriteBuilder(info);
    }

    @Override
    public String name() {
        return "AWS-SQS";
    }

    @Override
    public StructType schema() {
        return this.schema;
    }

    @Override
    public Set<TableCapability> capabilities() {
        if (capabilities == null) {
            this.capabilities = new HashSet<>();
            capabilities.add(TableCapability.BATCH_WRITE);
        }
        return capabilities;
    }
}

Este objeto responsável pela configuração do processo de escrita é o SQSSinkWriteBuilder (herda de WriteBuilder). Aqui vamos implementar o método buildForBatch apenas (outra opção seria implementar o buildForStreaming, mas como esse data source não foi criado para trabalhar com streaming, pelo menos por hora, vamos deixá-lo de fora). Note que todo o código deste método irá retornar um outro objeto que gerencia o processo de gravação, e em buildForBatch estamos pegando as opções que o desenvolvedor passa na hora de chamar o método write do dataframe, por exemplo, o nome da fila SQS e a região da AWS que essa fila se encontra.

public class SQSSinkWriteBuilder implements WriteBuilder {

    private LogicalWriteInfo info;
    private static final String messageAttributesColumnName = "msg_attributes";
    private static final String valueColumnName = "value";

    public SQSSinkWriteBuilder(LogicalWriteInfo info) {
        this.info = info;
    }

    @Override
    public BatchWrite buildForBatch() {
        int batchSize = Integer.parseInt(info.options().getOrDefault("batchSize", "1000"));
        final StructType schema = info.schema();
        SQSSinkOptions.Service service = SQSSinkOptions.Service.valueOf(
                info.options().getOrDefault("service", "SQS")
                .toUpperCase().trim());
        SQSSinkOptions options = new SQSSinkOptions(
                info.options().get("region"),
                info.options().get("endpoint"),
                info.options().get("queueName"),
                batchSize,
                service,
                schema.fieldIndex(valueColumnName),
                schema.getFieldIndex(messageAttributesColumnName).isEmpty() ? -1 : schema.fieldIndex(messageAttributesColumnName)
                );
        return new SQSSinkBatchWrite(options);
    }
}

A classe SQSSinkBatchWrite (herda de BatchWrite), retornada pela classe que explicamos anteriormente, é responsável por gerenciar a escrita como um todo. Veja que pela característica de processamento distribuído do Spark, o trabalho de gravação é dividido em várias tasks que são espalhadas pelo cluster. Então vamos precisar de alguém que olhe o processo como um todo, até mesmo pra ligar com questões de controle transacional (se algum dos processos de escrita deu erro, o data source pode querer abortar o processo, desfazendo qualquer outra escrita com sucesso).

No código de exemplo, eu não me preocupei com esse controle transacional, por isso os métodos commit e abort estão vazio. O trecho importante aqui é o retorno de um factory de data writers, que serão as instâncias que efetivamente serão usadas para a escrita das partições.

public class SQSSinkBatchWrite implements BatchWrite {

    private SQSSinkOptions options;

    public SQSSinkBatchWrite(SQSSinkOptions options) {
        this.options = options;
    }

    @Override
    public DataWriterFactory createBatchWriterFactory(PhysicalWriteInfo info) {
        return new SQSSinkDataWriterFactory(options);
    }

    @Override
    public void commit(WriterCommitMessage[] messages) {

    }

    @Override
    public void abort(WriterCommitMessage[] messages) {

    }
}

SQSSinkDataWriterFactory (herda de DataWriterFactory) é um objeto criado que é enviado para cada um dos nós do cluster do Spark (por isso a necessidade dele ser serializável), cuja responsabilidade é, dentro de cada um dos executores do Spark, gerar um data writer por partição (ou seja, para cada partição, o Spark irá chamar o método createWriter). Aqui então, para a criação desse data writer, eu monto o cliente do SQS a partir das informações de configurações que foram disponibilizadas (nome da fila, região, etc.).

public class SQSSinkDataWriterFactory implements DataWriterFactory {

    private SQSSinkOptions options;

    public SQSSinkDataWriterFactory(SQSSinkOptions options) {
        this.options = options;
    }

    @Override
    public DataWriter<InternalRow> createWriter(int partitionId, long taskId) {

        AmazonSQSClientBuilder clientBuilder = AmazonSQSClientBuilder.standard();
        if(!options.getEndpoint().isEmpty()) {
            AwsClientBuilder.EndpointConfiguration endpointConfiguration = new AwsClientBuilder.EndpointConfiguration(
                    options.getEndpoint(), options.getRegion());
            clientBuilder.withEndpointConfiguration(endpointConfiguration);
        }
        else {
            clientBuilder.withRegion(options.getRegion());
        }
        AmazonSQS sqs = clientBuilder.build();
        return new SQSSinkDataWriter(partitionId,
                taskId,
                sqs,
                options.getBatchSize(),
                options.getQueueName(),
                options.getValueColumnIndex(),
                options.getMsgAttributesColumnIndex());
    }
}

E finalmente temos a classe SQSSinkDataWriter (herda de DataWriter) que é quem efetivamente vai pegar os dados da partição do dataframe e mandá-los para a fila SQS.

public class SQSSinkDataWriter implements DataWriter<InternalRow> {

    private final int partitionId;
    private final long taskId;
    private final AmazonSQS sqs;
    private final List<SendMessageBatchRequestEntry> messages = new ArrayList<SendMessageBatchRequestEntry>();
    private final int batchMaxSize;
    private final String queueUrl;
    private final int valueColumnIndex;
    private final int msgAttributesColumnIndex;

    public SQSSinkDataWriter(int partitionId,
                             long taskId,
                             AmazonSQS sqs,
                             int batchMaxSize,
                             String queueName,
                             int valueColumnIndex,
                             int msgAttributesColumnIndex) {
        this.partitionId = partitionId;
        this.taskId = taskId;
        this.batchMaxSize = batchMaxSize;
        this.sqs = sqs;
        queueUrl = sqs.getQueueUrl(queueName).getQueueUrl();
        this.valueColumnIndex = valueColumnIndex;
        this.msgAttributesColumnIndex = msgAttributesColumnIndex;
    }

    @Override
    public void write(InternalRow record) throws IOException {
        SendMessageBatchRequestEntry msg = new SendMessageBatchRequestEntry()
                .withMessageBody(record.getString(valueColumnIndex))
                .withId(UUID.randomUUID().toString());
        messages.add(msg);
        if(messages.size() >= batchMaxSize) {
            sendMessages();
        }
    }

    @Override
    public WriterCommitMessage commit() throws IOException {
        try {
            if(messages.size() > 0) {
                sendMessages();
            }
        } catch (Exception e) {
            e.printStackTrace();
        }
        return new SQSSinkWriterCommitMessage(partitionId, taskId);
    }

    @Override
    public void abort() throws IOException {

    }

    @Override
    public void close() throws IOException {

    }

    private void sendMessages() {
        SendMessageBatchRequest batch = new SendMessageBatchRequest()
                .withQueueUrl(queueUrl)
                .withEntries(messages);
        sqs.sendMessageBatch(batch);
        messages.clear();
    }
}

O método write dessa classe é chamado para cada linha da partição do dataframe que está sendo gravado e, quando a partição termina, é chamado o método commit. O interessante aqui é que esse método commit vai gerar um objeto WriterCommitMessage, que será enviado de volta pra instância da classe SQSSinkWriteBatch, exatamente para ela saber o que ocorreu no processamento dessa partição.

Como utilizar o novo data source?

Depois de todo esse trabalho, fica a pergunta: como utilizar esse novo data source no Spark? A resposta é a mais simples possível: quando chamamos o método write do dataframe, nós especificamos qual é o formato usado para gravação. Como parâmetro, nós podemos passar o nome completo da classe SQSSinkProvider que o Spark irá encontrá-la e, no momento da gravação do dataframe, irá utilizá-la. Podemos ver isso no exemplo de código abaixo (desta vez em Python).

df.write.format("com.fabiogouw.spark.awsmessaging.sqs.SQSSinkProvider").mode("append")\
	.option("queueName", "test")\
    .option("batchSize", "10") \
    .option("region", "us-east-2") \
    .save()

Para executar, como não se trata de uma biblioteca nativa do Spark, é necessário passar o nome do package junto da linha de execução do spark-submit, de forma que os binários possam ser baixados para execução.

spark-submit \
--packages com.fabiogouw:spark-aws-messaging:0.3.1,com.amazonaws:aws-java-sdk-sqs:1.12.13 \
test.py sample.txt

Para mais detalhes de execução, exemplos e o código fonte mais atualizado, vocês podem olhar no meu Github. Sugestões de melhorias são bem-vindas!

Abraços e até a próxima

Comentários

Postagens mais visitadas deste blog

Trocando configurações padrão do Live TIM

Testes automatizados em sistemas autenticados com certificados digitais, usando Selenium e PhantomJS

ORA-01843: not a valid month