Lidando com Idempotência dentro dos Atores

No post anterior, eu comecei a construção de uma aplicação que une os conceitos do Clean Architecture e o Actor Model. Entretanto, ela ainda não tratava vários aspectos essenciais para uma aplicação ser considerada resiliente.

Agora nesse segundo artigo, eu vou mostrar uma proposta para tratar a idempotência na aplicação, de forma com que possamos lidar com cenários de reprocessamento de mensagens com segurança. O código resultante está disponível em https://github.com/fabiogouw/OrleansBank/tree/3283d13eea84da34b8a7131c63cdc1ad8d186f63

O problema

Um problema que meu exemplo no post anterior trazia era que, caso houvesse um erro no processamento do débito e no crédito da transferência bancária, simplesmente as contas ficariam com seu estado inconsistente. Por exemplo, a conta debitada teria o dinheiro retirado, mas a conta que era para ser creditada nunca receberia esse valor. Isso numa aplicação no mundo real é inconcebível.

Uma possibilidade para corrigir esse comportamento aqui seria implementarmos um controle na execução das operações de crédito e débito, fazendo com que caso haja erro, uma nova tentativa de execução ocorra. Nesse caso, precisamos introduzir uma nova responsabilidade na classe de use case, que acumularia o papel de controlador do que já foi executado ou não.

Mas isso traz outro problema de consistência. O que acontece se eu efetuar o débito da conta e acontecer um erro bem no momento que eu marco que esse passo foi executado com sucesso? Se isso acontecer e a rotina entrar em modo de retentativa, esse débito vai ocorrer novamente, pois essas duas operações (debitar na conta e marcar o débito como ok no use case) não são tratadas de forma atômica (ou tudo ou nada). Tentamos resolver nosso problema de consistência introduzindo uma solução de retry, mas essa solução trouxe um novo problema de possibilidade de duplicação de operações. Como resolver isso?

Diagrama de sequência mostrando a interação entre o caso de uso de transferência de dinheiro e os dois atores (um que recebe o débito e outro que recebe o crédito). Aí vem o problema: o que ocorre em caso de erros, visto que essas operações não são transacionais?

A solução

Nesse caso a gente precisa abraçar essa característica de que um serviço pode receber a mesma execução repetidas vezes. Isso é inerente ao desafio de trabalhar com arquiteturas distribuídas. Nesse caso, uma operação precisa saber se ela já foi executada, de forma que uma eventual reexecução não impacte a consistência da aplicação. Operações que podem ser executadas inúmeras vezes com os mesmos parâmetros e mesmo assim não geram nenhum efeito colateral são chamadas de idempotentes.

Alguns tipos de operações já são idempotentes por padrão. Por exemplo, uma exclusão em banco de dados. Não importa quantas vezes seja executado o comando de DELETE, o estado final será sempre o mesmo, com o registro não existindo mais no banco de dados. Já operações que criam registros, como as do exemplo que criam novos registros de transações, não possuem essa característica nativamente. Precisamos então de alguma forma de marcar que determinada operação faz parte de um contexto de negócio onde ela deve ser encarada como única.

Quem define uma operação como única tem que ser o cliente, ou seja, quem está chamando a operação. Dessa forma, ele pode marcar essa operação com um código que identifica unicamente a operação de crédito e de débito que está sendo feita. Vamos chamar essa chave de "chave de idempotência" (nossa implementação da classe de domínio já considerava receber uma chave para identificar unicamente a transação, então vamos reutilizá-la aqui para tratar a idempotência).

Nesse sentido, para que essa característica seja considerada na nossa aplicação de exemplo, eu incluí duas abordagens complementares. A ideia é que todos os atores (grains) mantenham em seu estado (na memória) quais chaves de idempotência já foram aplicadas, de forma que se eles receberem uma operação repetida (ou seja, com a mesma chave), essa ação possa ser ignorada e tratada devidamente para não gerar uma duplicação de funcionalidade. Mas isso traz um problema: se ocorrerem muitas operações, teríamos que armazenar todas as chaves em memória, o que traria um maior consumo de recursos. A solução aqui foi guardarmos as chaves na memória até determinado limite e, conforme novas chaves forem chegando, as mais velhas sendo descartadas para dar lugar às novas.

Mas o que acontece se uma operação mais antiga, cuja chave já tenha sido descartada, eventualmente seja reexecutada? Nesse caso, o ator irá executar a funcionalidade toda, alterando o saldo e incluindo a transação, só que para garantirmos que não haja duplicação de registros, vamos também gravar no banco de dados a chave de idempotência em uma tabela separada, tudo de forma transacional. E caso essa chave já exista, quer dizer que podemos descartar o processamento e recuperar da base o estado atual da aplicação. Como a chance de uma chave antiga ser reprocessada é menor, teremos poucas ocorrências desse tipo e eu acredito que isso acabe compensando a não-validação da idempotência em memória.

Por isso a ideia do funil, onde temos que os atores conseguem lidar com a maior parte dos casos de processamentos repetidos (com a premissa que a chance de repetição de uma operação recente é maior). Caso eles não consigam, o banco de dados acaba garantindo a consistência.

Essa é a primeira vez que estamos considerando o armazenamento de estado dos atores em disco, para que seja possível recuperá-lo no futuro. Isso é feito utilizando a funcionalidade de storage do Orleans

Diagrama de classes mostrando as funcionalidades de storage do Orleans implementadas no exemplo do código

Primeira camada - chaves de idempotência no estado do atores

Na implementação, introduzimos uma nova classe chamada de IdempotencyShield: ela será responsável por armazenar as chaves de idempotência utilizadas e validar todas as operações, para verificar se elas já não foram executadas anteriormente.

namespace OrleansBank.Adapters.Storage
{
    public class IdempotencyShield
    {
        private readonly Queue<string> _idempotencyKeys = new Queue<string>();
        private readonly string _entityKey;
        private readonly int _maxSize = 50;
        private DateTime _updatedAt;

        public string EntityKey
        {
            get { return _entityKey; }
        }

        public string LastIdempotencyKey
        {
            get { return _idempotencyKeys.LastOrDefault(); }
        }

        public DateTime UpdatedAt
        {
            get { return _updatedAt; }
        }

        public IdempotencyShield(string entityKey, IEnumerable<string> idempotencyKeys, int maxSize)
        {
            _entityKey = entityKey;
            _idempotencyKeys = new Queue<string>(idempotencyKeys.Take(maxSize));
            _maxSize = maxSize;
            _updatedAt = DateTime.Now;
        }
        public bool CheckCommitedAction(string idempotencyKey)
        {
            return _idempotencyKeys.Contains(idempotencyKey);
        }

        public void CommitAction(string idempotencyKey)
        {
            _idempotencyKeys.Enqueue(idempotencyKey);
            _updatedAt = DateTime.Now;
            if (_idempotencyKeys.Count > _maxSize)
            {
                _idempotencyKeys.Dequeue();
            }
        }
    }
}
Não há muita complexidade nessa classe, pois ela representa uma lista contendo as chaves utilizadas, junto de métodos que validam a existência de alguma chave nessa lista. A única coisa mais diferente é que temos uma implementação na inclusão da chave, para descartar da lista o elemento mais antigo, caso tenhamos ultrapassado o limite de chaves (que está como padrão em 50 itens).

Esse objeto IdempotencyShield precisa fazer parte do estado do ator (grain), então precisamos alterar a classe AccountGrain.
using Orleans;
using Orleans.Providers;
using Orleans.Runtime;
using OrleansBank.Adapters.Storage;
using OrleansBank.Domain;

namespace OrleansBank.Adapters.Grain
{
    [StorageProvider(ProviderName = "Accounts")]
    public class AccountGrain : Grain<(IdempotencyShield shield, Account account)>, IAccountActor
    {
        public async Task<bool> MakeCredit(string uniqueId, double amount)
        {
            return await ExecuteIdempotently(uniqueId, () => State.account.MakeCredit(uniqueId, amount));
        }

        public async Task<bool> MakeDebit(string uniqueId, double amount)
        {
            return await ExecuteIdempotently(uniqueId, () => State.account.MakeDebit(uniqueId, amount));
        }

        private async Task<bool> ExecuteIdempotently(string idempotentyKey, Func<Task<bool>> func)
        {
            if(State.shield.CheckCommitedAction(idempotentyKey))
            {
                return true;
            }
            var result = await func();
            State.shield.CommitAction(idempotentyKey);
            try
            {
                await WriteStateAsync();
            }
            catch (OrleansException ex) when (ex.InnerException is IdempotencyFailureException)
            {
                // sync the state again, since the storage was the one that realised that the call was duplicated
                await ReadStateAsync();
                return true;
            }
            return result;
        }

        public Task<double> GetBalance()
        {
            return State.account.GetBalance();
        }
    }
}
Percebam que o estado do grain agora não é apenas de Account, mas sim de uma tupla que contém dois objetos: o IdempotencyShield e o Account. O estado do grain está mais complexo. Outro ponto importante é que agora os métodos MakeCredit e MakeDebit estão chamando um método privado que faz uso do "escudo de idempotência": eles validam se a operação está na lista:
  • Se não estiver, então executa a lógica de negócio; e estando tudo certo, o novo estado é persistido em disco.
  • Se já estiver, então trata-se de um caso de reprocessamento de algo que já foi persistido, e portando podemos retornar ok.

Aqui vale um comentário: muitas implementações de controle de idempotência consideram que, caso seja identificada a duplicidade, temos que retornar o mesmo valor que havia sido processado na primeira execução, e por isso há a guarda desse valor em todo retorno. Isso não é exatamente necessário, você pode deixar explicito que tratou-se de um reprocessamento.

E novamente, estamos persistindo o estado sempre que ele é atualizado em disco. Isso quer dizer que aumentamos a latência das operações, mas é um mal necessário. Não podemos correr o risco de perder essas operações caso a máquina onde esses grains estão seja perdida. Apesar disso, essa gravação é apenas uma ida ao banco de dados, pois não temos que buscar o estado armazenado lá para fazer o processamento. Isso por si só já é uma vantagem que as arquiteturas stateful possuem.

Segunda camada - tabela de idempotência

Se a nossa primeira camada de segurança de idempotência "falhar", ainda temos a segunda que irá garantir que nenhuma operação seja duplicada. Aqui essa implementação está muito próxima da funcionalidade de storage do Orleans. Aqui eu parti para a criação de um storage personalizado, com as tabelas abaixo fazendo parte do modelo.

Três tabelas (tb_account, tb_transactions e tb_idempotency_keys)

Eu vou deixar para detalhar como se cria um storage personalizado no Orleans para outro post, mas o importante para saber aqui é que se trata de uma classe que é utilizada pelo framework Orleans, no momento que é solicitado uma ativação de um grain (onde seu estado é recuperado da base) através do método ReadStateAsync e também nas gravações do estado, quando chamamos o método WriteStateAsync. Estou usando uma base de dados MySQL, e a string de conexão é configurada nas propriedades do Orleans.

São três tabelas aqui:

  • tb_account, que guarda o saldo atual da conta e quando foi a última vez que ele foi atualizado.
  • tb_transactions, que guardam as transações dessa conta, que vão formando o saldo dela.
  • tb_idempotency_keys, que guardam as chaves de idempotência das operações que foram aplicadas nessa conta.
Toda a gravação é feita de forma transacional, para não termos situação de inconsistência no banco de dados. Aliás, para mim esse é o melhor ponto para garantirmos a idempotência: o mais próximo possível da gravação das próprias informações da entidade, pois temos certeza que podemos lidar com o contexto transacional corretamente. Caso identifiquemos que já existe um mesmo registro na tabela tb_idempotency_keys, quer dizer que é um processamento de operação já executada e que o grain deve ignorar. Para isso, lançamos uma exceção do tipo IdempotencyFailureException (que é capturada lá na classe AccountGrain).

Abaixo está o trecho da classe IdempotentMySqlAccountStorage que contém o código de gravação de estado.

using MySql.Data.MySqlClient;
using Orleans;
using Orleans.Runtime;
using Orleans.Storage;
using OrleansBank.Domain;

namespace OrleansBank.Adapters.Storage
{
    public class IdempotentMySqlAccountStorage : IGrainStorage
    {

        // ...

        public Task WriteStateAsync(string grainType, GrainReference grainReference, IGrainState grainState)
        {
            var state = ((IdempotencyShield shield, Account account)) grainState.State;
            var connection = new MySqlConnection(_options.ConnectionString);
            string newEtag = CalculareNewEtag(grainState.ETag);

            var commands = new MySqlCommand[] 
                {
                    GetIdempotencyKeyWriteCommand(connection, state.shield),
                    GetAccountWriteCommand(connection, state.account, grainState.ETag, newEtag)
                }
                .Union(GetTransactionWriteCommands(connection, state.account.Transactions))
                ;
            connection.Open();
            try
            {
                var transaction = connection.BeginTransaction();
                try
                {
                    foreach (var command in commands)
                    {
                        command.Transaction = transaction;
                        int rowsAffected = command.ExecuteNonQuery();
                        // TODO: every command should affect at least one row, but is this the right exception type to use?
                        if (rowsAffected == 0)
                        {
                            throw new InconsistentStateException(@$"Version conflict (WriteState): 
                                ProviderName={_storageName} GrainType={grainType} GrainReference={grainReference.ToKeyString()}.");
                        }
                    }
                    transaction.Commit();
                    RecreateState(grainState, state.account, newEtag);
                }
                catch (MySqlException ex)
                {
                    transaction.Rollback();
                    throw CheckIdempotencyException(ex);
                }
            }
            finally
            {
                connection?.Close();
            }
            return Task.CompletedTask;
        }

        private Exception CheckIdempotencyException(MySqlException ex)
        {
            if (ex.Number == (int)MySqlErrorCode.DuplicateKeyEntry && ex.Message.Contains("tb_idempotency_keys.PRIMARY"))
            {
                return new IdempotencyFailureException("account", ex);
            }
            return ex;
        }

        private static void RecreateState(IGrainState grainState, Account account, string newEtag)
        {
            grainState.ETag = newEtag;
            var state = ((IdempotencyShield shield, Account account))grainState.State;
            // after the update, we ignore the saved transactions and recreate the state with the new balance
            state.account = new Account(account.Id, account.Balance, new List<Transaction>());
        }

        private static string CalculareNewEtag(string etag)
        {
            return (int.Parse(etag) + 1).ToString();
        }

        private MySqlCommand GetAccountWriteCommand(MySqlConnection connection, Account account, string currentEtag, string newEtag)
        {
            var cmd = new MySqlCommand(@"UPDATE tb_account
                                            SET balance = @balance,
                                                updated_at = @updated_at,
                                                etag = @newEtag
                                            WHERE account_id = @account_id 
                                            AND etag = @currentEtag", connection);
            cmd.Parameters.AddWithValue("@account_id", account.Id);
            cmd.Parameters.AddWithValue("@balance", account.Balance);
            cmd.Parameters.AddWithValue("@updated_at", account.UpdatedAt);
            cmd.Parameters.AddWithValue("@currentEtag", int.Parse(currentEtag));
            cmd.Parameters.AddWithValue("@newEtag", int.Parse(newEtag));
            return cmd;
        }

        private IEnumerable<MySqlCommand> GetTransactionWriteCommands(MySqlConnection connection, IEnumerable<Transaction> transactions)
        {
            foreach (var transaction in transactions)
            {
                var cmd = new MySqlCommand(@"INSERT INTO tb_transactions
                                            (transaction_id, account_id, amount, created_at)
                                            VALUES (@transaction_id, @account_id, @amount, @created_at)", connection);
                cmd.Parameters.AddWithValue("@transaction_id", transaction.Id);
                cmd.Parameters.AddWithValue("@account_id", transaction.Account.Id);
                cmd.Parameters.AddWithValue("@amount", transaction.Amount);
                cmd.Parameters.AddWithValue("@created_at", transaction.CreatedAt);
                yield return cmd;
            }
        }

        private MySqlCommand GetIdempotencyKeyWriteCommand(MySqlConnection connection, IdempotencyShield idempotencyShield)
        {
            var cmd = new MySqlCommand(@"INSERT INTO tb_idempotency_keys
                                            (idempotency_key, account_id, created_at)
                                            VALUES (@idempotency_key, @account_id, @created_at)", connection);
            cmd.Parameters.AddWithValue("@idempotency_key", idempotencyShield.LastIdempotencyKey);
            cmd.Parameters.AddWithValue("@account_id", idempotencyShield.EntityKey);
            cmd.Parameters.AddWithValue("@created_at", idempotencyShield.UpdatedAt);
            return cmd;
        }
    }
}

Conclusões

O que ficou bom?

  • Mantivemos praticamente intactas as classes do nosso domínio, o que corrobora para um bom uso do Clean Architecture, visto que nossas mudanças se concentraram em aspectos técnicos e não de negócio. O que eu mudei no domínio foram alterações funcionais mesmo, para melhorar o exemplo.
  • Demos um grau maior de resiliência na aplicação, pois onde já se viu guardar dinheiro das pessoas apenas em memória? Esse tipo de funcionalidade precisa sim de uma persistência em disco!
  • Essa abordagem em separar a validação da idempotencia em camadas faz sentido se tivermos bastante ocorrência de reprocessamento. Se isso não ocorrer, ou for raro, não valeria a pena manter as chaves em memória, pois o impacto na aplicação seria irrelevante. Aqui vale monitorar a aplicação pra entender o seu comportamento e alinhar a estratégia de controle.

O que não ficou legal?

  • Dá um bom trabalho criar um storage personalizado dentro do Orleans, e eu me pergunto como poderíamos diminuir essa complexidade de código para deixar mais simples para que os times façam um uso recorrente dessa funcionalidade.

Pensamentos para próximas evoluções do exemplo

Com essa implementação, agora fica possível pensarmos em rotinas de tratamento de erros que consideram a reexecução das operações de crédito e débito sem acarretar em duplicidades. Esse ponto vamos deixar para um próximo post, bem como a explicação em detalhes de como criar um storage personalizado no Orleans (e quem sabe com uma evolução para se usar sharding no MySQL?)

Novamente deixo aberto o repositório do GitHub para comentários e discussões a respeito desses experimentos relacionados ao Orleans e ao conceito de actor model.

[]'s

Comentários

Postagens mais visitadas deste blog

Trocando configurações padrão do Live TIM

Uma proposta de Clean Architecure com Modelo de Atores

Testes automatizados em sistemas autenticados com certificados digitais, usando Selenium e PhantomJS