Skip to content Skip to sidebar Skip to footer

Can I Call A Thread Recurrently From Within A Thread?

I'm trying to transfer samples from thread A ('Acquisition') to thread B ('P300') using queue but I can't read any data in thread B, although samples are being allocated in thread

Solution 1:

Being a noob can be tricky... So I'll answer my own question to help other beginners that may come across this issue too.

Well, first things first: no, it's not possible to call a thread from within a thread recurrently, because each thread can only be called once.

But there is a way to prevent the thread from ending, making them wait for triggers that will allow them to continue. After some more research, I came across this question that showed me there is a way to create events for threads. The documentation can be found here. And it's quite straight forward: the event objects behave like flags and can be set() (indicating True) or clear() (indicating False, which is the original value). To test an event, one can use the is_set() method for boolean problems or use the wait() method instead of a timer. In my case, it saved me some queues I was going to use:

import threading
import queue
from queue import Empty
import numpy as np


classAcqThread(threading.Thread):
    def__init__(self, dataOutQ1, dataOutQ2, saveQ):
        threading.Thread.__init__(self)
        self.dataOutQ2 = dataOutQ2
        self.dataOutQ1 = dataOutQ1
        self.saveQ = saveQ

    defrun(self):
        Acquisition(inlet, self.dataOutQ1, self.dataOutQ2, self.saveQ)

classP300Thread(threading.Thread):
    def__init__(self, dataInQ, featureQ):
        threading.Thread.__init__(self)
        self.dataInQ = dataInQ
        self.featureQ = featureQ

    defrun(self):
        P300fun(self.dataInQ, self.featureQ)

threadLock = threading.Lock()
SaveQ = queue.Queue()
DataOutQ1 = queue.Queue()
DataOutQ2 = queue.Queue()
FeatQ1 = queue.Queue()
FeatQ2 = queue.Queue()

#NEW:: initializes Events
E = threading.Event()
EP300 = threading.Event()
#
AcqTh = AcqThread(DataOutQ1, DataOutQ2, SaveQ)
P300Th = P300Thread(DataOutQ1, FeatQ1)

And it allows me to "call" thread B "recurrently", as it keeps my first while active (because of event E) and enters the processing part only when the event EP300 is set. Then, EP300 is cleared after the process is done:

defAcquisition(inlet, dataOutQ1, dataOutQ2 saveQ):
    i = 0print('Starting...')
    while i<1250:
        sample, timestamp = inlet.pull_sample()
        ##Normalization, filtering##if _condition_:
            threadLock.acquire()
            dataOutQ1.put([filtsamples[:,-250:], rawtimestamps[-250:]])
            threadLock.release()
            EP300.set() #NEW:: allows the P300 function to collect data from queue
        i += 1
    E.set() #NEW:: flaggs end data collectiondefP300fun(dataInQ, featureQ):
    p300sample = []
    p300timestamp = []
    whilenot E.is_set(): #NEW:: loop until collection is endedif EP300.is_set(): #NEW:: activated when Event is triggeredwhile dataInQ.qsize():
                try:
                    print("DataInQ has data")
                    ss, ts = dataInQ.get(0) 
                    print('<>P300\n>>Samples [', ss, ']\nTimestamp [', ts, ']')
                except Empty:
                    returnifnot E.is_set(): #NEW:: Event is cleared in case data collection is not over, waiting for a new set()
            EP300.clear()
    print('Thread Finished')

if __name__ == '__main__':
    print('Looking for an EEG stream...')
    streams = resolve_stream('type', 'EEG')
    inlet = StreamInlet(streams[0])
    print('Connected!\n')

    AcqTh.start()
    P300Th.start()

    AcqTh.join()
    P300Th.join()

    print("\n\n>>>DONE<<<\n\n")

Post a Comment for "Can I Call A Thread Recurrently From Within A Thread?"