1 module des.flow.thread; 2 3 import std..string; 4 5 import std.algorithm; 6 import std.array; 7 8 import std.datetime; 9 import core.thread; 10 11 import des.arch.emm; 12 13 import des.flow.base; 14 import des.flow.event; 15 import des.flow.element; 16 import des.flow.signal; 17 import des.flow.sync; 18 import des.flow.sysevdata; 19 20 /// 21 class FThreadException : FlowException 22 { 23 /// 24 this( string msg, string file=__FILE__, size_t line=__LINE__ ) @safe pure nothrow 25 { super( msg, file, line ); } 26 } 27 28 /// Thread wrap 29 class FThread 30 { 31 protected: 32 33 /// 34 Communication com; 35 36 /// core.thread.Thread 37 Thread thread; 38 39 /// 40 string self_name; 41 42 public: 43 44 /// 45 enum State 46 { 47 NONE, /// not inited 48 PAUSE, /// inited, not worked 49 WORK /// inited, worked 50 }; 51 52 /// addational info part 53 enum Error 54 { 55 NONE, /// 56 FTHREAD, /// 57 FLOW, /// 58 EXCEPT, /// 59 FATAL /// unrecoverable error 60 }; 61 62 /// 63 static struct Info 64 { 65 /// 66 State state; 67 /// 68 Error error; 69 /// 70 string message; 71 /// 72 ulong timestamp; 73 74 /// 75 this( State state, Error error=Error.NONE, string msg="" ) 76 { 77 this.state = state; 78 this.error = error; 79 message = msg; 80 timestamp = currentTick; 81 } 82 83 private enum ctor_text = ` this( in Info fts ) 84 { 85 state = fts.state; 86 error = fts.error; 87 message = fts.message; 88 timestamp = fts.timestamp; 89 } 90 `; 91 92 mixin( ctor_text ); 93 mixin( "const" ~ ctor_text ); 94 mixin( "immutable" ~ ctor_text ); 95 mixin( "shared" ~ ctor_text ); 96 mixin( "shared const" ~ ctor_text ); 97 } 98 99 /++ 100 params: 101 name = name of thread 102 func = work element creation function 103 args = args for function 104 +/ 105 this(Args...)( string name, WorkElement function(Args) func, Args args ) 106 in { assert( func !is null ); } body 107 { 108 thread = new Thread({ tmain( com, func, args ); }); 109 thread.name = name; 110 self_name = name; 111 com.initialize(); 112 thread.start(); 113 debug logger.Debug( "name: '%s'", name ); 114 } 115 116 @property 117 { 118 /++ getting last information of thread 119 returns: 120 `Info` 121 +/ 122 auto info() const { return Info(com.info.back); } 123 124 /// getting name of fthread 125 auto name() const { return self_name; } 126 } 127 128 /// take all signals from work element 129 auto takeAllSignals() 130 { return com.signals.clearAndReturnAll(); } 131 132 /// push command for changing state of thread 133 void pushCommand( Command cmd ) 134 { 135 com.commands.pushBack( cmd ); 136 debug logger.trace( "thread: '%s', command: '%s'", name, cmd ); 137 } 138 139 /// push event for processing in work element 140 void pushEvent( in Event ev ) 141 { 142 com.eventbus.pushBack( ev ); 143 debug logger.trace( "thread: '%s', event code: %d", name, ev.code ); 144 } 145 146 /// core.thread.Thread.join 147 void join() { thread.join(); } 148 149 /// add listener FThread, listeners gets events from work element 150 void addListener( FThread[] thrs... ) 151 { 152 foreach( t; thrs ) 153 com.listener.add( t.com.eventbus ); 154 debug logger.Debug( "thread: '%s', listeners: %s", name, array(map!(a=>a.name)(thrs)) ); 155 } 156 157 /// delete listener 158 void delListener( FThread th ) 159 { 160 com.listener.del( th.com.eventbus ); 161 debug logger.Debug( "thread: '%s', listener: %s", name, th.name ); 162 } 163 } 164 165 version(none) 166 { 167 unittest 168 { 169 static class TestElement : WorkElement 170 { 171 this() { stderr.writeln( "init" ); } 172 override EventProcessor[] getEventProcessors() 173 { 174 return [ 175 new FunctionEventProcessor( (in Event ev) 176 { 177 if( ev.isSystem ) 178 stderr.writeln( "system event: ", 179 ((cast(Event)ev).data.as!SysEvData).msg ); 180 }) 181 ]; 182 } 183 override void process() { stderr.writeln( "process" ); } 184 protected void selfDestroy() { stderr.writeln( "destroy" ); } 185 } 186 187 auto fth = new FThread( "test", { return new TestElement; } ); 188 fth.pushCommand( Command.START ); 189 Thread.sleep(dur!"usecs"(20)); 190 fth.pushCommand( Command.PAUSE ); 191 Thread.sleep(dur!"msecs"(20)); 192 fth.pushCommand( Command.START ); 193 Thread.sleep(dur!"usecs"(20)); 194 fth.pushCommand( Command.REINIT ); 195 Thread.sleep(dur!"usecs"(20)); 196 fth.pushCommand( Command.START ); 197 Thread.sleep(dur!"usecs"(20)); 198 fth.pushCommand( Command.CLOSE ); 199 fth.join(); 200 } 201 } 202 203 private 204 { 205 void tmain(Args...)( Communication com, WorkElement function(Args) func, Args args ) 206 { 207 208 Logger logger = new InstanceLogger( __MODULE__ ~ ".WorkProcessor", Thread.getThis().name ); 209 auto wp = createProcessor( com, logger, func, args ); 210 if( wp is null ) return; 211 scope(exit) fullTerminate(wp); 212 try while( wp.hasWork ) wp.process(); 213 catch( Throwable e ) com.info.pushBack( convertToErrorInfo(logger,e) ); 214 } 215 216 auto createProcessor(Args...)( Communication com, Logger logger, WorkElement function(Args) func, Args args ) 217 { 218 try return new WorkProcessor!Args( com, logger, func, args ); 219 catch( Throwable e ) 220 { 221 com.info.pushBack( convertToErrorInfo(logger,e) ); 222 return null; 223 } 224 } 225 226 final class WorkProcessor(Args...) : ExternalMemoryManager, CtrlSignalProcessor, EventProcessor 227 { 228 mixin EMM; 229 230 Args args; 231 WorkElement function(Args) func; 232 WorkElement elem; 233 EventProcessor[] evprocs; 234 235 Communication com; 236 237 bool work = false; 238 bool has_work = true; 239 240 Logger logger; 241 242 this( Communication com, Logger logger, WorkElement function(Args) func, Args args ) 243 { 244 this.com = com; 245 this.func = func; 246 this.args = args; 247 248 this.logger = logger; 249 250 init(); 251 debug logger.Debug( "pass" ); 252 } 253 254 @property bool hasWork() const { return has_work; } 255 256 void process() 257 { 258 debug logger.trace( "start process" ); 259 if( work ) 260 { 261 foreach( e; com.eventbus.clearAndReturnAll() ) 262 transmitEvent( e ); 263 264 elem.process(); 265 266 debug logger.trace( "events processed" ); 267 } 268 269 foreach( scmd; com.commands.clearAndReturnAll() ) 270 { 271 auto cmd = cast(Command)scmd; 272 273 debug logger.trace( "process command '%s'", cmd ); 274 final switch( cmd ) 275 { 276 case Command.START: start(); break; 277 case Command.PAUSE: pause(); break; 278 case Command.STOP: stop(); break; 279 case Command.REINIT: reinit(); break; 280 case Command.CLOSE: close(); break; 281 } 282 debug logger.trace( "process command '%s' pass", cmd ); 283 } 284 debug logger.trace( "process pass" ); 285 } 286 287 void transmitEvent( in Event event ) 288 { 289 foreach( ep; evprocs ) 290 ep.processEvent( event ); 291 } 292 293 // SignalProcessor 294 void processCtrlSignal( in CtrlSignal sig ) { com.signals.pushBack(sig); } 295 296 // EventProcessor 297 void processEvent( in Event ev ) { com.listener.pushBack(ev); } 298 299 void pushEvent( in Event ev ) { com.eventbus.pushBack( ev ); } 300 301 void pushInfo( FThread.State state, FThread.Error error=FThread.Error.NONE, string msg="" ) 302 { com.info.pushBack( FThread.Info( state, error, msg ) ); } 303 304 void init() 305 { 306 debug logger.Debug( "start" ); 307 elem = func(args); 308 309 if( elem is null ) 310 throw new FThreadException( "creation func return null" ); 311 312 elem.setCtrlSignalProcessor( this ); 313 elem.setEventListener( this ); 314 315 evprocs = elem.getEventProcessors(); 316 317 pause(); 318 debug logger.Debug( "pass" ); 319 } 320 321 void start() 322 { 323 pushInfo( FThread.State.WORK ); 324 transmitEvent( Event.system( SysEvData.work ) ); 325 work = true; 326 debug logger.Debug( "pass" ); 327 } 328 329 void pause() 330 { 331 pushInfo( FThread.State.PAUSE ); 332 transmitEvent( Event.system( SysEvData.pause ) ); 333 work = false; 334 debug logger.Debug( "pass" ); 335 } 336 337 void stop() 338 { 339 pushInfo( FThread.State.NONE ); 340 if( elem is null ) return; 341 pause(); 342 evprocs.length = 0; 343 elem.destroy(); 344 elem = null; 345 debug logger.Debug( "pass" ); 346 } 347 348 void reinit() 349 { 350 stop(); 351 init(); 352 } 353 354 void close() 355 { 356 stop(); 357 has_work = false; 358 debug logger.Debug( "pass" ); 359 } 360 361 protected: 362 363 void selfDestroy() 364 { 365 stop(); 366 debug logger.Debug( "pass" ); 367 } 368 } 369 370 void fullTerminate(T)( ref T obj ) 371 { 372 if( is( T : ExternalMemoryManager ) && obj !is null ) 373 obj.destroy(); 374 obj = null; 375 } 376 377 FThread.Info convertToErrorInfo( Logger logger, Throwable e ) 378 in{ assert( logger !is null ); } body 379 { 380 if( auto pe = cast(FThreadException)e ) 381 { 382 logger.error( "EXCEPTION: fthread exc: %s", pe ); 383 return errorInfo( FThread.Error.FTHREAD, e.msg ); 384 } 385 else if( auto fe = cast(FlowException)e ) 386 { 387 logger.error( "EXCEPTION: flow exc: %s", fe ); 388 return errorInfo( FThread.Error.FLOW, e.msg ); 389 } 390 else if( auto se = cast(Exception)e ) 391 { 392 logger.error( "EXCEPTION: %s", se ); 393 return errorInfo( FThread.Error.EXCEPT, e.msg ); 394 } 395 else if( auto te = cast(Throwable)e ) 396 { 397 logger.error( "FATAL: %s", te ); 398 return errorInfo( FThread.Error.FATAL, e.msg ); 399 } 400 assert(0,"unknown exception"); 401 } 402 403 FThread.Info errorInfo( FThread.Error error, string msg ) 404 { return FThread.Info( FThread.State.NONE, error, msg ); } 405 }