1 module des.flow.thread;
2 
3 import std..string;
4 
5 import std.algorithm;
6 import std.array;
7 
8 import std.datetime;
9 import core.thread;
10 
11 import des.arch.emm;
12 
13 import des.flow.base;
14 import des.flow.event;
15 import des.flow.element;
16 import des.flow.signal;
17 import des.flow.sync;
18 import des.flow.sysevdata;
19 
20 ///
21 class FThreadException : FlowException
22 {
23     ///
24     this( string msg, string file=__FILE__, size_t line=__LINE__ ) @safe pure nothrow
25     { super( msg, file, line ); }
26 }
27 
28 /// Thread wrap
29 class FThread
30 {
31 protected:
32 
33     ///
34     Communication com;
35 
36     /// core.thread.Thread
37     Thread thread;
38 
39     ///
40     string self_name;
41 
42 public:
43 
44     ///
45     enum State
46     {
47         NONE,  /// not inited
48         PAUSE, /// inited, not worked
49         WORK   /// inited, worked
50     };
51 
52     /// addational info part
53     enum Error
54     {
55         NONE,    /// 
56         FTHREAD, ///
57         FLOW,    ///
58         EXCEPT,  ///
59         FATAL    /// unrecoverable error
60     };
61 
62     ///
63     static struct Info
64     {
65         ///
66         State state;
67         ///
68         Error error;
69         ///
70         string message;
71         ///
72         ulong timestamp;
73 
74         ///
75         this( State state, Error error=Error.NONE, string msg="" )
76         {
77             this.state = state;
78             this.error = error;
79             message = msg;
80             timestamp = currentTick;
81         }
82 
83         private enum ctor_text = ` this( in Info fts )
84         {
85             state = fts.state;
86             error = fts.error;
87             message = fts.message;
88             timestamp = fts.timestamp;
89         }
90         `;
91 
92         mixin( ctor_text );
93         mixin( "const" ~ ctor_text );
94         mixin( "immutable" ~ ctor_text );
95         mixin( "shared" ~ ctor_text );
96         mixin( "shared const" ~ ctor_text );
97     }
98 
99     /++
100         params:
101         name = name of thread
102         func = work element creation function
103         args = args for function
104      +/
105     this(Args...)( string name, WorkElement function(Args) func, Args args )
106     in { assert( func !is null ); } body
107     {
108         thread = new Thread({ tmain( com, func, args ); });
109         thread.name = name;
110         self_name = name;
111         com.initialize();
112         thread.start();
113         debug logger.Debug( "name: '%s'", name );
114     }
115 
116     @property
117     {
118         /++ getting last information of thread 
119             returns:
120             `Info`
121          +/
122         auto info() const { return Info(com.info.back); }
123 
124         /// getting name of fthread
125         auto name() const { return self_name; }
126     }
127 
128     /// take all signals from work element
129     auto takeAllSignals()
130     { return com.signals.clearAndReturnAll(); }
131 
132     /// push command for changing state of thread
133     void pushCommand( Command cmd )
134     {
135         com.commands.pushBack( cmd );
136         debug logger.trace( "thread: '%s', command: '%s'", name, cmd );
137     }
138 
139     /// push event for processing in work element
140     void pushEvent( in Event ev )
141     {
142         com.eventbus.pushBack( ev );
143         debug logger.trace( "thread: '%s', event code: %d", name, ev.code );
144     }
145 
146     /// core.thread.Thread.join
147     void join() { thread.join(); }
148 
149     /// add listener FThread, listeners gets events from work element
150     void addListener( FThread[] thrs... )
151     {
152         foreach( t; thrs )
153             com.listener.add( t.com.eventbus );
154         debug logger.Debug( "thread: '%s', listeners: %s", name, array(map!(a=>a.name)(thrs)) );
155     }
156 
157     /// delete listener
158     void delListener( FThread th )
159     {
160         com.listener.del( th.com.eventbus );
161         debug logger.Debug( "thread: '%s', listener: %s", name, th.name );
162     }
163 }
164 
165 version(none)
166 {
167 unittest
168 {
169     static class TestElement : WorkElement
170     {
171         this() { stderr.writeln( "init" ); }
172         override EventProcessor[] getEventProcessors()
173         {
174             return [
175                 new FunctionEventProcessor( (in Event ev) 
176                 {
177                     if( ev.isSystem )
178                         stderr.writeln( "system event: ",
179                             ((cast(Event)ev).data.as!SysEvData).msg );
180                 })
181             ];
182         }
183         override void process() { stderr.writeln( "process" ); }
184         protected void selfDestroy() { stderr.writeln( "destroy" ); }
185     }
186 
187     auto fth = new FThread( "test", { return new TestElement; } );
188     fth.pushCommand( Command.START );
189     Thread.sleep(dur!"usecs"(20));
190     fth.pushCommand( Command.PAUSE );
191     Thread.sleep(dur!"msecs"(20));
192     fth.pushCommand( Command.START );
193     Thread.sleep(dur!"usecs"(20));
194     fth.pushCommand( Command.REINIT );
195     Thread.sleep(dur!"usecs"(20));
196     fth.pushCommand( Command.START );
197     Thread.sleep(dur!"usecs"(20));
198     fth.pushCommand( Command.CLOSE );
199     fth.join();
200 }
201 }
202 
203 private
204 {
205     void tmain(Args...)( Communication com, WorkElement function(Args) func, Args args )
206     {
207 
208         Logger logger = new InstanceLogger( __MODULE__ ~ ".WorkProcessor", Thread.getThis().name );
209         auto wp = createProcessor( com, logger, func, args );
210         if( wp is null ) return;
211         scope(exit) fullTerminate(wp);
212         try while( wp.hasWork ) wp.process();
213         catch( Throwable e ) com.info.pushBack( convertToErrorInfo(logger,e) );
214     }
215 
216     auto createProcessor(Args...)( Communication com, Logger logger, WorkElement function(Args) func, Args args )
217     {
218         try return new WorkProcessor!Args( com, logger, func, args );
219         catch( Throwable e )
220         {
221             com.info.pushBack( convertToErrorInfo(logger,e) );
222             return null;
223         }
224     }
225 
226     final class WorkProcessor(Args...) : ExternalMemoryManager, CtrlSignalProcessor, EventProcessor
227     {
228         mixin EMM;
229 
230         Args args;
231         WorkElement function(Args) func;
232         WorkElement elem;
233         EventProcessor[] evprocs;
234 
235         Communication com;
236 
237         bool work = false;
238         bool has_work = true;
239 
240         Logger logger;
241 
242         this( Communication com, Logger logger, WorkElement function(Args) func, Args args )
243         {
244             this.com = com;
245             this.func = func;
246             this.args = args;
247 
248             this.logger = logger;
249 
250             init();
251             debug logger.Debug( "pass" );
252         }
253 
254         @property bool hasWork() const { return has_work; }
255 
256         void process()
257         {
258             debug logger.trace( "start process" );
259             if( work )
260             {
261                 foreach( e; com.eventbus.clearAndReturnAll() )
262                     transmitEvent( e );
263 
264                 elem.process();
265 
266                 debug logger.trace( "events processed" );
267             }
268 
269             foreach( scmd; com.commands.clearAndReturnAll() )
270             {
271                 auto cmd = cast(Command)scmd;
272 
273                 debug logger.trace( "process command '%s'", cmd );
274                 final switch( cmd )
275                 {
276                     case Command.START:  start();  break;
277                     case Command.PAUSE:  pause();  break;
278                     case Command.STOP:   stop();   break;
279                     case Command.REINIT: reinit(); break;
280                     case Command.CLOSE:  close();  break;
281                 }
282                 debug logger.trace( "process command '%s' pass", cmd );
283             }
284             debug logger.trace( "process pass" );
285         }
286 
287         void transmitEvent( in Event event )
288         {
289             foreach( ep; evprocs )
290                 ep.processEvent( event );
291         }
292 
293         // SignalProcessor
294         void processCtrlSignal( in CtrlSignal sig ) { com.signals.pushBack(sig); }
295 
296         // EventProcessor
297         void processEvent( in Event ev ) { com.listener.pushBack(ev); }
298 
299         void pushEvent( in Event ev ) { com.eventbus.pushBack( ev ); }
300 
301         void pushInfo( FThread.State state, FThread.Error error=FThread.Error.NONE, string msg="" )
302         { com.info.pushBack( FThread.Info( state, error, msg ) ); }
303 
304         void init()
305         {
306             debug logger.Debug( "start" );
307             elem = func(args);
308 
309             if( elem is null )
310                 throw new FThreadException( "creation func return null" );
311 
312             elem.setCtrlSignalProcessor( this );
313             elem.setEventListener( this );
314 
315             evprocs = elem.getEventProcessors();
316 
317             pause();
318             debug logger.Debug( "pass" );
319         }
320 
321         void start()
322         {
323             pushInfo( FThread.State.WORK );
324             transmitEvent( Event.system( SysEvData.work ) );
325             work = true;
326             debug logger.Debug( "pass" );
327         }
328 
329         void pause()
330         {
331             pushInfo( FThread.State.PAUSE );
332             transmitEvent( Event.system( SysEvData.pause ) );
333             work = false;
334             debug logger.Debug( "pass" );
335         }
336 
337         void stop()
338         {
339             pushInfo( FThread.State.NONE );
340             if( elem is null ) return;
341             pause();
342             evprocs.length = 0;
343             elem.destroy();
344             elem = null;
345             debug logger.Debug( "pass" );
346         }
347 
348         void reinit()
349         {
350             stop();
351             init();
352         }
353 
354         void close()
355         {
356             stop();
357             has_work = false;
358             debug logger.Debug( "pass" );
359         }
360 
361     protected:
362 
363         void selfDestroy()
364         {
365             stop();
366             debug logger.Debug( "pass" );
367         }
368     }
369 
370     void fullTerminate(T)( ref T obj )
371     {
372         if( is( T : ExternalMemoryManager ) && obj !is null )
373             obj.destroy();
374         obj = null;
375     }
376 
377     FThread.Info convertToErrorInfo( Logger logger, Throwable e )
378     in{ assert( logger !is null ); } body
379     {
380         if( auto pe = cast(FThreadException)e ) 
381         {
382             logger.error( "EXCEPTION: fthread exc: %s", pe );
383             return errorInfo( FThread.Error.FTHREAD, e.msg );
384         }
385         else if( auto fe = cast(FlowException)e )
386         {
387             logger.error( "EXCEPTION: flow exc: %s", fe );
388             return errorInfo( FThread.Error.FLOW, e.msg );
389         }
390         else if( auto se = cast(Exception)e )
391         {
392             logger.error( "EXCEPTION: %s", se );
393             return errorInfo( FThread.Error.EXCEPT, e.msg );
394         }
395         else if( auto te = cast(Throwable)e )
396         {
397             logger.error( "FATAL: %s", te );
398             return errorInfo( FThread.Error.FATAL, e.msg );
399         }
400         assert(0,"unknown exception");
401     }
402 
403     FThread.Info errorInfo( FThread.Error error, string msg )
404     { return FThread.Info( FThread.State.NONE, error, msg ); }
405 }