Lidando com Idempotência dentro dos Atores
No post anterior, eu comecei a construção de uma aplicação que une os conceitos do Clean Architecture e o Actor Model. Entretanto, ela ainda não tratava vários aspectos essenciais para uma aplicação ser considerada resiliente.
Agora nesse segundo artigo, eu vou mostrar uma proposta para tratar a idempotência na aplicação, de forma com que possamos lidar com cenários de reprocessamento de mensagens com segurança. O código resultante está disponível em https://github.com/fabiogouw/OrleansBank/tree/3283d13eea84da34b8a7131c63cdc1ad8d186f63
O problema
Um problema que meu exemplo no post anterior trazia era que, caso houvesse um erro no processamento do débito e no crédito da transferência bancária, simplesmente as contas ficariam com seu estado inconsistente. Por exemplo, a conta debitada teria o dinheiro retirado, mas a conta que era para ser creditada nunca receberia esse valor. Isso numa aplicação no mundo real é inconcebível.
Uma possibilidade para corrigir esse comportamento aqui seria implementarmos um controle na execução das operações de crédito e débito, fazendo com que caso haja erro, uma nova tentativa de execução ocorra. Nesse caso, precisamos introduzir uma nova responsabilidade na classe de use case, que acumularia o papel de controlador do que já foi executado ou não.
Mas isso traz outro problema de consistência. O que acontece se eu efetuar o débito da conta e acontecer um erro bem no momento que eu marco que esse passo foi executado com sucesso? Se isso acontecer e a rotina entrar em modo de retentativa, esse débito vai ocorrer novamente, pois essas duas operações (debitar na conta e marcar o débito como ok no use case) não são tratadas de forma atômica (ou tudo ou nada). Tentamos resolver nosso problema de consistência introduzindo uma solução de retry, mas essa solução trouxe um novo problema de possibilidade de duplicação de operações. Como resolver isso?
A solução
Nesse caso a gente precisa abraçar essa característica de que um serviço pode receber a mesma execução repetidas vezes. Isso é inerente ao desafio de trabalhar com arquiteturas distribuídas. Nesse caso, uma operação precisa saber se ela já foi executada, de forma que uma eventual reexecução não impacte a consistência da aplicação. Operações que podem ser executadas inúmeras vezes com os mesmos parâmetros e mesmo assim não geram nenhum efeito colateral são chamadas de idempotentes.
Alguns tipos de operações já são idempotentes por padrão. Por exemplo, uma exclusão em banco de dados. Não importa quantas vezes seja executado o comando de DELETE, o estado final será sempre o mesmo, com o registro não existindo mais no banco de dados. Já operações que criam registros, como as do exemplo que criam novos registros de transações, não possuem essa característica nativamente. Precisamos então de alguma forma de marcar que determinada operação faz parte de um contexto de negócio onde ela deve ser encarada como única.
Quem define uma operação como única tem que ser o cliente, ou seja, quem está chamando a operação. Dessa forma, ele pode marcar essa operação com um código que identifica unicamente a operação de crédito e de débito que está sendo feita. Vamos chamar essa chave de "chave de idempotência" (nossa implementação da classe de domínio já considerava receber uma chave para identificar unicamente a transação, então vamos reutilizá-la aqui para tratar a idempotência).
Nesse sentido, para que essa característica seja considerada na nossa aplicação de exemplo, eu incluí duas abordagens complementares. A ideia é que todos os atores (grains) mantenham em seu estado (na memória) quais chaves de idempotência já foram aplicadas, de forma que se eles receberem uma operação repetida (ou seja, com a mesma chave), essa ação possa ser ignorada e tratada devidamente para não gerar uma duplicação de funcionalidade. Mas isso traz um problema: se ocorrerem muitas operações, teríamos que armazenar todas as chaves em memória, o que traria um maior consumo de recursos. A solução aqui foi guardarmos as chaves na memória até determinado limite e, conforme novas chaves forem chegando, as mais velhas sendo descartadas para dar lugar às novas.
Mas o que acontece se uma operação mais antiga, cuja chave já tenha sido descartada, eventualmente seja reexecutada? Nesse caso, o ator irá executar a funcionalidade toda, alterando o saldo e incluindo a transação, só que para garantirmos que não haja duplicação de registros, vamos também gravar no banco de dados a chave de idempotência em uma tabela separada, tudo de forma transacional. E caso essa chave já exista, quer dizer que podemos descartar o processamento e recuperar da base o estado atual da aplicação. Como a chance de uma chave antiga ser reprocessada é menor, teremos poucas ocorrências desse tipo e eu acredito que isso acabe compensando a não-validação da idempotência em memória.
Por isso a ideia do funil, onde temos que os atores conseguem lidar com a maior parte dos casos de processamentos repetidos (com a premissa que a chance de repetição de uma operação recente é maior). Caso eles não consigam, o banco de dados acaba garantindo a consistência.
Essa é a primeira vez que estamos considerando o armazenamento de estado dos atores em disco, para que seja possível recuperá-lo no futuro. Isso é feito utilizando a funcionalidade de storage do Orleans.
Primeira camada - chaves de idempotência no estado do atores
Na implementação, introduzimos uma nova classe chamada de IdempotencyShield: ela será responsável por armazenar as chaves de idempotência utilizadas e validar todas as operações, para verificar se elas já não foram executadas anteriormente.
namespace OrleansBank.Adapters.Storage
{
public class IdempotencyShield
{
private readonly Queue<string> _idempotencyKeys = new Queue<string>();
private readonly string _entityKey;
private readonly int _maxSize = 50;
private DateTime _updatedAt;
public string EntityKey
{
get { return _entityKey; }
}
public string LastIdempotencyKey
{
get { return _idempotencyKeys.LastOrDefault(); }
}
public DateTime UpdatedAt
{
get { return _updatedAt; }
}
public IdempotencyShield(string entityKey, IEnumerable<string> idempotencyKeys, int maxSize)
{
_entityKey = entityKey;
_idempotencyKeys = new Queue<string>(idempotencyKeys.Take(maxSize));
_maxSize = maxSize;
_updatedAt = DateTime.Now;
}
public bool CheckCommitedAction(string idempotencyKey)
{
return _idempotencyKeys.Contains(idempotencyKey);
}
public void CommitAction(string idempotencyKey)
{
_idempotencyKeys.Enqueue(idempotencyKey);
_updatedAt = DateTime.Now;
if (_idempotencyKeys.Count > _maxSize)
{
_idempotencyKeys.Dequeue();
}
}
}
}
using Orleans;
using Orleans.Providers;
using Orleans.Runtime;
using OrleansBank.Adapters.Storage;
using OrleansBank.Domain;
namespace OrleansBank.Adapters.Grain
{
[StorageProvider(ProviderName = "Accounts")]
public class AccountGrain : Grain<(IdempotencyShield shield, Account account)>, IAccountActor
{
public async Task<bool> MakeCredit(string uniqueId, double amount)
{
return await ExecuteIdempotently(uniqueId, () => State.account.MakeCredit(uniqueId, amount));
}
public async Task<bool> MakeDebit(string uniqueId, double amount)
{
return await ExecuteIdempotently(uniqueId, () => State.account.MakeDebit(uniqueId, amount));
}
private async Task<bool> ExecuteIdempotently(string idempotentyKey, Func<Task<bool>> func)
{
if(State.shield.CheckCommitedAction(idempotentyKey))
{
return true;
}
var result = await func();
State.shield.CommitAction(idempotentyKey);
try
{
await WriteStateAsync();
}
catch (OrleansException ex) when (ex.InnerException is IdempotencyFailureException)
{
// sync the state again, since the storage was the one that realised that the call was duplicated
await ReadStateAsync();
return true;
}
return result;
}
public Task<double> GetBalance()
{
return State.account.GetBalance();
}
}
}
- Se não estiver, então executa a lógica de negócio; e estando tudo certo, o novo estado é persistido em disco.
- Se já estiver, então trata-se de um caso de reprocessamento de algo que já foi persistido, e portando podemos retornar ok.
Segunda camada - tabela de idempotência
Se a nossa primeira camada de segurança de idempotência "falhar", ainda temos a segunda que irá garantir que nenhuma operação seja duplicada. Aqui essa implementação está muito próxima da funcionalidade de storage do Orleans. Aqui eu parti para a criação de um storage personalizado, com as tabelas abaixo fazendo parte do modelo.
Eu vou deixar para detalhar como se cria um storage personalizado no Orleans para outro post, mas o importante para saber aqui é que se trata de uma classe que é utilizada pelo framework Orleans, no momento que é solicitado uma ativação de um grain (onde seu estado é recuperado da base) através do método ReadStateAsync e também nas gravações do estado, quando chamamos o método WriteStateAsync. Estou usando uma base de dados MySQL, e a string de conexão é configurada nas propriedades do Orleans.
São três tabelas aqui:
- tb_account, que guarda o saldo atual da conta e quando foi a última vez que ele foi atualizado.
- tb_transactions, que guardam as transações dessa conta, que vão formando o saldo dela.
- tb_idempotency_keys, que guardam as chaves de idempotência das operações que foram aplicadas nessa conta.
using MySql.Data.MySqlClient;
using Orleans;
using Orleans.Runtime;
using Orleans.Storage;
using OrleansBank.Domain;
namespace OrleansBank.Adapters.Storage
{
public class IdempotentMySqlAccountStorage : IGrainStorage
{
// ...
public Task WriteStateAsync(string grainType, GrainReference grainReference, IGrainState grainState)
{
var state = ((IdempotencyShield shield, Account account)) grainState.State;
var connection = new MySqlConnection(_options.ConnectionString);
string newEtag = CalculareNewEtag(grainState.ETag);
var commands = new MySqlCommand[]
{
GetIdempotencyKeyWriteCommand(connection, state.shield),
GetAccountWriteCommand(connection, state.account, grainState.ETag, newEtag)
}
.Union(GetTransactionWriteCommands(connection, state.account.Transactions))
;
connection.Open();
try
{
var transaction = connection.BeginTransaction();
try
{
foreach (var command in commands)
{
command.Transaction = transaction;
int rowsAffected = command.ExecuteNonQuery();
// TODO: every command should affect at least one row, but is this the right exception type to use?
if (rowsAffected == 0)
{
throw new InconsistentStateException(@$"Version conflict (WriteState):
ProviderName={_storageName} GrainType={grainType} GrainReference={grainReference.ToKeyString()}.");
}
}
transaction.Commit();
RecreateState(grainState, state.account, newEtag);
}
catch (MySqlException ex)
{
transaction.Rollback();
throw CheckIdempotencyException(ex);
}
}
finally
{
connection?.Close();
}
return Task.CompletedTask;
}
private Exception CheckIdempotencyException(MySqlException ex)
{
if (ex.Number == (int)MySqlErrorCode.DuplicateKeyEntry && ex.Message.Contains("tb_idempotency_keys.PRIMARY"))
{
return new IdempotencyFailureException("account", ex);
}
return ex;
}
private static void RecreateState(IGrainState grainState, Account account, string newEtag)
{
grainState.ETag = newEtag;
var state = ((IdempotencyShield shield, Account account))grainState.State;
// after the update, we ignore the saved transactions and recreate the state with the new balance
state.account = new Account(account.Id, account.Balance, new List<Transaction>());
}
private static string CalculareNewEtag(string etag)
{
return (int.Parse(etag) + 1).ToString();
}
private MySqlCommand GetAccountWriteCommand(MySqlConnection connection, Account account, string currentEtag, string newEtag)
{
var cmd = new MySqlCommand(@"UPDATE tb_account
SET balance = @balance,
updated_at = @updated_at,
etag = @newEtag
WHERE account_id = @account_id
AND etag = @currentEtag", connection);
cmd.Parameters.AddWithValue("@account_id", account.Id);
cmd.Parameters.AddWithValue("@balance", account.Balance);
cmd.Parameters.AddWithValue("@updated_at", account.UpdatedAt);
cmd.Parameters.AddWithValue("@currentEtag", int.Parse(currentEtag));
cmd.Parameters.AddWithValue("@newEtag", int.Parse(newEtag));
return cmd;
}
private IEnumerable<MySqlCommand> GetTransactionWriteCommands(MySqlConnection connection, IEnumerable<Transaction> transactions)
{
foreach (var transaction in transactions)
{
var cmd = new MySqlCommand(@"INSERT INTO tb_transactions
(transaction_id, account_id, amount, created_at)
VALUES (@transaction_id, @account_id, @amount, @created_at)", connection);
cmd.Parameters.AddWithValue("@transaction_id", transaction.Id);
cmd.Parameters.AddWithValue("@account_id", transaction.Account.Id);
cmd.Parameters.AddWithValue("@amount", transaction.Amount);
cmd.Parameters.AddWithValue("@created_at", transaction.CreatedAt);
yield return cmd;
}
}
private MySqlCommand GetIdempotencyKeyWriteCommand(MySqlConnection connection, IdempotencyShield idempotencyShield)
{
var cmd = new MySqlCommand(@"INSERT INTO tb_idempotency_keys
(idempotency_key, account_id, created_at)
VALUES (@idempotency_key, @account_id, @created_at)", connection);
cmd.Parameters.AddWithValue("@idempotency_key", idempotencyShield.LastIdempotencyKey);
cmd.Parameters.AddWithValue("@account_id", idempotencyShield.EntityKey);
cmd.Parameters.AddWithValue("@created_at", idempotencyShield.UpdatedAt);
return cmd;
}
}
}
Conclusões
O que ficou bom?
- Mantivemos praticamente intactas as classes do nosso domínio, o que corrobora para um bom uso do Clean Architecture, visto que nossas mudanças se concentraram em aspectos técnicos e não de negócio. O que eu mudei no domínio foram alterações funcionais mesmo, para melhorar o exemplo.
- Demos um grau maior de resiliência na aplicação, pois onde já se viu guardar dinheiro das pessoas apenas em memória? Esse tipo de funcionalidade precisa sim de uma persistência em disco!
- Essa abordagem em separar a validação da idempotencia em camadas faz sentido se tivermos bastante ocorrência de reprocessamento. Se isso não ocorrer, ou for raro, não valeria a pena manter as chaves em memória, pois o impacto na aplicação seria irrelevante. Aqui vale monitorar a aplicação pra entender o seu comportamento e alinhar a estratégia de controle.
O que não ficou legal?
- Dá um bom trabalho criar um storage personalizado dentro do Orleans, e eu me pergunto como poderíamos diminuir essa complexidade de código para deixar mais simples para que os times façam um uso recorrente dessa funcionalidade.
Comentários
Postar um comentário