Skip to content

Python

Expectation Maximization for Poisson process

The problem

What if you want to run a regression to estimate the coefficients and retrieve the data generating process of some data BUT you are in a scenario with censored data? How does that affect my estimation? Can I do better?

Censored data

In short, you have censored data when some observation is unobserved or constrained because of some specific reason which is natural and unavoidable.

Unobserved

For instance, in survival analysis, if the subject is not yet dead (luckily) you can not observe the time of death yet and hence you don't know if he will die tomorrow or in two years. You can't observe yet the death date.

Constrained

For goods with limited demand, once you deplete your stock, by definition you can't sell more and you can't observe all the demand that you would have if more items were able to be sold. Or an Airline , when selling seats, can't observe the full demand after all the seats have been sold, maybe more people were willing to flight.

We will simulate an example of the latest case, for an airline with simple toy data.

import numpy as np
import statsmodels.api as sm
from scipy.stats import poisson
import matplotlib.pyplot as plt

We create some fake data, where seats sold are generated from a poisson distribution, with an intercept (2) and a coefficient related to how many days are left to the departure (-0.2), the further from the departure, the less seats are sold.

What we also do, is that on the last date, we constrain the demand. We don't get the full seats that would have been sold under a poisson distribution but the amount minus 3. Only ourselves know that because we want to tweak the data. In real life we would see data similar to the one generated and we couldn't see the actual sales that would have been present in a complete poisson generating process (that actually is how the tickets are sold in our example)

# Step 1: Generate data
np.random.seed(100)
n_samples = 20
n_features = 1
substract_sales = 3

# Generate features
X = np.random.normal(size=(n_samples, n_features))
X = np.array(range(n_samples+1, 1, -1))
X = sm.add_constant(X)  # Add intercept term

# True coefficients
beta_true = np.array([2, -0.2])

# Generate Poisson-distributed target values
linear_pred = np.dot(X, beta_true)
lambda_true = np.exp(linear_pred)
y = np.random.poisson(lambda_true)
# y

# Right-censoring threshold in the last date (when the fare is closed)
censor_threshold = np.concatenate([(y+1)[:-1],np.array([y[-1]-substract_sales])])
y_censored = np.where(y > censor_threshold, censor_threshold, y)
is_censored = (y > censor_threshold)
Maybe it's easier to visualize.
The green curve is the actual lambda parameter for each data, based on the intercept and the real coefficient multiplied by days to departure.
The blue dots are the actual data points we see for all dates except from the last one.
The grey dot is the data we should see if there was no limit of how many seats to sell, following the poisson distribution.
The red dot is the actual data point we have, the last day the airline sold all the remaining seats and couldn't fulfill the true demand and hence we see lower sales than the ones generated by the poisson distribution.

# Plot the data
plt.xlabel('Days to departure')
plt.ylabel('Bookings')
plt.title('Data Plot')
plt.plot(X[:-1, 1], y[:-1], 'o')
plt.plot(X[:, 1], lambda_true, label='Lambda True', color="green")
# plt.plot(X[:, 1], lambda_est, label='Lambda Estimated')
plt.scatter(X[-1, 1], y[-1:],  color="grey", label="Real unseen demand" )
plt.scatter(X[-1, 1], y[-1:]-substract_sales, color='red', label='Constrained')
plt.legend()
plt.show()

Image

Esimation
Poisson Regression

First we estimate the parameters using a poisson regression. We assume the data follows a poisson process and we don't do anything to manage the constrained data.
The results are kind of ok, we get some closeness to the true parameters but it is clearly biased.

model_censored = sm.GLM(y_censored, X, family=sm.families.Poisson())
results_censored = model_censored.fit()
print(f'Estimated Censored coefficients: {results_censored.params}')
## Estimated Censored coefficients: [ 1.86976922 -0.21124541]
Expectation Maximization

OK, here is our alternative. What we can do, a bit more sophisticated, is to apply EM algorithm, which is a iterative process, to estimate the coefficients of the poisson regression, including the knowledge we have, that there might be constrained data.

# Step 2: Initialize parameters
beta_est = np.zeros(n_features + 1)
tol = 1e-4
max_iter = 1000

for iteration in range(max_iter):
    # Step 3: E-step
    # Estimate the expected values of censored data
    lambda_est = np.exp(np.dot(X, beta_est))
    expected_y = np.where(
        is_censored,
        (censor_threshold + 1) / (1 - poisson.cdf(censor_threshold, lambda_est)),
        y_censored
    )
    # Ensure expected_y values are valid
    expected_y = np.nan_to_num(expected_y, nan=np.mean(y_censored), posinf=np.max(y_censored), neginf=0)
    # Step 4: M-step
    # Update parameter estimates using Poisson regression
    model = sm.GLM(expected_y, X, family=sm.families.Poisson())
    results = model.fit(method="lbfgs")
    # results = model.fit_regularized(L1_wt=0, alpha=0.1) 
    new_beta_est = results.params

    # Check convergence
    if np.linalg.norm(new_beta_est - beta_est) < tol:
        break

    beta_est = new_beta_est
## <string>:7: RuntimeWarning: divide by zero encountered in divide
print(f'Estimated coefficients: {beta_est}')

## Estimated coefficients: [ 2.11015996 -0.23573144]
We can see our estimates are of course not perfect but the intercept is closer to the true parameter.
Let's visualize the results to have a clear intuition.

uncensored_predictions = results.predict(X)
censored_predictions = results_censored.predict(X)
Promising!
For the furthest dates we see a bias in both approaches, we are not doing better than the Poisson regression, but neither worse.
As we move closer the the departure, the EM algorithm predictions get closer and closer to the true lambdas, while the regular poisson regression continues it's biased trajectory.
The EM procedure adjusted the coefficients to better match the constrained data.

plt.plot(X[:, 1], lambda_true, label='Lambda True', color="green")
plt.plot(X[:, 1], uncensored_predictions, label='Uncensored Predictions (EM)', color="blue")
plt.plot(X[:, 1], censored_predictions, label='Censored Predictions', color="red")
plt.xlabel('Days to departure')
plt.ylabel('Lambda - Poisson regression')
plt.title('Lambda Predictions')
plt.legend()
plt.show()

Image

Weighted regression

Weighted regression consists on assigning different weights to each observation and hence more or less importance at the time of fitting the regression.

On way to look at it is to think as solving the regression problem minimizing Weighted Mean Squared Error(WSME) instead of Mean Squared Error(MSE)

\(\(WMSE(\beta, w) = \frac{1}{N} \sum_{i=1}^n w_i(y_i - \overrightarrow {x_i} \beta)^2\)\) Intuitively, we are looking fot the coefficients that minimize MSE but putting different weights to each observation. OLS is a particular case where all the \(w_i = 1\)

Why doing this? A few reasons (Shalizi 2015. Chapter 7.1)

  • Focusing Accuracy: We want to predict specially well some particular points or region of points, maybe because that's the focus for production or maybe because being wrong at those observations has a huge cost, etc. Using Weighted regression will do an extra effort to match that data.

  • Discount imprecision: OLS returns the maximum likelihood estimates when the residuals are independent, normal with mean 0 and with constant variance. When we face non constant variance OLS no longer returns the MLE. The logic behind using weighted regression is that makes no sense to pay equal attention to all the observations since some of them have higher variance and are less indicative of the conditional mean. We should put more emphasis on the regions of lower variance, predict it well and "expect to fit poorly where the noise is big".
    The weights that will return MLE are \(\frac{1}{\sigma_i^2}\)

  • Sampling bias: If we think or know that the observations in our data are not completely random and some subset of the population might be under-represented (in a survey for example or because of data availability) it might make sense to weight observation inversely to the probability of being included in the sample. Under-represented observations will get more weights and over-represented less weight.
    Another similar situation is related to covariate shift. If the distribution of variable x changes over time we can use a weight designed as the ratio of the probability density functions.

    "If the old pdf was p(x) and the new one is q(x), the weight we'd want to is \(w_i=q(x_i)/p(x_i)\)

  • Other: Related to GLM, when the conditional mean is a non linear function of a linear predictor. (Not further explained in the book at this point)

Is there a scenario where OLS is better than Weighted regression? Assuming we can compute the weights.

Example.

First we will see the impact of using weighted regression, using a simulated scenario where we actually know the variance of the error of each observation. This is not realistic but useful to see it in action.

library(tidyverse)

We generate 1000 datapoints with a linear relation between y and x. Intercept = 0, slope = 5. We let the variance of the error depend on the value of x. Higher values of x are associated with higher values of the variance of the error.

set.seed(23)
n=1000
x = runif(n,0,10)
error = rnorm(n,0, x/1.5)
df = data.frame(x)
df = df %>% mutate(y = 5*x + error)
Visually..

ggplot(data=df, aes(x=x, y=y)) +
  geom_point(alpha=0.3) + 
  # geom_smooth(color="blue") +
  # geom_smooth(method = "lm", mapping = aes(weight = (1/sqrt(x)^2)),
  #             color = "red", show.legend = FALSE) +
  NULL

Image

Linear regression
ols = lm(formula = "y~x", data=df)
summary(ols)

## 
## Call:
## lm(formula = "y~x", data = df)
## 
## Residuals:
##     Min      1Q  Median      3Q     Max 
## -14.868  -1.720  -0.137   1.918  14.722 
## 
## Coefficients:
##             Estimate Std. Error t value Pr(>|t|)    
## (Intercept)  0.19192    0.24278   0.791    0.429    
## x            4.95585    0.04148 119.489   <2e-16 ***
## ---
## Signif. codes:  0 '***' 0.001 '**' 0.01 '*' 0.05 '.' 0.1 ' ' 1
## 
## Residual standard error: 3.855 on 998 degrees of freedom
## Multiple R-squared:  0.9347, Adjusted R-squared:  0.9346 
## F-statistic: 1.428e+04 on 1 and 998 DF,  p-value: < 2.2e-16
We get an intercept of 0.19, non-significant when the actual value is 0 and a slope of 4.96 when the actual value is 5.

Weighted linear regression
wols = lm(formula = "y~x", data=df, weights = (1/sqrt(x)^2) )
summary(wols)
## 
## Call:
## lm(formula = "y~x", data = df, weights = (1/sqrt(x)^2))
## 
## Weighted Residuals:
##     Min      1Q  Median      3Q     Max 
## -4.8880 -0.8601 -0.0016  0.8936  4.6535 
## 
## Coefficients:
##             Estimate Std. Error t value Pr(>|t|)    
## (Intercept) 0.001483   0.030072   0.049    0.961    
## x           4.993473   0.021874 228.286   <2e-16 ***
## ---
## Signif. codes:  0 '***' 0.001 '**' 0.01 '*' 0.05 '.' 0.1 ' ' 1
## 
## Residual standard error: 1.498 on 998 degrees of freedom
## Multiple R-squared:  0.9812, Adjusted R-squared:  0.9812 
## F-statistic: 5.211e+04 on 1 and 998 DF,  p-value: < 2.2e-16

We get an intercept of 0, non-significant too but much closer to 0 and with lower standard error and a slope of 4.99 also much closer to the actual value of 5 and with lower standard error.

Conclusion: if we know the right weights we can get better estimates from a linear regression in case of heteroscedasticity.

Spark and Pyspark

What's Spark?

prueba The definition says:

Spark is a fast and general processing engine compatible with Hadoop data. It can run in Hadoop clusters >through YARN or Spark's standalone mode, and it can process data in HDFS, HBase, Cassandra, Hive, and any >Hadoop InputFormat. It is designed to perform both batch processing (similar to MapReduce) and new >workloads like streaming, interactive queries, and machine learning.

Basically is a framework to work with big amounts of data stored in distributed systems instead of just one machine. This allows parallelization and hence much faster calculations.
It's biggest difference with plain Hadoop is that Spark uses RAM to process data while Hadoop doesn't.

Not being a data engineer myself I can tell you that you can use Spark to work with data stored in HDFS, S3 buckets or a data lake for example. All distributed systems.

Since those usually store huge big amount of data you can see how all this relate. The use case I have been exposed to, as a data scientist, is to query this distributed data and process it before using it for some purpose (modeling, reporting, etc).

How to use it?

I haven't deployed a distributed storage system myself but I think it's safe to assume that amount of data is gathered in big organizations and probably some data engineer has already done all the setup. You just want to access the data from an environment connected to the spark cluster.

There are several languages that can interact with Spark. Scala is the original one but you could use Java or Python. As data scientist we are probably more familiar with Python so I will show you Pyspark

Pyspark

Pyspark is an API to work with Spark using Python. In order to run you need also Java installed and Apache Spark. In our fictional organization a data engineer might have set up a server with Jupyter notebooks linked to the data lake and with all the dependencies.

There are probably ways to connect to the remote spark server from your local machine but I haven't done that.

So, Pyspark allows you to query the datalake/bigdata storage from a jupyter notebook and then convert that to a Pandas Dataframe and work as you are used to.

Spark/Pyspark has a particular syntax that is quite clear but has some particularities based on the parallelization notion. For example, many functions don't actually retrieve all the data, that only happens when you decide to. For example show() or collect() do retrieve the data (and can take a while if you are working with a lot of data) while filter() or withColumn() don't.

Another thing to notice is that you will need to create/initiate a sparkContext before actually being able to query data.

To understand this and have a good amount of examples regarding the functions and syntax I highly recommend THIS SITE.

How to practice?

You can practice Pyspark queries and scripts by installing Pyspark in your local machine despite not having a cluster running distributed data. With Pyspark installed you can create some data and use it as it was real.
You will be able to use all the functions and check them by yourself.

How to install it? You can check THIS GUIDE FOR WINDOWS.

I have struggled a bit to make it work so these are some things I learned during the way.

  • I have downloaded Java 8 since that's what the guide says and use that at my current organization.
  • To avoid creating an account in Oracle to download Java you can check THIS SOLUTION.
  • When creating Environment variables avoid blank spaces
  • If Pyspark doesn't run because can't find Java. Check the %JAVA_HOME% path.
  • If the error is related to missing Python3 , check the %PYTHONPATH% and create in the anaconda path a copy of python.exe but rename it python3.exe

Softmax vs sigmoid

When using Neural Nets for a multiclass classification problem it's standard to have a softmax layer at the end to normalize the probabilities for each class. This means that the output of our net is a vector of probabilities (one for each class) that sums to 1. If there isn't a softmax layer at the end, then the net will output a value in each of the last cells (one for each class) but without a delimited range.
Just a set of numbers where usually the highest is the one with the most probable class but it's not obvious how to value the differences between them.

So, you have a ordered set of numbers, you know which one is the most probable but you want to transform that into clear probabilities. You use the softmax layer.

You could use a sigmoid activation function in the last cell to have individual probabilities. For each class, it transforms the output of the net into a probability. However the sum of those probabilities is not guaranteed to sum 1, actually it won't in practice. It's a simple proxy but you can get better intuitions with softmax.

We will compare how these two approaches affect the last group of weights by inspecting the gradient after calculating the loss for an observation.

I'm using the reticulate package in R to include Python code in Rmarkdown. Pretty nice.

library(reticulate)

We import pytorch to handle tensors and neural net functions.

import numpy as np
import torch
import torch.nn.functional as F
torch.manual_seed(99)
## <torch._C.Generator object at 0x00000262714CF730>
  • 1 obs
  • 5 features (X)
  • 3 possible classes (index 1 = class 2)
  • W. 3 output cells, each one with 5 weights (one per feature)
  • W1 = W2 because we run it twice (two scenarios) and we can't re use the same weights because of the gradient calculated
X = torch.randn(5)
W1 = torch.randn(3,5)
W2 = W1.detach().clone() 
y = torch.tensor([1]) 

We transform everything to positives to make it cleaner and we add the requires_grad_() characteristic that tells pytorch that those tensors need the gradient backpropagated during training

X = X.abs()
W1 = W1.abs().requires_grad_()
W2 = W2.abs().requires_grad_()

We define both losses (softmax and sigmoid).

Softmax

  • Weights * input: cell value
  • we change dimension of output to use it as input of softmax
  • We calculate the softmax (probabilities of each class that sum 1)
  • Apply log because we will use the negative log likelihood
  • We calculate the loss (log of softmax probabilities vs actual class)

Sigmoid

  • Weights * input: cell value
  • we change dimension of output to use it as input of sigmoid
  • We calculate the sigmoid (probabilities of each class individually)
  • Apply log because we will use the negative log likelihood
  • We calculate the loss (log of sigmoid probabilities vs actual class)
# funcion con softmax al final
def softmax_loss(W):
    z = W @ X
    z = z.unsqueeze(0)
    z = torch.softmax(z, dim=1)
    z = torch.log(z)
    return F.nll_loss(z, y)

# funcion con una sigmoidea por activacion
def sigmoid_loss(W):
    z = W @ X
    z = z.unsqueeze(0)
    z = torch.sigmoid(z)
    z = torch.log(z)
    return F.nll_loss(z, y)

We run the forward pass and calculate the loss for the sigmoid first. Then we look for the gradient.
As we can see in the results, only the weights that go to the correct class' output cell are modified. Classes one and three rest untouched. This is because the sigmoid activation just has the individual weights (and cross entropy only look to the correct class)

out_sigmoid = sigmoid_loss(W1)
out_sigmoid.backward()
W1.grad

## tensor([[ 0.0000,  0.0000,  0.0000,  0.0000,  0.0000],
##         [-0.0452, -0.0867, -0.0564, -0.0492, -0.0549],
##         [ 0.0000,  0.0000,  0.0000,  0.0000,  0.0000]])
On the contrary, when running the same net but with softmax layer we see that all the weights are updated. The correct class has gradient with the same sign that for the sigmoid example but the other two classes have in this case opposite sign gradients (which makes sense since you want them to go in the other direction).
This happens because the softmax includes the other classes in each cell since they are needed to normalize and return probabilities that sum up to 1.

out_softmax = softmax_loss(W2)
out_softmax.backward()
W2.grad

## tensor([[ 0.5393,  1.0346,  0.6731,  0.5868,  0.6552],
##         [-0.5576, -1.0697, -0.6959, -0.6066, -0.6775],
##         [ 0.0183,  0.0351,  0.0228,  0.0199,  0.0222]])
This is a simple case with just one layer of weights so we can clearly see this. If you had a fully connected net with more layers, this is valid just for the last one because the gradient is backpropagated and the weights from "other paths" still affect the cell that corresponds to the second class.

Conclusion

The net should evolve during training in a similar way with both last layer activations but the way they do it is different and we try to show in here why. In the end, the sigmoid still reflects the preference for one of the classes and during each epoch it will go through the desired path but just updating some of the weights and not all at the same time.

De R a Python 1

Serie que documenta cuestiones prácticas que voy descubriendo a medida que empiezo a incursionar en Python un poco más enserio. Son más que nada recordatorios para el futuro de mecánicas que hoy entiendo, pero me voy a olvidar.

Muchos de los objetos no van a tener relación entre sí o no se puede correr el código directo ya que son copy/paste random de scripts.

Usar objetos de otros scripts

Si uno genera modelos, dataframes, etc en otro script por prolijidad y quiere utilizarlos en el principal (o cualquiera en realidad) lo aconsejable es exportarlo como objeto pickle (algo asi como los RDS en R.)

library(reticulate)
import pickle
import pandas as pd

# exportar. Objeto, archivo, permisos
pickle.dump(OBJETO, open("working/qualifying.p", "wb"))

# importar
leer = pd.read_pickle('archivo.p')

Seleccionar columas de dataframe por patrón en el nombre

Para seleccionar columnas basados en si contiene determinado string en su nombre y no solo por nombre completo o por índice.

# Recordemos que iloc selecciona por índice
# Función Lambda  que convierte el indice de columna en strings y devuelve mascara (True/false)
# si contiene determinado patrón. Creo que puede ponerse cualquier Regex
df2 = df.iloc[:, lambda df:df.columns.str.contains('_points')] # select column based on name

Si queremos combinar esto con otras columnas con otro patrón no encontré manera más sencilla por el momento que combinar por separado. Quizás es muy tedioso si son muchas.

# Notar que en point_vars le pasamos la máscara al listado de columnas nuevamente
# para quedarnos con el nombre real y poder sumarlo a las otras listas
# luego lo convertimos en lista porque el objeto es de tipo índice si no.
target = ['position']
qualy_vars = ['grid', 'dif_to_min_perc']
point_vars = list(results.columns[results.columns.str.contains('_points')])

vars_keep = target + qualy_vars + point_vars

Juntar dataframes por indice

Los DF vienen por default con un índice. Si uno trabaja con una copia del DF original para generar nuevas columnas el índice se mantiene (si no lo reseteamos claro). También útil si se tienen varias tablas con mismo índice.

Esto permite juntar tablas sin tener que hacer un merge explicito por determinadas columnas si no tenemos esos keys.

results = results.join(driver_points_merge) # join by index (no need to merge with column)

Ifelse

El equivalente de IFELSE en R para rapidamente crear una columna basado en otras, fila por fila.

import numpy as np
#               = condicion, valor si True, valor si False
df['position'] = np.where(df['position'] > 1, 0, 1)

Dropear columna de DF

Útil para asegurar que sacan el target de las X...

X = df.drop(columns="position")

Remplazar determinado valor por NaN (u otro)

df.replace

qualifying = qualifying.replace('\\N', np.nan)

APPLY. Aplicar funciones a cada fila o columna

Permite aplicar una función por fila o columna.La funcion se aplica sobre la serie (la fila o columna) La serie mantiene los indices. Si usamos apply con axis = 1 que cada serie es una fila entera, podemos llamar a la celda correspondiente usando ['columna']

Apply es como las distintas versiones de apply de R y/o MAP del tidyverse cuando se aplica a un DF.

import pandas as pd
rectangles = [
    { 'height': 40, 'width': 10 },
    { 'height': 20, 'width': 9 },
    { 'height': 3.4, 'width': 4 }
]

rectangles_df = pd.DataFrame(rectangles)
rectangles_df


# Suma de todas las celdas ("filas") por columna
##    height  width
## 0    40.0     10
## 1    20.0      9
## 2     3.4      4
suma_por_columna = rectangles_df.apply(sum)
print(suma_por_columna)

# Suma de todas las celdas ("columnas") por filas
## height    63.4
## width     23.0
## dtype: float64
suma_por_fila = rectangles_df.apply(sum, axis = 1)
print(suma_por_fila)
## 0    50.0
## 1    29.0
## 2     7.4
## dtype: float64

Apply Lambda para pasar funciones custom en el momento

import pandas as pd
rectangles = [
    { 'height': 40, 'width': 10 },
    { 'height': 20, 'width': 9 },
    { 'height': 3.4, 'width': 4 }
]

rectangles_df = pd.DataFrame(rectangles)

def multiplicar_2(x):
   return x*2

# Caso donde paso una funcion propia predefinida
rectangles_df.apply(multiplicar_2)


# Lo mismo pero definido en el momento
##    height  width
## 0    80.0     20
## 1    40.0     18
## 2     6.8      8
rectangles_df.apply(lambda x: x*2)
##    height  width
## 0    80.0     20
## 1    40.0     18
## 2     6.8      8

Calculos by group

Como el bygroup de tidyverse.

# Equivalente a  groupby(raceid) %>% summarise(newcol = min(best_qualy_ms))
min_qualy_by_race = qualifying.groupby('raceId')['best_qualy_ms'].min()

By Group más complejo, con calculo acumulado en determinada ventana de obs. por cada fila

# suma acumulada de los ultimos 4 periodos (rolling)
# luego el gorupby(level = 0).shift() es para lagearlo por grupo
# el ultimo reset_index es para quitar el indice de este ultimo agrupado
driver_points.groupby('driverId')['points'].rolling(4, min_periods = 4).sum().groupby(level = 0).shift().fillna(0).reset_index(level = 0)['points']