Class WeeniePubSubAmqp<SubMsgType, PubMsgType>

The Weenie PubSub Class

NOTE: This assumes a topic exchange, which covers all of the use-cases that any other exchange can cover.

Type Parameters

  • SubMsgType extends {
        key: string;
    }

    is the type of message that will be received by subscribers. This will typically be a union of all possible messages.

  • PubMsgType extends {
        key: string;
    }

    is the type of message that will be published by publishers. This will typically be a union of all of the messages that can be emitted by this particular service (which may or may not be the union of all possible system messages).

Hierarchy

Implements

Constructors

  • Type Parameters

    • SubMsgType extends {
          key: string;
      }

    • PubMsgType extends {
          key: string;
      }

    Parameters

    • publishingConfig: WeeniePublishingConfig
    • config: SimpleAmqpConfig
    • log: SimpleLoggerInterface
    • deps: {
          retry?: Retry;
          amqpConnect?: ((url, socketOptions?) => Promise<SimpleAmqpConnection>);
      }
      • Optional retry?: Retry
      • Optional amqpConnect?: ((url, socketOptions?) => Promise<SimpleAmqpConnection>)
          • (url, socketOptions?): Promise<SimpleAmqpConnection>
          • Parameters

            • url: string | SimpleAmqpConfig
            • Optional socketOptions: unknown

            Returns Promise<SimpleAmqpConnection>

    Returns WeeniePubSubAmqp<SubMsgType, PubMsgType>

Properties

publishingConfig: WeeniePublishingConfig
config: SimpleAmqpConfig
log: SimpleLoggerInterface

Accessors

  • get driver(): SimplePubSubAmqp<PubMsgType>
  • Returns SimplePubSubAmqp<PubMsgType>

  • get waiting(): boolean
  • Returns boolean

Methods

  • A subscription method used to subscribe a general handler that must accept ALL messages that the system can receive. This method allows you to provide routing keys with wildcards, but it can't offer the same type-safety as the subscribe method. Handlers passed to this method must be ready to receive ANY message.

    Parameters

    • routingKeys: string[]

      is an array of routing keys indicating which messages should arrive in this queue. (This enforces that the messages be on the same exchange that was indicated on initialization.) For example, if you initialized the class with the foo exchange and you want to subscribe to all messages published to that exchange, you would pass ['#'] here. If you want to subscribe to all messages starting with bar and baz, you might pass ['bar.#', 'baz.#'].

    • queueName: string

      is the name of the queue that will be created for this subscription.

    • handler: ((msg, attrs, log) => Promise<boolean>)

      is the function that will be called when the message is received. It will be passed the message itself, the message's attributes, and a logger tagged with the messages id.

        • (msg, attrs, log): Promise<boolean>
        • Parameters

          • msg: SubMsgType
          • attrs: AmqpExtra
          • log: SimpleLoggerInterface

          Returns Promise<boolean>

    • queueOpts: Omit<{
          name: string;
          exclusive?: boolean;
          durable?: boolean;
          autoDelete?: boolean;
      }, "name"> = {}

      are the options for the subscription. This allows you to set a few queue properties if you'd like

    Returns Promise<void>

    Example

    type Thing1Msg = { key: 'my-domain.did.thing1'; data: { foo: string } };
    type Thing2Msg = { key: 'my-domain.did.thing2'; data: { bar: number } };
    type Thing3Msg = { key: 'my-domain.did.thing3'; data: { baz: boolean } };

    // All the messages that we might receive
    type AllMessages = Thing1Msg | Thing2Msg | Thing3Msg;

    // A handler that handles two of these messages
    const handlerForAnyMessage: MessageHandler<AllMessages> = async (msg, attrs, log) => {
    // Do something...
    switch (msg.key) {
    case 'my-domain.did.thing1': {
    // ...
    }
    // ...
    }
    return true;
    }

    export const subscribe = (deps: { amqp: WeenieSubscriberInterface<AllMessages> }) => {
    // This subscription will receive all messages that start with 'my-domain.'
    await deps.amqp.subscribeAny(['my-domain.*'], 'my-queue', handlerForAnyMessage);
    }
  • A subscription method used to subscribe a handler for one or more specfic messages with a key or array of keys derived directly from that type.

    This is used to enforce type-safety in the (common) case in which you are subscribing a specific handler to a specific routing key.

    See also subscribeAny

    Type Parameters

    • MsgTypes extends {
          key: string;
      }

    Parameters

    • key: MsgTypes["key"] | MsgTypes["key"][]
    • queueName: string

      is the name of the queue that will be created for this subscription.

    • handler: MessageHandler<MsgTypes>

      is the function that will be called when the message is received. It will be passed the message itself, the message's attributes, and a logger tagged with the messages id.

    • queueOpts: Omit<{
          name: string;
          exclusive?: boolean;
          durable?: boolean;
          autoDelete?: boolean;
      }, "name"> = {}

      are the options for the subscription. This allows you to set a few queue properties if you'd like

    Returns Promise<void>

    Example

    type Thing1Msg = { key: 'my-domain.did.thing1'; data: { foo: string } };
    type Thing2Msg = { key: 'my-domain.did.thing2'; data: { bar: number } };
    type Thing3Msg = { key: 'my-domain.did.thing3'; data: { baz: boolean } };

    // A handler that handles two of these messages
    const handlerForSomeMessages: MessageHandler<Thing1Msg | Thing2Msg> = async (msg, attrs, log) => {
    // Do something...
    switch (msg.key) {
    case 'my-domain.did.thing1': {
    // ...
    }
    // ...
    }
    return true;
    }

    export const subscribe = (deps: { amqp: WeenieSubscriberInterface<AllMessages> }) => {
    await deps.amqp.subscribe(
    ['my-domain.did.thing1', 'my-domain.did.thing2'],
    'my-queue',
    handlerForSomeMessages,
    );
    }
  • Parameters

    • event: "error"
    • listener: ((e) => void)
        • (e): void
        • Parameters

          • e: Error

          Returns void

    Returns this

  • Parameters

    • event: "connect"
    • listener: (() => void)
        • (): void
        • Returns void

    Returns this

  • Parameters

    • event: "disconnect"
    • listener: (() => void)
        • (): void
        • Returns void

    Returns this

  • Parameters

    • event: "error"
    • listener: ((e) => void)
        • (e): void
        • Parameters

          • e: Error

          Returns void

    Returns this

  • Parameters

    • event: "connect"
    • listener: (() => void)
        • (): void
        • Returns void

    Returns this

  • Parameters

    • event: "disconnect"
    • listener: (() => void)
        • (): void
        • Returns void

    Returns this

  • Parameters

    • event: "error" | "connect" | "disconnect"
    • listener: (() => void)
        • (): void
        • Returns void

    Returns this

  • Parameters

    • Optional event: "error" | "connect" | "disconnect"

    Returns this

  • Returns Promise<unknown>

Generated using TypeDoc