Criando um data source personalizado para o Apache Spark
O Spark é uma ferramenta de processamento para grandes volumes de dados que permite o uso de diversas fontes de dados. É uma plataforma muito flexível, onde conseguimos ler e escrever arquivos em diversos formatos (CSV, JSON, Parquet, etc.), bancos de dados (MySQL, SQL Server, DynamoDB, etc.) e até ferramentas de streaming / mensageria (Kafka, etc.).
Esses providers podem ser adicionados como bibliotecas externas ao Spark e, caso você não encontre nenhum que atenda os seus requisitos, o Spark permite a criação dos seus próprios data sources implementando a API DataSourceV2, introduzida na versão 2.3 do Spark.
É com a ideia de exercitar esse conceito que eu resolvi criar um data source para escrever em uma fila da AWS (SQS). O código fonte pode ser encontrado em https://github.com/fabiogouw/spark-aws-messaging e o componente foi disponibilizado como uma biblioteca pública no repositório central do Maven.
Criando seu próprio data source
O diagrama abaixo mostra as classes que foram necessárias a implementação para se criar o data source personalizado. São várias classes que formam toda a estrutura necessária para o funcionamento de um data source personalizado, pois temos classes responsáveis pela configuração do data source, definição das suas capacidades (ou seja, o que o Spark pode contar que ele faça), controle da operação de gravação que pode ser executada em várias máquinas (lembrando que o Spark é uma ferramenta de processamento distribuído, dividindo o trabalho em partições), a própria gravação dos dados de cada partição na fila SQS, etc.
Vamos começar com a classe SQSSkinProvider, que herda da classe TableProvider, que representa o novo data source. É essa classe que vamos referenciar no código para que os dados armazenados em um dataframe sejam enviados para uma fila SQS. Aqui um dos métodos que é implementado é o inferSchema, responsável por dizer ao Spark qual é o schema esperado para que este data source funcione. No caso, esperamos que o dataframe possua uma coluna chamada value, coluna esta que será usada como corpo da mensagem gerada no SQS.
Um dos métodos que precisamos implementar da classe TableProvider é o getTable, responsável por gerar uma "tabela". Essa tabela é uma representação lógica de um conjunto de dados que é manipulado pelo data source.
public class SQSSinkProvider implements TableProvider, DataSourceRegister {
@Override
public StructType inferSchema(CaseInsensitiveStringMap options) {
StructField valueField = new StructField("value", DataTypes.StringType, true, Metadata.empty());
return new StructType(new StructField[]{ valueField });
}
@Override
public Table getTable(StructType schema, Transform[] partitioning, Map<String, String> properties) {
return new SQSSinkTable(schema);
}
@Override
public boolean supportsExternalMetadata() { return true; };
@Override
public String shortName() {
return "sqs";
}
}
Essa "tabela" é representada pela classe SQSSinkTable, que implementa a interface SupportsWrite. Como nosso data source é apenas de gravação, não estamos implementando a interface SupportsRead, que expõe métodos para que o Spark leia dados. Aqui eu destaco o método capabilities, onde eu especifico que esse data source terá a capacidade de escrita em batch (o enumerador TableCapability lista todas as opções disponíveis); e o método newWriteBuilder, que retorna uma instância do objeto responsável por configurar o processo de escrita.
public class SQSSinkTable implements SupportsWrite {
private Set<TableCapability> capabilities;
private StructType schema;
public SQSSinkTable(StructType schema) {
this.schema = schema;
}
@Override
public WriteBuilder newWriteBuilder(LogicalWriteInfo info) {
return new SQSSinkWriteBuilder(info);
}
@Override
public String name() {
return "AWS-SQS";
}
@Override
public StructType schema() {
return this.schema;
}
@Override
public Set<TableCapability> capabilities() {
if (capabilities == null) {
this.capabilities = new HashSet<>();
capabilities.add(TableCapability.BATCH_WRITE);
}
return capabilities;
}
}
Este objeto responsável pela configuração do processo de escrita é o SQSSinkWriteBuilder (herda de WriteBuilder). Aqui vamos implementar o método buildForBatch apenas (outra opção seria implementar o buildForStreaming, mas como esse data source não foi criado para trabalhar com streaming, pelo menos por hora, vamos deixá-lo de fora). Note que todo o código deste método irá retornar um outro objeto que gerencia o processo de gravação, e em buildForBatch estamos pegando as opções que o desenvolvedor passa na hora de chamar o método write do dataframe, por exemplo, o nome da fila SQS e a região da AWS que essa fila se encontra.
public class SQSSinkWriteBuilder implements WriteBuilder {
private LogicalWriteInfo info;
private static final String messageAttributesColumnName = "msg_attributes";
private static final String valueColumnName = "value";
public SQSSinkWriteBuilder(LogicalWriteInfo info) {
this.info = info;
}
@Override
public BatchWrite buildForBatch() {
int batchSize = Integer.parseInt(info.options().getOrDefault("batchSize", "1000"));
final StructType schema = info.schema();
SQSSinkOptions.Service service = SQSSinkOptions.Service.valueOf(
info.options().getOrDefault("service", "SQS")
.toUpperCase().trim());
SQSSinkOptions options = new SQSSinkOptions(
info.options().get("region"),
info.options().get("endpoint"),
info.options().get("queueName"),
batchSize,
service,
schema.fieldIndex(valueColumnName),
schema.getFieldIndex(messageAttributesColumnName).isEmpty() ? -1 : schema.fieldIndex(messageAttributesColumnName)
);
return new SQSSinkBatchWrite(options);
}
}
A classe SQSSinkBatchWrite (herda de BatchWrite), retornada pela classe que explicamos anteriormente, é responsável por gerenciar a escrita como um todo. Veja que pela característica de processamento distribuído do Spark, o trabalho de gravação é dividido em várias tasks que são espalhadas pelo cluster. Então vamos precisar de alguém que olhe o processo como um todo, até mesmo pra ligar com questões de controle transacional (se algum dos processos de escrita deu erro, o data source pode querer abortar o processo, desfazendo qualquer outra escrita com sucesso).
No código de exemplo, eu não me preocupei com esse controle transacional, por isso os métodos commit e abort estão vazio. O trecho importante aqui é o retorno de um factory de data writers, que serão as instâncias que efetivamente serão usadas para a escrita das partições.
public class SQSSinkBatchWrite implements BatchWrite {
private SQSSinkOptions options;
public SQSSinkBatchWrite(SQSSinkOptions options) {
this.options = options;
}
@Override
public DataWriterFactory createBatchWriterFactory(PhysicalWriteInfo info) {
return new SQSSinkDataWriterFactory(options);
}
@Override
public void commit(WriterCommitMessage[] messages) {
}
@Override
public void abort(WriterCommitMessage[] messages) {
}
}
SQSSinkDataWriterFactory (herda de DataWriterFactory) é um objeto criado que é enviado para cada um dos nós do cluster do Spark (por isso a necessidade dele ser serializável), cuja responsabilidade é, dentro de cada um dos executores do Spark, gerar um data writer por partição (ou seja, para cada partição, o Spark irá chamar o método createWriter). Aqui então, para a criação desse data writer, eu monto o cliente do SQS a partir das informações de configurações que foram disponibilizadas (nome da fila, região, etc.).
public class SQSSinkDataWriterFactory implements DataWriterFactory {
private SQSSinkOptions options;
public SQSSinkDataWriterFactory(SQSSinkOptions options) {
this.options = options;
}
@Override
public DataWriter<InternalRow> createWriter(int partitionId, long taskId) {
AmazonSQSClientBuilder clientBuilder = AmazonSQSClientBuilder.standard();
if(!options.getEndpoint().isEmpty()) {
AwsClientBuilder.EndpointConfiguration endpointConfiguration = new AwsClientBuilder.EndpointConfiguration(
options.getEndpoint(), options.getRegion());
clientBuilder.withEndpointConfiguration(endpointConfiguration);
}
else {
clientBuilder.withRegion(options.getRegion());
}
AmazonSQS sqs = clientBuilder.build();
return new SQSSinkDataWriter(partitionId,
taskId,
sqs,
options.getBatchSize(),
options.getQueueName(),
options.getValueColumnIndex(),
options.getMsgAttributesColumnIndex());
}
}
E finalmente temos a classe SQSSinkDataWriter (herda de DataWriter) que é quem efetivamente vai pegar os dados da partição do dataframe e mandá-los para a fila SQS.
public class SQSSinkDataWriter implements DataWriter<InternalRow> {
private final int partitionId;
private final long taskId;
private final AmazonSQS sqs;
private final List<SendMessageBatchRequestEntry> messages = new ArrayList<SendMessageBatchRequestEntry>();
private final int batchMaxSize;
private final String queueUrl;
private final int valueColumnIndex;
private final int msgAttributesColumnIndex;
public SQSSinkDataWriter(int partitionId,
long taskId,
AmazonSQS sqs,
int batchMaxSize,
String queueName,
int valueColumnIndex,
int msgAttributesColumnIndex) {
this.partitionId = partitionId;
this.taskId = taskId;
this.batchMaxSize = batchMaxSize;
this.sqs = sqs;
queueUrl = sqs.getQueueUrl(queueName).getQueueUrl();
this.valueColumnIndex = valueColumnIndex;
this.msgAttributesColumnIndex = msgAttributesColumnIndex;
}
@Override
public void write(InternalRow record) throws IOException {
SendMessageBatchRequestEntry msg = new SendMessageBatchRequestEntry()
.withMessageBody(record.getString(valueColumnIndex))
.withId(UUID.randomUUID().toString());
messages.add(msg);
if(messages.size() >= batchMaxSize) {
sendMessages();
}
}
@Override
public WriterCommitMessage commit() throws IOException {
try {
if(messages.size() > 0) {
sendMessages();
}
} catch (Exception e) {
e.printStackTrace();
}
return new SQSSinkWriterCommitMessage(partitionId, taskId);
}
@Override
public void abort() throws IOException {
}
@Override
public void close() throws IOException {
}
private void sendMessages() {
SendMessageBatchRequest batch = new SendMessageBatchRequest()
.withQueueUrl(queueUrl)
.withEntries(messages);
sqs.sendMessageBatch(batch);
messages.clear();
}
}
O método write dessa classe é chamado para cada linha da partição do dataframe que está sendo gravado e, quando a partição termina, é chamado o método commit. O interessante aqui é que esse método commit vai gerar um objeto WriterCommitMessage, que será enviado de volta pra instância da classe SQSSinkWriteBatch, exatamente para ela saber o que ocorreu no processamento dessa partição.
Como utilizar o novo data source?
Depois de todo esse trabalho, fica a pergunta: como utilizar esse novo data source no Spark? A resposta é a mais simples possível: quando chamamos o método write do dataframe, nós especificamos qual é o formato usado para gravação. Como parâmetro, nós podemos passar o nome completo da classe SQSSinkProvider que o Spark irá encontrá-la e, no momento da gravação do dataframe, irá utilizá-la. Podemos ver isso no exemplo de código abaixo (desta vez em Python).
df.write.format("com.fabiogouw.spark.awsmessaging.sqs.SQSSinkProvider").mode("append")\
.option("queueName", "test")\
.option("batchSize", "10") \
.option("region", "us-east-2") \
.save()
Para executar, como não se trata de uma biblioteca nativa do Spark, é necessário passar o nome do package junto da linha de execução do spark-submit, de forma que os binários possam ser baixados para execução.
spark-submit \
--packages com.fabiogouw:spark-aws-messaging:0.3.1,com.amazonaws:aws-java-sdk-sqs:1.12.13 \
test.py sample.txt
Para mais detalhes de execução, exemplos e o código fonte mais atualizado, vocês podem olhar no meu Github. Sugestões de melhorias são bem-vindas!
Abraços e até a próxima
Comentários
Postar um comentário