Prototype for Real Time Data Streaming (Data Push) Part 3

This is the part 3 of the series. Here are the other parts of the series.

Prototype for Real Time Data Streaming (Data Push) Part 1: maven2 generated Jetty based application

Prototype for Real Time Data Streaming (Data Push) Part 2: multi-channel subscription based web application

Prototype for Real Time Data Streaming (Data Push) Part 3: channel feeder java based application

In the previous blogs, I have shown how to create a web based application to allow users to subscribe multiple channels on jetty embedded server. However, they still cannot see any data from those channels because there is no data being fed onto them.

In the following, I will show how to create a java application to feed data onto the channel.

ChannelFeeder.java

The ChannelFeeder is the generic middle man program. It reads from input and sends it immediately to the designate channel. It requires a named channel as an input argument (such /123. /sar. etc) which were defined in part 2.

ChannelFeeder.java

import java.io.IOException;
import java.io.BufferedReader;
import java.io.InputStreamReader;
import java.util.HashMap;
import java.util.Map;
import org.cometd.bayeux.Channel;
import org.cometd.bayeux.Message;
import org.cometd.bayeux.client.ClientSessionChannel;
import org.cometd.bayeux.client.ClientSession;
import org.cometd.client.BayeuxClient;
import org.cometd.client.transport.ClientTransport;
import org.cometd.client.transport.LongPollingTransport;
import org.eclipse.jetty.client.HttpClient;
import org.eclipse.jetty.http.HttpBuffers;
import org.eclipse.jetty.util.component.AbstractLifeCycle;

public class ChannelFeeder { 
    private static String CHANNEL;
    private static final ClientSessionChannel.MessageListener DevListener = new DevListener();

    private static class DevListener implements ClientSessionChannel.MessageListener
    {
        public void onMessage(ClientSessionChannel channel, Message message)
        {
            // Here we received a message on the channel
            System.out.println("Sending:"+message.getData().toString());
        }
    }

    public static void main(String[] args) throws Exception {
        try { 

        if(args.length == 1)
        {
            CHANNEL = (args[0].charAt(0) == '/') ? args[0] : '/'+args[0];
        }
        else
        {    
            System.out.println("Enter new channel name");
            System.exit(1);
        } 

        // Create (and eventually setup) Jetty's HttpClient
        <span style="color:#ff0000;">HttpClient httpClient = new HttpClient();</span>

        // Setup Jetty's HttpClient
        <span style="color:#ff0000;">httpClient.start();</span>

        // Prepare the transport    
        Map&lt;String, Object&gt; options = new HashMap&lt;String, Object&gt;();
        ClientTransport transport = LongPollingTransport.create(options, httpClient);

        //ClientSession client = new BayeuxClient("http://localhost:8080/cometd", transport);
        <span style="color:#ff0000;">final BayeuxClient client = new BayeuxClient("http://localhost:8080/DeviceMonitor/cometd", transport);
</span>
        // Setup the BayeuxClient<span style="color:#ff0000;">
        client.getChannel(Channel.META_CONNECT).addListener(new ClientSessionChannel.MessageListener()</span>
        {
            public void onMessage(ClientSessionChannel channel, Message message)
            {
                //connected.set(message.isSuccessful());
            }
        });
<span style="color:#ff0000;">        client.getChannel(Channel.META_HANDSHAKE).addListener(new ClientSessionChannel.MessageListener()</span>
        {
            public void onMessage(ClientSessionChannel channel, Message message)
            {
                //connected.set(false);
            }
        });

<span style="color:#ff0000;">        client.handshake();</span>

        boolean handshaken = client.waitFor(1000, BayeuxClient.State.CONNECTED);
        if (handshaken)
        {
            // subscribe to the channel to normal (broadcast) channels
<span style="color:#ff0000;">            client.getChannel(CHANNEL).subscribe(DevListener);</span>

<span style="color:#ff0000;">            // the follow are commented out because we don't want to publish anything here.</span>
<span style="color:#ff0000;">            // We just want to pipe it from the input</span>
            // publish data to the normal channels
            //Map&lt;String, Object&gt; data = new HashMap&lt;String, Object&gt;();
            // Fill in the data. Publishing data on a channel is an asynchronous operation.
            //client.getChannel(CHANNEL).publish(data);
            //Map&lt;String, Object&gt; data = new HashMap&lt;String, Object&gt;();
            //data.put("name", "\"DevClient\"");

            //client.getChannel(CHANNEL).publish(data);

            System.out.println("Ready... (\"q\" to exit)");
            final BufferedReader inReader = new BufferedReader(new InputStreamReader(System.in));
            do {
                final String userInput = inReader.readLine();
                if (userInput == null || "q".equals(userInput)) {
                    break;
                }

                //connection.write(userInput);

                <span style="color:#ff0000;">client.getChannel(CHANNEL).publish(userInput);</span>
            } while (true);
        }

<span style="color:#ff0000;">        client.disconnect();
        client.waitFor(1000, BayeuxClient.State.DISCONNECTED);</span>

        } catch (IOException e) {
            e.printStackTrace();
        }

    } //end of main
} // end of class

feed.bash

This is a wrapper shell script to make our test easy. The execution is like

./feed.bash 123

This will allow you to key in anything from the screen, and the result will be sent to the channel 123. If the user choose channel 123 from the web browser, it will subscribe to it and see whatever you type from that onward.

./feed.bash stopwatch is to activate Stopwatch.java program. It just a count-down second by second java program. I will provide this program in another blog.

./feed.bash sar will pipe unix ‘sar -u 2 10000’ command output to the ‘sar’ for those subscribers where they can watch real time feed from remote browser. This will apply for the rest option like iostat, vmstat and ifstat.

All of these are just the simulation of the devices. The likely application for these technologies are for field devices in transportation systems, the appliance devices in home automation etc.

#! /bin/bash
if [ $# -ne 1 ]
then
    echo "$0 channel_name"
    exit 1
fi

export CHANNEL=$1

export CLASSPATH=bayeux-api-2.4.1.jar:cometd-java-client-2.4.1.jar:cometd-java-common-2.4.1.jar:jetty-client-8.0.1.v20110908.jar:jetty-http-8.0.1.v20110908.jar:jetty-util-8.0.1.v20110908.jar:jetty-io-8.0.1.v20110908.jar:slf4j-api-1.6.4.jar:slf4j-simple-1.6.4.jar:.

cd ~/test/mvn/DevClient

case "${CHANNEL}" in
     123)
           java -cp ${CLASSPATH} ChannelFeeder ${CHANNEL}
           ;;
     stopwatch)
           <span style="color:#ff0000;">java Stopwatch</span> | java -cp ${CLASSPATH} ChannelFeeder ${CHANNEL}
           ;;
     sar)
           <span style="color:#ff0000;">sar -u 2 10000</span> | java -cp ${CLASSPATH} ChannelFeeder ${CHANNEL}
           ;;
     iostat)
           <span style="color:#ff0000;">iostat -xtc 2</span> 10000 | java -cp ${CLASSPATH} ChannelFeeder ${CHANNEL}
           ;;
     vmstat)
           <span style="color:#ff0000;">vmstat 2 10000</span> | java -cp ${CLASSPATH} ChannelFeeder ${CHANNEL}
           ;;
     ifstat)
           <span style="color:#ff0000;">ifstat</span> | java -cp ${CLASSPATH} ChannelFeeder ${CHANNEL}
           ;;
     *)
           echo "Channel not defined."
           exit
           ;;
esac
exit 0

Test Drive

The following script should open many tabs in your terminal and feed all the channels. This can use to stress test your machine. For my 2GB machine, I only can run these 6 times, and the system is totally unresponsive after it.

#!/bin/bash
if [ $# -ne 1 ]
then
    echo "$0 Feed_base_dir"
    exit 1
fi
gnome-terminal --tab --title=123 -e "$1/feed 123"  --tab --title=stopwatch -e "$1/feed stopwatch" --tab --title=sar -e "$1/feed sar" --tab --title=iostat -e "$1/feed iostat" --tab --title=vmstat -e "$1/feed vmstat" --tab --title=ifstat -e "$1/feed ifstat"
exit 0

Prototype for Real Time Data Streaming (Data Push) Part 1

Real time data streaming from remote devices (ie. data pushing) has been a fascinating topic. In this series of blogs, I will examine how to implement a prototype to demonstrate the capability of pushing data from devices (java client) via embedded light weight web server JETTY (JETTY 7 / COMETD 2 / Bayeux Protocol) to your web browser (Javascript / JQuery). The series includes three parts:

Prototype for Real Time Data Streaming (Data Push) Part 1: maven2 generated Jetty based application

Prototype for Real Time Data Streaming (Data Push) Part 2: multi-channel subscription based web application

Prototype for Real Time Data Streaming (Data Push) Part 3: channel feeder java based application
The prerequisites for the prototype are MAVEN2, JETTY 7, JAVA SDK and JQUERY. I am doing on Ubunto 11.04. I believe it can be generalized on any linux distro. Also don’t worry too much on the minor version of all the prerequisites because MAVEN2 will take care of all the software and version dependencies. That’s why we use MVN. We just need to tell it what our goal is, and let it take care of the rest.

Maven 2 Jetty based Web Application

Get Started

I will create a server side web application for Jetty 7 by using mavern2.

 mvn archetype:generate -DarchetypeCatalog=http://cometd.org

It will present a few archetypes to choose. choose the following

4: http://cometd.org -> org.cometd.archetypes:cometd-archetype-jquery-jetty7 (2.4.3 - CometD archetype for creating a server-side event-driven web application)

Then provide some parameters like the following:

Define value for property 'groupId': : henry416      
Define value for property 'artifactId': : DeviceMonitor
Define value for property 'version': 1.0-SNAPSHOT: 
Define value for property 'package': DeviceMonitor: 
[INFO] Using property: cometdVersion = 2.4.3
[INFO] Using property: jettyVersion = 7.6.4.v20120524
[INFO] Using property: slf4jVersion = 1.6.4
....

From it, a project called DeviceMonitort is created.  We can really test drive this web application now. Here is how we start jetty embedded server:

mvn install jetty:run
....
2012-09-23 12:18:28.567:INFO:oejs.AbstractConnector:Started SelectChannelConnector@0.0.0.0:8080
[INFO] Started Jetty Server
[INFO] Starting scanner at interval of 10 seconds.

Test drive by http://localhost:8080/ from web browser:

CometD Connection Established
Server Says: Hello, World
This is just like any programming where we always start with HELLO WORLD. Don’t stop here. Let’s explore what was created:

Exploring

In web.xml, it defines two servlets:


<servlet>
<servlet-name>cometd</servlet-name>
<servlet-class>org.cometd.server.CometdServlet</servlet-class>
<load-on-startup>1</load-on-startup>
</servlet>
<servlet-mapping>
<servlet-name>cometd</servlet-name>
<url-pattern>/cometd/*</url-pattern>
</servlet-mapping>

<servlet>
<servlet-name>initializer</servlet-name>
<servlet-class>DevServ.BayeuxInitializer</servlet-class>
<load-on-startup>2</load-on-startup>
</servlet>

In DevServ/BayeuxInitializer.java, it creates new HelloService(bayeux);
In DevServ/HelloService.java, it adds service : addService(“/service/hello”, “processHello”);
In processHello, it reads from input (name), writes to output (“greeting”, “Hello, ” + name), and remote.deliver(getServerSession(), “/hello”, output, null); Here /hello is the channel.

In index.jsp, it obtains context-path contextPath: ‘${pageContext.request.contextPath}’ dynamically, and pass control to application.js
In application.js, it does:

...
...
        $(window).unload(function()
        {
            cometd.disconnect(true);
        });

/// 1. configure URL for cometd protocol
        var cometURL = location.protocol + "//" + location.host + config.contextPath + "/cometd";
        cometd.configure({
            url: cometURL,
            logLevel: 'debug'
        });

/// 2. add meta listener
        cometd.addListener('/meta/handshake', _metaHandshake);
        cometd.addListener('/meta/connect', _metaConnect);

/// 3 handshake
        cometd.handshake();
...

Here is what metaHandshake do: subscribe to /hello channel, and publish { name: ‘World’ } to channel ‘/service/hello’, when the message push back, it displays $(‘#body’).append(‘<div>Server Says: ‘ + message.data.greeting + ‘</div>’);

...
 function _metaHandshake(handshake)
 {
 if (handshake.successful === true)
 {
 cometd.batch(function()
 {
 cometd.subscribe('/hello', function(message)
 {
 $('#body').append('<div>Server Says: ' + message.data.greeting + '</div>');
 });
 // Publish on a service channel since the message is for the server only
 cometd.publish('/service/hello', { name: 'World' });
 });
 }
 }
 ...

A program to simulate the Countdown Pedestrian Signals (CPS)

Countdown pedestrian signals (CPS) model is defined in Section 4E.02 of the 2009 Edition of the U.S. FHWA Manual on Uniform Traffic Control Devices with four indications (in fact only three can be used) presented in the following sequence:

  1. A steady WALKING PERSON (symbolizing WALK) signal indication means that a pedestrian facing the signal indication is permitted to start to cross the roadway in the direction of the signal indication, possibly in conflict with turning vehicles. The pedestrian shall yield the right-of-way to vehicles lawfully within the intersection at the time that the WALKING PERSON (symbolizing WALK) signal indication is first shown.
  2. A flashing UPRAISED HAND (symbolizing DONT WALK) signal indication means that a pedestrian shall not start to cross the roadway in the direction of the signal indication, but that any pedestrian who has already started to cross on a steady WALKING PERSON (symbolizing WALK) signal indication shall proceed to the far side of the traveled way of the street or highway, unless otherwise directed by a traffic control device to proceed only to the median of a divided highway or only to some other island or pedestrian refuge area.
  3. A steady UPRAISED HAND (symbolizing DONT WALK) signal indication means that a pedestrian shall not enter the roadway in the direction of the signal indication.
  4. A flashing WALKING PERSON (symbolizing WALK) signal indication has no meaning and shall not be used.

Thumbnail image of Figure 4E-1

This simulation program (output shown belows) simplifies the model into two main indications sequence (WALK and DON’T WALK). It demonstrates the techniques of rotation of image views and the implemtation of count down LED clock class in JavaFX. It can easily be changed to include the additional transition indication into the full three indications sequence in the official model.

Output of Pedestrian Signal Light Simulation with Countdown Clock

Output of Pedestrian Signal Light Simulation with Countdown Clock

The program takes the system clock and slices it into milli seconds for the count down pulses. The model also divides the ONE minute into two 30 seconds intervals: one 30 seconds for WALK and another 30 seconds for DONT WALK. It can be further enhanced with the pulse sounds if you like.

(Acknowledgement: Part of code of the clock class originates from Oracle’s digital clock sample).

PedestrianSignal.java

/**
 * Purpose: A program to simulate the pedestrian signal light
 *    with a thirty second count-down LED clock.
 * Resources: walk.jpg and dont_walk.jpg (you can download from anywhere on web)
 *
 * Author:  https://henry416.wordpress.com
 *
 * Compile: javac -cp "c:\progra~1\oracle\javafx runtime 2.0\lib\jfxrt.jar" PedestriannSignal.java
 * Execute: java -cp "c:\progra~1\oracle\javafx runtime 2.0\lib\jfxrt.jar";. PedestriannSignal
 */
import javafx.application.Application;
import javafx.geometry.Rectangle2D;
import javafx.scene.layout.VBox;
import javafx.stage.Stage;
import javafx.animation.KeyFrame;
import javafx.animation.Timeline;
import javafx.event.ActionEvent;
import javafx.event.EventHandler;
import javafx.scene.Group;
import javafx.scene.Parent;
import javafx.scene.Scene;
import javafx.scene.effect.Effect;
import javafx.scene.effect.Glow;
import javafx.scene.effect.InnerShadow;
import javafx.scene.image.Image;
import javafx.scene.image.ImageView;
import javafx.scene.paint.Color;
import javafx.scene.shape.Circle;
import javafx.scene.shape.Polygon;
import javafx.scene.transform.Scale;
import javafx.scene.transform.Shear;
import javafx.util.Duration;
import java.util.Calendar;
 public class PedestrianSignal extends Application {
    private Clock clock;
    private Calendar calendar = Calendar.getInstance();
    private Image img_dont_walk = new Image("dont_walk.jpg");
    private Image img_walk = new Image("walk.jpg");
    private ImageView iv = new ImageView();

     @Override public void start(Stage stage) {
         calendar.setTimeInMillis(System.currentTimeMillis());
         int seconds = 60-calendar.get(Calendar.SECOND);
         Group root = new Group();
         Scene scene = new Scene(root);
         scene.setFill(Color.BLACK);
         VBox box = new VBox();

         // load the image
            if (seconds&gt;30)
              { iv.setImage(img_dont_walk);}
            else
              { iv.setImage(img_walk);}

         // resizes the image to have width of 120 while preserving the ratio and using
         // higher quality filtering method; this ImageView is also cached to
         // improve performance
         iv.setImage(img_walk);
         iv.setFitWidth(120);
         iv.setPreserveRatio(true);
         iv.setSmooth(true);
         iv.setCache(true);

         box.getChildren().add(iv);

         // add digital clock
         clock = new Clock(Color.ORANGERED, Color.rgb(50,50,50));
         clock.setLayoutX(45);
         clock.setLayoutY(186);
         clock.getTransforms().add(new Scale(0.83f, 0.83f, 0, 0));
         // add clock to bo
         box.getChildren().add(clock);
         root.getChildren().add(box);

         stage.setTitle("Pedestrian Test");
         stage.setWidth(120);
         stage.setHeight(300);
         stage.setScene(scene);
         stage.sizeToScene();
         stage.show();
     }
    public void play() {
        clock.play();
    }
    @Override public void stop() {
        clock.stop();
    }
    /**
     * Clock made of 6 of the Digit classes for hours, minutes and seconds.
     */
    public class Clock extends Parent {
        private Digit[] digits;
        private Timeline delayTimeline, secondTimeline;
        public Clock(Color onColor, Color offColor) {
            // create effect for on LEDs
            Glow onEffect = new Glow(1.7f);
            onEffect.setInput(new InnerShadow());
            // create effect for on dot LEDs
            Glow onDotEffect = new Glow(1.7f);
            onDotEffect.setInput(new InnerShadow(5,Color.BLACK));
            // create effect for off LEDs
            InnerShadow offEffect = new InnerShadow();
            // create digits
            digits = new Digit[2];
            for (int i = 0; i &lt; 2; i++) {
                Digit digit = new Digit(onColor, offColor, onEffect, offEffect);
                digit.setLayoutX(i * 80 + ((i + 1) % 2) * 20);
                digits[i] = digit;
                getChildren().add(digit);
            }
            // update digits to current time and start timer to update every second
            refreshClocks();
            play();
        }
        private void refreshClocks() {
            calendar.setTimeInMillis(System.currentTimeMillis());
            int seconds = 60-calendar.get(Calendar.SECOND);
            // load the image
            if (seconds&gt;30)
              { iv.setImage(img_dont_walk);}
            else
              { iv.setImage(img_walk);}

            if (seconds &gt;30)
              { seconds = seconds -30; }
            digits[0].showNumber(seconds / 10);
            digits[1].showNumber(seconds % 10);
        }
        public void play() {
            // wait till start of next second then start a timeline to call refreshClocks() every second
            delayTimeline = new Timeline();
            delayTimeline.getKeyFrames().add(
                    new KeyFrame(new Duration(1000 - (System.currentTimeMillis() % 1000)), new EventHandler&lt;ActionEvent&gt;() {
                        @Override public void handle(ActionEvent event) {
                            if (secondTimeline != null) {
                                secondTimeline.stop();
                            }
                            secondTimeline = new Timeline();
                            secondTimeline.setCycleCount(Timeline.INDEFINITE);
                            secondTimeline.getKeyFrames().add(
                                    new KeyFrame(Duration.seconds(1), new EventHandler&lt;ActionEvent&gt;() {
                                        @Override public void handle(ActionEvent event) {
                                            refreshClocks();
                                        }
                                    }));
                            secondTimeline.play();
                        }
                    })
            );
            delayTimeline.play();
        }
        public void stop(){
            delayTimeline.stop();
            if (secondTimeline != null) {
                secondTimeline.stop();
            }
        }
    }
    /**
     * Simple 7 segment LED style digit. It supports the numbers 0 through 9.
     */
    public static class Digit extends Parent {
        private static final boolean[][] DIGIT_COMBINATIONS = new boolean[][]{
                new boolean[]{true, false, true, true, true, true, true},
                new boolean[]{false, false, false, false, true, false, true},
                new boolean[]{true, true, true, false, true, true, false},
                new boolean[]{true, true, true, false, true, false, true},
                new boolean[]{false, true, false, true, true, false, true},
                new boolean[]{true, true, true, true, false, false, true},
                new boolean[]{true, true, true, true, false, true, true},
                new boolean[]{true, false, false, false, true, false, true},
                new boolean[]{true, true, true, true, true, true, true},
                new boolean[]{true, true, true, true, true, false, true}};
        private final Polygon[] polygons = new Polygon[]{
                new Polygon(2, 0, 52, 0, 42, 10, 12, 10),
                new Polygon(12, 49, 42, 49, 52, 54, 42, 59, 12f, 59f, 2f, 54f),
                new Polygon(12, 98, 42, 98, 52, 108, 2, 108),
                new Polygon(0, 2, 10, 12, 10, 47, 0, 52),
                new Polygon(44, 12, 54, 2, 54, 52, 44, 47),
                new Polygon(0, 56, 10, 61, 10, 96, 0, 106),
                new Polygon(44, 61, 54, 56, 54, 106, 44, 96)};
        private final Color onColor;
        private final Color offColor;
        private final Effect onEffect;
        private final Effect offEffect;
        public Digit(Color onColor, Color offColor, Effect onEffect, Effect offEffect) {
            this.onColor = onColor;
            this.offColor = offColor;
            this.onEffect = onEffect;
            this.offEffect = offEffect;
            getChildren().addAll(polygons);
            getTransforms().add(new Shear(-0.1,0));
            showNumber(0);
        }
        public void showNumber(Integer num) {
            if (num &lt; 0 || num &gt; 9) num = 0; // default to 0 for non-valid numbers
            for (int i = 0; i &lt; 7; i++) {
                polygons[i].setFill(DIGIT_COMBINATIONS[num][i] ? onColor : offColor);
                polygons[i].setEffect(DIGIT_COMBINATIONS[num][i] ? onEffect : offEffect);
            }
        }
    }
     public static void main(String[] args) {
         Application.launch(args);
     }
 }

Prototype Light Switch Program

lightSwitch.java

/*
   A small prototype for a switch-lights problem
   (four or more consecutive lights will be turned off)
   Henry Chen
   https://henry416.wordpress.com
*/

import java.io.*;
import javafx.application.*;
import javafx.scene.*;
import javafx.scene.paint.Color;
import javafx.stage.*;
import javafx.animation.*;
import javafx.util.*;
import javafx.scene.control.*;
import javafx.scene.layout.*;
import javafx.geometry.*;
import javafx.scene.text.*;
import javafx.scene.shape.*;
import javafx.animation.*;
import java.lang.*;
import javafx.scene.input.*;
import javafx.event.*;

public class lightSwitch extends Application
{
   int[] array = new int[25];
   Circle[] light = new Circle[25];

   @Override
   public void start (Stage stage)
   {
      init(stage);
      stage.show();
   }

   public void init(Stage stage)
   {
      Group root = new Group();
      Scene scene = new Scene(root, 700, 500, Color.WHITE);
      for (int i = 0; i < 25; i++)
      {
         circleChange(i,root);
      }
      root.getChildren().addAll(light);
      stage.setTitle("Lights Problem");
      stage.setScene(scene);
   }

   public void circleChange(final int i, final Group root)
   {
      light[i] = new Circle(i*25 + 25, 100, 10);
      light[i].setOnMouseClicked(new EventHandler<MouseEvent>()
      {
         public void handle(MouseEvent me)
         {
            if (array[i] == 1)
               array[i] = 0;
            else
               array[i] = 1;

            int counter = 0;
            int place = -1;
            boolean x = false;
            for (int a = 0; a < 25; a++)
            {
               if (array[a] == 1)
               {
                  if (x == false)
                  {
                     place = a;
                     x = true;
                  }
                  counter++;
               }
               else
               {
                  if (counter >= 4 && x == true)
                     a = 26;
                  else
                  {
                     x = false;
                     counter = 0;
                     place = -1;
                  }
               }
            }

            if (counter >= 4 && x == true)
               for (int b = 0; b < counter; b++)
                  array[b+place] = 0;

            paint();
            me.consume();
         }
      });
   }

   public void paint()
   {
      for (int a = 0; a < 25; a++)
      {
         if (array[a] == 0)
            light[a].setFill(Color.BLACK);
         else
            light[a].setFill(Color.RED);
      }
   }

   public static void main (String args[])
   {
      Application.launch(args);
   }
}