RxJS Explained with Vega Chart visualization

According to the official documentation: “RxJS is a library for composing asynchronous and event-based programs by using observable sequences.” There are lots of articles and definitions around the web related to async programming, but still challenges developers at the beginning. In my opinion RxJS is more than a library, it is a way of thinking. The demo I will create in this article will be simple to give a full picture of event based programming and will cover some easy operations.

Demo Description

Let’s suppose we have a chart with 10 categories of data from A -> H and we also have 10 buttons corresponding to each of the chart bars. When each of the buttons is clicked we would like to update the chart. Each of the bars in the chart will show the number of times the corresponding button was clicked. This will be our end result:

In order to implement this, we need RxJS, Vega Grammar which is a declarative format for creating interactive visualizations and Angular.

First step is to install vega using

npm install vega

and then include inside index.html in your angular project. For this project I have been using the bar chart configuration from Vega Examples. In the chart component there are two functions: parsing the vega configuration and then updating the data. This is the implementation:

declare var vega: any; 
@Component({
selector: 'app-chart',
templateUrl: './chart.component.html',
styleUrls: ['./chart.component.scss']
})
export class ChartComponent implements OnInit {

view: View;
@Input() set data(values: ChartDataModel[]){
if (!this.view) {
return;
}
this.view.remove('table', this.view.data('table')).run(); this.view.insert('table', values).run();
}
constructor(private http: HttpClient) { } ngOnInit() {
this.http.get('../assets/chart-config.json')
.subscribe(s => this.vegaInit(s)); }
public vegaInit(spec: any): any {
this.view = new vega.View(vega.parse(spec))
.renderer('svg')
.initialize('#chart')
.width(400)
.height(200)
.hover()
.run();
}
}

Every time the user clicks one of the buttons on top, the chart should update the values and increment the corresponding bar by one. The button clicks will be a sequence of values over time. At this moment we understand that we will need a subscription to manage the values.

this.subscription$ = fromEvent(document, 'click').subscribe(e => console.log('E:', e));

From RxJS documentation a subscription is an object that represents a disposable resource, usually the execution of an Observable. In our case, the subscription represents the documents click. Every time we click in the document we will see the message we print in the console. It is very important to unsubscribe on ngOnDestroy lifecycle hook, so that we do not cause a memory leak.

At this point we can track every click, but according to our requirements we are looking only for clicks made at one of the buttons, so we will need to manage the clicks. I have defined the list of the buttons inside the app component as below:

buttons = [ 
{id: 0, title: 'A'}, {id: 1, title: 'B'},
{id: 2, title: 'C'}, {id: 3, title: 'D'},
{id: 4, title: 'E'}, {id: 5, title: 'F'},
{id: 6, title: 'G'}, {id: 7, title: 'H'} ];

We can filter the clicks inside the subscribe function, but in order to take advantage of RxJS operators and follow the recommended guide we will need to use pipe. Pipe is useful because it separates the streaming operation from the core functionality. We are looking to emit only those events that satisfy our condition: one of A -> H button click and for this reason filter operator is what we are looking for. Later on we will enrich the operations we will apply to our data stream.

ngOnInit() { 
this.subscription$ = fromEvent(document, 'click')
.pipe( filter(event => this.isBtnIncrementEvent(event)))
.subscribe(e => console.log('E:', e)); }
private isBtnIncrementEvent(event: Event): boolean {
const idClicked = event.target['id'];
return idClicked && idClicked >= 0 && idClicked <= 7;
}

Since our buttons have ids ranging from 0 to 7, by filtering the incoming clicks we now have only the clicks we need to update the chart.

Next step is to add a delay after each button click, so if the user is clicking multiple times the same button, we would like to update the chart values after 100ms that the user last clicked a button. This is just a case so that we can better demonstrate the RxJS operators. For this case we are looking at debounceTime operator.

ngOnInit() { 
this.subscription$ = fromEvent(document, 'click')
.pipe(
filter(event => this.isBtnIncrementEvent(event)),
debounceTime(100))
.subscribe(e => console.log('E: ', e)); }

If you run some tests for this case, you will see that the message in the console will be printed after 100 ms after you last clicked a button.

At this point we successfully get the event when a button is clicked. Next step is to update the chart data. Inside the app component I have defined the chart data as below:

chartData$: Subject<ChartDataModel[]> = new Subject([ 
{category: 'A', count: 0}, {category: 'B', count: 0},
{category: 'C', count: 0}, {category: 'D', count: 0},
{category: 'E', count: 0}, {category: 'F', count: 0},
{category: 'G', count: 0}, {category: 'H', count: 0}
]);

I am passing this data to the chart component through binding:

From the incoming click event, we are interested only on the id of the button click, which will correspond to the index of chart data that we will increment. We are looking to project each incoming event to its target id and for this case we will need the map operator.

ngOnInit() { 
this.subscription$ = fromEvent(document, 'click')
.pipe(
filter(event => this.isBtnIncrementEvent(event)), debounceTime(100),
map((event: Event) => event.target['id']) )
.subscribe(e => console.log('E: ', e)); }

If we go and check the console, we will notice that the message it will be printed is now a number, rather than an event that we saw before. Let’s say user clicked button B, now we can access id 1 which is the index of the chart data that we will have to modify.

I will use the tap operator to perform the side effect, which is updating the values we will pass to the chart. Tap operator will not modify the observable, so we will still see printed in the browser the index of the button that we get from the map operator.

ngOnInit() { 
this.subscription$ = fromEvent(document, 'click')
.pipe(
filter(event => this.isBtnIncrementEvent(event)),
debounceTime(100),
map((event: Event) => event.target['id']),
tap((index: number) => this.updateChartData(index)))
.subscribe(e => console.log('Result: ', e));
}
private updateChartData(index: number): void {
const values = this.chartData$.value;
const newCount = { ...values[index], count: +values[index].count}; values[index] = newCount; this.chartData$.next([...values]);
}

So this is the function that will update the chartData$ and automatically will update the chart component. If you go and test it yourself, you will see that the chart that I displayed in the beginning of this article is now correctly implemented, after each button click the correspondent bar will grow.

To sum up, I would say that when you have to deal with streams of data RxJS provides a wide range of operators, subjects, promises ect that will help you modify/update or create new streams from the incoming data in a clean and efficient way.

Originally published at http://agdev.tech on February 16, 2021.

Get the Medium app

A button that says 'Download on the App Store', and if clicked it will lead you to the iOS App store
A button that says 'Get it on, Google Play', and if clicked it will lead you to the Google Play store