November 6, 2014

Stumbling on Code - Guava EventBus and GOOS

I've been re-reading Growing Object Oriented Software Guided By Tests (or GOOS as its commonly known) and was itching to try 'Notification Rather Than Logging' in my code. If you haven't heard or read the book, I highly recommend it. It is divided into three parts:

  • Where the authors explain the methodology and concepts they will be applying.
  • Where the authors code a sample application and explain every decision they take.
  • Where the authors outline some general patterns and 'best practices' to help us write better software.
My favorite parts are 1 & 3. Part 2 is a little too intense, and requires lots of concentration to try to keep some of the code in my head.

I'm trying to applying some of the recommendations made by the authors as I re-read the book and it so happened that the opportunity presented itself to apply one of those.

Enough pre-amble, lets get down to the nitty gritty.

Notification Rather Than Logging

We might all be familiar with code that is sprinkled with log statements:
//Do some business logic
if(logger.isInfoEnabled()) {
	logger.info("Log something");
}

//Do more business logic
if(logger.isInfoEnabled()) {
	logger.info("Log something else");
}

The log statements will at times blur the intent of the code that surrounds and also provide some testing challenges. It also violates the 'Single Responsibility Principle'. This code is now responsible to both log and perform some business transaction. If we want to add or remove log statements, we need to go into this code and change it.

One way to avoid this is by notifying that an event happened, instead of logging:

//Do some business transaction
publishMessage("Something Happened", objThatChanged)

This affords us much more flexibility. By publishing a message, we can add listeners for loggers, and any other part of our application that needs to perform an action based on that event. We have to be careful not to go too crazy with events though. I try to only implement listeners for those parts of the application that are far removed from the business domain where the event originated from.

Guava EventBus

We need some sort of message bus to try to implement this in our code. I opted for Guava's EventBus because it is lightweight and doesn't require any infrastructure setup.It also comes with an async event bus which is very handy when we have IO intensive operations that we want to defer.

We'll need an EventBus and a publisher and a subscriber. Instantiating an event bus is straightforward:

EventBus eventBus = new EventBus()

Publishing a message:

eventBus.post("A Message")

We can post an object and subscribe to messages of that specific type:

eventBus.post(anObjectInstance)


class Subscriber {
	Subscriber(EventBus eventBus) {
		eventBus.register(this)
	}

	@Subscribe
	void handleMessage(AType anObjectInstance) {
		println "An event was triggered"
	}
}

We'd want to abstract the act of publishing a message to the event bus or it might be just as bad as having a bunch of log statements lying around.

Publishing a message

We'll write some wrappers for the publisher and the message. In this example we will publish a message everytime we do something to an instance of Person.The message is wrapped in a PersonMessage which contains the 'action' that was executed and the instance on which it was executed.
class PersonMessage {
	private final Person person
	private final ACTION action

	PersonMessage(Person person, PersonMessage.ACTION action) {
		this.person = person
		this.action = action
	}

	Person getPerson() {
		person
	}

	ACTION getAction() {
		action
	}

	enum ACTION {
		CREATE, UPDATE, DELETE
	}
}

interface Publisher {
	void notifyChange()
}

class PersonPublisher implements Publisher {
	private final EventBus eventBus

	public PersonPublisher(EventBus eventBus) {
		this.eventBus = eventBus
	}

	@Override
	void notifyChange(PersonMessage message) {
		eventBus.post(message)
	}
}

We will then inject the publisher, instead of the eventBus, anywhere we want to publish a message. And publishing a message would look like this:

publisher.notifyChange(new PersonMessage(person: personInstance, action: PersonPublisher.ACTION.UPDATE))
 

Subscribing to a message

We can also wrap our subscribers. We should be careful to delegate what we want to do with a message to another object. This is going to assist us with testing later.
interface Subscriber {
	public void handleMessage(PersonMessage message)
}

class PersonSubscriber implements Subscriber {
	FictionalService fictionalService

	PersonSubscriber(EventBus eventBus, FictionalService fictionalService) {
		eventBus.register(this)
		this.fictionalService = fictionalService
	}

	@Override
	@Subscribe
	void handleMessage(PersonMessage message) {
		fictionalService.process(message)
	}
}

Testing a Publisher.

Testing a publisher is very easy. We only need to test that the post message was sent to an EventBus :
def "should publish a message to the event bus"(){
	given: "An event bus"
	def eventBus = Mock(EventBus)

	and: "A publisher"
	def publisher = new PersonPublisher(eventBus)

	and: "A Person Message"
	def personMessage = new PersonMessage(person: new Person(), action: PersonPublisher.ACTION.CREATE)

	when: "A message is published"
	publisher.notifyChange(personMessage)

	then: "The event bus received the message"
	1 * eventBus.post(_)
}

Testing a Subscriber.

The subscriber is tested in a very similar manner. We only need to ensure that the 'process' message was sent to our 'fictionalService':
def "message subscriber should delegate message to our fictional service"() {
	given: "An event bus"
	def eventBus = new EventBus()

	and: "A Fake Service"
	def FictionalService = Mock(FictionalService)

	and: "A Subscriber"
	def subscriber = new PersonSubscriber(eventBus, FictionalService)

	and: "A Person Message"
	def personMessage = new PersonMessage(person: new Person(), action: PersonPublisher.ACTION.CREATE)

	when: "A message is published"
	eventBus.post(personMessage)

	then: "Our publisher delegates the message to our service"
	1 * FictionalService.process(personMessage)
}

Async Event Bus

Guava also has an Async Event Bus (AsyncEventBus). This is handy when we need to do other time consuming transactions and we don't want to wait for themto finish. For example, in a web application, we don't want wait until a bunch of log statements are written to disk before we send back a response to theclient. Or if we need to do some more database updates that do not affect the value of the response in any way.

There are three differences when creating an AsyncEventBus compared to EventBus. First, it is a new type: AsyncEventBus. Secondly, we need to instantiate the AsyncEventBus with an executor:

new AsyncEventBus(Executors.newCachedThreadPool())

And the last difference is that we need to annotate our handler with @AllowConcurrentEvents:

@Subscribe
@AllowConcurrentEvents
void handleMessage(object) {
}
 

Testing AsyncEventBus

Testing an async process gets a little tricky because we can't determine at what time something is going to be executed and completed. The other difficulty I encounteredwas that when I had a method that was explicitly void, I was not able to stub that method. So considering these two limitations we can take another approach. We can createa double for our Subscriber and monkey patch our message. This will take care of our second limitation. In regards to the non-deterministic behavior of async calls, spockprovides a class called BlockingVariables that we can use to tell spock to explicitly wait for a period of time before checking our assertions. We can then add a field tothe BlockingVariables and check if it as modified in our assertion:
class DoubleSubscriber implements Subscriber {
	AsyncEventBus asyncEventBus

	DoubleSubscriber(AsyncEventBus asyncEventBus) {
		this.asyncEventBus = asyncEventBus
	}

	@Override
	@Subscribe
	@AllowConcurrentEvents
	void handleMessage(PersonMessage message) {
		message.test()
	}
}


class SubscriberSpec extends Specification {
	def "subscriber receives message"() {
		given: "An async event bus"
		AsyncEventBus eventBus = new AsyncEventBus(Executors.newCachedThreadPool())

		and:
		BlockingVariables vars = new BlockingVariables()
		new DoubleSubscriber(eventBus)

		and:
		def personMessage = new PersonMessage(person: new Person(), action: PersonPublisher.ACTION.CREATE)
		personMessage.metaClass.test = { 
			vars.result = true
		}

		when:
		eventBus.post(personMessage)

		then:
		vars.result

	}
}
This proves that subscribers to 'PersonMessage' will be able to listen to messages of type 'PersonMessage' published from anywhere in our application.

Conclusion

We've seen how we can use 'GOOS-Based' principles like 'Notification Rather Than Logging' by using an event bus. How this can make our code a little less cluttered bykeeping our methods and objects focused. Finally we used spock to test an async event bus and tried to work around the nuances of stubbing void methods by using alittle bit of metaprogramming and spock's BlockingVariables.

Errata

Peter Niederwieser and Rob Fletcher were kind enough to point out some errors and wrong assumptions. Rob Fletcherprovided a sample test that does work and is much better than what I came up with:
def "subscriber receives message"() {
    given: "An async event bus"
    def eventBus = new AsyncEventBus(Executors.newCachedThreadPool())
 
    and: "A Fake Service"
    def vars = new BlockingVariables()
    def fakeService = Stub(FictionalService) {
      process(_) >> { PersonMessage message -> vars.message = message }
    }
 
    and: "A Subscriber"
    def subscriber = new PersonSubscriber(eventBus, fakeService)
 
    and: "A Person Message"
    def personMessage = new PersonMessage(new Person(), PersonMessage.ACTION.CREATE)
 
    when: "A message is published"
    eventBus.post(personMessage)
 
    then: "Our publisher delegates the message to our service"
    vars.message == personMessage
  }

We don't need to write a double and take advantage of spock's Stub utility. Also, it apparently does make a difference if we stub the method using this syntax vs trying to open the stub again afterwards to add behavior to a method. An additional advantage is that we can explicitly test that the message we published was received.

For those interested, here is a sample github project with the tests for this blog post: https://github.com/uris77/spock-async-tests.

Tags: eventbus guava groovy async event bus goos