The Adapter Remoting Infrastructure makes it possible to run Data and/or Metadata Adapters as separate processes, by defining and supporting a communication protocol based on sockets.
The supplied Proxy Data and Metadata Adapters take the burden of adapting this protocol to Lightstreamer Adapter interface.
However, they don't provide any clever recovery mechanism to face the case in which the remote adapter process crashes or the communication is interrupted: in such cases, they just cause Lightstreamer Server to exit (so that it can be restarted later) and the Client to reconnect (in case of a cluster, another Server instance would be immediately available).
This is because the Server/Adapters interface contract is quite stringent. For instance, a Data Adapter should guarantee that, after subscription, no updates can be lost; ensuring this even in case the connection to the remote adapter is lost is, when feasible, a strongly application dependent task and only a custom Proxy Adapter could do it.
[>>> ADDED ON 14/11/2008:
Something changed with ARI 1.2 <<<]
In order to save the protocol handling work made by the supplied Proxy Adapters, it is possible to write custom Proxy Adapters which lean on them. Unfortunately, the supplied Proxy Adapters don't expose a proper interface for extension. The following are guidelines on how they can be reused:
- The supplied Proxy Adapters are included in "ls-proxy-adapters.jar"; the socket-based versions can be addressed as:
- com.lightstreamer.adapters.remote.metadata.NetworkedMetadataProvider
(let's call it NMP)
- com.lightstreamer.adapters.remote.data.NetworkedDataProvider
(let's call it NDP)
Note that NDP implements the SmartDataProvider interface.
- The supplied Proxy Adapters can only manage one connection throughout their life.
In order to reopen the connection, a custom Proxy Adapter should wrap an NMP or NDP instance and replace it with a new instance each time a new connection is needed.
The newly created instance, however, should inherit the state from the previous instance and forward it to the new remote adapter as well.
- For the NMP case, the notifyUser, notifyNewSession and notifySessionClose affect internal NMP caches. Hence, upon initialization of the recovered NMP, a playback of notifyUser and notifyNewSession calls for the currently active sessions should be submitted to the recovered NMP; one call to notifyUser for each distinct user would be enough.
However, other methods (like notifyNewTables, notifyTablesClose and notifyUserMessage, for instance) may affect the remote adapter state. All those cases should be managed in a similar way.
- For the NDP case, upon initialization of the recovered NDP, a playback of subscribe calls for all the currently subscribed items should be submitted to the recovered NDP.
The NDP always sends back a snapshot, thought it may be empty in case the remote adapter cannot provide one. The snapshot should not be forwarded to the Server, though it may be used in order to recover the update hole.
In both cases, the list of active users/sessions or subscriptions cannot be directly asked for to the inner Proxy Adapter. So, the custom Proxy Adapter has to keep them.
- The supplied Proxy Adapters, upon a socket disconnection, automatically try to close the whole Server.
This behaviour should be suppressed and replaced with a graceful local closure. This has to be implemented by subclassing the NMP and NDP classes and using these subclasses as the real inner Proxy Adapters.
The code below shows the Data Adapter case:
[SYNTAX=JAVA]import java.util.concurrent.atomic.AtomicBoolean;
import com.lightstreamer.adapters.remote.data.NetworkedDataProvider;
import com.lightstreamer.interfaces.data.FailureException;
import com.lightstreamer.interfaces.data.SubscriptionException;
public class RecoverableNDP extends NetworkedDataProvider {
private AtomicBoolean closed = new AtomicBoolean(false);
public void subscribe(String itemName, Object handle, boolean needsIterator)
throws SubscriptionException, FailureException {
if (closed.get()) {
// a closure notification is pending
return;
}
try {
super.subscribe(itemName, handle, needsIterator);
} catch (FailureException e) {
// the Proxy Adapter would like to fail
onException(e);
}
}
public void unsubscribe(String itemName)
throws SubscriptionException, FailureException {
if (closed.get()) {
// a closure notification is pending
return;
}
try {
super.unsubscribe(itemName);
} catch (FailureException e) {
// the Proxy Adapter would like to fail
onException(e);
}
}
public void onException(Throwable t) {
if (closed.compareAndSet(false, true)) {
super.stop(); // this releases all resources
// LOG the exception here
// we need to forward the closure notification to the owner
// in some way; we would need to extend the listener;
// to make it short, let's agree with the owner to use
// failure with a null exception
super._listener.failure(null);
}
}
}
[/SYNTAX]
The class shown can also be directly used as the Data Adapter (after commenting the final call to "failure"). In this case, when the connection is lost, no recovery is performed and no more updates will come for the subscribed items; however, this behaviour may be preferred to Lightstreamer Server closure in order to preserve other Data Adapters currently running.
The code below shows a working example of a recovery-capable Adapter. The Data Adapter case is faced.
All the recovery stuff has been delegated to a custom ItemFlowManager class. In this simple case, the update flow is just resumed, with no attempt to fill the update hole.
[SYNTAX=JAVA]import java.io.File;
import java.util.HashMap;
import java.util.Map;
import com.lightstreamer.interfaces.data.DataProviderException;
import com.lightstreamer.interfaces.data.FailureException;
import com.lightstreamer.interfaces.data.IndexedItemEvent;
import com.lightstreamer.interfaces.data.ItemEvent;
import com.lightstreamer.interfaces.data.ItemEventListener;
import com.lightstreamer.interfaces.data.OldItemEvent;
import com.lightstreamer.interfaces.data.SmartDataProvider;
import com.lightstreamer.interfaces.data.SubscriptionException;
public class RecoveringNDP implements SmartDataProvider {
private HashMap<String, ItemFlowManager> delegates = new HashMap<String, ItemFlowManager>();
private RecoverableNDP undProxy;
private volatile ItemEventListener serverSideListener;
private volatile Map initialParams;
private volatile File initialConfigDir;
public void init(Map params, File configDir)
throws DataProviderException {
initialParams = params;
initialConfigDir = configDir;
// to generalize the recovery case,
// let's allow the Adapter to start before getting a connection
new Thread() {
public void run() {
connect();
}
}.start();
}
public void setListener(ItemEventListener listener) {
serverSideListener = listener;
}
private void connect() {
RecoverableNDP p = new RecoverableNDP();
try {
p.init(initialParams, initialConfigDir);
// this may take a lot of time
} catch (DataProviderException e) {
// how should we handle the case? should we retry?
// LOG the exception here
return;
}
p.setListener(new ProxyListener(p));
synchronized (this) {
assert(undProxy == null);
undProxy = p;
for (ItemFlowManager d : delegates.values()) {
d.onNewProxy(undProxy, d);
}
}
}
public boolean isSnapshotAvailable(String itemName)
throws SubscriptionException {
return true;
}
public void subscribe(String itemName, Object serverSideHandle, boolean needsIterator)
throws SubscriptionException, FailureException {
// needsIterator is not used by undProxy
ItemFlowManager d = new ItemFlowManager(itemName, serverSideHandle, serverSideListener);
synchronized (this) {
assert(delegates.get(itemName) == null);
delegates.put(itemName, d);
if (undProxy != null)
d.onNewProxy(undProxy, d);
}
d.start();
}
public void subscribe(String itemName, boolean needsIterator)
throws SubscriptionException, FailureException {
assert(false); // not called on a SmartDataProvider
}
public void unsubscribe(String itemName)
throws SubscriptionException, FailureException {
ItemFlowManager d;
synchronized (this) {
d = delegates.remove(itemName);
assert(d != null);
}
d.close();
}
private void onDisconnect() {
synchronized (this) {
assert(undProxy != null);
for (ItemFlowManager d : delegates.values()) {
d.onLostProxy();
}
undProxy = null;
}
new Thread() {
public void run() {
connect();
}
}.start();
}
private void onFailure(Throwable t) {
serverSideListener.failure(t);
}
private class ProxyListener implements ItemEventListener {
final RecoverableNDP refProxy;
public ProxyListener(RecoverableNDP refProxy) {
this.refProxy = refProxy;
}
public void update(String itemName, ItemEvent event, boolean isSnapshot) {
assert(false); // not used by the the Proxy Adapter
}
public void update(String itemName, OldItemEvent event, boolean isSnapshot) {
assert(false); // not used by the the Proxy Adapter
}
public void update(String itemName, Map event, boolean isSnapshot) {
assert(false); // not used by the the Proxy Adapter
}
public void update(String itemName, IndexedItemEvent event, boolean isSnapshot) {
assert(false); // not used by the the Proxy Adapter
}
public void smartUpdate(Object proxySideHandle, ItemEvent event, boolean isSnapshot) {
assert(false); // no longer supported by the Remote Protocol
}
public void smartUpdate(Object proxySideHandle, OldItemEvent event, boolean isSnapshot) {
assert(false); // no longer supported by the Remote Protocol
}
public void smartUpdate(Object proxySideHandle, Map event, boolean isSnapshot) {
ItemFlowManager d = (ItemFlowManager) proxySideHandle;
d.onUpdate(event, isSnapshot, refProxy);
}
public void smartUpdate(Object proxySideHandle, IndexedItemEvent event, boolean isSnapshot) {
assert(false); // no longer supported by the Remote Protocol
}
public void endOfSnapshot(String itemName) {
assert(false); // not used by the the Proxy Adapter
}
public void smartEndOfSnapshot(Object proxySideHandle) {
ItemFlowManager d = (ItemFlowManager) proxySideHandle;
d.onEndOfSnapshot(refProxy);
}
public void failure(Throwable t) {
if (t == null) {
// simple convention for a recoverable connection loss
RecoveringNDP.this.onDisconnect();
} else {
// failure invoked by the remote adapter;
// we migth have tried to recover the case as well
RecoveringNDP.this.onFailure(t);
}
}
}
}
class ItemFlowManager {
private final String name;
private final Object serverSideHandle;
private final ItemEventListener serverSideListener;
private boolean started;
private boolean resumed;
private boolean closed;
private RecoverableNDP currProxy;
private Object proxySideHandle;
public ItemFlowManager(String name, Object serverSideHandle, ItemEventListener serverSideListener) {
this.name = name;
this.serverSideHandle = serverSideHandle;
this.serverSideListener = serverSideListener;
}
public synchronized void onNewProxy(RecoverableNDP proxy, Object handle) {
assert(currProxy == null);
currProxy = proxy;
proxySideHandle = handle;
if (started && ! closed) {
resumed = true;
subscribeToProxy(name);
// but you might resume the state in a proper way
// for instance, by using a different item name;
// if such operation were blocking, it should be done asynchronously
}
}
public synchronized void onLostProxy() {
assert(currProxy != null);
// unsubscriptions to currProxy are not needed
currProxy = null;
if (started && ! closed) {
// you may try to keep the state in a proper way
// if such operation were blocking, it should be done asynchronously
}
}
public synchronized void start() {
assert(! started && ! closed);
if (currProxy == null) {
// we have to send a snapshot, but haven't got one
serverSideListener.smartEndOfSnapshot(serverSideHandle);
// but you might get snapshot information in a proper way
// if such operation were blocking, it should be done asynchronously
} else {
subscribeToProxy(name);
}
started = true;
}
public synchronized void close() {
assert(started && ! closed);
if (currProxy != null) {
if (! resumed) {
unsubscribeToProxy(name);
} else {
unsubscribeToProxy(name);
// but in this case you might have used a different item name
}
}
closed = true;
}
public synchronized void onUpdate(Map event, boolean isSnapshot, RecoverableNDP source) {
// note: the Proxy Adapter calls the listener with no locks held
if (source != currProxy || closed) {
// old trailing event
return;
}
if (isSnapshot && resumed) {
// the event may be used to recover the state;
// and eventually smartUpdate (with isSnapshot set as false) could be called
// if this operation is blocking, it should be done asynchronously
} else {
serverSideListener.smartUpdate(serverSideHandle, event, isSnapshot);
}
}
public synchronized void onEndOfSnapshot(RecoverableNDP source) {
// note: the Proxy Adapter calls the listener with no locks held
if (source != currProxy || closed) {
// old trailing event
return;
}
if (resumed) {
// the notification may be used to recover the state
// if this operation is blocking, it should be done asynchronously
} else {
serverSideListener.smartEndOfSnapshot(serverSideHandle);
}
}
private void subscribeToProxy(String subscrName) {
// the Proxy Adapter allows subscribe to be called with locks held
try {
currProxy.subscribe(subscrName, proxySideHandle, true);
} catch (SubscriptionException e) {
assert(false); // not used by the Proxy Adapter
} catch (FailureException e) {
assert(false); // intercepted by RecoverableNDP
}
}
private void unsubscribeToProxy(String subscrName) {
// the Proxy Adapter allows unsubscribe to be called with locks held
try {
currProxy.unsubscribe(subscrName);
} catch (SubscriptionException e) {
assert(false); // not used by the Proxy Adapter
} catch (FailureException e) {
assert(false); // intercepted by RecoverableNDP
}
}
}
[/SYNTAX]
Of course, the purpose of this code is to document what can and cannot be done while interacting with the supplied Proxy Adapter; it is not a recommendation on how to write a recovery-capable Adapter.