A JMS C++ Topic Subscriber

Introduction

This example demonstrates how a C++ application can asynchronously receive messages that were published to a JMS Topic.

In this example, we're set up to receive stock quotes in two different message types: a TextMessage and a MapMessage. We use two different message types solely for the purpose of illustrating something other than TextMessages. In a real production application you would probably use one message type and save yourself message type checks and conditional casts. Please also see the corresponding C++ Topic Publisher example as well as the .NET examples.

The example relies on JunC++ion-generated C++ bindings for the JMS API. In the JMS Courier product, the JMS C++ bindings are bundled into an easy-to-use shared library that does not require you to use the code generator.

The source code

The following block contains the full application except for the init_jvm() procedure, which parses command line arguments and sets up some system properties based on command line input and a configuration file.

#include "java_lang_pkg.h"
#include "java_util_pkg.h"
#include "javax_naming_pkg.h"
#include "javax_jms_pkg.h"
#include "xmog_util.h"
#include <iostream>
#include <time.h>
#if (XMOG_WINDOWS==1)
#    include <conio.h> // for _kbhit()
#endif

using namespace std;


// implemented in the 'init_jvm.cpp' file;
// shared between all examples that involve the JMS API
void    init_jvm( int argc, char * argv[] );


///
/// @brief An asynchronous message listener implemented in C++.
///
/// This implementation is your blueprint for implementing JMS listeners
/// in C++:
///
/// - extend the special CB type 
/// - provide a method that exactly implements the callback method, 
///   including the xmog_localenv and xmog_flags arguments (which you
///   can ignore if you so wish)
///
class QuoteListener : public javax_jms_MessageListenerCB
{
private:

    int              numSymbols;
    String *         symbols;
    Class            clsMapMessage;
    Class            clsTextMessage;

public:

    QuoteListener( int _numSymbols, String * _symbols ) : javax_jms_MessageListenerCB(),
        numSymbols( _numSymbols ),
        symbols( _symbols )
    {
        clsMapMessage = Class::forName( "javax.jms.MapMessage" );
        clsTextMessage = Class::forName( "javax.jms.TextMessage" );
    }

    ///
    /// @brief  The callback invoked by the JMS implementation when a message is received
    ///         on a topic that we have registered for.
    ///
    void    onMessage( const Message & msg, xmog_localenv * env, xmog_flags flags )
    {
        // we handle two types of messages in this example:  
        //   
        //    1) TextMessages, which contain one quote tagged with a property indicating the symbol
        //
        //    2) MapMessages, which may contain several quotes as symbol/price pairs, with the
        //                    price being a Double.
        //
        // Also notice that we're using the optional 'env' argument here.  This is just
        // a performance enhancement and you don't need to do this in your implementation.
        // Explicitly passing the 'env' pointer (which we know to be non-null in a callback)
        // saves us a thread-local-storage lookup per API call.
        //
        if( clsTextMessage.isInstance( msg, env ) )
        {
            // it's a text message
            
            // we have to cast away the const-ness of the argument (which is required
            // to satisfy the calling contract)
            Message &        rmsg = const_cast<Message&>( msg );

            String    symbol = rmsg.getStringProperty( "SYMBOL" );

            // we "filter" by the symbols we're listening for;  this could of course
            // be done more efficiently by using a selector based on the property value.
            for( int i=0; i<numSymbols; i++ )
                if( symbols[ i ].equals( symbol ) )
                {
                    cout << symbols[ i ].to_charsUtf8( env ) << "=" << rmsg.toString().to_charsUtf8( env ) << endl;
                    break;
                }
        }
        else if( clsMapMessage.isInstance( msg ) )
        {
            // it's a map message

            MapMessage    mmsg = MapMessage::dyna_cast( msg );

            // check whether the map contains the symbols we're interested in
            for( int i=0; i<numSymbols; i++ )
            {
                // get the corresponding value
                Double    val = mmsg.getDouble( symbols[ i ] );

                if( val != null )
                    cout << symbols[ i ].to_charsUtf8( env ) << "=" << val.doubleValue() << endl;
            }
        }
        else
        {
            cerr << "ERROR: unexpected message type!" << endl;
        }
    }
};


///
/// @brief The quote publisher example
///
/// This is a heavily documented example that illustrates how to
/// subscribe to stock quotes to a topic via the JMS Courier C++ APIs.
///
/// @author    Alexander R Krapf
/// @version 1.0
///
int main( int argc, char * argv[] )
{
    if( argc < 2 )
    {
        cerr << "Usage: " << argv[ 0 ] << " [tickersymbol]+" << endl;
        cerr << "Usage: " << argv[ 0 ] << " CDMSH AAPL MSFT" << endl;
        return 1;
    }

    try
    {
        // set up the Java runtime environment (defined in 'init_jvm.cpp')
        //
        // This function has to at least initialize
        // - the classpath, 
        // - the JVM path (unless you wish to rely on a default JVM we might or might not find),
        // - the security manager
        // - the security policy file
        //
        // We also set up the JMS JNDI properties in this method.
        //
        // You can perform these steps in code or rely on a configuration file,
        // it's up to you.  All these cases are documented in the 'rtconfig' examples.
        //
        init_jvm( argc, argv );

        // the JNDI properties are assumed to be set up correctly at this point;
        // That all happens in the init_jvm() function, WHICH IS EXAMPLE CODE AND
        // DOES NOT MEAN THAT YOU HAVE TO DO THINGS THIS WAY!
        InitialContext            ictx( _use_java_ctor );

        // we're using TopicConnectionFactory::dyna_cast instead of (TopicConnectionFactory)
        // This is a peculiarity of the C++ framework: all casts have to be done
        // via the dyna_cast method.
        //
        // The C++ examples assume that the TopicConnectionFactory lookup name is in the system
        // properties at this point.  That all happens in the init_jvm() function, WHICH IS 
        // EXAMPLE CODE AND DOES NOT MEAN THAT YOU HAVE TO DO THINGS THIS WAY!
        // We look for the property "TCF" and give it the default value "TopicConnectionFactory" 
        // if we can't find it.
        String                    tcfName = System::getProperty( "TCF", "TopicConnectionFactory" );
        TopicConnectionFactory    tcf = TopicConnectionFactory::dyna_cast( ictx.lookup( tcfName ) );
        
        if ( tcf == null )
        {
            cerr << "TCF not found for lookup name '" << tcfName.to_chars() << "'" << endl;
            return -1;              
        }

        // create a topic connection
        // topic connections are relatively heavy objects, so don't create more than one
        // unless you know that you really have to.  It's better to work with multiple 
        // sessions instead.
        TopicConnection            tc = tcf.createTopicConnection();

        // create a session in which we publish (non-transacted, auto-acknowledging)
        // a session should never be shared by more than one thread. If you have more than
        // one thread you need more than one session.
        TopicSession            ts = tc.createTopicSession( false, Session::AUTO_ACKNOWLEDGE );

        // look up the topic name we want to use (set up in 'init_jvm')
        String                    topicName = System::getProperty( "TOPIC", "testTopic" );
        Topic                    topic = Topic::dyna_cast( ictx.lookup( topicName ) );

        // create the publisher for that topic
        TopicSubscriber            tsub = ts.createSubscriber( topic );

        cout << "Starting to listen for quotes on topic '" << topicName.to_chars() << "'" << endl;
        cout << "Hit a key to quit..." << endl;

        // here comes the business logic...

        // set up the ticker symbols we're interested in
        int                        numSymbols = argc - 1;
        String *                symbols = new String[ numSymbols ];

        for( int i=0; i<numSymbols; i++ )
        {
            // cache the string reference to improve performance
            symbols[ i ] = argv[ i + 1 ];
        }

        // register our message listener
        // we're keeping the main thread alive for the existence of our interest
        MessageListener    *        ml = new QuoteListener( numSymbols, symbols );

        tsub.setMessageListener( *ml );
        tc.start();

        // keep the application alive and test every 10th of a second for a 
        // keystroke (which then exits the keep-alive loop)
        while( !_kbhit() )
            xmog_util::sleep( 100 );

        // perform an orderly shutdown of the JMS runtime
        tc.stop();
        tsub.setMessageListener( null );
        ts.close();
        tc.close();

        cout << "Done!" << endl;

        return 0;
    }
    // you can write hierarchical exception catch blocks, just like in Java
    // and you have full access to the entire exception information and
    // functionality
    catch( NamingException & nex )
    {
        cerr << "JNDI Naming Exception: " << nex.toString().to_chars() << endl;
        
        return -1;
    }
    catch( Exception & e )
    {
        cerr << "Exception: " << e.toString().to_chars() << endl;

        return -1;
    }
    // it's always good to catch the C++ framework exception as well so
    // you can handle problems that don't have a corresponding Java 
    // exception
    catch( xmog_exception & xe )
    {
        char *    xe_msg = xe.get_message_chars();

        if( xe_msg )
        {
            cerr << "A framework exception was caught" << endl;
            cerr << xe_msg << endl;

            // don't forget to clean up the message string
            xmog_java_string::free( xe_msg );
        }

        return -1;
    }

    return 0;
}


Copyright 2006-2011 by Codemesh, Inc., ALL RIGHTS RESERVED

:
jms c++ topic subscriber
home products support customers partners newsroom about us contact us