MessageBus to the rescue

In this post I am going to explain a simple way of messaging without making use of rxjs within an angular application. The tiny MessageBus class that will be described might be more convenient to folks with an object oriented programming background.

The MessageBase class

This class defines the base implementation of a Message that our MessageBus accepts. It forces the creator of a message to implement the getType() - method where we define a unique key for each message type.

1
2
3
export abstract class MessageBase {
public abstract getType(): string;
}

The IMessageSubscriber interface

This interface allows a particular angular service class to become a subscriber for a message type.

Within the getType() - method we expose the unique key for the message we are interested in.

The onMessage(message: T) - method will be called once the subscriber is receiving a message.

1
2
3
4
export interface IMessageSubscriber<T extends MessageBase> {
onMessage(message: T): void;
getType(): string;
}

The MessageBus

Our simple MessageBus looks like the following and makes use of the above types.

The subscribe<T extends MessageBase>(...) - method allows to register all kind of subscribers.

The publish<T extends MessageBase>(msg: T) - method will be called if we need to publish a message. In that case the MessageBus will look within its private subscriber registry which instances should be notified and calls the onMessage(T) - method.

Simple as that…!

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
@Injectable()
export class MessageBus {
private _subscribers: { [id: string]: IMessageSubscriber<MessageBase>; };

public constructor() {
this._subscribers = {};
}

public subscribe<T extends MessageBase>(subscriber: IMessageSubscriber<T>): void {
this._subscribers[subscriber.getType()] = subscriber;
}

public publish<T extends MessageBase>(msg: T): void {
const subscribersForMessage = this.findSubscribers<T>(msg);

if (subscribersForMessage.length === 0) {
return;
}

subscribersForMessage.forEach((subscriberForMessage: IMessageSubscriber<T>) => {
subscriberForMessage.onMessage(message);
});
}

private findSubscribers<T extends MessageBase>(msg: T): Array<IMessageSubscriber<T>> {
const subscribers = new Array<IMessageSubscriber<T>>();

for (const key in this._subscribers) {
if (this._subscribers.hasOwnProperty(key)) {
const subscriber = this._subscribers[key];
if (subscriber.getType() === msg.getType()) {
subscribers.push(subscriber);
}
}
}

return subscribers;
}
}

The MessageBus in action

Before we are going to examine the MessageBus in action let me shortly explain some rules I follow:

In my applications that I write only real angular services will eventually implement the IMessageSubscriber interface. I don’t do that with components because exchanging data between loosely coupled NgModules require some boundaries I think and for me this is the logic layer where I place all the service related functionality.

So now with that rule in mind let’s see the MessageBus in action while looking at a unit test.

First we define a TestMessage which extends from the MessageBase class. We define a public string property data. This represents our data. In a real world scenario you would probably use a more representative object but for clarification purpose it should do the job.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
export class TestMessage extends MessageBase {
public static KEY = 'TestMessage';
public data: string;

public constructor(data: string) {
super();

this.data = data;
}

public messageKey(): string {
return TestMessage.KEY;
}
}

As a second step we create the TestMessageSubscriber - service that is interested in TestMessage - messages. In this simple service we declare a public string property receivedData as our state that gets assigned when receiving a TestMessage.

1
2
3
4
5
6
7
8
9
10
11
export class TestMessageSubscriber implements IMessageSubscriber<TestMessage> {
public receivedData: string = 'subscriber';

public onMessage(message: TestMessage) {
this.receivedData = message.data;
}

public getMessageKey(): string {
return TestMessage.KEY;
}
}

Now that we have basically everything ready let’s test the MessageBus main functionality: Publishing messages!

In the arrange section we see that we instantiate our subscriber and let the MessageBus know about it in calling the subscribe(...) method.

Then we create an instance of our TestMessage and publish it.

The result should be that our subscriber’s receivedData property will have the value of our TestMessage.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
it('should publish to the subscriber',
inject([MessageBus],
fakeAsync((messageBus: MessageBus) => {
// Arrange
let subscriber = new TestMessageSubscriber();
messageBus.subscribe(subscriber);
let data = 'hello from unit test';
let message = new TestMessage(data);

// Act
messageBus.publish(message);

// Assert
expect(subscriber.receivedData).toBeDefined();
expect(subscriber.receivedData).toBe(data);
}))
);

So now you might wonder yourself how and more importantly where in an angular app you can register the subscribers?

The easiest way is to make a subscriber dependent on the MessageBus and subscribe itself within the constructor body.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
export class TestMessageSubscriber implements IMessageSubscriber<TestMessage> {
public receivedData: string = 'subscriber';

public constructor(
messageBus: MessageBus
) {
messageBus.subscribe(this);
}

public onMessage(message: TestMessage) {
this.receivedData = message.data;
}

public getMessageKey(): string {
return TestMessage.KEY;
}
}

You can use this option if you know for sure that the service acts somewhere as a dependency within your application. If not use option two.

The second option is to register a subscriber in a service’s constructor body that is itself a dependency (I call that the host service).

1
2
3
4
5
6
7
8
9
10
11
12
13
@Injectable()
export class SuperService {
...

public constructor(
testMessageSubscriber: TestMessageSubscriber,
messageBus: MessageBus
) {
messageBus.subscribe(testMessageSubscriber);
}

...
}

The problem you will face from time to time is that some services aren’t always part of angular’s bootstrapping mechanism and won’t therefor being instantiated. That’s why I came up with the second option.

As a side note this option should also be possible with constructor injection in an NgModule.

Benefits

So what - you might think. Here is the main benefit in my opinion.

This simple MessageBus leverages the typescript generics feature and provides a typed message to a subscriber whereas doing it reactive we need to filter for MessageBase - types.

Maybe it is possible doing it with rxjs but I am just not there yet…

Maybe you are?

Let me know… Thomas