Source code for lerot.environment.FederatedClickModel

# This file is part of Lerot.
#
# Lerot is free software: you can redistribute it and/or modify
# it under the terms of the GNU Lesser General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# Lerot is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
# GNU Lesser General Public License for more details.
#
# You should have received a copy of the GNU Lesser General Public License
# along with Lerot.  If not, see <http://www.gnu.org/licenses/>.

from collections import defaultdict
import itertools
import numpy as np

from AbstractUserModel import AbstractUserModel


[docs]class FederatedClickModel(AbstractUserModel): def __init__(self, arg_str): self.parmh = {'text': [0.95, 0.3, 0.25, 0.2, 0.15, 0.1, 0.05, 0.05, 0.05, 0.05, 0.05, 0.05], 'media': [0.95, 0.9, 0.85, 0.8, 0.75, 0.7, 0.3, 0.25, 0.2, 0.15, 0.10, 0.05]} self.parmphi = [.68, .61, .48, .34, .28, .2, .11, .1, .08, .06] args = arg_str.split() self.pargamma = {'text': float(args[0]), 'media': float(args[1])}
[docs] def h(self, i, serp_len, vert): return self.getParamRescaled(i, serp_len, self.parmh[self.getVertClass(vert)])
[docs] def p(self, i, serp_len): return self.getParamRescaled(i, serp_len, self.parmphi)
[docs] def b(self, i, vert): return min(1, 1.0 / ((abs(i) + self.pargamma[self.getVertClass(vert)])))
@staticmethod
[docs] def getParamRescaled(rank, serp_len, param_vector): assert rank < serp_len if serp_len <= len(param_vector): return param_vector[rank] origin_rank = float(rank) / (serp_len - 1) * (len(param_vector) - 1) left = int(origin_rank) delta = origin_rank - left if delta < 0.01: return param_vector[left] return param_vector[left] * (1 - delta) + param_vector[left + 1] * delta
@staticmethod
[docs] def getVertClass(vert_type): if vert_type in ['Answer', 'Blog', 'Books', 'Discussion', 'News', 'Scholar', 'Wiki']: return 'text' elif vert_type in ['Image', 'Recipe', 'Shopping', 'Video', 'Apps']: return 'media' else: raise NotImplementedError('Unknown vertical type: %s' % vert_type)
[docs] def get_clicks(self, result_list, labels, **kwargs): """Simulate clicks on the result_list. - labels contain relevance labels indexed by the docid """ N = len(result_list) orientation = kwargs.get('orientation') if orientation is None: orientation = defaultdict(lambda: 1.0) vert_types = set(d.get_type() for d in result_list if d.get_type() != 'Web') biased_verticals = set([]) # the set of verticals for which A^j == True for vert in vert_types: hposs = [i for i, d in enumerate(result_list) if d.get_type() == vert] A = np.random.binomial(1, self.h(hposs[0], N, vert) * orientation[vert]) if A: biased_verticals.add(vert) examination_probs = self._examination_prob(result_list, biased_verticals) return [1 if labels[d.get_id()] > 0 and np.random.binomial(1, e) else 0 \ for (e, d) in zip(examination_probs, result_list)]
[docs] def get_examination_prob(self, result_list, **kwargs): N = len(result_list) orientation = kwargs.get('orientation') if orientation is None: orientation = defaultdict(lambda: 1.0) vert_types = list(set(d.get_type() for d in result_list if d.get_type() != 'Web')) # P(A_j = 1) p_A_j_1 = np.zeros(len(vert_types)) for j, vert in enumerate(vert_types): hposs = [i for i, d in enumerate(result_list) if d.get_type() == vert] p_A_j_1[j] = self.h(hposs[0], N, vert) * orientation[vert] # P(E_i = 1) = \sum_A P(A) \cdot P(E_i = 1 \mid A) p_E = np.zeros(N) # A is a vector of attractiveness values of length `len(vert_types)` for A in itertools.product([0, 1], repeat=len(vert_types)): biased_verticals = set(v for (a, v) in zip(A, vert_types) if a) # P(A) = \prod_j P(A_j) p_A = 1.0 for j, a in enumerate(A): p_A *= p_A_j_1[j] if a else (1 - p_A_j_1[j]) # P(E = 1 \mid A) p_E_1_mid_A = self._examination_prob(result_list, biased_verticals) p_E += p_A * np.array(p_E_1_mid_A) return p_E
def _examination_prob(self, result_list, biased_verticals): N = len(result_list) examination_probs = [] for pos, d in enumerate(result_list): beta = 0 for vert in biased_verticals: nearest = min((i for i, d in enumerate(result_list) \ if d.get_type() == vert), key=lambda i: (abs(i - pos), i)) beta = max(beta, self.b(nearest - pos, vert)) beta = min(1, beta) phi = self.p(pos, N) e = phi + (1 - phi) * beta examination_probs.append(e) return examination_probs