Quantcast

How to interrupt async message receiver thread on connection shutdown

classic Classic list List threaded Threaded
2 messages Options
Reply | Threaded
Open this post in threaded view
|  
Report Content as Inappropriate

How to interrupt async message receiver thread on connection shutdown

ktsai
In my application, I registered a message listener with Session to receive message asynchronously. When message is received, it is pushed to a blocking queue to be consumed by other object. Potentially the receiver thread could be blocked at time of JMS connection closing.

When my application closes the JMS connection, the thread blocks on receiver.close() call. How do I force interrupt on receiver threadpool when closing the receiver connection?

I have provided code sample that demonstrate the logic:
I am currently using SwiftMQ 9.7.x.

                        // buffer to be consumed by something
                        LinkedBlockingQueue<Message> buffer = new LinkedBlockingQueue<Message>(1);

                        // Create connection, session & receiver
                        QueueConnection connection = connectionFactory.createQueueConnection();
                        QueueSession session = connection.createQueueSession(false,Session.AUTO_ACKNOWLEDGE);
                        QueueReceiver receiver = session.createReceiver(queue);
                        receiver.setMessageListener(new MessageListener() {
                               
                                @Override
                                public void onMessage(Message msg) {
                                        System.out.println("message received.");
                                        try {
                                                buffer.put(msg);
                                        } catch (InterruptedException e) {
                                                e.printStackTrace();
                                        }
                                }
                        });
                       
                        // Start the connection
                        connection.start();

                        try {
                                Thread.sleep(5000);
                        } catch (InterruptedException e) {
                                e.printStackTrace();
                        }
                       
                        // Close resources
                        System.out.println("receiver closing..");
                        receiver.close();
                        System.out.println("receiver closed.");
                        session.close();
                        System.out.println("session closed.");
                        connection.close();
                        System.out.println("connection closed.");


WHen running this code following is printed:

message received.
message received.
receiver closing..

It blocks at receiver.close(); line.

Thanks in advance.
Reply | Threaded
Open this post in threaded view
|  
Report Content as Inappropriate

Re: How to interrupt async message receiver thread on connection shutdown

IIT Software
Administrator
As per JMS Spec a receiver.close() has to wait on the return of onMessage.

The only way to interrupt the current put operation would be to store the reference of Thread.currentThread() in onMessage and interrupt the Thread before calling close. However, there is a potential gap where onMessage will be called another time before the close is called. Another way is to use offer(msg, timeout) instead of put.
Loading...