Velocity Reviews - Computer Hardware Reviews

Velocity Reviews > Newsgroups > Programming > Python > Processes not exiting

Reply
Thread Tools

Processes not exiting

 
 
ma3mju
Guest
Posts: n/a
 
      07-31-2009
Hi all,

I'm having trouble with multiprocessing I'm using it to speed up some
simulations, I find for large queues when the process reaches the
poison pill it does not exit whereas for smaller queues it works
without any problems. Has anyone else had this trouble? Can anyone
tell me a way around it? The code is in two files below.

Thanks

Matt

parallel.py
================================================== =
import GaussianProcessRegression as GP
import numpy as np
import networkx as nx
import pickle
import multiprocessing
################################################## ##########################################
# Things You Can Change
################################################## ##########################################
#savefiles
savefile = "wattsdata2"
graphfile = "wattsgraphs2"
#sample sizes
num_graphs = 5
num_sets_of_data = 10
#other things...
intervals = np.ceil(np.logspace(-2,1,50)*500)
noise = [np.sqrt(0.1),np.sqrt(0.01),np.sqrt(0.001),np.sqrt( 0.0001)]

################################################## ##########################################
#generate graphs
graphs = []
for i in range(0,num_graphs):
graphs.append(nx.watts_strogatz_graph(500,5,0.01))
#save them for later reference
filehandler = open(graphfile,'w')
pickle.dump(graphs,filehandler,-1)
filehandler.close()

#queues
easy_work_queue = multiprocessing.Queue()
hard_work_queue = multiprocessing.Queue()
result_queue = multiprocessing.Queue()
#construct the items in the hard queue
l=0
for j in range(0,len(intervals)):
for i in range(0,len(noise)):
for k in range(0,num_graphs):
if int(intervals[j]) <=4000:
easy_work_queue.put({'datapt': l,'graph': graphs
[k],'noise': noise[i],'number_of_sets_of_data':
num_sets_of_data,'number_of_data_points':int(inter vals[j])})
else:
hard_work_queue.put({'datapt': l,'graph': graphs
[k],'noise': noise[i],'number_of_sets_of_data':
num_sets_of_data,'number_of_data_points':int(inter vals[j])})
l+=1

#get number of cores and set the number on concurrent processes
num_hard_workers = 2
num_workers = multiprocessing.cpu_count()*1.5
easy_workers = []
hard_workers = []
#add poison pill for each worker and create the worker
for i in range(0,num_workers-num_hard_workers):
easy_work_queue.put(None)
easy_workers.append(multiprocessing.Process
(target=GP.RandomWalkGeneralizationErrorParallel,a rgs=
(easy_work_queue,result_queue,)))
for i in range(0,num_hard_workers):
hard_work_queue.put(None)
hard_workers.append(multiprocessing.Process
(target=GP.RandomWalkGeneralizationErrorParallel,a rgs=
(hard_work_queue,result_queue,)))

#run all workers
for worker in hard_workers:
worker.start()
for worker in easy_workers:
worker.start()
#wait for easy workers to finish
for worker in easy_workers:
worker.join()
print('worker joined')

#set off some of the easy workers on the hard work (maybe double
number of hard)
for i in range(0,num_hard_workers):
hard_work_queue.put(None)
hard_workers.append(multiprocessing.Process
(target=GP.RandomWalkGeneralizationErrorParallel,a rgs=
(hard_work_queue,result_queue,)))
#wait for all hard workers to finish
for worker in hard_workers:
worker.join()

#construct data from the mess in the result queue

tempdata = np.zeros(l)
while not result_queue.empty():
data = result_queue.get()
tempdata[data[0]] = data[1]

finaldata = tempdata.reshape((len(intervals),len(noise),num_gr aphs))

np.save(savefile,finaldata)

================================================== =====
GaussianProcessRegression.py
================================================== =====
import CovarianceFunction as CF
import networkx as nx
import numpy as np
import scipy.linalg as sp
#fortran code from lapack-blas (hopefully when scipy updated this wont
be needed)
import dtrsv
#to use more than one core
import multiprocessing

#Currently we assume Gaussian noise TODO change to general noise
#Assume 0 mean TODO change to general mean Gaussian Process
class GaussianProcessRegression:
def __init__(self,covariance_function,sigma):
#a covariance function object defined in CovarianceFunction
class
#note this uses the parent class but any children can be used
self.C = covariance_function
#a list of pts that are known and their values
self.pts = []
self.vals = []
#the inverse of K as defined in
#@book{coolen05:theoryofneural,
#ISBN = {0-19-853024-2},
#publisher = {Oxford University Press, USA},
#author = {Coolen, A. C. C. and K{\"{u}}hn, R. and Sollich, P.},
#title = {Theory of neural information processing systems},
#year = {2005},
#}
self.K = np.array([])
#gaussian noise variable
self.sigma = float(sigma)
self.cholL = np.array([])


def add_data_points(self,points,vals):
#add all points to list
self.pts += points
self.vals += vals
arraysize = len(self.pts)
#construct K
K = np.zeros((arraysize,arraysize))
#for speed
pts = self.pts
between_points = self.C.between_points
if len(self.K):
K[:-1,:-1] = self.K
for i in xrange(0,arraysize):
for j in xrange(arraysize-len(points),arraysize):
K[i,j] = between_points(pts[i],pts[j])
K[j,i] = K[i,j]
K[arraysize-len(points):arraysize,arraysize-len
(points):arraysize] = K[arraysize-len(points):arraysize,arraysize-len
(points):arraysize] + self.sigma**2 * np.eye(len(points))
self.K = K

#calculate the prediction of a point based on data previously
given
def point_prediction(self,points):
mean = []
variance =[]
arraysize = len(self.pts)
#cholesky
#if self.cholL.shape[0] < arraysize:
L=np.linalg.cholesky(self.K)
# self.cholL = L
#else:
# L = self.cholL

alpha = sp.cho_solve((L,1),self.vals)
#create L in banded form
k=np.zeros((arraysize,len(points)))

################################################## ################
#for speed get ref to functions im going to use and save them
between_points = self.C.between_points
pts = self.pts
dot = np.dot

################################################## ################
for j in xrange(0,len(points)):
#create k
for i in xrange(0,arraysize):
k[i,j] = between_points(pts[i],points[j])

#calculate mean and variance
#call the command for forward substitution
###############fortran
call#######################################
v = dtrsv.dtrsv('L','N',arraysize,L,k)

################################################## ################

#result
mean=dot(alpha,k)
for i in xrange(0,len(points)):
variance.append(between_points(points[i],points[i]) - dot(v
[:,i],v[:,i]))
#return it in dictionary form
return {'mean':mean,'variance':variance}


# calculate the error for data given, where function is a vector
# of the function evaluated at a sufficiently large number of
points
# that the GPregression has been trying to learn
def error(self,function):
total = 0
#sum up variances
result = self.point_prediction(function[::2])
total = np.sum(result['variance'])
total = (1/float(len(function)/2))*total
return total

#clear what has been learnt so far
def clear(self):
self.pts = []
self.vals = []
self.K = np.array([])

#calculate the average error for a function defined in function
when give
#number_of_examples examples
def average_error_over_samples(self,function, sample_size,
number_of_examples):
avg = 0
numberofpoints = len(function)/2
for i in range(0,sample_size):
self.clear()
#generate points of the function
permpts = np.random.randint
(0,numberofpoints,number_of_examples)
#create the vectors
pts = []
vals = []
for j in range(0,number_of_examples):
pts.append(function[permpts[j]*2])
vals.append(function[permpts[j]*2+1])

#learn these points
self.add_data_points(pts,vals)
#print("points added")
avg = avg + self.error(function)
avg = avg/sample_size
return avg

#calculate the average error over functions over data of size
number_of_data_points for MOST cases this is
#also the generalization error a summary of which and
approximations to can be found in:
#@inproceedings{Sollich99learningcurves,
#booktitle = {Neural Computation},
#author = {Sollich, P.},
#title = {Learning curves for Gaussian process regression:
Approximations and bounds},
#pages = {200-2},
#year = {1999},
#}

def emprical_average_error_over_functions
(self,number_of_functions,number_of_sets_of_data,n umber_of_data_points,function_detail
=0,progress=0):
avg = 0
step = float(100)/number_of_functions
for i in range(0,number_of_functions):
if progress:
print step*float(i),"%"
if function_detail:
fx = self.C.generate_function
(self.sigma,function_detail)
else:
fx = self.C.generate_function(self.sigma)
avg = self.average_error_over_samples
(fx,number_of_sets_of_data,number_of_data_points)+ avg
avg = avg / number_of_functions
return avg

def average_error_over_functions
(self,number_of_sets_of_data,number_of_data_points ,function_detail=0):
if function_detail:
fx = self.C.generate_function
(self.sigma,function_detail)
else:
fx = self.C.generate_function(self.sigma)
avg = self.average_error_over_samples
(fx,number_of_sets_of_data,number_of_data_points)
return(avg)



def function_prediction(self,pts):
temp = self.point_prediction(pts)
return {'func':temp['mean'],'varpos':temp
['variance'],'varneg':-temp['variance']}


################################################## ################################################## ################################################## ###
#Functions not contained in a class
################################################## ################################################## ################################################## ###

#function to calculate the generalization error for a RandomWalk
kernel averaging over graphs graphs
def RandomWalkGeneralizationError
(noise,graphs,number_of_sets_of_data,number_of_dat a_points,a=2,p=10):
graph_specific = np.zeros(len(graphs))
avg = 0
for i in range(0,len(graphs)):
rw = CF.RandomWalk(a,p,graphs[i])
GP = GaussianProcessRegression(rw,noise)
graph_specific[i] = GP.average_error_over_functions
(number_of_sets_of_data,number_of_data_points)
avg = np.sum(graph_specific)/len(graphs)
return avg, graph_specific

#as above but using queues to create parallel architechture
def RandomWalkGeneralizationErrorParallel
(work_queue,result_queue,a=2,p=10):
while True:
input = work_queue.get()
if input is None:
print "poison"
break
print 'this should not appear'
print input['datapt'], ' ', input['number_of_data_points']
rw=CF.RandomWalk(a,p,input['graph'])
GP = GaussianProcessRegression(rw,input['noise'])
err = GP.average_error_over_functions(input
['number_of_sets_of_data'],input['number_of_data_points'])
result_queue.put([input['datapt'],err])
print 'here'
return
 
Reply With Quote
 
 
 
 
ma3mju
Guest
Posts: n/a
 
      07-31-2009
Sorry

###############fortran
call#######################################

is meant to be

###############fortran call#######################################

Matt
 
Reply With Quote
 
 
 
 
Piet van Oostrum
Guest
Posts: n/a
 
      07-31-2009
>>>>> ma3mju <(E-Mail Removed)> (m) wrote:

>m> Hi all,
>m> I'm having trouble with multiprocessing I'm using it to speed up some
>m> simulations, I find for large queues when the process reaches the
>m> poison pill it does not exit whereas for smaller queues it works
>m> without any problems. Has anyone else had this trouble? Can anyone
>m> tell me a way around it? The code is in two files below.


How do you know it doesn't exit. You haven't shown any of your output.

I did discover a problem in your code, but it should cause an exception:

>m> #set off some of the easy workers on the hard work (maybe double
>m> number of hard)
>m> for i in range(0,num_hard_workers):
>m> hard_work_queue.put(None)
>m> hard_workers.append(multiprocessing.Process
>m> (target=GP.RandomWalkGeneralizationErrorParallel,a rgs=
>m> (hard_work_queue,result_queue,)))
>m> #wait for all hard workers to finish
>m> for worker in hard_workers:
>m> worker.join()


Here you create new hard workers, but you never start them. The join
should then give an exception when it reaches these.
--
Piet van Oostrum <(E-Mail Removed)>
URL: http://pietvanoostrum.com [PGP 8DAE142BE17999C4]
Private email: http://www.velocityreviews.com/forums/(E-Mail Removed)
 
Reply With Quote
 
MRAB
Guest
Posts: n/a
 
      07-31-2009
ma3mju wrote:
> Hi all,
>
> I'm having trouble with multiprocessing I'm using it to speed up some
> simulations, I find for large queues when the process reaches the
> poison pill it does not exit whereas for smaller queues it works
> without any problems. Has anyone else had this trouble? Can anyone
> tell me a way around it? The code is in two files below.
>

[snip]
>
> #get number of cores and set the number on concurrent processes
> num_hard_workers = 2
> num_workers = multiprocessing.cpu_count()*1.5
> easy_workers = []
> hard_workers = []
> #add poison pill for each worker and create the worker
> for i in range(0,num_workers-num_hard_workers):
> easy_work_queue.put(None)
> easy_workers.append(multiprocessing.Process
> (target=GP.RandomWalkGeneralizationErrorParallel,a rgs=
> (easy_work_queue,result_queue,)))
> for i in range(0,num_hard_workers):
> hard_work_queue.put(None)
> hard_workers.append(multiprocessing.Process
> (target=GP.RandomWalkGeneralizationErrorParallel,a rgs=
> (hard_work_queue,result_queue,)))
>

You have 2 hard workers and ceil(CPU_count * 1.5) - 2 easy workers.
What if the number of CPUs was 1? That would give 2 hard and 0 easy!

Also, I recommend that you put only 1 'poison pill' in each queue and
have the workers put it back when they see it.

 
Reply With Quote
 
ma3mju
Guest
Posts: n/a
 
      08-02-2009
On 31 July, 11:27, Piet van Oostrum <(E-Mail Removed)> wrote:
> >>>>> ma3mju <(E-Mail Removed)> (m) wrote:

> >m> Hi all,
> >m> I'm having trouble with multiprocessing I'm using it to speed up some
> >m> simulations, I find for large queues when the process reaches the
> >m> poison pill it does not exit whereas for smaller queues it works
> >m> without any problems. Has anyone else had this trouble? Can anyone
> >m> tell me a way around it? The code is in two files below.

>
> How do you know it doesn't exit. You haven't shown any of your output.
>
> I did discover a problem in your code, but it should cause an exception:
>
> >m> #set off some of the easy workers on the hard work (maybe double
> >m> number of hard)
> >m> for i in range(0,num_hard_workers):
> >m> * * hard_work_queue.put(None)
> >m> * * hard_workers.append(multiprocessing.Process
> >m> (target=GP.RandomWalkGeneralizationErrorParallel,a rgs=
> >m> (hard_work_queue,result_queue,)))
> >m> #wait for all hard workers to finish
> >m> for worker in hard_workers:
> >m> * * worker.join()

>
> Here you create new hard workers, but you never start them. The join
> should then give an exception when it reaches these.
> --
> Piet van Oostrum <(E-Mail Removed)>
> URL:http://pietvanoostrum.com[PGP 8DAE142BE17999C4]
> Private email: (E-Mail Removed)


Ok thanks I'll change that In a sec, It never reaches that bit of code
because the easy_workers don't exit so it never gets past the join().

As far as running it goes I get the datapt and number of points
printed to the list for everything in both queues. When it reaches the
end of either queue I get "Poison" on the screen then "here" for each
process but I don't get "worker joined" and as expected don't get
"this should not appear". If I have a look at the processes running
after all queues are supposed to have finished I see all of them
running taking little or no resources. This is running on Ubuntu
Jaunty at home and the same happens on the Debian machine at uni.

The weird thing is that if I run them with less points the processes
do manage to exit.

Thanks

Matt
 
Reply With Quote
 
ma3mju
Guest
Posts: n/a
 
      08-02-2009
On 31 July, 11:34, MRAB <(E-Mail Removed)> wrote:
> ma3mju wrote:
> > Hi all,

>
> > I'm having trouble with multiprocessing I'm using it to speed up some
> > simulations, I find for large queues when the process reaches the
> > poison pill it does not exit whereas for smaller queues it works
> > without any problems. Has anyone else had this trouble? Can anyone
> > tell me a way around it? The code is in two files below.

>
> [snip]
>
> > #get number of cores and set the number on concurrent processes
> > num_hard_workers = 2
> > num_workers = multiprocessing.cpu_count()*1.5
> > easy_workers = []
> > hard_workers = []
> > #add poison pill for each worker and create the worker
> > for i in range(0,num_workers-num_hard_workers):
> > * * easy_work_queue.put(None)
> > * * easy_workers.append(multiprocessing.Process
> > (target=GP.RandomWalkGeneralizationErrorParallel,a rgs=
> > (easy_work_queue,result_queue,)))
> > for i in range(0,num_hard_workers):
> > * * hard_work_queue.put(None)
> > * * hard_workers.append(multiprocessing.Process
> > (target=GP.RandomWalkGeneralizationErrorParallel,a rgs=
> > (hard_work_queue,result_queue,)))

>
> You have 2 hard workers and ceil(CPU_count * 1.5) - 2 easy workers.
> What if the number of CPUs was 1? That would give 2 hard and 0 easy!
>
> Also, I recommend that you put only 1 'poison pill' in each queue and
> have the workers put it back when they see it.


I'll give that a go in a sec and see if it helps. The processes quit
out for smaller queues though so it should in theory be alright. I'm
not too fussed about the CPU's it's only there because I change
between a uni PC and home one with a different number of cores in each
but both greater than one.
 
Reply With Quote
 
ma3mju
Guest
Posts: n/a
 
      08-02-2009
On 2 Aug, 15:48, ma3mju <(E-Mail Removed)> wrote:
> On 31 July, 11:34, MRAB <(E-Mail Removed)> wrote:
>
>
>
> > ma3mju wrote:
> > > Hi all,

>
> > > I'm having trouble with multiprocessing I'm using it to speed up some
> > > simulations, I find for large queues when the process reaches the
> > > poison pill it does not exit whereas for smaller queues it works
> > > without any problems. Has anyone else had this trouble? Can anyone
> > > tell me a way around it? The code is in two files below.

>
> > [snip]

>
> > > #get number of cores and set the number on concurrent processes
> > > num_hard_workers = 2
> > > num_workers = multiprocessing.cpu_count()*1.5
> > > easy_workers = []
> > > hard_workers = []
> > > #add poison pill for each worker and create the worker
> > > for i in range(0,num_workers-num_hard_workers):
> > > * * easy_work_queue.put(None)
> > > * * easy_workers.append(multiprocessing.Process
> > > (target=GP.RandomWalkGeneralizationErrorParallel,a rgs=
> > > (easy_work_queue,result_queue,)))
> > > for i in range(0,num_hard_workers):
> > > * * hard_work_queue.put(None)
> > > * * hard_workers.append(multiprocessing.Process
> > > (target=GP.RandomWalkGeneralizationErrorParallel,a rgs=
> > > (hard_work_queue,result_queue,)))

>
> > You have 2 hard workers and ceil(CPU_count * 1.5) - 2 easy workers.
> > What if the number of CPUs was 1? That would give 2 hard and 0 easy!

>
> > Also, I recommend that you put only 1 'poison pill' in each queue and
> > have the workers put it back when they see it.

>
> I'll give that a go in a sec and see if it helps. The processes quit
> out for smaller queues though so it should in theory be alright. I'm
> not too fussed about the CPU's it's only there because I change
> between a uni PC and home one with a different number of cores in each
> but both greater than one.


Just tried changing the poison pill part to no avail sadly
 
Reply With Quote
 
MRAB
Guest
Posts: n/a
 
      08-02-2009
ma3mju wrote:
> On 2 Aug, 15:48, ma3mju <(E-Mail Removed)> wrote:
>> On 31 July, 11:34, MRAB <(E-Mail Removed)> wrote:
>>
>>
>>
>>> ma3mju wrote:
>>>> Hi all,
>>>> I'm having trouble with multiprocessing I'm using it to speed up some
>>>> simulations, I find for large queues when the process reaches the
>>>> poison pill it does not exit whereas for smaller queues it works
>>>> without any problems. Has anyone else had this trouble? Can anyone
>>>> tell me a way around it? The code is in two files below.
>>> [snip]
>>>> #get number of cores and set the number on concurrent processes
>>>> num_hard_workers = 2
>>>> num_workers = multiprocessing.cpu_count()*1.5
>>>> easy_workers = []
>>>> hard_workers = []
>>>> #add poison pill for each worker and create the worker
>>>> for i in range(0,num_workers-num_hard_workers):
>>>> easy_work_queue.put(None)
>>>> easy_workers.append(multiprocessing.Process
>>>> (target=GP.RandomWalkGeneralizationErrorParallel,a rgs=
>>>> (easy_work_queue,result_queue,)))
>>>> for i in range(0,num_hard_workers):
>>>> hard_work_queue.put(None)
>>>> hard_workers.append(multiprocessing.Process
>>>> (target=GP.RandomWalkGeneralizationErrorParallel,a rgs=
>>>> (hard_work_queue,result_queue,)))
>>> You have 2 hard workers and ceil(CPU_count * 1.5) - 2 easy workers.
>>> What if the number of CPUs was 1? That would give 2 hard and 0 easy!
>>> Also, I recommend that you put only 1 'poison pill' in each queue and
>>> have the workers put it back when they see it.

>> I'll give that a go in a sec and see if it helps. The processes quit
>> out for smaller queues though so it should in theory be alright. I'm
>> not too fussed about the CPU's it's only there because I change
>> between a uni PC and home one with a different number of cores in each
>> but both greater than one.

>
> Just tried changing the poison pill part to no avail sadly


I wonder whether one of the workers is raising an exception, perhaps due
to lack of memory, when there are large number of jobs to process.

Another question: why are you distinguishing between easy and hard jobs?
Do you actually get a measurable improvement in performance from doing
it this way instead of having just a single queue of jobs and a single
pool of workers?
 
Reply With Quote
 
Piet van Oostrum
Guest
Posts: n/a
 
      08-02-2009
>>>>> MRAB <(E-Mail Removed)> (M) wrote:

>M> I wonder whether one of the workers is raising an exception, perhaps due
>M> to lack of memory, when there are large number of jobs to process.


But that wouldn't prevent the join. And you would probably get an
exception traceback printed.

I wonder if something fishy is happening in the multiprocessing
infrastructure. Or maybe the Fortran code goes wrong because it has no
protection against buffer overruns and similar problems, I think.
--
Piet van Oostrum <(E-Mail Removed)>
URL: http://pietvanoostrum.com [PGP 8DAE142BE17999C4]
Private email: (E-Mail Removed)
 
Reply With Quote
 
ma3mju
Guest
Posts: n/a
 
      08-03-2009
On 2 Aug, 21:49, Piet van Oostrum <(E-Mail Removed)> wrote:
> >>>>> MRAB <(E-Mail Removed)> (M) wrote:

> >M> I wonder whether one of the workers is raising an exception, perhaps due
> >M> to lack of memory, when there are large number of jobs to process.

>
> But that wouldn't prevent the join. And you would probably get an
> exception traceback printed.
>
> I wonder if something fishy is happening in the multiprocessing
> infrastructure. Or maybe the Fortran code goes wrong because it has no
> protection against buffer overruns and similar problems, I think.
> --
> Piet van Oostrum <(E-Mail Removed)>
> URL:http://pietvanoostrum.com[PGP 8DAE142BE17999C4]
> Private email: (E-Mail Removed)


I don't think it's a memory problem, the reason for the hard and easy
queue is because for larger examples it uses far more RAM. If I run
all of workers with harder problems I do begin to run out of RAM and
end up spending all my time switching in and out of swap so I limit
the number of harder problems I run at the same time. I've watched it
run to the end (a very boring couple of hours) and it stays out of my
swap space and everything appears to be staying in RAM. Just hangs
after all "poison" has been printed for each process.

The other thing is that I get the message "here" telling me I broke
out of the loop after seeing the poison pill in the process and I get
all the things queued listed as output surely if I were to run out of
memory I wouldn't expect all of the jobs to be listed as output.

I have a serial script that works fine so I know individually for each
example the fortran code works.

Thanks

Matt
 
Reply With Quote
 
 
 
Reply

Thread Tools

Posting Rules
You may not post new threads
You may not post replies
You may not post attachments
You may not edit your posts

BB code is On
Smilies are On
[IMG] code is On
HTML code is Off
Trackbacks are On
Pingbacks are On
Refbacks are Off


Similar Threads
Thread Thread Starter Forum Replies Last Post
loop not exiting sLim C++ 1 12-02-2009 06:50 PM
Controlling processes and what to "feed" other processes Marc Heiler Ruby 1 05-24-2009 05:37 PM
Firefox not exiting (JApplet) Steven J. Sobol Java 4 09-06-2006 06:49 PM
How do I: Main thread spawn child threads, which child processes...control those child processes? Jeff Rodriguez C Programming 23 12-09-2003 11:06 PM
Ruby not exiting when developing extension Derek Lewis Ruby 2 12-04-2003 05:46 PM



Advertisments