1 module des.flow.sync;
2 
3 import std.traits;
4 
5 import des.flow.base;
6 import des.flow.event;
7 import des.flow.signal;
8 import des.flow.thread;
9 
10 /// FThread communication struct
11 struct Communication
12 {
13     shared SyncList!Command        commands; ///
14     shared SyncList!CtrlSignal     signals;  ///
15     shared SyncList!(FThread.Info) info;     ///
16     shared SyncList!Event          eventbus; ///
17     shared HubOutput!Event         listener; ///
18 
19     /// create all fields
20     void initialize()
21     {
22         commands = new shared SyncList!Command;
23         signals  = new shared SyncList!CtrlSignal;
24         info     = new shared SyncList!(FThread.Info);
25         eventbus = new shared SyncList!Event;
26         listener = new shared HubOutput!Event;
27 
28         info.pushBack( FThread.Info( FThread.State.NONE, FThread.Error.NONE, "" ) );
29     }
30 }
31 
32 ///
33 interface SyncOutput(T) { /++ +/ synchronized void pushBack( in T val ); }
34 
35 ///
36 synchronized class SyncList(T) : SyncOutput!T
37 {
38     ///
39     protected T[] list;
40 
41     ///
42     void pushBack( in T obj )
43     {
44         static if( isBasicType!T ) list ~= obj;
45         else static if( isArray!T ) list ~= obj.dup;
46         else list = list ~ T(obj);
47     }
48 
49     ///
50     void popBack() { if( list.length ) list = list[0..$-1]; }
51 
52     ///
53     T popAndReturnBack()
54     {
55         auto buf = back;
56         popBack();
57         return buf;
58     }
59 
60     ///
61     T[] clearAndReturnAll()
62     {
63         auto r = cast(T[])list.dup;
64         list.length = 0;
65         return r;
66     }
67 
68     @property
69     {
70         ///
71         bool empty() const { return list.length == 0; }
72 
73         ///
74         auto back() const
75         {
76             static if( isBasicType!T ) return list[$-1];
77             else static if( isArray!T ) return list[$-1].idup;
78             else return T(list[$-1]);
79         }
80     }
81 }
82 
83 
84 ///
85 version(unittest) void syncTest(T)( T a, T b )
86 {
87     auto sl = new shared SyncList!T;
88 
89     assert( sl.empty );
90     sl.pushBack( a );
91     assert( !sl.empty );
92     assert( eq( a, sl.back ) );
93     assert( !sl.empty );
94     assert( eq( a, sl.back ) );
95     sl.pushBack( b );
96     assert( eq( b, sl.back ) );
97     sl.popBack();
98     assert( eq( a, sl.back ) );
99     auto val = sl.popAndReturnBack();
100     assert( sl.empty );
101     assert( eq( a, val ) );
102     sl.pushBack( a );
103     sl.pushBack( b );
104     assert( !sl.empty );
105     auto arr = sl.clearAndReturnAll();
106     assert( sl.empty );
107     assert( eq( arr, [ a, b ] ) );
108 }
109 
110 unittest
111 {
112     assert( creationTest( CtrlSignal(0) ) );
113     assert( creationTest( FThread.Info( FThread.State.PAUSE ) ) );
114     assert( creationTest( Event( 0, [1,2] ) ) );
115 
116     syncTest( 1.2, 3.4 );
117     syncTest( "hello", "world" );
118     syncTest( Command.START, Command.PAUSE );
119     syncTest( CtrlSignal(0), CtrlSignal(1) );
120     syncTest( FThread.Info( FThread.State.PAUSE ), 
121               FThread.Info( FThread.State.WORK ));
122     syncTest( Event(0,[1,2]), Event(1,[2,3]) );
123 }
124 
125 ///
126 synchronized class HubOutput(T): SyncOutput!T
127 {
128     ///
129     alias SyncOutput!T LST;
130 
131     ///
132     protected LST[] listeners;
133 
134     private void check( shared LST checked )
135     {
136         auto ll = cast(shared HubOutput)checked;
137         if( ll is null ) return;
138         if( ll is this ) throw new FlowException( "listener found cycle link" );
139         foreach( lll; ll.listeners ) check( lll );
140     }
141 
142     ///
143     void pushBack( in T val )
144     {
145         foreach( listener; listeners )
146             listener.pushBack( val );
147     }
148 
149     ///
150     bool inList( shared LST sync )
151     {
152         foreach( l; listeners )
153             if( l is sync ) return true;
154         return false;
155     }
156 
157     ///
158     void add( shared LST[] syncs... )
159     {
160         foreach( s; syncs )
161         {
162             debug check(s);
163             if( !inList(s) )
164                 listeners ~= s;
165         }
166     }
167 
168     ///
169     void del( shared LST[] syncs... )
170     {
171         typeof(listeners) moved;
172         m: foreach( lst; listeners )
173         {
174             foreach( ds; syncs )
175                 if( ds == lst )
176                     continue m;
177             moved ~= lst;
178         }
179         listeners = moved;
180     }
181 }
182 
183 unittest
184 {
185     auto sl1 = new shared SyncList!int;
186     auto sl2 = new shared SyncList!int;
187 
188     auto hub = new shared HubOutput!int;
189 
190     hub.add( sl1 );
191     // duplicate adding
192     hub.add( sl1 );
193     hub.pushBack( 12 );
194 
195     hub.add( sl2 );
196     hub.pushBack( 23 );
197 
198     assert( eq( sl1.list, [ 12, 23 ] ) );
199     assert( eq( sl2.list, [ 23 ] ) );
200 
201     hub.del( sl1 );
202     hub.pushBack( 34 );
203     assert( eq( sl1.list, [ 12, 23 ] ) );
204     assert( eq( sl2.list, [ 23, 34 ] ) );
205 
206     hub.add( sl1, sl2 );
207     hub.pushBack( 45 );
208     assert( eq( sl1.list, [ 12, 23, 45 ] ) );
209     assert( eq( sl2.list, [ 23, 34, 45 ] ) );
210 }