Dark Mode

Skip to content

Navigation Menu

Sign in
Appearance settings

Search code, repositories, users, issues, pull requests...

Provide feedback

We read every piece of feedback, and take your input very seriously.

Saved searches

Use saved searches to filter your results more quickly

Sign up
Appearance settings

algoan/pubsub

Repository files navigation

PubSub

This is a generic PubSub Factory exposing a listen and a emit method.

NOTE: Today, only Google Cloud PubSub has been added.

Installation

npm install --save @algoan/pubsub

Usage

Google Cloud PubSub

Run tests

To run tests or to try the PubSubFactory class, you need to have a google account and have installed gcloud sdk.

Then, to install the Google PubSub simulator, run:

gcloud components install pubsub-emulator
gcloud components install beta
gcloud components update

Start tests running:

npm test

It will launch a Google PubSub emulator thanks to the google-pubsub-emulator library.

Example

To create a PubSub instance using Google Cloud:

) => { console.log(data.parsedData); // { foo: 'bar', time: {Date.now}, _eventName: 'some_topic' } // do whatever you want. The message has already been acknowledged }, onError: (error: Error) => { // Handle error as you wish } }); await pubsub.emit(topicName, { foo: 'bar' });">import { EmittedMessage, GCPubSub, PubSubFactory, Transport } from '@algoan/pubsub'

const pubsub: GCPubSub = PubSubFactory.create({
transport: Transport.GOOGLE_PUBSUB,
options: {
projectId: 'test',
// And all other Google PubSub properties
}
});
const topicName: string = 'some_topic';

await pubsub.listen(topicName, {
autoAck: true,
onMessage: (data: EmittedMessage<{foo: string}>) => {
console.log(data.parsedData); // { foo: 'bar', time: {Date.now}, _eventName: 'some_topic' }
// do whatever you want. The message has already been acknowledged
},
onError: (error: Error) => {
// Handle error as you wish
}
});

await pubsub.emit(topicName, { foo: 'bar' });

Contribution

Thank you for your future contribution Please follow these instructions before opening a pull request!

API

PubSubFactory.create({ transport, options })

The only static method from the PubSubFactory class. It initiates a new PubSub instance depending on the transport. By default, it connects to Google Cloud PubSub.

  • transport: PubSub technology to use. Only GOOGLE_PUBSUB is available for now.
  • options: Options related to the transport.
    • If transport === Transport.GOOGLE_PUBSUB, then have a look at the Google Cloud PubSub config client.
    • debug: Display logs if it is set to true. It uses a pino logger and pino-pretty if NODE_ENV is not equal to production.
    • pinoOptions: If debug is set to true, set the pino logger options. Default to level: debug and prettyPrint: true if NODE_ENV is not equal to production.
    • topicsPrefix: Add a prefix to all created topics. Example: topicsPrefix: 'my-topic', all topics will begin with my-topic+{your topic name}.
    • topicsSeparator: Customize separator between topicsPrefix and topic name. Example: topicsSeparator: '-', all topics will be {topic prefix}-{your topic name} (default to '+').
    • subscriptionsPrefix: Add a prefix to all created subscriptions. Example: subscriptionsPrefix: 'my-sub', all subscriptions will begin with my-sub%{your topic name}.
    • subscriptionsSeparator: Customize separator between subscriptionsPrefix and topic name. Example: subscriptionsSeparator: '-', all subscriptions will be {subscription prefix}-{your topic name} (default to '%').
    • namespace: Add a namespace property to Message attributes when publishing on a topic.
    • environment: Add a environment property to Message attributes when publishing on a topic.

pubsub.listen(event, opts)

Listen to a specific event.

NOTE: It only uses the Google Cloud subscription pull delivery for now.

  • event: Name of the event.
  • opts: Options related to the Listener method
    • onMessage: Method called when receiving a message
    • onError: Method called when an error occurs
    • options: Option related to the chosen transport

If the chosen transport is Google Cloud PubSub, then options would be:

  • autoAck: Automatically ACK an event as soon as it is received (default to true)
  • subscriptionOptions: Options related to the created Subscription:
    • name: Custom name for the subscription. Default: event (also equal to the topic name)
    • get: Options applied to the getSubscription method (have a look at Subscription options)
    • sub: Options applied to the subscription instance (see also setOptions method)
    • create: Options applied to the createSubscription method (have a look at Create Subscription options)
    • deadLetterTopicName: Per-subscription override for the dead-letter topic name (see Dead Letter Topics)
  • topicOptions: Options applied to the created topic (have a look at Topic options)
  • topicName: Set the topic name. By default, it uses the default name with a prefix.

pubsub.emit(event, payload, opts)

Emit a specific event with a payload. It added attributes in the message if you have added a namespace or an environment when setting the PubSubFactory class. It also adds an _eventName and a time property in the emitted payload.

  • event: Name of the event to emit.
  • payload: Payload to send. It will be buffered by Google, and then parsed by the listen method.
  • opts: Options related to the Emit method
    • metadata: Custom metadata added to the message
    • options: Option related to the chosen transport

If the chosen transport is Google Cloud PubSub, then options would be:

  • topicOptions: Options applied to the created topic (have a look at Topic options)
  • publishOptions: Publish options set to the topic after its creation. Refer to Publish Options
  • messageOptions: Additional message options added to the message. Refer to Message Options

pubsub.unsubscribe(event)

Stop the server connection for a given subscription.

  • event: Name of of the event to stop listening for.

Dead Letter Topics

When deadLetterOptions is set in the constructor options, the library automatically:

  1. Creates the dead-letter topic (if it does not exist)
  2. Creates a drain subscription on the dead-letter topic (to prevent GCP from discarding undelivered messages)
  3. Grants roles/pubsub.publisher on the dead-letter topic to the GCP Pub/Sub service account
  4. Grants roles/pubsub.subscriber on the source subscription to the GCP Pub/Sub service account
  5. Applies the deadLetterPolicy to the created subscription

This setup only happens once per new subscription -- reconnecting to an already-existing subscription incurs zero overhead.

Mode 1: Per-subscription isolated dead-letter topics (recommended for multi-consumer systems)

Omit deadLetterTopicName. Each subscription automatically gets its own -deadletter topic, so failed messages from different consumers are isolated and can be replayed independently.

const pubsub: GCPubSub = PubSubFactory.create({
transport: Transport.GOOGLE_PUBSUB,
options: {
projectId: 'my-project',
deadLetterOptions: {
maxDeliveryAttempts: 10, // optional, defaults to 5
},
},
});

// Each listen() call creates its own isolated dead-letter topic:
// - "my-orders-sub-deadletter" for the first subscription
// - "my-payments-sub-deadletter" for the second
await pubsub.listen('my-orders-sub');
await pubsub.listen('my-payments-sub');

Mode 2: Shared dead-letter topic (single DLT for all subscriptions)

Provide deadLetterTopicName to route all failed messages to one shared topic. The topic must already exist in GCP.

const pubsub: GCPubSub = PubSubFactory.create({
transport: Transport.GOOGLE_PUBSUB,
options: {
projectId: 'my-project',
deadLetterOptions: {
deadLetterTopicName: 'projects/my-project/topics/my-dead-letter', // fully-qualified or short name
maxDeliveryAttempts: 5,
},
},
});

await pubsub.listen('my-orders-sub'); // routes to "my-dead-letter"
await pubsub.listen('my-payments-sub'); // routes to "my-dead-letter"

Mode 3: Per-subscription override

Override the dead-letter topic for a specific subscription via subscriptionOptions.deadLetterTopicName. Falls back to the instance-level deadLetterTopicName, then auto-derives if neither is set.

await pubsub.listen('my-special-sub', {
options: {
subscriptionOptions: {
deadLetterTopicName: 'projects/my-project/topics/special-dead-letter',
},
},
});

No dead-letter topic

If deadLetterOptions is not set, no dead-letter resources are created and no IAM calls are made -- fully backward compatible.

const pubsub: GCPubSub = PubSubFactory.create({
transport: Transport.GOOGLE_PUBSUB,
options: {
projectId: 'my-project',
// no deadLetterOptions -- existing behavior unchanged
},
});

About

A simple PubSub factory method exposing a listen-emit pattern

Topics

Resources

Readme

Contributing

Contributing

Stars

Watchers

Forks

Packages

Contributors

Languages