A JMS C++ Topic Publisher

Introduction

This example demonstrates how you can publish messages to a JMS Topic from a C++ application. Asynchronous messaging is very much a peer-to-peer technology that can make it hard to distinguish between client and server usage. Your published messages could be client replies or they could be server publications, it totally depends on your application.

In this example, we're publishing stock quotes using two different message types: a TextMessage and a MapMessage. We use two different message types solely for the purpose of illustrating something other than TextMessages. In a real production application you would probably use one message type and save yourself message type checks and conditional casts. Please also see the corresponding C++ Topic Subscriber example as well as the .NET examples.

The example relies on JunC++ion-generated C++ bindings for the JMS API. In the JMS Courier product, the JMS C++ bindings are bundled into an easy-to-use shared library that does not require you to use the code generator.

The source code

The following block contains the full application except for the init_jvm() procedure, which parses command line arguments and sets up some system properties based on command line input and a configuration file.

#include "java_lang_pkg.h"
#include "java_util_pkg.h"
#include "javax_naming_pkg.h"
#include "javax_jms_pkg.h"
#include "xmog_util.h" // for sleep()
#include <iostream>
#include <time.h>
#if (XMOG_WINDOWS==1)
#    include <conio.h> // for _kbhit()
#endif

using namespace std;


// implemented in the 'init_jvm.cpp' file;
// shared between all examples that involve the JMS API
void    init_jvm( int argc, char * argv[] );


///
/// @brief The quote publisher example
///
/// This is a heavily documented example that illustrates how to
/// publish stock quotes to a topic via the JMS Courier C++ APIs.
///
/// @author    Alexander R Krapf
/// @version 1.0
///
int main( int argc, char * argv[] )
{
    if( argc < 2 )
    {
        cerr << "Usage: " << argv[ 0 ] << " [tickersymbol]+" << endl;
        cerr << "Usage: " << argv[ 0 ] << " CDMSH AAPL MSFT" << endl;
        return 1;
    }

    try
    {
        // set up the Java runtime environment (defined in 'init_jvm.cpp')
        //
        // This function has to at least initialize
        // - the classpath, 
        // - the JVM path (unless you wish to rely on a default JVM we might or might not find),
        // - the security manager
        // - the security policy file
        //
        // We also set up the JMS JNDI properties in this method.
        //
        // You can perform these steps in code or rely on a configuration file,
        // it's up to you.  All these cases are documented in the 'rtconfig' examples.
        //
        init_jvm( argc, argv );

        // the JNDI properties are assumed to be set up correctly at this point;
        // That all happens in the init_jvm() function, WHICH IS EXAMPLE CODE AND
        // DOES NOT MEAN THAT YOU HAVE TO DO THINGS THIS WAY!
        InitialContext            ictx( _use_java_ctor );

        // we're using TopicConnectionFactory::dyna_cast instead of (TopicConnectionFactory)
        // This is a peculiarity of the C++ framework: all casts have to be done
        // via the dyna_cast method.
        //
        // The C++ examples assume that the TopicConnectionFactory lookup name is in the system
        // properties at this point.  That all happens in the init_jvm() function, WHICH IS 
        // EXAMPLE CODE AND DOES NOT MEAN THAT YOU HAVE TO DO THINGS THIS WAY!
        // We look for the property "TCF" and give it the default value "TopicConnectionFactory" 
        // if we can't find it.
        String                    tcfName = 
                                      System::getProperty( "TCF", "TopicConnectionFactory" );
        TopicConnectionFactory    tcf = 
                                      TopicConnectionFactory::dyna_cast( ictx.lookup( tcfName ) );
        
        if ( tcf == null )
        {
            cerr << "TCF not found for lookup name '" << tcfName.to_chars() << "'" << endl;
            return -1;              
        }

        // create a topic connection
        // topic connections are relatively heavy objects, so don't create more than one
        // unless you know that you really have to.  It's better to work with multiple 
        // sessions instead.
        TopicConnection           tc = tcf.createTopicConnection();

        // create a session in which we publish (non-transacted, auto-acknowledging)
        // a session should never be shared by more than one thread. If you have more than
        // one thread you need more than one session.
        TopicSession              ts = tc.createTopicSession( false, Session::AUTO_ACKNOWLEDGE );

        // look up the topic name we want to use (set up in 'init_jvm')
        String                    topicName = System::getProperty( "TOPIC", "testTopic" );
        Topic                     topic = Topic::dyna_cast( ictx.lookup( topicName ) );

        // create the publisher for that topic
        TopicPublisher            tp = ts.createPublisher( topic );

        cout << "Starting to publish quotes to topic '" << topicName.to_chars() << "'" << endl;
        cout << "Hit a key to quit..." << endl;

        // here comes the business logic...

        // seed the random number generator
        srand( (unsigned)time( NULL ) );

        // set up the ticker symbols and quotes
        int                     numSymbols = argc - 1;
        String *                symbols = new String[ numSymbols ];
        double *                values = new double[ numSymbols ];

        for( int i=0; i<numSymbols; i++ )
        {
            // cache the string reference to improve performance
            symbols[ i ] = argv[ i + 1 ];
            // a random starting value
            values[ i ] = ( (double)rand() / (double)RAND_MAX ) * 100;
        }

        tc.start();

        // the publisher will continue until you hit a key
        while( !_kbhit() )
        {
            // we randomly pick whether or not we send MapMessages or TextMessages
            bool    bSendMapMsg = ( rand() < RAND_MAX / 2 );

            if( bSendMapMsg )
            {
                // if we picked map messages, we have the session create one
                MapMessage        msg = ts.createMapMessage();

                // for every stock symbol, calculate a random price 
                for( int i=0; i<numSymbols; i++ )
                {
                    msg.setDouble( symbols[ i ], values[ i ] );
                    values[ i ] += ( (double)rand() / (double)RAND_MAX ) * 2 - 1;
                }

                tp.publish( msg, DeliveryMode::NON_PERSISTENT, 1, 5 );
                cout << "Published map message: " << msg.toString().to_chars() << endl;
            }
            else
            {
                TextMessage        msg = ts.createTextMessage();
                char            buffer[ 16 ];

                for( int i=0; i<numSymbols; i++ )
                {
                    // sets a message property containing the stock symbol
                    // this allows property-based filtering on the receiver side
                    msg.setStringProperty( "SYMBOL", symbols[ i ] );
                    sprintf( buffer, "%3.3f", values[ i ] );
                    values[ i ] += ( (double)rand() / (double)RAND_MAX ) * 2 - 1;
                    msg.setText( buffer );

                    tp.publish( msg, DeliveryMode::NON_PERSISTENT, 1, 5 );
                    cout << "Published text message: " << symbols[ i ].to_charsUtf8() 
                         << "=" << msg.toString().to_chars() << endl;
                }
            }

            // random delay of up to 500ms
            // COMMENT CODE IF YOU WISH TO PUBLISH AS QUICKLY AS POSSIBLY
            xmog_util::sleep( (int)( ( (double)rand() / (double)RAND_MAX ) * 500 ) );
        }

        // orderly shutdown processing
        tc.stop();
        ts.close();
        tc.close();

        cout << "Done publishing!" << endl;

        return 0;
    }
    // you can write hierarchical exception catch blocks, just like in Java
    // and you have full access to the entire exception information and
    // functionality
    catch( NamingException & nex )
    {
        cerr << "JNDI Naming Exception: " << nex.toString().to_chars() << endl;
        
        return -1;
    }
    catch( Exception & e )
    {
        cerr << "Exception: " << e.toString().to_chars() << endl;

        return -1;
    }
    // it's always good to catch the C++ framework exception as well so
    // you can handle problems that don't have a corresponding Java 
    // exception
    catch( xmog_exception & xe )
    {
        char *    xe_msg = xe.get_message_chars();

        if( xe_msg )
        {
            cerr << "A framework exception was caught" << endl;
            cerr << xe_msg << endl;

            // don't forget to clean up the message string
            xmog_java_string::free( xe_msg );
        }

        return -1;
    }

    return 0;
}


Copyright 2006-2011 by Codemesh, Inc., ALL RIGHTS RESERVED

:
jms c++ topic publisher
home products support customers partners newsroom about us contact us