A JMS C++ Topic Subscriber
Introduction
This example demonstrates how a C++ application can asynchronously receive messages that were published to a JMS Topic.
In this example, we're set up to receive stock quotes in two different message types: a TextMessage and a MapMessage. We use two different message types solely for the purpose of illustrating something other than TextMessages. In a real production application you would probably use one message type and save yourself message type checks and conditional casts. Please also see the corresponding C++ Topic Publisher example as well as the .NET examples.
The example relies on JunC++ion-generated C++ bindings for the JMS API. In the JMS Courier product, the JMS C++ bindings are bundled into an easy-to-use shared library that does not require you to use the code generator.
The source code
The following block contains the full application except for the init_jvm() procedure, which parses command line arguments and sets up some system properties based on command line input and a configuration file.
#include "java_lang_pkg.h" #include "java_util_pkg.h" #include "javax_naming_pkg.h" #include "javax_jms_pkg.h" #include "xmog_util.h" #include <iostream> #include <time.h> #if (XMOG_WINDOWS==1) # include <conio.h> // for _kbhit() #endif using namespace std; // implemented in the 'init_jvm.cpp' file; // shared between all examples that involve the JMS API void init_jvm( int argc, char * argv[] ); /// /// @brief An asynchronous message listener implemented in C++. /// /// This implementation is your blueprint for implementing JMS listeners /// in C++: /// /// - extend the special CB type /// - provide a method that exactly implements the callback method, /// including the xmog_localenv and xmog_flags arguments (which you /// can ignore if you so wish) /// class QuoteListener : public javax_jms_MessageListenerCB { private: int numSymbols; String * symbols; Class clsMapMessage; Class clsTextMessage; public: QuoteListener( int _numSymbols, String * _symbols ) : javax_jms_MessageListenerCB(), numSymbols( _numSymbols ), symbols( _symbols ) { clsMapMessage = Class::forName( "javax.jms.MapMessage" ); clsTextMessage = Class::forName( "javax.jms.TextMessage" ); } /// /// @brief The callback invoked by the JMS implementation when a message is received /// on a topic that we have registered for. /// void onMessage( const Message & msg, xmog_localenv * env, xmog_flags flags ) { // we handle two types of messages in this example: // // 1) TextMessages, which contain one quote tagged with a property indicating the symbol // // 2) MapMessages, which may contain several quotes as symbol/price pairs, with the // price being a Double. // // Also notice that we're using the optional 'env' argument here. This is just // a performance enhancement and you don't need to do this in your implementation. // Explicitly passing the 'env' pointer (which we know to be non-null in a callback) // saves us a thread-local-storage lookup per API call. // if( clsTextMessage.isInstance( msg, env ) ) { // it's a text message // we have to cast away the const-ness of the argument (which is required // to satisfy the calling contract) Message & rmsg = const_cast<Message&>( msg ); String symbol = rmsg.getStringProperty( "SYMBOL" ); // we "filter" by the symbols we're listening for; this could of course // be done more efficiently by using a selector based on the property value. for( int i=0; i<numSymbols; i++ ) if( symbols[ i ].equals( symbol ) ) { cout << symbols[ i ].to_charsUtf8( env ) << "=" << rmsg.toString().to_charsUtf8( env ) << endl; break; } } else if( clsMapMessage.isInstance( msg ) ) { // it's a map message MapMessage mmsg = MapMessage::dyna_cast( msg ); // check whether the map contains the symbols we're interested in for( int i=0; i<numSymbols; i++ ) { // get the corresponding value Double val = mmsg.getDouble( symbols[ i ] ); if( val != null ) cout << symbols[ i ].to_charsUtf8( env ) << "=" << val.doubleValue() << endl; } } else { cerr << "ERROR: unexpected message type!" << endl; } } }; /// /// @brief The quote publisher example /// /// This is a heavily documented example that illustrates how to /// subscribe to stock quotes to a topic via the JMS Courier C++ APIs. /// /// @author Alexander R Krapf /// @version 1.0 /// int main( int argc, char * argv[] ) { if( argc < 2 ) { cerr << "Usage: " << argv[ 0 ] << " [tickersymbol]+" << endl; cerr << "Usage: " << argv[ 0 ] << " CDMSH AAPL MSFT" << endl; return 1; } try { // set up the Java runtime environment (defined in 'init_jvm.cpp') // // This function has to at least initialize // - the classpath, // - the JVM path (unless you wish to rely on a default JVM we might or might not find), // - the security manager // - the security policy file // // We also set up the JMS JNDI properties in this method. // // You can perform these steps in code or rely on a configuration file, // it's up to you. All these cases are documented in the 'rtconfig' examples. // init_jvm( argc, argv ); // the JNDI properties are assumed to be set up correctly at this point; // That all happens in the init_jvm() function, WHICH IS EXAMPLE CODE AND // DOES NOT MEAN THAT YOU HAVE TO DO THINGS THIS WAY! InitialContext ictx( _use_java_ctor ); // we're using TopicConnectionFactory::dyna_cast instead of (TopicConnectionFactory) // This is a peculiarity of the C++ framework: all casts have to be done // via the dyna_cast method. // // The C++ examples assume that the TopicConnectionFactory lookup name is in the system // properties at this point. That all happens in the init_jvm() function, WHICH IS // EXAMPLE CODE AND DOES NOT MEAN THAT YOU HAVE TO DO THINGS THIS WAY! // We look for the property "TCF" and give it the default value "TopicConnectionFactory" // if we can't find it. String tcfName = System::getProperty( "TCF", "TopicConnectionFactory" ); TopicConnectionFactory tcf = TopicConnectionFactory::dyna_cast( ictx.lookup( tcfName ) ); if ( tcf == null ) { cerr << "TCF not found for lookup name '" << tcfName.to_chars() << "'" << endl; return -1; } // create a topic connection // topic connections are relatively heavy objects, so don't create more than one // unless you know that you really have to. It's better to work with multiple // sessions instead. TopicConnection tc = tcf.createTopicConnection(); // create a session in which we publish (non-transacted, auto-acknowledging) // a session should never be shared by more than one thread. If you have more than // one thread you need more than one session. TopicSession ts = tc.createTopicSession( false, Session::AUTO_ACKNOWLEDGE ); // look up the topic name we want to use (set up in 'init_jvm') String topicName = System::getProperty( "TOPIC", "testTopic" ); Topic topic = Topic::dyna_cast( ictx.lookup( topicName ) ); // create the publisher for that topic TopicSubscriber tsub = ts.createSubscriber( topic ); cout << "Starting to listen for quotes on topic '" << topicName.to_chars() << "'" << endl; cout << "Hit a key to quit..." << endl; // here comes the business logic... // set up the ticker symbols we're interested in int numSymbols = argc - 1; String * symbols = new String[ numSymbols ]; for( int i=0; i<numSymbols; i++ ) { // cache the string reference to improve performance symbols[ i ] = argv[ i + 1 ]; } // register our message listener // we're keeping the main thread alive for the existence of our interest MessageListener * ml = new QuoteListener( numSymbols, symbols ); tsub.setMessageListener( *ml ); tc.start(); // keep the application alive and test every 10th of a second for a // keystroke (which then exits the keep-alive loop) while( !_kbhit() ) xmog_util::sleep( 100 ); // perform an orderly shutdown of the JMS runtime tc.stop(); tsub.setMessageListener( null ); ts.close(); tc.close(); cout << "Done!" << endl; return 0; } // you can write hierarchical exception catch blocks, just like in Java // and you have full access to the entire exception information and // functionality catch( NamingException & nex ) { cerr << "JNDI Naming Exception: " << nex.toString().to_chars() << endl; return -1; } catch( Exception & e ) { cerr << "Exception: " << e.toString().to_chars() << endl; return -1; } // it's always good to catch the C++ framework exception as well so // you can handle problems that don't have a corresponding Java // exception catch( xmog_exception & xe ) { char * xe_msg = xe.get_message_chars(); if( xe_msg ) { cerr << "A framework exception was caught" << endl; cerr << xe_msg << endl; // don't forget to clean up the message string xmog_java_string::free( xe_msg ); } return -1; } return 0; }
