Discussion:
[theano-users] Querying a single model on the GPU from multiple threads (GPU context initialization problem)
Balázs Hidasi
2018-11-06 18:20:51 UTC
Permalink
I'm trying to query a single model from multiple threads. The use case is
that a single query on the model doesn't utilize the full power of the GPU,
so the throughput could be improved by starting multiple queries in
parallel instead of serializing the requests. I don't want to load multiple
instances of the model onto the GPU due to memory constraints. Since I
think that theano is not fully thread-safe, the idea is to compile multiple
instances of the computational graph (theano.function) and have the workers
of the threadpool work on a single graph at a time. The shared variables of
the model are only loaded once. See the example code below (the model there
is just a dummy model for demonstrating the problem).

This works if the model is on CPU, but fails if it is on the GPU. I get the
following error: GpuArrayException: b'cuEventCreate:
CUDA_ERROR_INVALID_CONTEXT: invalid device context' Is there a way to
initialize the context for each individual thread or clone the context of
the parent thread? Thanks in advance!

Example code:
------------------------------------------------------------------------------------------
import theano
from theano import tensor as T
import numpy as np
from collections import OrderedDict, namedtuple
from concurrent.futures import ThreadPoolExecutor
from queue import Queue

#this huge matrix will be shared between computational graphs
W1 = theano.shared(np.random.rand(100,1000000).astype('float32'))

#function for defining the computational graphs
def create_graph():
H = theano.shared(np.zeros(100, dtype='float32'))
updatesH = OrderedDict()
updatesH[H] = T.zeros_like(H)
reset = theano.function([], updates=updatesH)
X = T.fvector()
updatesH[H] = H + X
upd = theano.function([X], updates=updatesH)
Y = T.dot(H, W1).mean()
calc = theano.function([], Y)
return reset, upd, calc

#queue of computational graphs; the workers will remove one computational
graph at a time, use it for computations and then put it back
class GraphPoolHandler:
def __init__(self, n_threads):
self.graph_queue = Queue()
for i in range(n_threads):
self.graph_queue.put(create_graph()) #fill the queue with
computational graphs
def run(self, n, x):
reset, upd, calc = self.graph_queue.get()
reset()
for i in range(n):
upd(x)
res = calc()
self.graph_queue.put((reset, upd, calc))
return res

graph_pool = GraphPoolHandler(4)
thread_pool = ThreadPoolExecutor(max_workers=4)

def print_callback(v):
print(v.result(), end='\n')

f1 = thread_pool.submit(graph_pool.run, 2, np.arange(100).astype('float32'))
f2 = thread_pool.submit(graph_pool.run, 1, np.arange(100).astype('float32'))
f3 = thread_pool.submit(graph_pool.run, 4, np.arange(100).astype('float32'))
f4 = thread_pool.submit(graph_pool.run, 3, np.arange(100).astype('float32'))
f5 = thread_pool.submit(graph_pool.run, 5, np.arange(100).astype('float32'))
f1.add_done_callback(print_callback)
f2.add_done_callback(print_callback)
f3.add_done_callback(print_callback)
f4.add_done_callback(print_callback)
f5.add_done_callback(print_callback)
------------------------------------------------------------------------------------------
--
---
You received this message because you are subscribed to the Google Groups "theano-users" group.
To unsubscribe from this group and stop receiving emails from it, send an email to theano-users+***@googlegroups.com.
For more options, visit https://groups.google.com/d/optout.
Arnaud Bergeron
2018-11-06 19:01:55 UTC
Permalink
What you want to do is certainly not supported. You might be able to force a different context for each thread by using a different device name and manually registering contexts on the same GPU for those names.

Then you would have to share the shared variables over IPC or some other sort of CUDA memory sharing because different contexts require different variables.

And even if you manage to do all of that, there is no guarantee that nothing else will break. None of this will ever be officially supported and any and all problems that you encounter with such a setup is your sole responsibility.

Arnaud
I'm trying to query a single model from multiple threads. The use case is that a single query on the model doesn't utilize the full power of the GPU, so the throughput could be improved by starting multiple queries in parallel instead of serializing the requests. I don't want to load multiple instances of the model onto the GPU due to memory constraints. Since I think that theano is not fully thread-safe, the idea is to compile multiple instances of the computational graph (theano.function) and have the workers of the threadpool work on a single graph at a time. The shared variables of the model are only loaded once. See the example code below (the model there is just a dummy model for demonstrating the problem).
This works if the model is on CPU, but fails if it is on the GPU. I get the following error: GpuArrayException: b'cuEventCreate: CUDA_ERROR_INVALID_CONTEXT: invalid device context' Is there a way to initialize the context for each individual thread or clone the context of the parent thread? Thanks in advance!
------------------------------------------------------------------------------------------
import theano
from theano import tensor as T
import numpy as np
from collections import OrderedDict, namedtuple
from concurrent.futures import ThreadPoolExecutor
from queue import Queue
#this huge matrix will be shared between computational graphs
W1 = theano.shared(np.random.rand(100,1000000).astype('float32'))
#function for defining the computational graphs
H = theano.shared(np.zeros(100, dtype='float32'))
updatesH = OrderedDict()
updatesH[H] = T.zeros_like(H)
reset = theano.function([], updates=updatesH)
X = T.fvector()
updatesH[H] = H + X
upd = theano.function([X], updates=updatesH)
Y = T.dot(H, W1).mean()
calc = theano.function([], Y)
return reset, upd, calc
#queue of computational graphs; the workers will remove one computational graph at a time, use it for computations and then put it back
self.graph_queue = Queue()
self.graph_queue.put(create_graph()) #fill the queue with computational graphs
reset, upd, calc = self.graph_queue.get()
reset()
upd(x)
res = calc()
self.graph_queue.put((reset, upd, calc))
return res
graph_pool = GraphPoolHandler(4)
thread_pool = ThreadPoolExecutor(max_workers=4)
print(v.result(), end='\n')
f1 = thread_pool.submit(graph_pool.run, 2, np.arange(100).astype('float32'))
f2 = thread_pool.submit(graph_pool.run, 1, np.arange(100).astype('float32'))
f3 = thread_pool.submit(graph_pool.run, 4, np.arange(100).astype('float32'))
f4 = thread_pool.submit(graph_pool.run, 3, np.arange(100).astype('float32'))
f5 = thread_pool.submit(graph_pool.run, 5, np.arange(100).astype('float32'))
f1.add_done_callback(print_callback)
f2.add_done_callback(print_callback)
f3.add_done_callback(print_callback)
f4.add_done_callback(print_callback)
f5.add_done_callback(print_callback)
------------------------------------------------------------------------------------------
--
---
You received this message because you are subscribed to the Google Groups "theano-users" group.
For more options, visit https://groups.google.com/d/optout <https://groups.google.com/d/optout>.
--
---
You received this message because you are subscribed to the Google Groups "theano-users" group.
To unsubscribe from this group and stop receiving emails from it, send an email to theano-users+***@googlegroups.com.
For more options, visit https://groups.google.com/d/optout.
Loading...