Parameter Estimation for Differential Equations using Scipy Least Square

home > Research

The post is about parameter estimation based on gradient descent specifically for the differential equation \frac{dn}{dt}=G-k_1 n-k_2 n^2-k_3 n^3, i.e. we are looking for a good (G,k_1,k_2,k_3) that best fits the experimental data. We will go through the methods of choosing initial parameter guesses and running the optimization. This post is more fully explained in this note: Differential Equation Parameter Estimations. The data used here is a fraction of the sample data I was given. The codes and sample data can also be found here.

As usual, do use virtual environment for cleaner package management (you can see here). The packages needed can be installed via pip.

pip install numpy
pip install scipy
pip install matplotlib

Put the scripts scipyode_header.pycubicODE.py and cubicODE_post_processing.py into the project folder – in the virtual environment if you are using one. The code can be found at the end of the post.

  1. PART 1. Loading data. In scipyode_header.py, customize load_data() as you see fit. As it is, it will load the data starting from a maximum point, and assume that the csv file contains 2 columns: column 1 is for variable t, column 2 is n. The file does not have any header.
    Go into cubicODE.py, set the following toggles.

    plot_data True
    initial_guess_mode False
    optimize_mode False

    Run cubicODE.py. You will see figure like figure 1. In this example, the relevant data points (blue) are the points that overlap with the orange plot.

  2. PART 1. Define the ODE. Write the ODE in def f(t,n,f_args).
  3. PART 2. Choosing initial guesses. Set any number of initial guesses you want to try out with by setting collection_of_initial_guesses.  Since we optimize [G,k1,k2,k3], an example guess is [0,1e-3,0.5e-3,1.2e-7]. Set the following toggles and then run cubicODE.py. Plot like figure 1 will be generated.
    plot_data False
    initial_guess_mode True
    optimize_mode False
  4. PART 2. Optimization. Once the choices of initial guesses are decided upon, set the following toggles and then run cubicODE.py. The optimized parameters from each initial guess will be saved as a .par file, and the save name is set by the variable save_name. The code in our example used 3 initial guesses and save_name = “20190110_cubic”. Thus, 20190110_cubic_1.par, 20190110_cubic_2.par and 20190110_cubic_3.par will be created.
    plot_data False
    initial_guess_mode False
    optimize_mode True
  5. Loading optimized parameters. Go to cubicODE_post_processing.py. If we want to load all the 3 saved .par files, then set load_name = “20190110_cubic” and serial_number = [1,2,3]. Then run cubicODE_post_processing.py. The plot results are shown in figure 2(A, B, C), showing good fit. The optimized parameters for each initial guess can be read off from the console, as in figure 2(D). The smaller cost value is, the better the fit is generally.

The optimized parameters seem to produce good fitting. We are done!

guess1

Figure 1.Green plot shows the experimental data points while the rest are generated using RK45 using 3 different initial guesses, given by collection_of_initial_guesses = [  [0,1e-3,0.5e-3,1.2e-7], [0,1e-2,0,1e-7], [0,0,1.5e-3,1e-7] ].

opres.JPG

Figure 2. (A, B, C) shows the results of parameter estimation from 3 different initial guesses. (D) shows the results in the console. For each serial number, the optimized parameters are printed as [G, k1, k2, k3].

scipyode_header.py

import numpy as np
from scipy.integrate import RK45
import matplotlib.pyplot as plt
import csv

def RK45_wrapper(x0, time_checkpoints, f, f_args, stepsize=None):
	if stepsize is None:
		stepsize = 0.01*(time_checkpoints[-1]-time_checkpoints[0])
	t_set = [time_checkpoints[0]]
	x_set = [x0]
	t_checkpoint_new = [time_checkpoints[0]]
	x_checkpoint = [x0]

	f_temp = lambda t,y : f(t,y, f_args)

	for i in range(len(time_checkpoints)-1):	
		t1 = time_checkpoints[i+1]
		if i>0:
			x0 = ox.y
			t0 = ox.t
			
		else:
			t0 = time_checkpoints[i]
		ox = RK45(f_temp,t0,x0,t1,max_step=stepsize)
		
		while ox.t < t1:
			msg = ox.step()
			t_set.append(ox.t)
			x_set.append(ox.y)
		x_checkpoint.append(ox.y)
		t_checkpoint_new.append(ox.t)
	return x_set, t_set, x_checkpoint, t_checkpoint_new

def read_csv(filename, header_skip=1,get_the_first_N_rows = 0):
    # filename : string, name of csv file without extension
    # headerskip : nonnegative integer, skip the first few rows
    # get_the_first_N_rows : integer, the first few rows after header_skip to be read
    #   if all rows are to be read, set to zero

    # example
    # xx = read_csv('tryy', 3)
    # xx = [[np.float(x) for x in y] for y in xx] # nice here
    # print(xx)
    # print(np.sum(xx[0]))
    
    out = []
    with open(str(filename)+'.csv') as csv_file:
        data_reader = csv.reader(csv_file)
        count = 0
        i = 0
        for row in data_reader:
            if count < header_skip:
                count = count + 1
            else:
                out.append(row)
                if get_the_first_N_rows>0:
                	i = i + 1
                	if i == get_the_first_N_rows:
                		break
    return out

def load_data(plot_data=False):
	print(" --+ load_data().")

	# load data
	filename = 'sample_data'
	all_data = read_csv(filename, header_skip=0)
	all_data = [[np.float(x) for x in y] for y in all_data]

	X=[x[0] for x in all_data]
	Y=[x[1] for x in all_data]

	# customize your data loading here
	max_index = np.argmax(Y)
	relevant_data= all_data[max_index:]
	Xr = [x[0] for x in relevant_data]
	Yr = [x[1] for x in relevant_data]

	print("     data size = ", len(Xr))
	print("     peak = ", X[max_index],",", Y[max_index])
	if plot_data:
		fig = plt.figure()
		ax=fig.add_subplot(111)
		ax.scatter(X,Y,3)
		ax.scatter(X[max_index],Y[max_index])
		ax.plot(Xr,Yr,'r')
		ax.set_xlabel("t")
		ax.set_ylabel("n")
		plt.show()

	return Xr, Yr

cubicODE.py

import numpy as np
import matplotlib.pyplot as plt
import scipyode_header as sh
import pickle
from scipy.optimize import least_squares

# TOGGLES:
initial_guess_mode = 0
optimize_mode = 1
# SETTINGS:
save_name = "20190110_cubic"

# --------------- PART 1 ----------------
#
# Load data, prepare the differential equation

def f(t,n, f_args): # function for pendulum motion with friction 
	dndt = [f_args[0] - f_args[1] * n - f_args[2] * n**2 - f_args[3] * n**3]
	return dndt
print("\n------------- LOADING DATA ----------------\n")
Xr, Yr = sh.load_data(plot_data=0)

x0 = np.array([Yr[0]])
t_set_expt = [np.array([x]) for x in Xr]
n_set_expt = [np.array([x]) for x in Yr]
print("x0 = ",x0)
print("Xr[:5] = ",Xr[:5])
print("Yr[:5] = ",Yr[:5])
print("t_set_expt[:5] = ",t_set_expt[:5])
print("n_set_expt[:5] = ",n_set_expt[:5])

# --------------- PART 2 ----------------
#
# try out initial guesses

# initial guess [G, k1, k2, k3]
collection_of_initial_guesses = [ 
 [0,1e-3,0.5e-3,1.2e-7],
 [0,1e-2,0,1e-7],
 [0,0,1.5e-3,1e-7]
]

if initial_guess_mode:
	print("\n------------- INITIAL TESTING ----------------\n")
	color_scheme = np.linspace(0,1,len(collection_of_initial_guesses))
	fig = plt.figure()
	ax = fig.add_subplot(111)
	ax.scatter(Xr,Yr,3,color='g',label="expt data")
	for i in range(len(collection_of_initial_guesses)):
		f_args0 = collection_of_initial_guesses[i]
		# print(f_args0)
		this_label = "initial guess" + str(i+1)
		_, _, x_test, t_test = sh.RK45_wrapper(x0, t_set_expt, f, f_args0, stepsize=None)		
		ax.scatter(t_test,x_test,3,color=(1-color_scheme[i],0,color_scheme[i]),label=this_label)
	ax.legend()
	ax.set_xlabel("t")
	ax.set_ylabel("n")	

if optimize_mode:
	def target_function(f_args):
		# "checkpoint" here refers to the intended data points
		#   since RK45 will generate more points in between the checkpoints.
		#   What happens is just that with smaller time steps in between the checkpoints,
		#   we can compute the checkpoints more precisely.
		_, _, x_checkpoint, t_checkpoint_new = sh.RK45_wrapper(x0, t_set_expt, f, f_args, stepsize=None)
		
		collection_of_f_i = []
		# each i is a data point in this particular example
		for x1, x2 in zip(x_checkpoint,n_set_expt):
			this_norm = np.linalg.norm(np.array(x1)-np.array(x2))
			collection_of_f_i.append(this_norm)
		return np.array(collection_of_f_i)
	print("\n------------- OPTIMIZATION STAGE ----------------\n")

	for i in range(len(collection_of_initial_guesses)):
		f_args0 = collection_of_initial_guesses[i]
		print(" initial guess = ", f_args0)
		optimized_res = least_squares(target_function,f_args0)
		f_args_optimized = optimized_res.x

		SAVE_DATA = {
			"initial_guess": f_args0,
			"optimized": f_args_optimized,
			"cost":optimized_res.cost
		}

		print(" + optimized result = ",f_args_optimized)
		print(" + cost = ",optimized_res.cost)
		output = open(save_name+"_"+ str(i+1) +".par", 'wb')
		pickle.dump(SAVE_DATA, output)
		output.close()


plt.show()

cubicODE_post_processing.py

import numpy as np
import matplotlib.pyplot as plt
import pickle
import scipyode_header as sh

# SETTINGS
load_name = "20190110_cubic"
serial_number = [1,2,3] 

print("\n------------- LOADING DATA ----------------\n")
def f(t,n, f_args): # function for pendulum motion with friction 
	dndt = [f_args[0] - f_args[1] * n - f_args[2] * n**2 - f_args[3] * n**3]
	return dndt
Xr, Yr = sh.load_data(plot_data=0)
x0 = np.array([Yr[0]])
t_set_expt = [np.array([x]) for x in Xr]
n_set_expt = [np.array([x]) for x in Yr]
print("x0 = ",x0)
print("Xr[:5] = ",Xr[:5])
print("Yr[:5] = ",Yr[:5])
print("t_set_expt[:5] = ",t_set_expt[:5])
print("n_set_expt[:5] = ",n_set_expt[:5])

print("\n------------- PLOTTING ----------------\n")
color_scheme = np.linspace(0,1,len(serial_number))

for j in range(len(serial_number)):
	fig = plt.figure()
	ax = fig.add_subplot(111)
	ax.scatter(Xr,Yr,3,color='g',label="expt data")
	i = serial_number[j]
	this_label = "initial guess" + str(i+1)
	pkl_file = open(load_name+'_'+str(i)+'.par', 'rb')
	SAVE_DATA = None
	SAVE_DATA = pickle.load(pkl_file)
	pkl_file.close()
	# print(SAVE_DATA)
	f_args_initial = SAVE_DATA["initial_guess"]
	f_args_optimized = SAVE_DATA["optimized"]
	cost  = SAVE_DATA["cost"]
	_, _, x_op, t_op = sh.RK45_wrapper(x0, t_set_expt, f, f_args_optimized, stepsize=None)	
	print("Serial number:",j+1, " - Optimized parameters = " )
	print(" ",f_args_optimized)
	print(" cost = ", cost)
	title_msg = load_name +":"+ str(j+1)
	ax.plot(t_op,x_op,color=(1-color_scheme[j],0,color_scheme[j]),label=this_label)
	ax.legend()
	ax.set_title(title_msg)
	ax.set_xlabel("t")
	ax.set_ylabel("n")

plt.show()