1 module des.flow.sync; 2 3 import std.traits; 4 5 import des.flow.base; 6 import des.flow.event; 7 import des.flow.signal; 8 import des.flow.thread; 9 10 /// FThread communication struct 11 struct Communication 12 { 13 shared SyncList!Command commands; /// 14 shared SyncList!CtrlSignal signals; /// 15 shared SyncList!(FThread.Info) info; /// 16 shared SyncList!Event eventbus; /// 17 shared HubOutput!Event listener; /// 18 19 /// create all fields 20 void initialize() 21 { 22 commands = new shared SyncList!Command; 23 signals = new shared SyncList!CtrlSignal; 24 info = new shared SyncList!(FThread.Info); 25 eventbus = new shared SyncList!Event; 26 listener = new shared HubOutput!Event; 27 28 info.pushBack( FThread.Info( FThread.State.NONE, FThread.Error.NONE, "" ) ); 29 } 30 } 31 32 /// 33 interface SyncOutput(T) { /++ +/ synchronized void pushBack( in T val ); } 34 35 /// 36 synchronized class SyncList(T) : SyncOutput!T 37 { 38 /// 39 protected T[] list; 40 41 /// 42 void pushBack( in T obj ) 43 { 44 static if( isBasicType!T ) list ~= obj; 45 else static if( isArray!T ) list ~= obj.dup; 46 else list = list ~ T(obj); 47 } 48 49 /// 50 void popBack() { if( list.length ) list = list[0..$-1]; } 51 52 /// 53 T popAndReturnBack() 54 { 55 auto buf = back; 56 popBack(); 57 return buf; 58 } 59 60 /// 61 T[] clearAndReturnAll() 62 { 63 auto r = cast(T[])list.dup; 64 list.length = 0; 65 return r; 66 } 67 68 @property 69 { 70 /// 71 bool empty() const { return list.length == 0; } 72 73 /// 74 auto back() const 75 { 76 static if( isBasicType!T ) return list[$-1]; 77 else static if( isArray!T ) return list[$-1].idup; 78 else return T(list[$-1]); 79 } 80 } 81 } 82 83 84 /// 85 version(unittest) void syncTest(T)( T a, T b ) 86 { 87 auto sl = new shared SyncList!T; 88 89 assert( sl.empty ); 90 sl.pushBack( a ); 91 assert( !sl.empty ); 92 assert( eq( a, sl.back ) ); 93 assert( !sl.empty ); 94 assert( eq( a, sl.back ) ); 95 sl.pushBack( b ); 96 assert( eq( b, sl.back ) ); 97 sl.popBack(); 98 assert( eq( a, sl.back ) ); 99 auto val = sl.popAndReturnBack(); 100 assert( sl.empty ); 101 assert( eq( a, val ) ); 102 sl.pushBack( a ); 103 sl.pushBack( b ); 104 assert( !sl.empty ); 105 auto arr = sl.clearAndReturnAll(); 106 assert( sl.empty ); 107 assert( eq( arr, [ a, b ] ) ); 108 } 109 110 unittest 111 { 112 assert( creationTest( CtrlSignal(0) ) ); 113 assert( creationTest( FThread.Info( FThread.State.PAUSE ) ) ); 114 assert( creationTest( Event( 0, [1,2] ) ) ); 115 116 syncTest( 1.2, 3.4 ); 117 syncTest( "hello", "world" ); 118 syncTest( Command.START, Command.PAUSE ); 119 syncTest( CtrlSignal(0), CtrlSignal(1) ); 120 syncTest( FThread.Info( FThread.State.PAUSE ), 121 FThread.Info( FThread.State.WORK )); 122 syncTest( Event(0,[1,2]), Event(1,[2,3]) ); 123 } 124 125 /// 126 synchronized class HubOutput(T): SyncOutput!T 127 { 128 /// 129 alias SyncOutput!T LST; 130 131 /// 132 protected LST[] listeners; 133 134 private void check( shared LST checked ) 135 { 136 auto ll = cast(shared HubOutput)checked; 137 if( ll is null ) return; 138 if( ll is this ) throw new FlowException( "listener found cycle link" ); 139 foreach( lll; ll.listeners ) check( lll ); 140 } 141 142 /// 143 void pushBack( in T val ) 144 { 145 foreach( listener; listeners ) 146 listener.pushBack( val ); 147 } 148 149 /// 150 bool inList( shared LST sync ) 151 { 152 foreach( l; listeners ) 153 if( l is sync ) return true; 154 return false; 155 } 156 157 /// 158 void add( shared LST[] syncs... ) 159 { 160 foreach( s; syncs ) 161 { 162 debug check(s); 163 if( !inList(s) ) 164 listeners ~= s; 165 } 166 } 167 168 /// 169 void del( shared LST[] syncs... ) 170 { 171 typeof(listeners) moved; 172 m: foreach( lst; listeners ) 173 { 174 foreach( ds; syncs ) 175 if( ds == lst ) 176 continue m; 177 moved ~= lst; 178 } 179 listeners = moved; 180 } 181 } 182 183 unittest 184 { 185 auto sl1 = new shared SyncList!int; 186 auto sl2 = new shared SyncList!int; 187 188 auto hub = new shared HubOutput!int; 189 190 hub.add( sl1 ); 191 // duplicate adding 192 hub.add( sl1 ); 193 hub.pushBack( 12 ); 194 195 hub.add( sl2 ); 196 hub.pushBack( 23 ); 197 198 assert( eq( sl1.list, [ 12, 23 ] ) ); 199 assert( eq( sl2.list, [ 23 ] ) ); 200 201 hub.del( sl1 ); 202 hub.pushBack( 34 ); 203 assert( eq( sl1.list, [ 12, 23 ] ) ); 204 assert( eq( sl2.list, [ 23, 34 ] ) ); 205 206 hub.add( sl1, sl2 ); 207 hub.pushBack( 45 ); 208 assert( eq( sl1.list, [ 12, 23, 45 ] ) ); 209 assert( eq( sl2.list, [ 23, 34, 45 ] ) ); 210 }