summaryrefslogtreecommitdiffstats
path: root/simple/simple-http/src/test/java/org/simpleframework/http/socket/table/WebSocketTableUpdater.java
blob: 0c4ec4dee8952a3f4d87bb687836abc69004e984 (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
package org.simpleframework.http.socket.table;

import java.util.LinkedHashMap;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.CopyOnWriteArraySet;
import java.util.concurrent.atomic.AtomicLong;

import org.simpleframework.http.socket.Session;
import org.simpleframework.http.socket.FrameChannel;
import org.simpleframework.http.socket.WebSocketAnalyzer;
import org.simpleframework.http.socket.service.Service;
import org.simpleframework.transport.trace.TraceAnalyzer;

public class WebSocketTableUpdater extends Thread implements Service {
   
   private final Set<WebSocketTableSubscription> subscriptions;
   private final WebSocketTableListener listener;
   private final WebSocketTableRowChanger changer;
   private final WebSocketTableSweeper sweeper;   
   private final WebSocketTable table;
   private final AtomicLong time;
   
   public WebSocketTableUpdater(String key, WebSocketTableSchema schema, WebSocketTableRowAnnotator annotator) {
      this.subscriptions = new CopyOnWriteArraySet<WebSocketTableSubscription>();
      this.table = new WebSocketTable(key, schema, annotator);
      this.sweeper = new WebSocketTableSweeper(table);
      this.changer = new WebSocketTableRowChanger(table);
      this.listener = new WebSocketTableListener(this);
      this.time = new AtomicLong();
   }
   
   public void refresh(Session session) {
      for(WebSocketTableSubscription subscription : subscriptions) {
         FrameChannel socket = subscription.getSocket();  
         FrameChannel other = session.getChannel();
         
         if(socket == other) {
            AtomicLong timeStamp = subscription.getTimeStamp();
            timeStamp.set(0);
         }
      }
   }
   
   public void run() {     
      changer.start();
      
      while(true) {
         try {
            Thread.sleep(200);
            
            for(WebSocketTableSubscription subscription : subscriptions) {
               FrameChannel socket = subscription.getSocket();            
               AtomicLong timeStamp = subscription.getTimeStamp();
               AtomicLong sendCount = subscription.getSendCount();
               long before = System.currentTimeMillis();
               long time = timeStamp.get();                
               long count = sendCount.get();
           
               try {                   
                  Map<WebSocketTableUpdateType, String> messages = sweeper.sweep(time - 1000, count);
                  Set<WebSocketTableUpdateType> updates = messages.keySet();
                  
                  for(WebSocketTableUpdateType update : updates) {
                     String message = messages.get(update);

                     if(message != null) {
                        socket.send(update.code + message);
                     }
                  }
               } catch(Exception e) {
                  e.printStackTrace();
                  subscriptions.remove(subscription);
                  socket.close();
               } finally {
                  sendCount.getAndIncrement();
                  timeStamp.set(before);
               }
            }
         } catch(Exception e) {
            e.printStackTrace();
         }
      }
   }

   public void connect(Session connection) {
      FrameChannel socket = connection.getChannel();   
      
      try {
         WebSocketTableSubscription subscription = new WebSocketTableSubscription(socket);
               
         socket.register(listener);
         subscriptions.add(subscription);
         time.set(0);
         Thread.sleep(1000); // crap
         time.set(0);
      } catch(Exception e) {
         e.printStackTrace();
      }
      
   }
   
   public static void main(String[] list) throws Exception {
      TraceAnalyzer agent = new WebSocketAnalyzer();
      Map<String, WebSocketTableColumnStyle> columns = new LinkedHashMap<String, WebSocketTableColumnStyle>();
      
      WebSocketTableSchema schema = new WebSocketTableSchema(columns);      
      columns.put("id", new WebSocketTableColumnStyle("id", "Id", "{id}", true, true));      
      columns.put("bidOutrightVolume", new WebSocketTableColumnStyle("bidOutrightVolume", "$ B", "<div style='font-weight: bold; color: #0000ff; text-decoration: underline;'>{bidOutrightVolume}</a>", true, false));
      columns.put("bidOutright", new WebSocketTableColumnStyle("bidOutright", "Bid", "<div style='font-weight: bold; color: #0000ff; text-decoration: underline;'>{bidOutright}</a>", true, false));      
      columns.put("offerOutright", new WebSocketTableColumnStyle("offerOutright", "Offer", "<div style='font-weight: bold; color: #ff0000; text-decoration: underline;'>{offerOutright}</a>", true, false));
      columns.put("offerOutrightVolume", new WebSocketTableColumnStyle("offerOutrightVolume", "$ O", "<div style='font-weight: bold; color: #ff0000; text-decoration: underline;'>{offerOutrightVolume}</a>", true, false));      
      columns.put("product", new WebSocketTableColumnStyle("product", "Security", "<div style='font-weight: bold;'>{product}</div>", true, true));            
      columns.put("bidEFPVolume", new WebSocketTableColumnStyle("bidEFPVolume", "$ B", "<div style='font-weight: bold; color: #0000ff; text-decoration: underline;'>{bidEFPVolume}</a>", true, false));
      columns.put("bidEFP", new WebSocketTableColumnStyle("bidEFP", "Bid", "<div style='font-weight: bold; color: #0000ff; text-decoration: underline;'>{bidEFP}</a>", true, false));      
      columns.put("offerEFP", new WebSocketTableColumnStyle("offerEFP", "Offer", "<div style='font-weight: bold; color: #ff0000; text-decoration: underline;'>{offerEFP}</a>", true, false));
      columns.put("offerEFPVolume", new WebSocketTableColumnStyle("offerEFPVolume", "$ O", "<div style='font-weight: bold; color: #ff0000; text-decoration: underline;'>{offerEFPVolume}</a>", true, false));
      columns.put("reference", new WebSocketTableColumnStyle("reference", "Ref", "{reference}", true, true));      
      WebSocketTableRowAnnotator annotator = new WebSocketTableRowAnnotator(schema);
      WebSocketTableUpdater application = new WebSocketTableUpdater("product", schema, annotator);
      
      WebSocketTableUpdaterApplication container = new WebSocketTableUpdaterApplication(application, agent, 6060);
      application.start();
      container.connect();      
   }
}