Index: pom.xml =================================================================== --- pom.xml (revision 88) +++ pom.xml (working copy) @@ -3,7 +3,7 @@ org.codehaus.stomp stompconnect jar - 1.0 + 1.0-BT StompConnect @@ -33,6 +33,30 @@ + + log4j + log4j + 1.2.15 + + + com.sun.jmx + jmxri + + + com.sun.jdmk + jmxtools + + + javax.jms + jms + + + javax.mail + mail + + + + commons-logging commons-logging Index: src/main/java/org/codehaus/stomp/jms/StompSession.java =================================================================== --- src/main/java/org/codehaus/stomp/jms/StompSession.java (revision 88) +++ src/main/java/org/codehaus/stomp/jms/StompSession.java (working copy) @@ -27,6 +27,8 @@ import java.util.HashMap; import java.util.Iterator; import java.util.Map; +import java.util.List; +import java.util.ArrayList; /** * Represents a logical session (a parallel unit of work) within a Stomp connection @@ -37,7 +39,8 @@ private final ProtocolConverter protocolConverter; private final Session session; private MessageProducer producer; - private Map temporaryDestinations = new HashMap(); + private static Map temporaryDestinations = new HashMap(); + private List created = new ArrayList(); public StompSession(ProtocolConverter protocolConverter, Session session) { this.protocolConverter = protocolConverter; @@ -61,14 +64,20 @@ public void close() throws JMSException { session.close(); + synchronized (temporaryDestinations) { + Iterator i = created.iterator(); + while (i.hasNext()) { + temporaryDestinations.remove(i.next()); + } + } } public void sendToJms(StompFrame command) throws JMSException, ProtocolException { Map headers = command.getHeaders(); String destinationName = (String) headers.remove(Stomp.Headers.Send.DESTINATION); Message message = convertFrame(command); + Destination destination = convertDestination(destinationName, false); - Destination destination = convertDestination(destinationName); int deliveryMode = getDeliveryMode(headers); int priority = getPriority(headers); @@ -83,7 +92,7 @@ protocolConverter.sendToStomp(frame); } - public Destination convertDestination(String name) throws ProtocolException, JMSException { + public Destination convertDestination(String name, boolean forceNew) throws ProtocolException, JMSException { if (name == null) { throw new ProtocolException("No destination is specified!"); } @@ -97,11 +106,22 @@ } else if (name.startsWith("/temp-queue/")) { String tempName = name.substring("/temp-queue/".length(), name.length()); - return temporaryDestination(tempName, session.createTemporaryQueue()); + Destination answer = temporaryDestinations.get(tempName); + + if (forceNew || answer == null) { + return temporaryDestination(tempName, session.createTemporaryQueue()); + } else { + return answer; + } } else if (name.startsWith("/temp-topic/")) { String tempName = name.substring("/temp-topic/".length(), name.length()); - return temporaryDestination(tempName, session.createTemporaryTopic()); + Destination answer = temporaryDestinations.get(tempName); + if (forceNew || answer == null) { + return temporaryDestination(tempName, session.createTemporaryTopic()); + } else { + return answer; + } } else { throw new ProtocolException("Illegal destination name: [" + name + "] -- StompConnect destinations " + @@ -118,6 +138,7 @@ Topic topic = (Topic) d; if (d instanceof TemporaryTopic) { buffer.append("/temp-topic/"); + temporaryDestination(topic.getTopicName(), d); } else { buffer.append("/topic/"); @@ -128,6 +149,7 @@ Queue queue = (Queue) d; if (d instanceof TemporaryQueue) { buffer.append("/temp-queue/"); + temporaryDestination(queue.getQueueName(), d); } else { buffer.append("/queue/"); @@ -139,12 +161,17 @@ protected synchronized Destination temporaryDestination(String tempName, Destination temporaryDestination) { - Destination answer = temporaryDestinations.get(tempName); - if (answer == null) { - temporaryDestinations.put(tempName, temporaryDestination); - answer = temporaryDestination; + synchronized (temporaryDestinations) { + temporaryDestinations.put(tempName, temporaryDestination); + created.add(tempName); + try { +// System.out.println("Added: " + tempName + " for " + ((Queue)temporaryDestination).getQueueName()); + //throw new Throwable(); + } catch (Throwable t) { + //t.printStackTrace(); + } } - return answer; + return temporaryDestination; } protected int getDeliveryMode(Map headers) throws JMSException { @@ -223,7 +250,7 @@ o = headers.remove(Stomp.Headers.Send.REPLY_TO); if (o != null) { - msg.setJMSReplyTo(convertDestination((String) o)); + msg.setJMSReplyTo(convertDestination((String) o, false)); } // now the general headers Index: src/main/java/org/codehaus/stomp/jms/StompSubscription.java =================================================================== --- src/main/java/org/codehaus/stomp/jms/StompSubscription.java (revision 88) +++ src/main/java/org/codehaus/stomp/jms/StompSubscription.java (working copy) @@ -53,7 +53,7 @@ Map headers = frame.getHeaders(); String selector = (String) headers.remove(Stomp.Headers.Subscribe.SELECTOR); String destinationName = (String) headers.get(Stomp.Headers.Subscribe.DESTINATION); - destination = session.convertDestination(destinationName); + destination = session.convertDestination(destinationName, true); Session jmsSession = session.getSession(); boolean noLocal = false; Index: src/main/java/org/codehaus/stomp/tcp/TcpTransport.java =================================================================== --- src/main/java/org/codehaus/stomp/tcp/TcpTransport.java (revision 88) +++ src/main/java/org/codehaus/stomp/tcp/TcpTransport.java (working copy) @@ -42,7 +42,7 @@ import java.util.Map; /** - * @version $Revision: $ + * @version $Revision$ */ public class TcpTransport extends ServiceSupport implements Runnable, StompHandler { private static final Log log = LogFactory.getLog(TcpTransport.class); @@ -391,10 +391,10 @@ } protected void initializeStreams() throws Exception { - TcpBufferedInputStream buffIn = new TcpBufferedInputStream(socket.getInputStream(), ioBufferSize); - this.dataIn = new DataInputStream(buffIn); - TcpBufferedOutputStream buffOut = new TcpBufferedOutputStream(socket.getOutputStream(), ioBufferSize); - this.dataOut = new DataOutputStream(buffOut); +// TcpBufferedInputStream buffIn = new TcpBufferedInputStream(socket.getInputStream(), ioBufferSize); + this.dataIn = new DataInputStream(socket.getInputStream());//new DataInputStream(buffIn); + // TcpBufferedOutputStream buffOut = new TcpBufferedOutputStream(socket.getOutputStream(), ioBufferSize); + this.dataOut = new DataOutputStream(socket.getOutputStream());//new DataOutputStream(buffOut); } protected void closeStreams() throws IOException {