Publishing domain events from aggregate roots

· June 9, 2017

Starting with Spring Data Ingalls release publishing domain events by aggregate roots becomes easier. Instead of leveraging Spring’s ApplicationEventPublisher you can use @DomainEvents annotation on a method of your aggregate root. Let’s look at an example.

class BankTransfer {
    
    @DomainEvents 
    Collection<Object> domainEvents() {
            // … return events you want to get published here
    }
    
    @AfterDomainEventsPublication 
    void callbackMethod() {
           // … potentially clean up domain events list
    }
}

The method annotated with @DomainEvents will be called if one of the save() methods of the Spring Data repository is called. The method can return a single event instance or a collection of events and cannot have any arguments. After all events have been published a method annotated with @AfterDomainEventsPublication is called.

Spring Data Commons provides a convenient base class (AbstractAggregateRoot) to help to register domain events and is using the publication mechanism implied by @DomainEvents and @AfterDomainEventsPublication

public class AbstractAggregateRoot {

	/**
	 * All domain events currently captured by the aggregate.
	 */
	@Getter(onMethod = @__(@DomainEvents)) //
	private transient final List<Object> domainEvents = new ArrayList<Object>();

	/**
	 * Registers the given event object for publication on a call to a Spring Data repository's save method.
	 * 
	 * @param event must not be {@literal null}.
	 * @return
	 */
	protected <T> T registerEvent(T event) {
		Assert.notNull(event, "Domain event must not be null!");
		this.domainEvents.add(event);
		return event;
	}

	/**
	 * Clears all domain events currently held. Usually invoked by the infrastructure in place in Spring Data
	 * repositories.
	 */
	@AfterDomainEventPublication
	public void clearDomainEvents() {
		this.domainEvents.clear();
	}
}

The @Getter(onMethod = @__(@DomainEvents)) Lombok construct makes sure that the @DomainEvents annotation is put on the generated getter method. Let’s modify the example to extend from AbstractAggregateRoot base class.

public class BankTransfer extends AbstractAggregateRoot {

   ...

    public BankTransfer complete() {
        id = UUID.randomUUID().toString();
        registerEvent(new BankTransferCompletedEvent(id));
        return this;
    }
    
    ...
}

In the example the BankTransfer aggregate root registers a BankTransferCompletedEvent when its complete method is called. The client calls this method in a transactional context saving the BankTransfer also via Spring Data Repository abstraction, which triggers the publication of BankTransferCompletedEvent event.

@Service
public class BankTransferService {

    ...
    
    @Transactional
    public String completeTransfer(BankTransfer bankTransfer) {
        return repository.save(bankTransfer.complete()).getId();
    }

    ...
}

On the event listener side by using TransactionalEventListener the event handler can be bound to a phase of the transaction which published the event. The typical use case is to handle the event when the transaction completed successfully, which is the default setting for @TransactionalEventListener. It can be further customized via the phase attribute.

@Service
public class BankTransferProcessor {

    ...
    
    @Async
    @TransactionalEventListener
    public void handleBankTransferCompletedEvent(BankTransferCompletedEvent event) {
        BankTransfer bankTransfer = repository.findById(event.getBankTransferId());
        bankTransfer.markStarted();
        bankTransfer = repository.save(bankTransfer);

        log.info("Starting to process bank transfer {}.", bankTransfer);

        try {
            Thread.sleep(5000);
        } catch (InterruptedException e) {
            throw new RuntimeException(e);
        }

        if (new Random().nextBoolean()) {
            bankTransfer.markCompleted();
        } else {
            bankTransfer.markFailed();
        }

        repository.save(bankTransfer);

        log.info("Finished processing bank transfer {}.", bankTransfer);
    }

}

In the example the @Async is used to return immediately the client without waiting for the processing of the bank transfer.

$ echo '{
            "from":"DE89 3704 0044 0532 0130 00",
            "to":"HU42 1177 3016 1111 1018 0000 0000",
            "amount":100.25
      }' | \
      http post :8080/bank-transfers
     
HTTP/1.1 201
Content-Type: application/json;charset=UTF-8
Date: Thu, 04 May 2017 13:54:12 GMT      
{
    "bankTransferId": "783b9b13-8424-4004-b59f-eef400d8a52c"
}      

Retrieving the details of the bank transfer:

$ http :8080/bank-transfers/783b9b13-8424-4004-b59f-eef400d8a52c

HTTP/1.1 200
Content-Type: application/json;charset=UTF-8
Date: Thu, 04 May 2017 13:55:33 GMT
Transfer-Encoding: chunked

{
    "bankTransferId": "783b9b13-8424-4004-b59f-eef400d8a52c",
    "from": "DE89 3704 0044 0532 0130 00",
    "to": "HU42 1177 3016 1111 1018 0000 0000",  
    "amount": 100.25,
    "status": "COMPLETED"
}

Twitter