1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
|
package org.simpleframework.http.socket.table;
import java.util.LinkedHashMap;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.CopyOnWriteArraySet;
import java.util.concurrent.atomic.AtomicLong;
import org.simpleframework.http.socket.Session;
import org.simpleframework.http.socket.FrameChannel;
import org.simpleframework.http.socket.WebSocketAnalyzer;
import org.simpleframework.http.socket.service.Service;
import org.simpleframework.transport.trace.TraceAnalyzer;
public class WebSocketTableUpdater extends Thread implements Service {
private final Set<WebSocketTableSubscription> subscriptions;
private final WebSocketTableListener listener;
private final WebSocketTableRowChanger changer;
private final WebSocketTableSweeper sweeper;
private final WebSocketTable table;
private final AtomicLong time;
public WebSocketTableUpdater(String key, WebSocketTableSchema schema, WebSocketTableRowAnnotator annotator) {
this.subscriptions = new CopyOnWriteArraySet<WebSocketTableSubscription>();
this.table = new WebSocketTable(key, schema, annotator);
this.sweeper = new WebSocketTableSweeper(table);
this.changer = new WebSocketTableRowChanger(table);
this.listener = new WebSocketTableListener(this);
this.time = new AtomicLong();
}
public void refresh(Session session) {
for(WebSocketTableSubscription subscription : subscriptions) {
FrameChannel socket = subscription.getSocket();
FrameChannel other = session.getChannel();
if(socket == other) {
AtomicLong timeStamp = subscription.getTimeStamp();
timeStamp.set(0);
}
}
}
public void run() {
changer.start();
while(true) {
try {
Thread.sleep(200);
for(WebSocketTableSubscription subscription : subscriptions) {
FrameChannel socket = subscription.getSocket();
AtomicLong timeStamp = subscription.getTimeStamp();
AtomicLong sendCount = subscription.getSendCount();
long before = System.currentTimeMillis();
long time = timeStamp.get();
long count = sendCount.get();
try {
Map<WebSocketTableUpdateType, String> messages = sweeper.sweep(time - 1000, count);
Set<WebSocketTableUpdateType> updates = messages.keySet();
for(WebSocketTableUpdateType update : updates) {
String message = messages.get(update);
if(message != null) {
socket.send(update.code + message);
}
}
} catch(Exception e) {
e.printStackTrace();
subscriptions.remove(subscription);
socket.close();
} finally {
sendCount.getAndIncrement();
timeStamp.set(before);
}
}
} catch(Exception e) {
e.printStackTrace();
}
}
}
public void connect(Session connection) {
FrameChannel socket = connection.getChannel();
try {
WebSocketTableSubscription subscription = new WebSocketTableSubscription(socket);
socket.register(listener);
subscriptions.add(subscription);
time.set(0);
Thread.sleep(1000); // crap
time.set(0);
} catch(Exception e) {
e.printStackTrace();
}
}
public static void main(String[] list) throws Exception {
TraceAnalyzer agent = new WebSocketAnalyzer();
Map<String, WebSocketTableColumnStyle> columns = new LinkedHashMap<String, WebSocketTableColumnStyle>();
WebSocketTableSchema schema = new WebSocketTableSchema(columns);
columns.put("id", new WebSocketTableColumnStyle("id", "Id", "{id}", true, true));
columns.put("bidOutrightVolume", new WebSocketTableColumnStyle("bidOutrightVolume", "$ B", "<div style='font-weight: bold; color: #0000ff; text-decoration: underline;'>{bidOutrightVolume}</a>", true, false));
columns.put("bidOutright", new WebSocketTableColumnStyle("bidOutright", "Bid", "<div style='font-weight: bold; color: #0000ff; text-decoration: underline;'>{bidOutright}</a>", true, false));
columns.put("offerOutright", new WebSocketTableColumnStyle("offerOutright", "Offer", "<div style='font-weight: bold; color: #ff0000; text-decoration: underline;'>{offerOutright}</a>", true, false));
columns.put("offerOutrightVolume", new WebSocketTableColumnStyle("offerOutrightVolume", "$ O", "<div style='font-weight: bold; color: #ff0000; text-decoration: underline;'>{offerOutrightVolume}</a>", true, false));
columns.put("product", new WebSocketTableColumnStyle("product", "Security", "<div style='font-weight: bold;'>{product}</div>", true, true));
columns.put("bidEFPVolume", new WebSocketTableColumnStyle("bidEFPVolume", "$ B", "<div style='font-weight: bold; color: #0000ff; text-decoration: underline;'>{bidEFPVolume}</a>", true, false));
columns.put("bidEFP", new WebSocketTableColumnStyle("bidEFP", "Bid", "<div style='font-weight: bold; color: #0000ff; text-decoration: underline;'>{bidEFP}</a>", true, false));
columns.put("offerEFP", new WebSocketTableColumnStyle("offerEFP", "Offer", "<div style='font-weight: bold; color: #ff0000; text-decoration: underline;'>{offerEFP}</a>", true, false));
columns.put("offerEFPVolume", new WebSocketTableColumnStyle("offerEFPVolume", "$ O", "<div style='font-weight: bold; color: #ff0000; text-decoration: underline;'>{offerEFPVolume}</a>", true, false));
columns.put("reference", new WebSocketTableColumnStyle("reference", "Ref", "{reference}", true, true));
WebSocketTableRowAnnotator annotator = new WebSocketTableRowAnnotator(schema);
WebSocketTableUpdater application = new WebSocketTableUpdater("product", schema, annotator);
WebSocketTableUpdaterApplication container = new WebSocketTableUpdaterApplication(application, agent, 6060);
application.start();
container.connect();
}
}
|