Prototype for Real Time Data Streaming (Data Push) Part 3

This is the part 3 of the series. Here are the other parts of the series.

Prototype for Real Time Data Streaming (Data Push) Part 1: maven2 generated Jetty based application

Prototype for Real Time Data Streaming (Data Push) Part 2: multi-channel subscription based web application

Prototype for Real Time Data Streaming (Data Push) Part 3: channel feeder java based application

In the previous blogs, I have shown how to create a web based application to allow users to subscribe multiple channels on jetty embedded server. However, they still cannot see any data from those channels because there is no data being fed onto them.

In the following, I will show how to create a java application to feed data onto the channel.

ChannelFeeder.java

The ChannelFeeder is the generic middle man program. It reads from input and sends it immediately to the designate channel. It requires a named channel as an input argument (such /123. /sar. etc) which were defined in part 2.

ChannelFeeder.java

import java.io.IOException;
import java.io.BufferedReader;
import java.io.InputStreamReader;
import java.util.HashMap;
import java.util.Map;
import org.cometd.bayeux.Channel;
import org.cometd.bayeux.Message;
import org.cometd.bayeux.client.ClientSessionChannel;
import org.cometd.bayeux.client.ClientSession;
import org.cometd.client.BayeuxClient;
import org.cometd.client.transport.ClientTransport;
import org.cometd.client.transport.LongPollingTransport;
import org.eclipse.jetty.client.HttpClient;
import org.eclipse.jetty.http.HttpBuffers;
import org.eclipse.jetty.util.component.AbstractLifeCycle;

public class ChannelFeeder { 
    private static String CHANNEL;
    private static final ClientSessionChannel.MessageListener DevListener = new DevListener();

    private static class DevListener implements ClientSessionChannel.MessageListener
    {
        public void onMessage(ClientSessionChannel channel, Message message)
        {
            // Here we received a message on the channel
            System.out.println("Sending:"+message.getData().toString());
        }
    }

    public static void main(String[] args) throws Exception {
        try { 

        if(args.length == 1)
        {
            CHANNEL = (args[0].charAt(0) == '/') ? args[0] : '/'+args[0];
        }
        else
        {    
            System.out.println("Enter new channel name");
            System.exit(1);
        } 

        // Create (and eventually setup) Jetty's HttpClient
        <span style="color:#ff0000;">HttpClient httpClient = new HttpClient();</span>

        // Setup Jetty's HttpClient
        <span style="color:#ff0000;">httpClient.start();</span>

        // Prepare the transport    
        Map&lt;String, Object&gt; options = new HashMap&lt;String, Object&gt;();
        ClientTransport transport = LongPollingTransport.create(options, httpClient);

        //ClientSession client = new BayeuxClient("http://localhost:8080/cometd", transport);
        <span style="color:#ff0000;">final BayeuxClient client = new BayeuxClient("http://localhost:8080/DeviceMonitor/cometd", transport);
</span>
        // Setup the BayeuxClient<span style="color:#ff0000;">
        client.getChannel(Channel.META_CONNECT).addListener(new ClientSessionChannel.MessageListener()</span>
        {
            public void onMessage(ClientSessionChannel channel, Message message)
            {
                //connected.set(message.isSuccessful());
            }
        });
<span style="color:#ff0000;">        client.getChannel(Channel.META_HANDSHAKE).addListener(new ClientSessionChannel.MessageListener()</span>
        {
            public void onMessage(ClientSessionChannel channel, Message message)
            {
                //connected.set(false);
            }
        });

<span style="color:#ff0000;">        client.handshake();</span>

        boolean handshaken = client.waitFor(1000, BayeuxClient.State.CONNECTED);
        if (handshaken)
        {
            // subscribe to the channel to normal (broadcast) channels
<span style="color:#ff0000;">            client.getChannel(CHANNEL).subscribe(DevListener);</span>

<span style="color:#ff0000;">            // the follow are commented out because we don't want to publish anything here.</span>
<span style="color:#ff0000;">            // We just want to pipe it from the input</span>
            // publish data to the normal channels
            //Map&lt;String, Object&gt; data = new HashMap&lt;String, Object&gt;();
            // Fill in the data. Publishing data on a channel is an asynchronous operation.
            //client.getChannel(CHANNEL).publish(data);
            //Map&lt;String, Object&gt; data = new HashMap&lt;String, Object&gt;();
            //data.put("name", "\"DevClient\"");

            //client.getChannel(CHANNEL).publish(data);

            System.out.println("Ready... (\"q\" to exit)");
            final BufferedReader inReader = new BufferedReader(new InputStreamReader(System.in));
            do {
                final String userInput = inReader.readLine();
                if (userInput == null || "q".equals(userInput)) {
                    break;
                }

                //connection.write(userInput);

                <span style="color:#ff0000;">client.getChannel(CHANNEL).publish(userInput);</span>
            } while (true);
        }

<span style="color:#ff0000;">        client.disconnect();
        client.waitFor(1000, BayeuxClient.State.DISCONNECTED);</span>

        } catch (IOException e) {
            e.printStackTrace();
        }

    } //end of main
} // end of class

feed.bash

This is a wrapper shell script to make our test easy. The execution is like

./feed.bash 123

This will allow you to key in anything from the screen, and the result will be sent to the channel 123. If the user choose channel 123 from the web browser, it will subscribe to it and see whatever you type from that onward.

./feed.bash stopwatch is to activate Stopwatch.java program. It just a count-down second by second java program. I will provide this program in another blog.

./feed.bash sar will pipe unix ‘sar -u 2 10000’ command output to the ‘sar’ for those subscribers where they can watch real time feed from remote browser. This will apply for the rest option like iostat, vmstat and ifstat.

All of these are just the simulation of the devices. The likely application for these technologies are for field devices in transportation systems, the appliance devices in home automation etc.

#! /bin/bash
if [ $# -ne 1 ]
then
    echo "$0 channel_name"
    exit 1
fi

export CHANNEL=$1

export CLASSPATH=bayeux-api-2.4.1.jar:cometd-java-client-2.4.1.jar:cometd-java-common-2.4.1.jar:jetty-client-8.0.1.v20110908.jar:jetty-http-8.0.1.v20110908.jar:jetty-util-8.0.1.v20110908.jar:jetty-io-8.0.1.v20110908.jar:slf4j-api-1.6.4.jar:slf4j-simple-1.6.4.jar:.

cd ~/test/mvn/DevClient

case "${CHANNEL}" in
     123)
           java -cp ${CLASSPATH} ChannelFeeder ${CHANNEL}
           ;;
     stopwatch)
           <span style="color:#ff0000;">java Stopwatch</span> | java -cp ${CLASSPATH} ChannelFeeder ${CHANNEL}
           ;;
     sar)
           <span style="color:#ff0000;">sar -u 2 10000</span> | java -cp ${CLASSPATH} ChannelFeeder ${CHANNEL}
           ;;
     iostat)
           <span style="color:#ff0000;">iostat -xtc 2</span> 10000 | java -cp ${CLASSPATH} ChannelFeeder ${CHANNEL}
           ;;
     vmstat)
           <span style="color:#ff0000;">vmstat 2 10000</span> | java -cp ${CLASSPATH} ChannelFeeder ${CHANNEL}
           ;;
     ifstat)
           <span style="color:#ff0000;">ifstat</span> | java -cp ${CLASSPATH} ChannelFeeder ${CHANNEL}
           ;;
     *)
           echo "Channel not defined."
           exit
           ;;
esac
exit 0

Test Drive

The following script should open many tabs in your terminal and feed all the channels. This can use to stress test your machine. For my 2GB machine, I only can run these 6 times, and the system is totally unresponsive after it.

#!/bin/bash
if [ $# -ne 1 ]
then
    echo "$0 Feed_base_dir"
    exit 1
fi
gnome-terminal --tab --title=123 -e "$1/feed 123"  --tab --title=stopwatch -e "$1/feed stopwatch" --tab --title=sar -e "$1/feed sar" --tab --title=iostat -e "$1/feed iostat" --tab --title=vmstat -e "$1/feed vmstat" --tab --title=ifstat -e "$1/feed ifstat"
exit 0

About henry416
I am a computer technology explorer and an university student based on Toronto. If you have any question, please feel free to discuss and comment here

One Response to Prototype for Real Time Data Streaming (Data Push) Part 3

  1. Pingback: Prototype for Real Time Data Streaming (Data Push) Part 2 « Henry Chen

Leave a Reply

Fill in your details below or click an icon to log in:

WordPress.com Logo

You are commenting using your WordPress.com account. Log Out / Change )

Twitter picture

You are commenting using your Twitter account. Log Out / Change )

Facebook photo

You are commenting using your Facebook account. Log Out / Change )

Google+ photo

You are commenting using your Google+ account. Log Out / Change )

Connecting to %s