Index: pom.xml
===================================================================
--- pom.xml (revision 88)
+++ pom.xml (working copy)
@@ -3,7 +3,7 @@
org.codehaus.stomp
stompconnect
jar
- 1.0
+ 1.0-BT
StompConnect
@@ -33,6 +33,30 @@
+
+ log4j
+ log4j
+ 1.2.15
+
+
+ com.sun.jmx
+ jmxri
+
+
+ com.sun.jdmk
+ jmxtools
+
+
+ javax.jms
+ jms
+
+
+ javax.mail
+ mail
+
+
+
+
commons-logging
commons-logging
Index: src/main/java/org/codehaus/stomp/jms/StompSession.java
===================================================================
--- src/main/java/org/codehaus/stomp/jms/StompSession.java (revision 88)
+++ src/main/java/org/codehaus/stomp/jms/StompSession.java (working copy)
@@ -27,6 +27,8 @@
import java.util.HashMap;
import java.util.Iterator;
import java.util.Map;
+import java.util.List;
+import java.util.ArrayList;
/**
* Represents a logical session (a parallel unit of work) within a Stomp connection
@@ -37,7 +39,8 @@
private final ProtocolConverter protocolConverter;
private final Session session;
private MessageProducer producer;
- private Map temporaryDestinations = new HashMap();
+ private static Map temporaryDestinations = new HashMap();
+ private List created = new ArrayList();
public StompSession(ProtocolConverter protocolConverter, Session session) {
this.protocolConverter = protocolConverter;
@@ -61,14 +64,20 @@
public void close() throws JMSException {
session.close();
+ synchronized (temporaryDestinations) {
+ Iterator i = created.iterator();
+ while (i.hasNext()) {
+ temporaryDestinations.remove(i.next());
+ }
+ }
}
public void sendToJms(StompFrame command) throws JMSException, ProtocolException {
Map headers = command.getHeaders();
String destinationName = (String) headers.remove(Stomp.Headers.Send.DESTINATION);
Message message = convertFrame(command);
+ Destination destination = convertDestination(destinationName, false);
- Destination destination = convertDestination(destinationName);
int deliveryMode = getDeliveryMode(headers);
int priority = getPriority(headers);
@@ -83,7 +92,7 @@
protocolConverter.sendToStomp(frame);
}
- public Destination convertDestination(String name) throws ProtocolException, JMSException {
+ public Destination convertDestination(String name, boolean forceNew) throws ProtocolException, JMSException {
if (name == null) {
throw new ProtocolException("No destination is specified!");
}
@@ -97,11 +106,22 @@
}
else if (name.startsWith("/temp-queue/")) {
String tempName = name.substring("/temp-queue/".length(), name.length());
- return temporaryDestination(tempName, session.createTemporaryQueue());
+ Destination answer = temporaryDestinations.get(tempName);
+
+ if (forceNew || answer == null) {
+ return temporaryDestination(tempName, session.createTemporaryQueue());
+ } else {
+ return answer;
+ }
}
else if (name.startsWith("/temp-topic/")) {
String tempName = name.substring("/temp-topic/".length(), name.length());
- return temporaryDestination(tempName, session.createTemporaryTopic());
+ Destination answer = temporaryDestinations.get(tempName);
+ if (forceNew || answer == null) {
+ return temporaryDestination(tempName, session.createTemporaryTopic());
+ } else {
+ return answer;
+ }
}
else {
throw new ProtocolException("Illegal destination name: [" + name + "] -- StompConnect destinations " +
@@ -118,6 +138,7 @@
Topic topic = (Topic) d;
if (d instanceof TemporaryTopic) {
buffer.append("/temp-topic/");
+ temporaryDestination(topic.getTopicName(), d);
}
else {
buffer.append("/topic/");
@@ -128,6 +149,7 @@
Queue queue = (Queue) d;
if (d instanceof TemporaryQueue) {
buffer.append("/temp-queue/");
+ temporaryDestination(queue.getQueueName(), d);
}
else {
buffer.append("/queue/");
@@ -139,12 +161,17 @@
protected synchronized Destination temporaryDestination(String tempName, Destination temporaryDestination) {
- Destination answer = temporaryDestinations.get(tempName);
- if (answer == null) {
- temporaryDestinations.put(tempName, temporaryDestination);
- answer = temporaryDestination;
+ synchronized (temporaryDestinations) {
+ temporaryDestinations.put(tempName, temporaryDestination);
+ created.add(tempName);
+ try {
+// System.out.println("Added: " + tempName + " for " + ((Queue)temporaryDestination).getQueueName());
+ //throw new Throwable();
+ } catch (Throwable t) {
+ //t.printStackTrace();
+ }
}
- return answer;
+ return temporaryDestination;
}
protected int getDeliveryMode(Map headers) throws JMSException {
@@ -223,7 +250,7 @@
o = headers.remove(Stomp.Headers.Send.REPLY_TO);
if (o != null) {
- msg.setJMSReplyTo(convertDestination((String) o));
+ msg.setJMSReplyTo(convertDestination((String) o, false));
}
// now the general headers
Index: src/main/java/org/codehaus/stomp/jms/StompSubscription.java
===================================================================
--- src/main/java/org/codehaus/stomp/jms/StompSubscription.java (revision 88)
+++ src/main/java/org/codehaus/stomp/jms/StompSubscription.java (working copy)
@@ -53,7 +53,7 @@
Map headers = frame.getHeaders();
String selector = (String) headers.remove(Stomp.Headers.Subscribe.SELECTOR);
String destinationName = (String) headers.get(Stomp.Headers.Subscribe.DESTINATION);
- destination = session.convertDestination(destinationName);
+ destination = session.convertDestination(destinationName, true);
Session jmsSession = session.getSession();
boolean noLocal = false;
Index: src/main/java/org/codehaus/stomp/tcp/TcpTransport.java
===================================================================
--- src/main/java/org/codehaus/stomp/tcp/TcpTransport.java (revision 88)
+++ src/main/java/org/codehaus/stomp/tcp/TcpTransport.java (working copy)
@@ -42,7 +42,7 @@
import java.util.Map;
/**
- * @version $Revision: $
+ * @version $Revision$
*/
public class TcpTransport extends ServiceSupport implements Runnable, StompHandler {
private static final Log log = LogFactory.getLog(TcpTransport.class);
@@ -391,10 +391,10 @@
}
protected void initializeStreams() throws Exception {
- TcpBufferedInputStream buffIn = new TcpBufferedInputStream(socket.getInputStream(), ioBufferSize);
- this.dataIn = new DataInputStream(buffIn);
- TcpBufferedOutputStream buffOut = new TcpBufferedOutputStream(socket.getOutputStream(), ioBufferSize);
- this.dataOut = new DataOutputStream(buffOut);
+// TcpBufferedInputStream buffIn = new TcpBufferedInputStream(socket.getInputStream(), ioBufferSize);
+ this.dataIn = new DataInputStream(socket.getInputStream());//new DataInputStream(buffIn);
+ // TcpBufferedOutputStream buffOut = new TcpBufferedOutputStream(socket.getOutputStream(), ioBufferSize);
+ this.dataOut = new DataOutputStream(socket.getOutputStream());//new DataOutputStream(buffOut);
}
protected void closeStreams() throws IOException {