3. Memory-Optimized Mutiprocess Example#

This notebook demonstrates how to effectively run a heavy model with a large data set. The demonstration utilizes techniques such as:

Multiprocessing by ipyparallel

Multiprocessing is a technique to make use of multiple CPU cores on a machine or on multiple machines by splitting a task into multiple processes. The multiple processes run in parallel on the multiple CPU cores, reducing the total run time required to finish the task. The ipyparallel package makes it easy and simple to perform multiprocessing. ipyparallel provides the following functions that are required for multiprocessing:

  • Handling both synchronous and asynchronous communications.

  • Sending and receiving numpy and pandas objects fast to and from engines.

  • Communicating with remote machines in the same way as with the localhost.

Memory-optimized run

Memory-optimized run is a feature that is introduced in modelx v0.19. Running a heaviy model with a large data set consumes a lot of memory. This feature makes it possible to run the model using less memory. For this feature, you need 2 runs. The first run is to get a list of actions and the model should be run with a small set of data. The second run is performed with the entire data set for getting the desired output. For more about memory-optimized run, see modelx documentation.

Steps in this notebook

The demonstration in this notebook involves the following steps:

  • A table of 100,000 model points is loaded.

  • A sample model is loaded in this Python process.

  • The model is run to generate actions. The actions are saved within the model.

  • 10 IPython engines are invoked, and a block of 10,000 model points is sent to each engine.

  • Each engine loads and runs the saved model with the 10,000 model by executing the actions saved in the model.

  • This process receives the results from all the engines and concatinate them.

Note:

The techniques above were discussed before in the following blog posts on https://modelx.io.

Prerequisites#

To run this notebook, the following packages are required.

  • modelx 0.19.1 or newer is required to run this notebook.

  • ipyparallel 8.2.0 or newer is required to run this notebook.

  • numpy, pandas, openpyxl

If either of the packages above is missing or outdated, install them using either pip or conda command depending on your Python environment.

As of March 12, 2022, ipyparallel 8.2.0 is available on conda-forge, but not in the anaconda package. So if you’re using anaconda, update ipyparallel by conda install -c conda-forge ipyparallel=8.2.0.

Running this notebook consumes 8 to 9 GB memory, so make sure your machine has enough memory space available for the run.

Sample Model and Model Points#

This notebook uses the CashValue_ME model in the savings library with 100 model points generated by a notebook generate_100K_model_points.ipynb included in the library. Run th notebook and a model point file named model_point_table_100K.xlsx is created.

This notebook demonstrates how to run modelx models in parallel using ipyparallel.

This example:

  • launches 10 IPython engines,

  • loads 100,000 model points,

  • Send a block of 10,000 model points to each engine,

  • runs modelx with the 10,000 model points on each engine and

  • get results from all the engines and concatinate them.

Click the badge below to run this notebook online on Google Colab. You need a Google account and need to be logged in to it to run this notebook on Google Colab. Run on Google Colab

The next code cell below is relevant only when you run this notebook on Google Colab. It installs lifelib and creates a copy of the library for this notebook.

[1]:
import sys, os

if 'google.colab' in sys.modules:
    lib = 'savings'; lib_dir = '/content/'+ lib
    if not os.path.exists(lib_dir):
        !pip install lifelib
        !pip install ipyparallel
        import lifelib; lifelib.create(lib, lib_dir)

    %cd $lib_dir

Read the entire model point table into a DataFrame in this process. The table has 100,000 model points. Give index_col=0 to pd.read_excel to use the first column as the DataFrame index.

[2]:
import pandas as pd
model_point_all =  pd.read_excel('model_point_table_100K.xlsx', index_col=0)
model_point_all
[2]:
spec_id age_at_entry sex policy_term policy_count sum_assured duration_mth premium_pp av_pp_init
policy_id
1 A 47 M 20 22 804000 0 804000 0
2 C 29 F 9999 75 519000 0 900 0
3 A 51 F 10 5 409000 0 409000 0
4 B 32 M 15 60 128000 0 128000 0
5 D 28 M 9999 45 698000 0 1200 0
... ... ... ... ... ... ... ... ... ...
99996 A 21 M 10 34 152000 0 152000 0
99997 D 24 F 9999 53 928000 0 1400 0
99998 B 46 F 15 72 662000 0 662000 0
99999 A 46 M 15 36 583000 0 583000 0
100000 B 35 M 15 3 638000 0 638000 0

100000 rows × 9 columns

Next, read the sample model to generate actions.

[3]:
import modelx as mx
model = mx.read_model('CashValue_ME')

model_point_table in the Projection space holds the model point table. By default, CashValue_ME has 4 model points, so we use this table for generate actions.

[4]:
model.Projection.model_point_table
[4]:
spec_id age_at_entry sex policy_term policy_count sum_assured duration_mth premium_pp av_pp_init accum_prem_init_pp
poind_id
1 A 20 M 10 100 500000 0 500000 0 0
2 B 50 M 20 100 500000 0 500000 0 0
3 C 20 M 9999 100 500000 0 1000 0 0
4 D 50 M 9999 100 500000 0 1000 0 0

Generate Actions#

In this example, we want to get the value of result_pv in the end, so we give the node of result_pv as a target parameter to generate_actions. Then generate_actions method of the model returns a list of actions to instruct a memory-optimized run that calculates and preserves the value of result_pv.

node method on the result_pv Cells creates an ItemNode object representing result_pv with no arguments. Since result_pv does not have any parameter, this is the only node associated with result_pv. Note that generate_actions takes a list of target nodes, so a list is created to enclose the result_pv node and passed to generate_actions below.

The returned list of actions are assigned to a Reference named actions in the Projection space, and the model is saved as ‘CashValue_ME_with_actions’ to be later loaded by IPython engines.

The model used for generating the actions are not needed anymore, so it is closed here.

[5]:
model.Projection.actions = model.generate_actions([model.Projection.result_pv.node()])
model.write('CashValue_ME_with_actions')
model.close()
UserWarning: call stack trace activated
UserWarning: call stack trace deactivated

Start IPython engines#

Start a ipyparallel cluster with 10 engines.rc is a Client object. To know more about what is done in the cell below, consult with the ipyparallel documentation.

[6]:
import ipyparallel
ipp = ipyparallel
no_engines = 10
cluster = ipp.Cluster(n=no_engines)
rc = cluster.start_and_connect_sync()
Starting 10 engines with <class 'ipyparallel.cluster.launcher.LocalEngineSetLauncher'>
100%|███████████████████████████████████████████████████████████████████████████████████| 10/10 [00:05<00:00,  1.88engine/s]

Now you have 10 IPython engines wating for code to execute.

Run the Model#

Evenly split the entire model point table into 10 blocks and send each block to each engine. The block of model points on each engine is assigned to a global variable named model_point.

[7]:
block_size = 10000
for i in range(no_engines):
    rc[i]['model_point'] = model_point_all.iloc[i * block_size: (i+1) * block_size]

Send the code to run the model. The code: * imports modelx, * loads the sample model created above and assigns it to m, * replace the original model point table with the new one sent above, * run the model by calling the execute_acionts, passing to the model the actions created above and saved in the model, * assigns the value of result_pv to a global variable named result.

The block=True argument is to tell the execiton to wait until all the engines finishe processing the code.

[8]:
code = """
import modelx as mx
m = mx.read_model('CashValue_ME_with_actions')
m.Projection.model_point_table = model_point
m.execute_actions(m.Projection.actions)
result = m.Projection.result_pv()
"""
rc[:].execute(code, block=True)
[8]:
<AsyncResult(execute): finished>

Get the Results#

Get the results back from all the engines and concatinate into one DataFrame.

[9]:
result = pd.concat(rc[:]['result'])
result
[9]:
Premiums Death Surrender Maturity Expenses Commissions Investment Income Change in AV Net Cashflow
policy_id
1 1.768800e+07 4.207421e+05 7.167661e+06 8.447648e+06 2.549999e+05 8.844000e+05 5.002695e+06 2.777545e+06 2.737698e+06
2 1.728737e+07 5.399209e+06 8.199806e+06 0.000000e+00 1.372393e+06 8.643683e+05 8.088528e+06 6.038786e+06 3.501331e+06
3 2.045000e+06 2.352243e+04 6.103083e+05 1.164191e+06 4.394721e+04 1.022500e+05 2.460592e+05 1.528495e+05 1.939911e+05
4 7.680000e+06 5.362278e+04 2.844819e+06 4.554170e+06 6.191257e+05 3.840000e+05 1.803562e+06 1.027055e+06 7.688294e+02
5 1.388815e+07 4.410248e+06 7.023923e+06 0.000000e+00 8.270160e+05 6.944077e+05 6.925307e+06 5.242075e+06 2.615791e+06
... ... ... ... ... ... ... ... ... ...
99996 5.168000e+06 1.433198e+04 1.546823e+06 2.983362e+06 2.994351e+05 2.584000e+05 6.263479e+05 3.887630e+05 3.032331e+05
99997 1.936913e+07 5.442479e+06 1.005014e+07 0.000000e+00 9.893691e+05 9.684563e+05 9.870065e+06 7.846641e+06 3.942107e+06
99998 4.766400e+07 7.247140e+05 1.760871e+07 2.791406e+07 7.413578e+05 2.383200e+06 1.113241e+07 6.340720e+06 3.083640e+06
99999 2.098800e+07 3.009029e+05 7.454509e+06 1.104981e+07 3.706789e+05 1.049400e+06 4.407965e+06 2.510836e+06 2.659825e+06
100000 1.914000e+06 1.542257e+04 7.087322e+05 1.133148e+06 3.094749e+04 9.570000e+04 4.491574e+05 2.557838e+05 1.234236e+05

100000 rows × 9 columns