amqp 1.0 client 'Maximum lock count exceeded

classic Classic list List threaded Threaded
2 messages Options
Reply | Threaded
Open this post in threaded view
|

amqp 1.0 client 'Maximum lock count exceeded

cadams
I am using the amqp driver against a variety of different brokers with the async Consumer.receiveNoWait.  Occasionally my listener client will throw a Exception in thread "Thread-2" java.lang.Error: Maximum lock count exceeded on one of the threads.  This happens under load and while the listener is idle.  

I implemented the MessageAvailabilityListener.messageAvailable method this way:

 public void messageAvailable(Consumer consumer) {
        AMQPMessage msg = consumer.receive();
        if (msg != null) {
            AmqpValue value = msg.getAmqpValue();
            String messageOut = ((AMQPString) value.getValue()).getValue();
            System.out.println("Received: " + messageOut);
 if (!msg.isSettled()) {
                try {
                    msg.accept();
                } catch (InvalidStateException e) {
                    e.printStackTrace();
                }
            }
        }
    }
 
and simply run it within a thread

new Thread(
                    new Runnable() {
                        public void run() {
                            while (true) {
                                AMQPMessage msg = c.receiveNoWait(listener);
                            }
                        }
                    }).start();

Also I consistently see this client not receiving messages that are on a queue.  It will often pull a number of messages and then stop.  Thread dump shows nothing unusual other than all threads idle.  If I restart the listener it will pull more messages and again stop with messages still on the broker queue.


Reply | Threaded
Open this post in threaded view
|

Re: amqp 1.0 client 'Maximum lock count exceeded

IIT Software
Administrator
Well, your approach seems to be wrong. The thread were you call receiveNoWait doesn't do anything if it receives a message there. That's the reason for the lock exceeded exception because you never call accept on it. Keep in mind that receiveNoWait(listener) is one atomic operation that EITHER returns the next available message OR registers a listener to inform you when a message becomes available.

Here is how it should be used properly (from the docs):

      // Implementation of MessageAvailabilityListener
      // Notifies a Poller thread to call poll()
      public void messageAvailable(Consumer consumer)
      {
        poller.enqueue(this);
      }

      // Called from the Poller thread
      public void poll()
      {
        AMQPMessage msg = c.receiveNoWait(this);
        if (msg != null)
        {
          // process message
         
          // enqueue the next poll
         poller.enqueue(this);
        }
      }