Balázs Hidasi
2018-11-06 18:20:51 UTC
I'm trying to query a single model from multiple threads. The use case is
that a single query on the model doesn't utilize the full power of the GPU,
so the throughput could be improved by starting multiple queries in
parallel instead of serializing the requests. I don't want to load multiple
instances of the model onto the GPU due to memory constraints. Since I
think that theano is not fully thread-safe, the idea is to compile multiple
instances of the computational graph (theano.function) and have the workers
of the threadpool work on a single graph at a time. The shared variables of
the model are only loaded once. See the example code below (the model there
is just a dummy model for demonstrating the problem).
This works if the model is on CPU, but fails if it is on the GPU. I get the
following error: GpuArrayException: b'cuEventCreate:
CUDA_ERROR_INVALID_CONTEXT: invalid device context' Is there a way to
initialize the context for each individual thread or clone the context of
the parent thread? Thanks in advance!
Example code:
------------------------------------------------------------------------------------------
import theano
from theano import tensor as T
import numpy as np
from collections import OrderedDict, namedtuple
from concurrent.futures import ThreadPoolExecutor
from queue import Queue
#this huge matrix will be shared between computational graphs
W1 = theano.shared(np.random.rand(100,1000000).astype('float32'))
#function for defining the computational graphs
def create_graph():
H = theano.shared(np.zeros(100, dtype='float32'))
updatesH = OrderedDict()
updatesH[H] = T.zeros_like(H)
reset = theano.function([], updates=updatesH)
X = T.fvector()
updatesH[H] = H + X
upd = theano.function([X], updates=updatesH)
Y = T.dot(H, W1).mean()
calc = theano.function([], Y)
return reset, upd, calc
#queue of computational graphs; the workers will remove one computational
graph at a time, use it for computations and then put it back
class GraphPoolHandler:
def __init__(self, n_threads):
self.graph_queue = Queue()
for i in range(n_threads):
self.graph_queue.put(create_graph()) #fill the queue with
computational graphs
def run(self, n, x):
reset, upd, calc = self.graph_queue.get()
reset()
for i in range(n):
upd(x)
res = calc()
self.graph_queue.put((reset, upd, calc))
return res
graph_pool = GraphPoolHandler(4)
thread_pool = ThreadPoolExecutor(max_workers=4)
def print_callback(v):
print(v.result(), end='\n')
f1 = thread_pool.submit(graph_pool.run, 2, np.arange(100).astype('float32'))
f2 = thread_pool.submit(graph_pool.run, 1, np.arange(100).astype('float32'))
f3 = thread_pool.submit(graph_pool.run, 4, np.arange(100).astype('float32'))
f4 = thread_pool.submit(graph_pool.run, 3, np.arange(100).astype('float32'))
f5 = thread_pool.submit(graph_pool.run, 5, np.arange(100).astype('float32'))
f1.add_done_callback(print_callback)
f2.add_done_callback(print_callback)
f3.add_done_callback(print_callback)
f4.add_done_callback(print_callback)
f5.add_done_callback(print_callback)
------------------------------------------------------------------------------------------
that a single query on the model doesn't utilize the full power of the GPU,
so the throughput could be improved by starting multiple queries in
parallel instead of serializing the requests. I don't want to load multiple
instances of the model onto the GPU due to memory constraints. Since I
think that theano is not fully thread-safe, the idea is to compile multiple
instances of the computational graph (theano.function) and have the workers
of the threadpool work on a single graph at a time. The shared variables of
the model are only loaded once. See the example code below (the model there
is just a dummy model for demonstrating the problem).
This works if the model is on CPU, but fails if it is on the GPU. I get the
following error: GpuArrayException: b'cuEventCreate:
CUDA_ERROR_INVALID_CONTEXT: invalid device context' Is there a way to
initialize the context for each individual thread or clone the context of
the parent thread? Thanks in advance!
Example code:
------------------------------------------------------------------------------------------
import theano
from theano import tensor as T
import numpy as np
from collections import OrderedDict, namedtuple
from concurrent.futures import ThreadPoolExecutor
from queue import Queue
#this huge matrix will be shared between computational graphs
W1 = theano.shared(np.random.rand(100,1000000).astype('float32'))
#function for defining the computational graphs
def create_graph():
H = theano.shared(np.zeros(100, dtype='float32'))
updatesH = OrderedDict()
updatesH[H] = T.zeros_like(H)
reset = theano.function([], updates=updatesH)
X = T.fvector()
updatesH[H] = H + X
upd = theano.function([X], updates=updatesH)
Y = T.dot(H, W1).mean()
calc = theano.function([], Y)
return reset, upd, calc
#queue of computational graphs; the workers will remove one computational
graph at a time, use it for computations and then put it back
class GraphPoolHandler:
def __init__(self, n_threads):
self.graph_queue = Queue()
for i in range(n_threads):
self.graph_queue.put(create_graph()) #fill the queue with
computational graphs
def run(self, n, x):
reset, upd, calc = self.graph_queue.get()
reset()
for i in range(n):
upd(x)
res = calc()
self.graph_queue.put((reset, upd, calc))
return res
graph_pool = GraphPoolHandler(4)
thread_pool = ThreadPoolExecutor(max_workers=4)
def print_callback(v):
print(v.result(), end='\n')
f1 = thread_pool.submit(graph_pool.run, 2, np.arange(100).astype('float32'))
f2 = thread_pool.submit(graph_pool.run, 1, np.arange(100).astype('float32'))
f3 = thread_pool.submit(graph_pool.run, 4, np.arange(100).astype('float32'))
f4 = thread_pool.submit(graph_pool.run, 3, np.arange(100).astype('float32'))
f5 = thread_pool.submit(graph_pool.run, 5, np.arange(100).astype('float32'))
f1.add_done_callback(print_callback)
f2.add_done_callback(print_callback)
f3.add_done_callback(print_callback)
f4.add_done_callback(print_callback)
f5.add_done_callback(print_callback)
------------------------------------------------------------------------------------------
--
---
You received this message because you are subscribed to the Google Groups "theano-users" group.
To unsubscribe from this group and stop receiving emails from it, send an email to theano-users+***@googlegroups.com.
For more options, visit https://groups.google.com/d/optout.
---
You received this message because you are subscribed to the Google Groups "theano-users" group.
To unsubscribe from this group and stop receiving emails from it, send an email to theano-users+***@googlegroups.com.
For more options, visit https://groups.google.com/d/optout.