This is the part 3 of the series. Here are the other parts of the series.
Prototype for Real Time Data Streaming (Data Push) Part 1: maven2 generated Jetty based application
Prototype for Real Time Data Streaming (Data Push) Part 2: multi-channel subscription based web application
Prototype for Real Time Data Streaming (Data Push) Part 3: channel feeder java based application
In the previous blogs, I have shown how to create a web based application to allow users to subscribe multiple channels on jetty embedded server. However, they still cannot see any data from those channels because there is no data being fed onto them.
In the following, I will show how to create a java application to feed data onto the channel.
ChannelFeeder.java
The ChannelFeeder is the generic middle man program. It reads from input and sends it immediately to the designate channel. It requires a named channel as an input argument (such /123. /sar. etc) which were defined in part 2.
ChannelFeeder.java
import java.io.IOException;
import java.io.BufferedReader;
import java.io.InputStreamReader;
import java.util.HashMap;
import java.util.Map;
import org.cometd.bayeux.Channel;
import org.cometd.bayeux.Message;
import org.cometd.bayeux.client.ClientSessionChannel;
import org.cometd.bayeux.client.ClientSession;
import org.cometd.client.BayeuxClient;
import org.cometd.client.transport.ClientTransport;
import org.cometd.client.transport.LongPollingTransport;
import org.eclipse.jetty.client.HttpClient;
import org.eclipse.jetty.http.HttpBuffers;
import org.eclipse.jetty.util.component.AbstractLifeCycle;
public class ChannelFeeder {
private static String CHANNEL;
private static final ClientSessionChannel.MessageListener DevListener = new DevListener();
private static class DevListener implements ClientSessionChannel.MessageListener
{
public void onMessage(ClientSessionChannel channel, Message message)
{
// Here we received a message on the channel
System.out.println("Sending:"+message.getData().toString());
}
}
public static void main(String[] args) throws Exception {
try {
if(args.length == 1)
{
CHANNEL = (args[0].charAt(0) == '/') ? args[0] : '/'+args[0];
}
else
{
System.out.println("Enter new channel name");
System.exit(1);
}
// Create (and eventually setup) Jetty's HttpClient
<span style="color:#ff0000;">HttpClient httpClient = new HttpClient();</span>
// Setup Jetty's HttpClient
<span style="color:#ff0000;">httpClient.start();</span>
// Prepare the transport
Map<String, Object> options = new HashMap<String, Object>();
ClientTransport transport = LongPollingTransport.create(options, httpClient);
//ClientSession client = new BayeuxClient("http://localhost:8080/cometd", transport);
<span style="color:#ff0000;">final BayeuxClient client = new BayeuxClient("http://localhost:8080/DeviceMonitor/cometd", transport);
</span>
// Setup the BayeuxClient<span style="color:#ff0000;">
client.getChannel(Channel.META_CONNECT).addListener(new ClientSessionChannel.MessageListener()</span>
{
public void onMessage(ClientSessionChannel channel, Message message)
{
//connected.set(message.isSuccessful());
}
});
<span style="color:#ff0000;"> client.getChannel(Channel.META_HANDSHAKE).addListener(new ClientSessionChannel.MessageListener()</span>
{
public void onMessage(ClientSessionChannel channel, Message message)
{
//connected.set(false);
}
});
<span style="color:#ff0000;"> client.handshake();</span>
boolean handshaken = client.waitFor(1000, BayeuxClient.State.CONNECTED);
if (handshaken)
{
// subscribe to the channel to normal (broadcast) channels
<span style="color:#ff0000;"> client.getChannel(CHANNEL).subscribe(DevListener);</span>
<span style="color:#ff0000;"> // the follow are commented out because we don't want to publish anything here.</span>
<span style="color:#ff0000;"> // We just want to pipe it from the input</span>
// publish data to the normal channels
//Map<String, Object> data = new HashMap<String, Object>();
// Fill in the data. Publishing data on a channel is an asynchronous operation.
//client.getChannel(CHANNEL).publish(data);
//Map<String, Object> data = new HashMap<String, Object>();
//data.put("name", "\"DevClient\"");
//client.getChannel(CHANNEL).publish(data);
System.out.println("Ready... (\"q\" to exit)");
final BufferedReader inReader = new BufferedReader(new InputStreamReader(System.in));
do {
final String userInput = inReader.readLine();
if (userInput == null || "q".equals(userInput)) {
break;
}
//connection.write(userInput);
<span style="color:#ff0000;">client.getChannel(CHANNEL).publish(userInput);</span>
} while (true);
}
<span style="color:#ff0000;"> client.disconnect();
client.waitFor(1000, BayeuxClient.State.DISCONNECTED);</span>
} catch (IOException e) {
e.printStackTrace();
}
} //end of main
} // end of class
feed.bash
This is a wrapper shell script to make our test easy. The execution is like
./feed.bash 123
This will allow you to key in anything from the screen, and the result will be sent to the channel 123. If the user choose channel 123 from the web browser, it will subscribe to it and see whatever you type from that onward.
./feed.bash stopwatch is to activate Stopwatch.java program. It just a count-down second by second java program. I will provide this program in another blog.
./feed.bash sar will pipe unix ‘sar -u 2 10000’ command output to the ‘sar’ for those subscribers where they can watch real time feed from remote browser. This will apply for the rest option like iostat, vmstat and ifstat.
All of these are just the simulation of the devices. The likely application for these technologies are for field devices in transportation systems, the appliance devices in home automation etc.
#! /bin/bash
if [ $# -ne 1 ]
then
echo "$0 channel_name"
exit 1
fi
export CHANNEL=$1
export CLASSPATH=bayeux-api-2.4.1.jar:cometd-java-client-2.4.1.jar:cometd-java-common-2.4.1.jar:jetty-client-8.0.1.v20110908.jar:jetty-http-8.0.1.v20110908.jar:jetty-util-8.0.1.v20110908.jar:jetty-io-8.0.1.v20110908.jar:slf4j-api-1.6.4.jar:slf4j-simple-1.6.4.jar:.
cd ~/test/mvn/DevClient
case "${CHANNEL}" in
123)
java -cp ${CLASSPATH} ChannelFeeder ${CHANNEL}
;;
stopwatch)
<span style="color:#ff0000;">java Stopwatch</span> | java -cp ${CLASSPATH} ChannelFeeder ${CHANNEL}
;;
sar)
<span style="color:#ff0000;">sar -u 2 10000</span> | java -cp ${CLASSPATH} ChannelFeeder ${CHANNEL}
;;
iostat)
<span style="color:#ff0000;">iostat -xtc 2</span> 10000 | java -cp ${CLASSPATH} ChannelFeeder ${CHANNEL}
;;
vmstat)
<span style="color:#ff0000;">vmstat 2 10000</span> | java -cp ${CLASSPATH} ChannelFeeder ${CHANNEL}
;;
ifstat)
<span style="color:#ff0000;">ifstat</span> | java -cp ${CLASSPATH} ChannelFeeder ${CHANNEL}
;;
*)
echo "Channel not defined."
exit
;;
esac
exit 0
Test Drive
The following script should open many tabs in your terminal and feed all the channels. This can use to stress test your machine. For my 2GB machine, I only can run these 6 times, and the system is totally unresponsive after it.
#!/bin/bash
if [ $# -ne 1 ]
then
echo "$0 Feed_base_dir"
exit 1
fi
gnome-terminal --tab --title=123 -e "$1/feed 123" --tab --title=stopwatch -e "$1/feed stopwatch" --tab --title=sar -e "$1/feed sar" --tab --title=iostat -e "$1/feed iostat" --tab --title=vmstat -e "$1/feed vmstat" --tab --title=ifstat -e "$1/feed ifstat"
exit 0