import dask # parllel library
import tqdm # creating progess bars
import time # atirficially slow down the code
from dask.diagnostics import ProgressBar
import numpy as np
Serial slow code
beta = np.random.normal(size=5)
def simulate_beta_hat(n,beta):
""" simulate OLS and estimate"""
nk = len(beta)
X = np.random.normal(size=(n,nk))
E = np.random.normal(size=(n))
Y = np.matmul(X,beta) + E
beta_hat = np.linalg.solve( np.matmul(X.T,X) , np.matmul(X.T,Y) )
return(beta_hat)
#create a simple for loop
beta_hat_list = []
for r in tqdm.tqdm(range(10)):
beta_hat_list.append( simulate_beta_hat(int(1e6),beta) )
np.array(beta_hat_list).std(1)
100%|██████████| 10/10 [00:03<00:00, 2.81it/s]
array([0.8837639 , 0.88393607, 0.88213816, 0.88362958, 0.88311311,
0.88262567, 0.88220251, 0.88293129, 0.88234124, 0.88307744])
Using dask delayed
beta_hat_serial = simulate_beta_hat(int(5e6),beta)
beta_hat_dask = dask.delayed(simulate_beta_hat)(int(5e6),beta)
beta_hat_dask.compute()
array([ 0.08197091, -0.62044626, 0.79480141, 0.52191349, 2.05038949])
create a client
from dask.distributed import Client
client = Client()
client
Client
|
Cluster
- Workers: 4
- Cores: 4
- Memory: 17.18 GB
|
#create a simple for loop
V = []
for r in tqdm.tqdm(range(20)):
V.append( dask.delayed(simulate_beta_hat)(int(1e6),beta) )
100%|██████████| 20/20 [00:00<00:00, 1371.85it/s]
dask.compute(*V)
(array([ 0.08161511, -0.61953092, 0.79522272, 0.52044203, 2.05054851]),
array([ 0.08285794, -0.62167223, 0.79310007, 0.52217079, 2.05098477]),
array([ 0.07986196, -0.62199832, 0.79457482, 0.52237575, 2.05020908]),
array([ 0.08100286, -0.61960148, 0.79531005, 0.52029908, 2.05085504]),
array([ 0.08178035, -0.61845347, 0.79573612, 0.52183041, 2.05165835]),
array([ 0.08270565, -0.62064098, 0.79495527, 0.52167249, 2.04932456]),
array([ 0.0826866 , -0.61942732, 0.7962837 , 0.52037415, 2.04987182]),
array([ 0.08140251, -0.62063702, 0.79273842, 0.52031335, 2.04983376]),
array([ 0.08282938, -0.6208797 , 0.79485908, 0.5210823 , 2.04963631]),
array([ 0.08210893, -0.62119996, 0.79470062, 0.52027452, 2.05000452]),
array([ 0.08140285, -0.62193338, 0.7931142 , 0.51959118, 2.04951389]),
array([ 0.08266208, -0.62223969, 0.79423316, 0.52094465, 2.0507154 ]),
array([ 0.08106881, -0.61904197, 0.79503763, 0.52083106, 2.05040489]),
array([ 0.08186745, -0.62058955, 0.79500309, 0.52093739, 2.04960608]),
array([ 0.08116398, -0.62078771, 0.79489224, 0.52070781, 2.05127837]),
array([ 0.08135074, -0.61943381, 0.79535478, 0.52226915, 2.05107564]),
array([ 0.08243417, -0.61824012, 0.79552023, 0.52061954, 2.04887393]),
array([ 0.08248117, -0.62044762, 0.79482442, 0.52089336, 2.0499758 ]),
array([ 0.08342013, -0.62032896, 0.79246806, 0.52153688, 2.05029783]),
array([ 0.08369605, -0.61907743, 0.79485533, 0.52189748, 2.05049225]))
np.array(V).std()
client.close()
Fictious Monte-Carlo workflow
@dask.delayed
def simulate_beta_hat_delayed(n,beta,seed=0):
nk = len(beta)
np.random.seed(r)
X = np.random.normal(size=(n,nk))
E = np.random.normal(size=(n))
Y = np.matmul(X,beta) + E
beta_hat = np.linalg.solve( np.matmul(X.T,X) , np.matmul(X.T,Y) )
return(beta_hat)
@dask.delayed
def simulate_extract(V):
time.sleep(10) # wait for 5 seconds
return( np.array(V).mean(0), np.array(V).std(0))
@dask.delayed
def simulate_combine(R):
time.sleep(5)
return( np.array(R) )
V = []
for r in range(5):
V.append( simulate_beta_hat_delayed(int(1e6),beta) )
R = simulate_extract(V)
R.visualize()
R.compute()
(array([ 0.08207483, -0.61952246, 0.79486013, 0.52201741, 2.0493561 ]),
array([0., 0., 0., 0., 0.]))
client = Client()
client
Client
|
Cluster
- Workers: 4
- Cores: 4
- Memory: 17.18 GB
|
S = []
for s in [int(1e5),int(1e6)]:
V = []
for r in range(5):
V.append( simulate_beta_hat_delayed(s,beta) )
S.append(simulate_extract(V))
R = simulate_combine(S)
R.visualize()
with ProgressBar():
out = dask.compute(R)
client.close()
out
(array([[[ 0.08322991, -0.61971241, 0.79743907, 0.51906852,
2.05001336],
[ 0. , 0. , 0. , 0. ,
0. ]],
[[ 0.08207483, -0.61952246, 0.79486013, 0.52201741,
2.0493561 ],
[ 0. , 0. , 0. , 0. ,
0. ]]]),)