Hi, i am still having this annoying bug, with the adapters. I have base jms adapter(minor changes from the demo):
package com.trinitas.ls.adapter.data;
import java.io.File;
import java.util.Enumeration;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.locks.ReentrantReadWriteLock;
import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.ObjectMessage;
import javax.naming.NamingException;
import org.apache.log4j.Logger;
import com.lightstreamer.interfaces.data.DataProviderException;
import com.lightstreamer.interfaces.data.FailureException;
import com.lightstreamer.interfaces.data.ItemEventListener;
import com.lightstreamer.interfaces.data.SubscriptionException;
import com.trinitas.common.jms.ConnectionLoop;
import com.trinitas.common.jms.ExtendedMessageListener;
import com.trinitas.common.jms.FeedMessage;
import com.trinitas.common.jms.HeartbeatMessage;
import com.trinitas.common.jms.JMSHandler;
import com.trinitas.common.jms.SubscribedItemAttributes;
public abstract class BaseJMSDataAdapter implements ExtendedMessageListener {
protected String loggerName;
protected Logger logger;
protected JMSHandler jmsHandler;
protected ItemEventListener listener;
protected ConcurrentHashMap<String, SubscribedItemAttributes> subscribedItems = new ConcurrentHashMap<String, SubscribedItemAttributes>();
protected volatile int nextHandleId = 1;
protected ConcurrentHashMap<String, Object> handles = new ConcurrentHashMap<String, Object>();
protected int msgPoolSize;
protected int recoveryPause;
protected ReentrantReadWriteLock rwLock = new ReentrantReadWriteLock(false);
protected ConcurrentLinkedQueue<String> toSendRequests = new ConcurrentLinkedQueue<String>();
/**
* Status variables. This adapter has 3 possible states: 1) jmsOk=false and
* lastHeartbeatRandom=-1: no connection to JMS is available; 2) jmsOk=true
* and lastHeartbeatRandom=-1: the adapter is connected to JMS but the
* Generator is not running; 3) jmsOk=true and lastHeartbeatRandom!=-1:
* connection to JMS is established and the adapter is receiving heartbeats
* (and/or data) by the Generator.
*/
protected volatile boolean jmsOk = false;
protected int lastHeartbeatRandom = -1;
protected int heartbeatCount = 0;
// protected HashMap<String, String> inactiveMap = new HashMap<String, String>();
// protected HashMap<String, String> completeInactiveMap = new HashMap<String, String>();
//
// //////////////// DataProvider
public void init(Map params, File arg1) throws DataProviderException {
loggerName = getParam(params, "loggerName", false, "LightstreamerLogger.BaseJMSAdapter");
logger = Logger.getLogger(loggerName);
final String providerURL = getParam(params, "jmsUrl", true, null);
final String initialContextFactory = getParam(params, "initialContextFactory", true, null);
final String topicConnectionFactory = getParam(params, "topicConnectionFactory", true, null);
final String queueConnectionFactory = getParam(params, "queueConnectionFactory", true, null);
final String topic = getParam(params, "topicName", true, null);
final String queue = getParam(params, "queueName", true, null);
this.msgPoolSize = getParam(params, "msgPoolSize", false, 15);
this.recoveryPause = getParam(params, "recoveryPauseMillis", false, 2000);
logger.info("Configuration read.");
jmsHandler = new JMSHandler(logger, initialContextFactory, providerURL, queueConnectionFactory, queue, topicConnectionFactory,
topic);
jmsHandler.setListener(this);
new ConnectionLoopTSQS(jmsHandler, recoveryPause, logger).start();
logger.info("BaseJMSDataAdapter ready.");
}
public void setListener(ItemEventListener listener) {
this.listener = listener;
}
protected abstract boolean isValidItem(String itemName);
public void subscribe(String itemName, Object itemHandle, boolean needsIterator) throws SubscriptionException, FailureException {
logger.info("Subscribing to " + itemName);
if (!isValidItem(itemName)) {
throw new SubscriptionException("(Subscribing) Unexpected item: " + itemName);
}
logger.info("(Subscribing) Valid item: " + itemName);
final String uniqueId = String.valueOf(nextHandleId++);
final SubscribedItemAttributes itemAttrs = new SubscribedItemAttributes(itemName, uniqueId);
rwLock.writeLock().lock();
logger.info("------------------>Write LOCK 1");
subscribedItems.put(itemName, itemAttrs);
handles.put(uniqueId, itemHandle);
boolean dispatchThread = false;
if (lastHeartbeatRandom == -1) {
dispatchInactiveFlag(itemAttrs);
} else {
toSendRequests.offer("subscribe" + itemName + "_" + uniqueId);
dispatchThread = true;
}
logger.info("------------------>Write UNLOCK 1");
rwLock.writeLock().unlock();
logger.info("(Subscribing) Inserted in subscribed items list: " + itemName + " (" + uniqueId + ")");
if (dispatchThread) {
new SenderThread().start();
}
}
public void subscribe(String itemName, boolean needsIterator) throws SubscriptionException, FailureException {
// NEVER CALLED
}
public void unsubscribe(String itemName) throws SubscriptionException, FailureException {
logger.info("Unsubscribing from " + itemName);
rwLock.writeLock().lock();
logger.info("------------------>Write LOCK 2");
if (!subscribedItems.containsKey(itemName)) {
logger.info("------------------>Write UNLOCK 2");
rwLock.writeLock().unlock();
throw new SubscriptionException("(Unsubscribing) Unexpected item: " + itemName);
}
final SubscribedItemAttributes item = subscribedItems.get(itemName);
subscribedItems.remove(itemName);
handles.remove(item.handleId);
boolean dispatchThread = false;
if (lastHeartbeatRandom != -1) {
toSendRequests.offer("unsubscribe" + itemName + "_" + item.handleId);
dispatchThread = true;
}
logger.info("------------------>Write UNLOCK 2");
rwLock.writeLock().unlock();
logger.info("(Unsubscribing) removed from subscribed items list:" + itemName + " (" + item.handleId + ")");
if (dispatchThread) {
new SenderThread().start();
}
}
public abstract boolean isSnapshotAvailable(String itemName) throws SubscriptionException;
protected void dispatchInactiveFlag(SubscribedItemAttributes item) {
final boolean isSnapshot = !item.isSnapshotSent;
if (isSnapshot) {
item.isSnapshotSent = true;
}
final Object handle = handles.get(item.handleId);
if (handle != null) {
// we do not need this, since we do not need default values to be set when the feeder is down
// if (isSnapshot) {
// listener.smartUpdate(handle, completeInactiveMap, isSnapshot);
// } else {
// listener.smartUpdate(handle, inactiveMap, isSnapshot);
// }
}
logger.info("Inactive flag dispatched: " + item.itemName + " (" + item.handleId + ")");
}
// /////////MessageListener
protected static final String noCompMex = "Message received was not compatible with this adapter.";
public void onConnection() {
rwLock.writeLock().lock();
logger.info("------------------>Write LOCK 3");
logger.info("JMS is now up");
jmsOk = true;
logger.info("------------------>Write UNLOCK 3");
rwLock.writeLock().unlock();
}
public void subscribeAll() {
logger.info("Subscribing all to the Generator");
toSendRequests.clear();
toSendRequests.offer("reset");
final Enumeration<SubscribedItemAttributes> subItems = subscribedItems.elements();
while (subItems.hasMoreElements()) {
final SubscribedItemAttributes sia = subItems.nextElement();
toSendRequests.offer("subscribe " + sia.itemName + "_" + sia.handleId);
}
new SenderThread().start();
}
public void onException(JMSException je) {
logger.error("onException: JMSException -> " + je.getMessage());
rwLock.writeLock().lock();
logger.info("------------------>Write LOCK 4");
logger.info("JMS is now down");
this.onFeedDisconnection();
jmsOk = false;
logger.info("------------------>Write UNLOCK 4");
rwLock.writeLock().unlock();
new ConnectionLoopTSQS(jmsHandler, recoveryPause, logger).start();
}
public void onFeedDisconnection() {
logger.info("Feed no more available");
lastHeartbeatRandom = -1;
final Enumeration<SubscribedItemAttributes> subItems = subscribedItems.elements();
while (subItems.hasMoreElements()) {
final SubscribedItemAttributes sia = subItems.nextElement();
this.dispatchInactiveFlag(sia);
}
}
public void onMessage(Message message) {
if (message == null) {
logger.warn(noCompMex + " (null)");
return;
}
logger.info("Received message");
FeedMessage feedMsg = null;
SubscribedItemAttributes item = null;
try {
final ObjectMessage objectMessage = (ObjectMessage) message;
try {
final HeartbeatMessage beat = (HeartbeatMessage) objectMessage.getObject();
handleHeartbeat(beat.random);
return;
} catch (final ClassCastException jmse) {
feedMsg = (FeedMessage) objectMessage.getObject();
if (!handleHeartbeat(feedMsg.random)) {
return;
}
}
logger.info("Valid message");
} catch (final ClassCastException jmse) {
logger.warn(noCompMex + "(not a FeedMessage instance)");
return;
} catch (final JMSException jmse) {
logger.error("BaseJMSDataAdapter.onMessage - JMSException: " + jmse.getMessage(), jmse);
return;
}
rwLock.readLock().lock();
logger.info("------------------>Read LOCK 5");
if (!subscribedItems.containsKey(feedMsg.itemName)) {
logger.info("------------------>Read UNLOCK 5");
rwLock.readLock().unlock();
logger.info("Received update for not subscribed item: " + feedMsg.itemName);
return;
}
Object handle = null;
boolean isSnapshot = false;
item = subscribedItems.get(feedMsg.itemName);
if (item != null) {
handle = handles.get(feedMsg.handleId);
if (handle == null) {
logger.info("------------------>Read UNLOCK 5");
rwLock.readLock().unlock();
logger.info("Received update for unsubscribed handle: " + feedMsg.itemName + "(" + feedMsg.handleId + ")");
return;
}
if (!item.isSnapshotSent) {
item.isSnapshotSent = true;
isSnapshot = true;
}
}
logger.info("Received update for item " + feedMsg.itemName);
listener.smartUpdate(handle, feedMsg.currentValues, isSnapshot);
logger.info("------------------>Read UNLOCK 5");
rwLock.readLock().unlock();
}
protected boolean handleHeartbeat(int beat) {
rwLock.writeLock().lock();
logger.info("------------------>Write LOCK 6");
logger.info("lastHeartbeatRandom is " + lastHeartbeatRandom);
if (lastHeartbeatRandom == beat) {
heartbeatCount = (heartbeatCount + 1) % 1000;
logger.info("Received heartbeat: " + beat);
logger.info("------------------>Write UNLOCK 6");
rwLock.writeLock().unlock();
return true;
} else {
logger.info("Received NEW heartbeat: " + beat + ", feed is now available");
lastHeartbeatRandom = beat;
this.subscribeAll();
heartbeatCount = 0;
logger.info("------------------>Write UNLOCK 6");
rwLock.writeLock().unlock();
new HeartbeatThread(beat).start();
return false;
}
}
protected class HeartbeatThread extends Thread {
protected int random;
protected int count;
public HeartbeatThread(int random) {
this.random = random;
}
@Override
public void run() {
while (this.random == lastHeartbeatRandom) {
try {
Thread.sleep(2000);
} catch (final InterruptedException e) {
}
rwLock.writeLock().lock();
logger.info("------------------>Write LOCK 7");
if (this.random == lastHeartbeatRandom && count == heartbeatCount) {
logger.info("2 Seconds without Heartbeats: " + this.random);
onFeedDisconnection();
logger.info("------------------>Write UNLOCK 7");
rwLock.writeLock().unlock();
return;
} else {
count = heartbeatCount;
}
logger.info("------------------>Write UNLOCK 7");
rwLock.writeLock().unlock();
}
}
}
// /////////////// Utils
protected static String noParam = " is missing.\nProcess exits";
protected static String useDefault = " is missing. Using default.";
protected static String isNaN = " must be a number but it isn't. Using default.";
protected int getParam(Map params, String toGet, boolean required, int def) throws DataProviderException {
int resInt;
final String res = (String) params.get(toGet);
if (res == null) {
if (required) {
throw new DataProviderException(toGet + noParam);
} else {
if (logger != null) {
logger.warn(toGet + useDefault);
}
resInt = def;
}
} else {
try {
resInt = Integer.parseInt(res);
} catch (final NumberFormatException nfe) {
if (logger != null) {
logger.error(toGet + isNaN);
}
resInt = def;
}
}
if (logger != null) {
logger.info(toGet + ": " + resInt);
}
return resInt;
}
protected String getParam(Map params, String toGet, boolean required, String def) throws DataProviderException {
String res = (String) params.get(toGet);
if (res == null) {
if (required) {
throw new DataProviderException(toGet + noParam);
} else {
if (logger != null) {
logger.warn(toGet + useDefault);
}
res = def;
}
}
if (logger != null) {
logger.info(toGet + ": " + res);
}
return res;
}
// ////////////////// ConnectionLoop
protected class ConnectionLoopTSQS extends ConnectionLoop {
public ConnectionLoopTSQS(JMSHandler jmsHandler, int recoveryPause, Logger logger) {
super(jmsHandler, recoveryPause, logger);
}
@Override
protected void onConnectionCall() {
onConnection();
}
@Override
protected void connectionCall() throws JMSException, NamingException {
jmsHandler.initTopicSubscriber();
jmsHandler.initQueueSender(msgPoolSize);
}
}
public class SenderThread extends Thread {
@Override
public void run() {
String nextRequest = "";
logger.info("Dispatch thread started");
rwLock.readLock().lock();
logger.info("------------------>Read LOCK 8");
while ((nextRequest = toSendRequests.poll()) != null) {
try {
jmsHandler.sendMessage(nextRequest);
logger.info("Message dispatched to JMS: " + nextRequest);
} catch (final JMSException je) {
logger.error("Can't actually dispatch request " + nextRequest + ": JMSException -> " + je.getMessage());
}
}
logger.info("------------------>Read UNLOCK 8");
rwLock.readLock().unlock();
logger.info("Dispatch thread ends");
}
}
}
And i have other classes that extends this adapter:
package com.trinitas.ls.adapter.data;
import com.lightstreamer.interfaces.data.FailureException;
import com.lightstreamer.interfaces.data.SmartDataProvider;
import com.lightstreamer.interfaces.data.SubscriptionException;
public class ManualBetJMSDataAdapter extends BaseJMSDataAdapter implements SmartDataProvider{
// static {
// inactiveMap.put("item_status","inactive");
// completeInactiveMap.put("item_status","inactive");
// }
@Override
public boolean isSnapshotAvailable(String itemName) throws SubscriptionException {
return false;
}
@Override
protected boolean isValidItem(String itemName) {
return true;
}
@Override
public void subscribe(String itemName, boolean needsIterator) throws SubscriptionException, FailureException {
super.subscribe(itemName, needsIterator);
}
}
and
package com.trinitas.ls.adapter.data;
import com.lightstreamer.interfaces.data.SmartDataProvider;
import com.lightstreamer.interfaces.data.SubscriptionException;
public class TickJMSDataAdapter extends BaseJMSDataAdapter implements SmartDataProvider {
// static {
// inactiveMap.put("item_status","inactive");
// completeInactiveMap.put("item_status","inactive");
// }
@Override
public boolean isSnapshotAvailable(String itemName) throws SubscriptionException {
return false;
}
@Override
protected boolean isValidItem(String itemName) {
return true;
}
}
In my adapters.xml :
<?xml version="1.0"?>
<adapters_conf id="CLAIRE">
<metadata_provider>
<adapter_class>com.trinitas.ls.adapter.meta.LiteralBasedProvider</adapter_class>
</metadata_provider>
<data_provider name="MANUALBETSJMSADAPTER">
<adapter_class>com.trinitas.ls.adapter.data.ManualBetJMSDataAdapter</adapter_class>
<!-- Parameters required by MANUALBETSJMSADAPTER -->
<param name="loggerName">LightstreamerLogger.ManualBetJMSDataAdapter</param>
<param name="msgPoolSize">15</param>
<param name="recoveryPauseMillis">2000</param>
<!-- JBoss Messaging configuration -->
<param name="jmsUrl">jnp://localhost:1099</param>
<param name="initialContextFactory">org.jnp.interfaces.NamingContextFactory</param>
<param name="topicConnectionFactory">ConnectionFactory</param>
<param name="queueConnectionFactory">ConnectionFactory</param>
<param name="topicName">topic/manualbetTopic</param>
<param name="queueName">queue/manualbetconfQueue</param>
</data_provider>
<data_provider name="TICKSJMSADAPTER">
<adapter_class>com.trinitas.ls.adapter.data.TickJMSDataAdapter</adapter_class>
<!-- Parameters required by TICKSJMSADAPTER -->
<param name="loggerName">LightstreamerLogger.TickJMSDataAdapter</param>
<param name="msgPoolSize">15</param>
<param name="recoveryPauseMillis">2000</param>
<param name="jmsUrl">jnp://localhost:1099</param>
<param name="initialContextFactory">org.jnp.interfaces.NamingContextFactory</param>
<param name="topicConnectionFactory">ConnectionFactory</param>
<param name="queueConnectionFactory">ConnectionFactory</param>
<param name="topicName">topic/tickTopic</param>
<param name="queueName">queue/tickconfQueue</param>
</data_provider>
</adapters_conf>
When i run the LS and my feed (which is in jboss, jms queues/topics also in jboss). Only the ticks adapter is working, but the manual bets adapter receives client request for subscription but does not send the subscription request through the jms to the manualBetsFeeder :
15.Sep.09 15:04:01,932 < WARN> Adapters configuration not found in /home/mnenchev/tools/Lightstreamer/conf/../adapters/StockQuotesJMSAdapter
15.Sep.09 15:04:01,942 < WARN> Adapters configuration not found in /home/mnenchev/tools/Lightstreamer/conf/../adapters/HelloWorld
15.Sep.09 15:04:01,983 < INFO> Loading Metadata Adapter CLAIRE
15.Sep.09 15:04:01,984 < INFO> Loading Data Adapter CLAIRE.TICKSJMSADAPTER
15.Sep.09 15:04:01,986 < INFO> Loading Data Adapter CLAIRE.MANUALBETSJMSADAPTER
15.Sep.09 15:04:01,995 < INFO> jmsUrl: jnp://localhost:1099
15.Sep.09 15:04:01,995 < INFO> initialContextFactory: org.jnp.interfaces.NamingContextFactory
15.Sep.09 15:04:01,995 < INFO> topicConnectionFactory: ConnectionFactory
15.Sep.09 15:04:01,995 < INFO> queueConnectionFactory: ConnectionFactory
15.Sep.09 15:04:01,995 < INFO> Finished loading Metadata Adapter CLAIRE
15.Sep.09 15:04:01,995 < INFO> topicName: topic/tickTopic
15.Sep.09 15:04:01,995 < INFO> jmsUrl: jnp://localhost:1099
15.Sep.09 15:04:01,995 < INFO> queueName: queue/tickconfQueue
15.Sep.09 15:04:01,995 < INFO> initialContextFactory: org.jnp.interfaces.NamingContextFactory
15.Sep.09 15:04:01,996 < INFO> topicConnectionFactory: ConnectionFactory
15.Sep.09 15:04:01,996 < INFO> queueConnectionFactory: ConnectionFactory
15.Sep.09 15:04:01,996 < INFO> topicName: topic/manualbetTopic
15.Sep.09 15:04:01,996 < INFO> queueName: queue/manualbetconfQueue
15.Sep.09 15:04:01,996 < INFO> msgPoolSize: 15
15.Sep.09 15:04:01,996 < INFO> msgPoolSize: 15
15.Sep.09 15:04:01,996 < INFO> recoveryPauseMillis: 2000
15.Sep.09 15:04:01,996 < INFO> recoveryPauseMillis: 2000
15.Sep.09 15:04:01,997 < INFO> Configuration read.
15.Sep.09 15:04:01,996 < INFO> Configuration read.
15.Sep.09 15:04:01,999 < INFO> JMSHandler Ready
15.Sep.09 15:04:01,999 < INFO> JMSHandler Ready
15.Sep.09 15:04:02,000 < INFO> BaseJMSDataAdapter ready.
15.Sep.09 15:04:02,001 < INFO> Finished loading Data Adapter CLAIRE.MANUALBETSJMSADAPTER
15.Sep.09 15:04:02,001 < INFO> BaseJMSDataAdapter ready.
15.Sep.09 15:04:02,001 < INFO> Finished loading Data Adapter CLAIRE.TICKSJMSADAPTER
15.Sep.09 15:04:02,023 < INFO> JNDI Context[{jnp.parsedName=, java.naming.provider.url=localhost:1099, java.naming.factory.initial=org.jnp.interfaces.NamingContextFactory}]...
15.Sep.09 15:04:02,023 < INFO> Looking up topic connection factory [ConnectionFactory]...
15.Sep.09 15:04:02,084 < INFO> Pump pool size set by default at 4
15.Sep.09 15:04:02,210 < INFO> Events pool size set by default at 4
15.Sep.09 15:04:02,220 < INFO> Lightstreamer Server 3.5 build 1428.4 starting...
15.Sep.09 15:04:02,298 < INFO> Server "Lightstreamer HTTP Server" listening to *:8888 ...
15.Sep.09 15:04:02,449 < INFO> Looking up topic [topic/tickTopic]...
15.Sep.09 15:04:03,076 < INFO> Topic connection created
15.Sep.09 15:04:03,345 < INFO> Topic session created
15.Sep.09 15:04:03,449 < INFO> Topic subscriber created
15.Sep.09 15:04:03,465 < INFO> Topic connection started
15.Sep.09 15:04:03,465 < INFO> Looking up queue connection factory [ConnectionFactory]...
15.Sep.09 15:04:03,468 < INFO> Looking up queue [queue/tickconfQueue]...
15.Sep.09 15:04:03,498 < INFO> Queue connection created
15.Sep.09 15:04:03,499 < INFO> Queue session created
15.Sep.09 15:04:03,553 < INFO> Queue sender created
15.Sep.09 15:04:03,557 < INFO> Text message pool created
15.Sep.09 15:04:03,557 < INFO> ------------------>Write LOCK 3
15.Sep.09 15:04:03,557 < INFO> JMS is now up
15.Sep.09 15:04:03,557 < INFO> ------------------>Write UNLOCK 3
15.Sep.09 15:04:03,768 < INFO> Received message
15.Sep.09 15:04:03,770 < INFO> ------------------>Write LOCK 6
15.Sep.09 15:04:03,770 < INFO> lastHeartbeatRandom is -1
15.Sep.09 15:04:03,770 < INFO> Received NEW heartbeat: 3555, feed is now available
15.Sep.09 15:04:03,770 < INFO> Subscribing all to the Generator
15.Sep.09 15:04:03,771 < INFO> ------------------>Write UNLOCK 6
15.Sep.09 15:04:03,771 < INFO> Dispatch thread started
15.Sep.09 15:04:03,772 < INFO> ------------------>Read LOCK 8
15.Sep.09 15:04:03,772 < INFO> Sending message: reset
15.Sep.09 15:04:03,831 < INFO> Message dispatched to JMS: reset
15.Sep.09 15:04:03,831 < INFO> ------------------>Read UNLOCK 8
15.Sep.09 15:04:03,832 < INFO> Dispatch thread ends
15.Sep.09 15:04:04,012 < INFO> Serving request: /lightstreamer/create_session.js?LS_phase=3613&LS_domain=sofiamn&LS_polling=true&LS_polling_millis=0&LS_idle_millis=30000&LS_client_version=4.3&LS_adapter=CLAIRE&LS_old_session=Sdcc0dddd28da58beT5450466& from 192.168.2.105:36571
15.Sep.09 15:04:04,049 < INFO> Starting new session: Saa47f07970cf4707T0404039 from 192.168.2.105:36571
15.Sep.09 15:04:04,118 < INFO> Serving request: /lightstreamer/control.html?LS_session=Saa47f07970cf4707T0404039&LS_window=3&LS_win_phase=8&LS_op=add&LS_req_phase=75&LS_mode1=DISTINCT&LS_id1=manualbet&LS_schema1=message&LS_data_adapter1=MANUALBETSJMSADAPTER&LS_snapshot1=true&LS_unique=3 from 192.168.2.105:36571
15.Sep.09 15:04:04,127 < INFO> Controlling session: Saa47f07970cf4707T0404039 from 192.168.2.105:36571
15.Sep.09 15:04:04,147 < INFO> Serving request: /lightstreamer/STREAMING_IN_PROGRESS?LS_session=Saa47f07970cf4707T0404039&LS_phase=3614&LS_domain=sofiamn& from 192.168.2.105:36572
15.Sep.09 15:04:04,149 < INFO> Attaching session: Saa47f07970cf4707T0404039 from 192.168.2.105:36572
15.Sep.09 15:04:04,169 < INFO> Subscribing to manualbet
15.Sep.09 15:04:04,169 < INFO> (Subscribing) Valid item: manualbet
15.Sep.09 15:04:04,170 < INFO> ------------------>Write LOCK 1
15.Sep.09 15:04:04,170 < INFO> Inactive flag dispatched: manualbet (1)
15.Sep.09 15:04:04,170 < INFO> ------------------>Write UNLOCK 1
15.Sep.09 15:04:04,170 < INFO> (Subscribing) Inserted in subscribed items list: manualbet (1)
15.Sep.09 15:04:04,759 < INFO> Received message
15.Sep.09 15:04:04,760 < INFO> ------------------>Write LOCK 6
.....
What disturbs me is that in my jboss i start 2 threads(each one like the demo Generator.java - maintain the jms heartbeat and so on) one for every adapter. But in the ls log i receive the heartbeat from one "generator" i.e.:
In jboss i have two generators running with heartbeats:
...
15:03:46,737 INFO [SLGenerator] Heartbeat sent: 2940
15:03:47,736 INFO [SLGenerator] Heartbeat sent: 3555
15:03:47,740 INFO [SLGenerator] Heartbeat sent: 2940
.....