LogoLogo
ChangelogGitHubTwitterGitter
v4.x
v4.x
  • Marble.js
  • Getting started
    • Installation
    • Quick setup
  • HTTP
    • Effects
    • Middlewares
    • Routing
    • Errors
    • Output
    • Context
    • Advanced
      • Logging
      • Validation
      • Server Events
      • Streaming
      • Continuous mode
  • Messaging
    • Core concepts
      • Events
      • Effects
    • Microservices
      • AMQP (RabbitMQ)
      • Redis Pub/Sub
    • CQRS
    • WebSockets
  • Testing
    • HTTP routes testing
  • Other
    • How does it glue together?
    • Migration guides
      • Migration from version 3.x
      • Migration from version 2.x
      • Migration from version 1.x
    • API reference
      • @marblejs/core
        • bindTo
        • bindEagerlyTo
        • createEvent
        • createContextToken
        • operator: matchEvent
        • operator: use
        • operator: act
      • @marblejs/http
        • httpListener
        • r.pipe
        • combineRoutes
        • createServer
      • @marblejs/messaging
        • eventBus
        • messagingClient
        • createMicroservice
        • reply
      • @marblejs/websockets
        • webSocketListener
        • operator: broadcast
        • operator: mapToServer
      • @marblejs/middleware-multipart
      • @marblejs/middleware-cors
      • @marblejs/middleware-io
      • @marblejs/middleware-logger
      • @marblejs/middleware-body
      • @marblejs-contrib/middleware-jwt
        • Token signing
      • @marblejs-contrib/middleware-joi
    • Style Guide
    • FAQ
Powered by GitBook
On this page
  • Processing
  • Error handling
  • Replying
  1. Messaging
  2. Core concepts

Effects

PreviousEventsNextMicroservices

Last updated 4 years ago

Marble.js messaging mental model represents incoming notifications as a stream of Events, that can be emitted either by an independent producer (client) or another consumer.

Processing

The module is built around similar abstractions, like: effect, middleware or output streams. The messaging effect it's just an extension of generic core Effect which is a function that maps a stream of incoming Events to another stream of outgoing Events, thus middleware and output models are a direct equivalent of basic effects.

MsgEffect :: Observable<Event> -> Observable<Event>

MsgOutputEffect :: Observable<Event> -> Observable<Event>

MsgMiddlewareEffect :: Observable<Event> -> Observable<Event>

MsgEffect when compared to their predecessor in form of HttpEffect is much more powerful tool, taking the possibilities to the next level. Once you're inside your Effect, use any Observable patterns you desire as long as any output from the final, returned stream, is an Event.

The marble diagram above can be translated into effect code as follows.

import { matchEvent } from '@marblejs/core';
import { MsgEffect } from '@marblejs/messaging';
import { mergeMap } from 'rxjs/operators';

const userCreated$: MsgEffect = event$ =>
  event$.pipe(
    matchEvent('USER_CREATED'),
    mergeMap(event => [
      { type: 'SEND_WELCOME_SMS_FOR_USER', payload: event.payload.id },
      { type: 'SEND_WELCOME_EMAIL_FOR_USER', payload: event.payload.id },
    ]),
  );

Keep in mind that messaging effects will always try to emit back the outgoing event. Mapping to the same input event will cause an infinite loop. You should always map the input event to different event type, or skip emitted values, eg. by ignoreElements operator, in case you don't want to output anything.

Error handling

Effects are built on top of observable streams provided by RxJS. They are listeners of observable streams that continue until an error or completion occurs. In order for effects to continue running in the event of an error in the observable, or completion of the observable stream, they must be nested within a "flattening" operator, such as mergeMap, concatMap, exhaustMap or other flattening operator. The snippet below shows the example error handling strategy for fetching an author.

import { matchEvent } from '@marblejs/core';
import { MsgEffect } from '@marblejs/messaging';
import { of } from 'rxjs';
import { catchError, map, mergeMap } from 'rxjs/operators';
import { pipe } from 'fp-ts/lib/pipeable';

const getUser$: MsgEffect = event$ =>
  event$.pipe(
    matchEvent('GET_USER'),
    mergeMap(event => pipe(
      getUser(event.payload.id),
      map(payload => ({ type: 'GET_USER_RESULT', payload })),
      catchError(error => of({
        type: 'GET_USER_ERROR',
        error: { name: error.name, message: error.message },
      })),
    )),
  );

Always remember to catch an error in disposable stream. If not, the event stream will be restarted, which means that every already processing event will be lost. Listener will try to resubscribe the stream automatically notifying the developer about the error.

import { act, matchEvent } from '@marblejs/core';
import { MsgEffect } from '@marblejs/messaging';
import { map } from 'rxjs/operators';
import { pipe } from 'fp-ts/lib/pipeable';

const getUser$: MsgEffect = event$ =>
  event$.pipe(
    matchEvent('GET_USER'),
    act(event => pipe(
      getUser(event.payload.id),
      map(payload => ({ type: 'GET_USER_RESULT', payload })),
    )),
  );

Replying

import { act, matchEvent } from '@marblejs/core';
import { reply, MsgEffect } from '@marblejs/messaging';
import { map } from 'rxjs/operators';
import { pipe } from 'fp-ts/lib/pipeable';

const getUser$: MsgEffect = event$ =>
  event$.pipe(
    matchEvent('GET_USER'),
    act(event => pipe(
      getUser(event.payload.id),
      map(payload => ({
        type: 'GET_USER_RESULT',
        payload,
        metadata: e.metadata, // { correlationId, replyTo }
      })),
    )),
  );
import { act, matchEvent } from '@marblejs/core';
import { reply, MsgEffect } from '@marblejs/messaging';
import { map } from 'rxjs/operators';
import { pipe } from 'fp-ts/lib/pipeable';

const getUser$: MsgEffect = event$ =>
  event$.pipe(
    matchEvent('GET_USER'),
    act(event => pipe(
      getUser(event.payload.id),
      map(payload => reply(event)({ type: 'GET_USER_RESULT', payload })),
    )),
  );

When replying to an incoming event, remember to define a different event type than the origin one. In some specific use cases there is a high risk that an emitted command (non RPC message) will produce an infinite loop.

Effects can define the matching against one or more events. Marble.js exposes a operator that is just an abstraction over RxJS filter operator, with additional functionalities and type-safety underneath.

To learn more about matchEvent operator please visit core .

Marble.js defines an operator that reduces the required boilerplate to minimum applying the error handling underneath the operator.

To learn more about act operator please visit core .

The previous example shows an effect that produces GET_USER_RESULT event without replying to producer. As mentioned in , event interface defines two additional metadata attributes: replyTo and correlationId. Typically when dealing with RPC (request-response) messaging, the event handler (effect) has to reply in case of success or error. In order to do that, effect has to combine the mapped outgoing event with incoming event metadata, passing the information about the correlation id that the producer will look for and the reply channel where the outgoing event should be routed to.

Again, in order to make the event processing more descriptive and easier to understand, Marble.js defines a function that makes the mapping more explicit.

To learn more about reply function please visit messaging

matchEvent
API documentation
act
API documentation
previous chapter
reply
API documentation.