Return to table of contents

II. Core Messaging

2.5 Message Endpoints

As mentioned in the overview, Message Endpoints are responsible for connecting the various messaging components to channels. Over the next several chapters, you will see a number of different components that consume Messages. Some of these are also capable of sending reply Messages. Sending Messages is quite straightforward. As shown above in Message Channels, it’s easy to send a Message to a Message Channel. Whenever you create message component it’s wrapped with one of the consumer types. And there are two types of consumers: Polling Consumers and Event Driven Consumers.

2.5.1 Message Handler

MessageHandler interface is implemented by many of the components within the framework.
In other words, this is not part of the public API, and a developer would not typically implement MessageHandler directly.
Nevertheless, it is used by a Message Consumer for actually handling the consumed Messages, and so being aware of this strategy interface does help in terms of understanding the overall role of a consumer.

interface MessageHandler
{
    public function handle(Message $message) : void;
}

Despite its simplicity, this provides the foundation for most of the components that will be covered in the following chapters (Routers, Transformers, Splitters, Aggregators, Service Activators, etc).
Those components each perform very different functionality with the Messages they handle, but the requirements for actually receiving a Message are the same, and the choice between polling and event-driven behavior is also the same.
There are two endpoint implementations that host these callback-based handlers and allow them to be connected to Message Channels.

2.5.2 Event Driven Consumer

You may recall that the SubscribableChannel provides a subscribe() method and that the method accepts a MessageHandler parameter

    public function subscribe(MessageHandler $messageHandler) : void;

2.5.3 Pollable Consumer

Is currently in developing progress. As will involve spawning new PHP process to start polling messages from channel.

General idea is to not care about infrastructure details. So the final solution, will provide you with list of
consumers that are require to be run based on your configuration.
It will allow for generating configuration during Continues Integration for example.

2.5.4 Messaging Gateways

The primary purpose of a Gateway is to hide the messaging API provided by Integration Messaging. It allows your application’s business logic to be completely unaware of the Spring Integration API and using a generic Gateway, your code interacts instead with a simple interface, only. As you need to start messaging flow and values that comes from outside are not messages, you need to convert it. That’s the role of Gateway.

Gateway Proxy

As mentioned above, it would be great to have no dependency on the Integration Messaging API at all - including the gateway class.
Integration Messaging solution to it, is to generate proxy classes based on interface.

    /**
    * @MessageEndpointAnnotation()
    */
    interface OrderGateway
    {
        /**
         * @GatewayAnnotation(requestChannel="requestChannel", parameterConverters={
         *      @ParameterToPayloadAnnotation(parameterName="orderId")
         * })
         */
        public function buy(string $orderId) : void;
    }

Above interface will auto-register in your Dependency Injection Container under interface class name.
This will allow you to inject it anywhere as normal service (auto-wire will work out of the box).
If you need to register gateway under specific name you can add referenceName to MessageEndpointAnnotation

    @MessageEndpointAnnotation(referenceName="someReferenceName")

As you can see there is parameter called parameterConverters. This is part responsible conversion of outside parameters to message.

Parameter To Message Converter

There are three types of converters available for Gateway.

@ParameterToPayloadAnnotation(parameterName="orderId")
@ParameterToHeaderAnnotation(parameterName="userName", headerName="user")
@ParameterToStaticHeaderAnnotation(headerName="isAdmin", headerValue=true)
    @ParameterToPayloadAnnotation(parameterName="orderId")

Value that will be passed to parameter under orderId will be used as message’s payload.

    @ParameterToHeaderAnnotation(parameterName="userName", headerName="user")

Value that will be passed to parameter under userName, will be add to message under header name user

    ParameterToStaticHeaderAnnotation(headerName="isAdmin", headerValue=true)

Is different that converters above, because do not use parameter. It takes value under headerValue key and assign it to headerName. So in above result will be isAdmin header containing true.

The default message converter, when there is only one parameter is @ParameterToPayloadAnnotation

Request Channel

After conversion the message need to be send somewhere to begin the flow.
The message under requestChannel will be used as channel to send.

Void Return Type

    /**
    * @MessageEndpointAnnotation()
    */
    interface OrderGateway
    {
        /**
         * @GatewayAnnotation(requestChannel="requestChannel", parameterConverters={
         *      @ParameterToPayloadAnnotation(parameterName="orderId")
         * })
         */
        public function buy(string $orderId) : void;
    }

With Return Type

If return type if void, then message will goes through configured endpoints like transformers, service activators connected by message channels
and will not return for reply.

     /**
     * @MessageEndpointAnnotation()
     */
     interface OrderGateway
     {
         /**
          * @GatewayAnnotation(requestChannel="requestChannel", parameterConverters={
          *      @ParameterToPayloadAnnotation(parameterName="orderId")
          * })
          */
         public function getOrder(string $orderId) : Order;
     }

A Gateway will auto-create a temporary, anonymous reply channel in message headers, where it will listen for the reply.
This is taken from user, and thanks to that OrderGateway what happens during flow. It just knows, that last messaging component that will no have outputChannel defined, will use MessageHeaders::REPLY_CHANNEL, which is in fact reply channel defined by gateway.

In the above example payload of reply message will require to be Order, otherwise exception will be thrown.
If interface doesn’t return null by using question mark type hint ?Order, then exception will be thrown if there will be no reply.

Error Channel

          @GatewayAnnotation(
               errorChannel="errorChannel"
          )

If gateway method throws exception it will be rethrown by gateway, if no error channel is defined.
Otherwise the exception will be converted to ErrorMessage and send to defined channel. This will result in gateway method returning null, so it has to allow for nullable return values.

Transaction Factories

          @GatewayAnnotation(
               transactionFactories={"transactionService"}
          )

As gateway it starting point for the message flow, you may want to start transactions at that time.
To allow for that you need to register within your container Transaction service implementing TransactionFactory and implement Transaction for that factory.

    interface TransactionFactory
    {
        /**
         * Create a new transaction and associate it with current process
         *
         * @return Transaction
         */
        public function begin() : Transaction;
    }
    
    interface Transaction
    {
        // methods to implement
    }