3. Memory-Optimized Mutiprocess Example#
This notebook demonstrates how to effectively run a heavy model with a large data set. The demonstration utilizes techniques such as:
Multiprocessing by ipyparallel
Multiprocessing is a technique to make use of multiple CPU cores on a machine or on multiple machines by splitting a task into multiple processes. The multiple processes run in parallel on the multiple CPU cores, reducing the total run time required to finish the task. The ipyparallel package makes it easy and simple to perform multiprocessing. ipyparallel provides the following functions that are required for multiprocessing:
Handling both synchronous and asynchronous communications.
Sending and receiving numpy and pandas objects fast to and from engines.
Communicating with remote machines in the same way as with the localhost.
Memory-optimized run is a feature that is introduced in modelx v0.19. Running a heaviy model with a large data set consumes a lot of memory. This feature makes it possible to run the model using less memory. For this feature, you need 2 runs. The first run is to get a list of actions and the model should be run with a small set of data. The second run is performed with the entire data set for getting the desired output. For more about memory-optimized run, see modelx documentation.
Steps in this notebook
The demonstration in this notebook involves the following steps:
A table of 100,000 model points is loaded.
A sample model is loaded in this Python process.
The model is run to generate actions. The actions are saved within the model.
10 IPython engines are invoked, and a block of 10,000 model points is sent to each engine.
Each engine loads and runs the saved model with the 10,000 model by executing the actions saved in the model.
This process receives the results from all the engines and concatinate them.
The techniques above were discussed before in the following blog posts on https://modelx.io.
To run this notebook, the following packages are required.
modelx 0.19.1 or newer is required to run this notebook.
ipyparallel 8.2.0 or newer is required to run this notebook.
If either of the packages above is missing or outdated, install them using either
conda command depending on your Python environment.
As of March 12, 2022, ipyparallel 8.2.0 is available on conda-forge, but not in the anaconda package. So if you’re using anaconda, update ipyparallel by
conda install -c conda-forge ipyparallel=8.2.0.
Running this notebook consumes 8 to 9 GB memory, so make sure your machine has enough memory space available for the run.
Sample Model and Model Points#
This notebook uses the
CashValue_ME model in the
savings library with 100 model points generated by a notebook
generate_100K_model_points.ipynb included in the library. Run th notebook and a model point file named
model_point_table_100K.xlsx is created.
This notebook demonstrates how to run modelx models in parallel using ipyparallel.
launches 10 IPython engines,
loads 100,000 model points,
Send a block of 10,000 model points to each engine,
runs modelx with the 10,000 model points on each engine and
get results from all the engines and concatinate them.
Read the entire model point table into a DataFrame in this process. The table has 100,000 model points. Give
pd.read_excel to use the first column as the DataFrame index.
import pandas as pd model_point_all = pd.read_excel('model_point_table_100K.xlsx', index_col=0) model_point_all
100000 rows × 9 columns
Next, read the sample model to generate actions.
import modelx as mx model = mx.read_model('CashValue_ME')
model_point_table in the
Projection space holds the model point table. By default,
CashValue_ME has 4 model points, so we use this table for generate actions.
In this example, we want to get the value of
result_pv in the end, so we give the node of
result_pv as a target parameter to
generate_actions method of the model returns a list of actions to instruct a memory-optimized run that calculates and preserves the value of
node method on the
result_pv Cells creates an ItemNode object representing
result_pv with no arguments. Since
result_pv does not have any parameter, this is the only node associated with
result_pv. Note that
generate_actions takes a list of target nodes, so a list is created to enclose the
result_pv node and passed to
The returned list of actions are assigned to a Reference named
actions in the
Projection space, and the model is saved as ‘CashValue_ME_with_actions’ to be later loaded by IPython engines.
The model used for generating the actions are not needed anymore, so it is closed here.
model.Projection.actions = model.generate_actions([model.Projection.result_pv.node()]) model.write('CashValue_ME_with_actions') model.close()
UserWarning: call stack trace activated UserWarning: call stack trace deactivated
Start IPython engines#
ipyparallel cluster with 10 engines.
rc is a
Client object. To know more about what is done in the cell below, consult with the ipyparallel documentation.
import ipyparallel ipp = ipyparallel no_engines = 10 cluster = ipp.Cluster(n=no_engines) rc = cluster.start_and_connect_sync()
Starting 10 engines with <class 'ipyparallel.cluster.launcher.LocalEngineSetLauncher'>
Now you have 10 IPython engines wating for code to execute.
Run the Model#
Evenly split the entire model point table into 10 blocks and send each block to each engine. The block of model points on each engine is assigned to a global variable named
block_size = 10000 for i in range(no_engines): rc[i]['model_point'] = model_point_all.iloc[i * block_size: (i+1) * block_size]
Send the code to run the model. The code: * imports
modelx, * loads the sample model created above and assigns it to
m, * replace the original model point table with the new one sent above, * run the model by calling the
execute_acionts, passing to the model the actions created above and saved in the model, * assigns the value of
result_pv to a global variable named
block=True argument is to tell the execiton to wait until all the engines finishe processing the code.
code = """ import modelx as mx m = mx.read_model('CashValue_ME_with_actions') m.Projection.model_point_table = model_point m.execute_actions(m.Projection.actions) result = m.Projection.result_pv() """ rc[:].execute(code, block=True)
Get the Results#
Get the results back from all the engines and concatinate into one DataFrame.
result = pd.concat(rc[:]['result']) result
|Premiums||Death||Surrender||Maturity||Expenses||Commissions||Investment Income||Change in AV||Net Cashflow|
100000 rows × 9 columns