Source code for lerot.experiment.VASyntheticComparisonExperiment

# This file is part of Lerot.
#
# Lerot is free software: you can redistribute it and/or modify
# it under the terms of the GNU Lesser General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# Lerot is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
# GNU Lesser General Public License for more details.
#
# You should have received a copy of the GNU Lesser General Public License
# along with Lerot.  If not, see <http://www.gnu.org/licenses/>.

import argparse
from collections import defaultdict
import copy
import glob
import itertools
import json
import numpy as np
import os
import random
import scipy
import scipy.stats
import sys
import traceback

from ..document import Document
from ..ranker import ModelRankingFunction, SyntheticDeterministicRankingFunction, StatelessRankingFunction
from ..utils import get_class


VERTICALS = ["Web", "News", "Image", "Video"]

T_TEST_THRESHOLD_ALPHA = 0.05

# Number of times we run a click model to compute online metrics.
CLICK_METRIC_RUNS = 50


def normalize(l):
    s = sum(l)
    for i in range(len(l)):
        l[i] /= s

[docs]class VASyntheticComparisonExperiment(): """Represents an experiment in which synthetic rankers are compared to investigate theoretical properties / guarantees. """ def __init__(self, log_fh, args): """Initialize an experiment using the provided arguments.""" self.verbose = args["verbose"] self.log_fh = log_fh # additional configuration: number of relevant documents # (number or "random") self.length = args["result_length"] self.rankings = args["rankings"] self.block_size = args["vertical_blocksize"] self.um_class = get_class(args["user_model"]) self.um_args = args["user_model_args"] self.um = self.um_class(self.um_args) self.pareto_um_class = get_class(args["pareto_um_class"]) self.pareto_um = self.pareto_um_class(args["pareto_um_args"]) self.fixed_doc_placement = (args["vertical_placement"]=="fixed") # initialize interleaved comparison methods according to configuration parser = argparse.ArgumentParser( description="parse arguments of an evaluation method.", prog="evaluation method configuration") parser.add_argument("--class_name") parser.add_argument("-i", "--interleave_method") self.methods = {} # init live methods if "evaluation_methods" in args: for method_id, method in enumerate( args["evaluation_methods"]): self.methods[method] = {} method_args_str = \ args["evaluation_methods_args"][method_id] method_args = vars(parser.parse_known_args(method_args_str.split())[0]) class_name = method_args["class_name"] self.methods[method]["instance"] = \ get_class(class_name)(method_args_str) self.offline_metrics = {} for metric in args.get("offline_metrics", "").split(","): metric = metric.rstrip() if metric: metric_short = metric.split(".")[-1].split("Eval")[0] self.offline_metrics[metric_short] = get_class(metric)() self.online_metrics = set([]) # will be filled later self.compute_online_metrics = args["compute_online_metrics"] self.compute_interleaved_metrics = args["compute_interleaved_metrics"] self.num_repeat_interleaving = args["num_repeat_interleaving"] self.random_query_draw = args.get("num_random_draws") is not None self.system_comparison = args["system_comparison"] if self.system_comparison == "none": dominates = lambda a, b, l: True elif self.system_comparison == "pareto": dominates = self._pareto_dominates else: raise Exception( "Unsupported system comparison: %s" % self.system_comparison ) if self.rankings == "synthetic": documents_A, documents_B, self.labels = ( VASyntheticComparisonExperiment.generate_ranking_pair( self.length, args["num_relevant"], pos_method=args["vertical_posmethod"], vert_rel=args["vertical_vertrel"], block_size=self.block_size, verticals=[v.strip() for v in args["verticals"].split(",")], fixed=self.fixed_doc_placement, dominates=dominates ) ) self.rankers = ( SyntheticDeterministicRankingFunction(documents_A), SyntheticDeterministicRankingFunction(documents_B) ) self.topics = ["dummy"] elif self.rankings == "model": # TODO(chuklin): make self.qrels, vert_map, self.Iprob to be # lazily initialized class attributes. self.Iprob = {} if args.get("vert_map_file") is not None and args.get("Iprob_file") is not None: vert_map = {} with open(args["vert_map_file"]) as vert_map_f: for line in vert_map_f: vertical, vert_id = line.split() vert_map[vert_id] = vertical with open(args["Iprob_file"]) as Iprob: for line in Iprob: topic, vert_id, prob = line.split() self.Iprob.setdefault(topic, {}) self.Iprob[topic][vert_map[vert_id]] = float(prob) else: self.Iprob = defaultdict(lambda: defaultdict(lambda: 1.0)) page_files = glob.glob(os.path.join(os.path.abspath(args["run_dir"]), "*")) page_files.sort() total_rankers = len(page_files) # indexes of rankers to compare all_pairs = [] for i in xrange(1, total_rankers): for j in xrange(i): all_pairs.append((i, j)) i, j = all_pairs[args["ranker_pair_idx"]] print "Comparing rankers %s and %s" % (page_files[i], page_files[j]) self.rankers = (ModelRankingFunction(), ModelRankingFunction()) self.doc_to_id = {} self.current_doc_id = 0 self.topics = set([]) self.ideal_ranker = ModelRankingFunction() # Add ideal page to the list of ranker files such that it can be # accessed using -1 index. page_files.append(args.get("ideal_page_as_rbp")) for r, idx in zip( [self.rankers[0], self.rankers[1], self.ideal_ranker], [i, j, -1]): if page_files[idx] is None: continue with open(page_files[idx]) as f: current_file_query_docs = set([]) for line in f: topic, _, doc, rank, system, vertical = line.split() self.topics.add(topic) if (topic, doc) in current_file_query_docs: # Skip duplicate docs continue current_file_query_docs.add((topic, doc)) if doc not in self.doc_to_id: self.doc_to_id[doc] = self.current_doc_id self.current_doc_id += 1 doc_id = self.doc_to_id[doc] r.add_doc_for_query(topic, Document(doc_id, vertical)) self.qrels = {} if args.get("qrel_file") is not None: with open(args["qrel_file"]) as qrels: for line in qrels: topic, doc, rel = line.split() self.qrels.setdefault(topic, defaultdict(lambda: 0)) if doc in self.doc_to_id: self.qrels[topic][self.doc_to_id[doc]] = int(rel[1:]) else: self.qrels = defaultdict(lambda: defaultdict(lambda: 0.0)) if "ideal_page_as_rbp" in args and "AsRbp" in self.offline_metrics: self.ideal_as_rbp = dict((q, self.offline_metrics["AsRbp"].get_value( ranking, self.qrels[q], self.Iprob[q], self.length)) \ for (q, ranking) in self.ideal_ranker.pages.iteritems() ) self.topics = sorted(self.topics) else: raise Exception("unknown input rankings specified: %s" % args["rankings"])
[docs] def init_rankers(self, query): """ Init rankers for a query Since the ranker may be stateful, we need to init it every time we access its documents. """ self.rankers[0].init_ranking(query) self.rankers[1].init_ranking(query)
[docs] def run(self): output = { "outcomes": defaultdict(lambda: []), "relaxed": defaultdict(lambda: []), "click_counts": defaultdict(lambda: []), "block_counts": defaultdict(lambda: []), "block_pos": defaultdict(lambda: []), "block_sizes": defaultdict(lambda: []), } result_length = self.length if self.fixed_doc_placement and self.rankings == "synthetic": result_length += self.block_size for r in self.rankers: assert isinstance(r, StatelessRankingFunction) topics = [random.choice(self.topics)] if self.random_query_draw else self.topics for topic in topics: if self.rankings == "synthetic": labels = self.labels orientations = defaultdict(lambda: 1.0) else: # model labels = self.qrels[topic] orientations = self.Iprob[topic] as_rbp_denominator = 1.0 if self.rankings == "model" and "AsRbp" in self.offline_metrics: as_rbp_denominator = self.ideal_as_rbp[topic] if as_rbp_denominator == 0.0: as_rbp_denominator = 1.0 for metric_name, metric_instance in self.offline_metrics.iteritems(): output.setdefault(metric_name, defaultdict(lambda: [])) self.init_rankers(topic) if metric_name == "RP" and self.rankings == "model": metric_A, metric_B = [metric_instance.get_value( self.rankers[i].getDocs(result_length), labels, orientations, result_length, ideal_ranking=self.ideal_ranker.pages[topic]) for i in [0, 1] ] else: metric_A = metric_instance.get_value( self.rankers[0].getDocs(result_length), labels, orientations, result_length) metric_B = metric_instance.get_value( self.rankers[1].getDocs(result_length), labels, orientations, result_length) if metric_name == "AsRbp" and self.rankings == "model": metric_A /= as_rbp_denominator metric_B /= as_rbp_denominator # Repeat the value n times such that metrics of the original # and the interleaved systems have the same lengths. output[metric_name]["A"].append(metric_A) output[metric_name]["B"].append(metric_B) if self.system_comparison == "pareto": # Store pareto-dominance similar to how we store metric values # so that they can be accessed uniformly. output.setdefault("pareto", defaultdict(lambda: [])) self.init_rankers(topic) docsA = self.rankers[0].getDocs(result_length) docsB = self.rankers[1].getDocs(result_length) if self._pareto_dominates(docsA, docsB, labels, orientation=orientations): output["pareto"]["A"].append(1) output["pareto"]["B"].append(0) elif self._pareto_dominates(docsB, docsA, labels, orientation=orientations): output["pareto"]["A"].append(0) output["pareto"]["B"].append(1) else: output["pareto"]["A"].append(0) output["pareto"]["B"].append(0) continue if self.compute_online_metrics: self.init_rankers(topic) for side, ranker in zip(["A", "B"], self.rankers): docs = ranker.getDocs(result_length) for run in xrange(CLICK_METRIC_RUNS): online_metrics = self.get_online_metrics( self.um.get_clicks(docs, labels, orientation=orientations), docs ) for metric_name, v in online_metrics.iteritems(): output.setdefault(metric_name, defaultdict(lambda: [])) output[metric_name][side].append(v) self.online_metrics.add(metric_name) for method_id, method in self.methods.iteritems(): self.init_rankers(topic) current_verticals = (self.rankers[0].verticals(result_length) | self.rankers[1].verticals(result_length)) try: # Interleave should call reset for query interleaver = method["instance"] n_interleavings = interleaver.interleave_n( self.rankers[1], self.rankers[0], topic, result_length, self.num_repeat_interleaving ) if hasattr(interleaver, "relaxed"): output["relaxed"][method_id].append(1 if interleaver.relaxed else 0) except Exception as e: print >>sys.stderr, method_id, "failed. Exception =", e self.init_rankers(topic) print >>sys.stderr, ("Topic:", topic, "Rankings:", self.rankers[0].getDocs(), self.rankers[1].getDocs()) traceback.print_exc() continue for (interleaved, a) in n_interleavings: # record outcomes and number of clicks output["outcomes"][method_id].append( method["instance"].infer_outcome( [x.get_id() for x in interleaved], a, self.um.get_clicks(interleaved, labels, orientation=orientations), None ) ) if self.compute_interleaved_metrics: block_cnts = VASyntheticComparisonExperiment.block_counts(interleaved) block_szs = VASyntheticComparisonExperiment.block_sizes(interleaved) for vert in current_verticals: output["block_counts"][method_id].append(block_cnts[vert]) output["block_sizes"][method_id].append( float(block_szs[vert]) / self.block_size) output["block_pos"][method_id].append( VASyntheticComparisonExperiment.block_position1(interleaved, result_length)) for metric_name, metric_instance in self.offline_metrics.iteritems(): if metric_name == "RP" and self.rankings == "model": metric_L = metric_instance.get_value( interleaved, labels, orientations, result_length, ideal_ranking=self.ideal_ranker.pages[topic]) else: metric_L = metric_instance.get_value( interleaved, labels, orientations, result_length) if metric_name == "AsRbp" and self.rankings == "model": metric_L /= as_rbp_denominator output[metric_name][method_id].append(metric_L) if self.compute_online_metrics: for run in xrange(CLICK_METRIC_RUNS): online_metrics = self.get_online_metrics( self.um.get_clicks(interleaved, labels, orientation=orientations), interleaved ) for metric_name, v in online_metrics.iteritems(): output[metric_name][method_id].append(v) if self.compute_interleaved_metrics: for metric_name in itertools.chain( self.offline_metrics.iterkeys(), self.online_metrics): side_metrics = {} for side in ["A", "B"]: side_metrics[side] = np.mean(output[metric_name][side]) # A is always the side with higher metric value; swap if needed A, B = sorted(["A", "B"], key=lambda side: side_metrics[side], reverse=True) side_metrics["A"] = A side_metrics["B"] = B for method_id in self.methods.iterkeys(): vals = output[metric_name][method_id] mean_val = np.mean(vals) degrades = False if mean_val < side_metrics[B]: _, p_value = scipy.stats.ttest_rel(vals, output[metric_name][B]) if p_value <= T_TEST_THRESHOLD_ALPHA: degrades = True output[metric_name][method_id + "_degrades"] = [1 if degrades else 0] for method, o in output["outcomes"].iteritems(): N = len(o) for part in xrange(5): num_impr = N // 5 * (part + 1) name = "outcomes_significant_%d" % num_impr output.setdefault(name, defaultdict(lambda: [])) output[name][method] = [1] if scipy.stats.ttest_1samp(o[:num_impr], 0.0)[1] <= T_TEST_THRESHOLD_ALPHA else [0] # record ranker pairs, comparison outcomes json.dump(output, self.log_fh)
@staticmethod
[docs] def block_counts(l): counts = defaultdict(lambda: 0) for i in xrange(len(l)): cur_type = l[i].get_type() if cur_type != "Web" and ( i + 1 == len(l) or cur_type != l[i + 1].get_type()): counts[l[i].get_type()] += 1 return counts
@staticmethod
[docs] def block_position1(l, result_length): for i in xrange(len(l)): cur_type = l[i].get_type() if cur_type != "Web": return i return result_length
@staticmethod
[docs] def block_sizes(l): sizes = defaultdict(lambda: 0) for d in l: cur_type = d.get_type() if cur_type != "Web": sizes[cur_type] += 1 return sizes
@staticmethod def _vertpos(min_len, pos_method): if pos_method == "beyondten": # This distribution is taken from figure 2 in "Beyond Ten Blue # Links: Enabling User Click Modeling in Federated Web Search" by # Chen et al. The distribution is scaled back to the 1-10 interval. posdist = [0.13950538998097659, 0.027266962587190878, 0.24096385542168688, 0.1407736207989854, 0.030437539632213073, 0.019023462270133174, 0.0025364616360177474, 0.04438807863031058, 0.3119847812301839, 0.04311984781230175] pos = int(np.where(np.random.multinomial(1, posdist) == 1)[0]) elif pos_method == "uniform": pos = random.randint(0, min_len) return pos @staticmethod def _set_vertical(r1, r2, olabels, length, pos_method="beyondten", vert_rel="non-relevant", block_size=3, verticals=None): if verticals is None: verticals = [VERTICALS[1]] # Select position to insert vertical blocks (at random). positions = {} for vert in verticals: pos = VASyntheticComparisonExperiment._vertpos( min(len(r1), len(r2)), pos_method) positions[vert] = pos max_id = max(d.get_id() for d in r1 + r2) r1 = [d.set_type("Web") for d in r1] r2 = [d.set_type("Web") for d in r2] for vert in verticals: pos = positions[vert] for i in range(block_size): r1.insert(pos, Document(max_id + i + 1, vert)) r2.insert(pos, Document(max_id + i + 1, vert)) # Adjust positions according to the just inserted block. del positions[vert] max_id += block_size for v, p in positions.iteritems(): if p >= pos: positions[v] += block_size assert all(all(v == 1 for v in VASyntheticComparisonExperiment.block_counts(r).itervalues()) for r in [r1, r2]) labels = olabels[:] for doc in set(r1 + r2): if doc.get_type() == "Web": continue vdoc = doc.get_id() if vdoc >= len(labels): labels += [0] * (vdoc - len(labels) + 1) if vert_rel == "non-relevant": labels[vdoc] = 0 elif vert_rel == "all-relevant": labels[vdoc] = 1 elif vert_rel == "relevant": ratio = float(sum(olabels)) / length labels[vdoc] = np.random.binomial(1, ratio) return r1, r2, labels @staticmethod
[docs] def generate_ranking_pair(result_length, num_relevant, pos_method="beyondten", vert_rel="non-relevant", block_size=3, verticals=None, fixed=False, dominates=lambda a, b, l: True): """ Generate pair of synthetic rankings. Appendix A, https://bitbucket.org/varepsilon/tois2013-interleaving """ if verticals is None: verticals = [VERTICALS[1]] # This is what is called 'd' in the paper. pool_length = result_length + 2 if fixed: document_pool, labels = ( VASyntheticComparisonExperiment._generate_document_pool( pool_length, num_relevant)) else: assert block_size * len(verticals) <= pool_length prob = float(block_size) / pool_length document_pool, labels = ( VASyntheticComparisonExperiment._generate_document_pool( pool_length, num_relevant, verticals, [prob] * len(verticals), vert_rel)) for _ in xrange(1000): ranking_A = VASyntheticComparisonExperiment._generate_ranking(document_pool) ranking_B = VASyntheticComparisonExperiment._generate_ranking(document_pool) if fixed: ranking_A, ranking_B, labels = ( VASyntheticComparisonExperiment._set_vertical( ranking_A, ranking_B, labels, result_length, pos_method, vert_rel, block_size, verticals)) if dominates(ranking_A, ranking_B, labels): return ranking_A, ranking_B, labels elif dominates(ranking_B, ranking_A, labels): return ranking_B, ranking_A, labels raise Exception("Could not find pareto dominated ranker for labels " "%s after 1000 trials." % ", ".join(str(x) for x in labels))
@staticmethod def _generate_document_pool(num_documents, num_relevant, verticals=None, prob_of_doc_vert=None, vert_rel="non-relevant"): """ num_documents --- total number of documents in the pool. The closer it is to the serp_size the more similar two generated SERPs would be. num_relevant --- number of relevant documents among those. prob_of_doc_vert --- probability of a document to be a vertical document. prob_of_doc_vert == [0.1, 0.3, 0.2] means that each document belongs to Vert1 with probability 0.1, belongs to Vert2 with probability 0.3, to Vert3 with probability 0.2, and to Web with probability 0.4 (1 - 0.1 - 0.3 - 0.2). vert_rel --- specifies if the vertical documents are "non-relevant", "all-relevant" or just as "relevant" as the non-vertical documents. Returns: list of documents and the labels: labels[doc.get_id()] == 1 <=> doc is relevant """ if prob_of_doc_vert is None: prob_of_doc_vert = [] if verticals is None: verticals = VERTICALS if num_relevant == "random": num_relevant = random.randint(1, num_documents // 2) elif type(num_relevant) == str and "-" in num_relevant: min_rel, max_rel = num_relevant.split("-") num_relevant = random.randint(int(min_rel), int(max_rel)) else: num_relevant = int(num_relevant) assert num_documents > 0 assert num_relevant > 0 assert num_relevant <= num_documents ranking_with_vert_assignment = [] for doc in range(num_documents): r = random.random() s = 0 vert_index = 0 for vert_index, prob in enumerate(prob_of_doc_vert): s += prob if s > r: ranking_with_vert_assignment.append( Document(doc, verticals[vert_index])) break else: # non-vertical ranking_with_vert_assignment.append(Document(doc, "Web")) labels = [0] * num_documents documents_with_random_relevance = range(num_documents) if vert_rel != "relevant": # "non-relevant" or "all-relevant" # Assign relevance for vertical and non-vertical documents separately documents_with_random_relevance = [id for id in xrange(num_documents) if \ ranking_with_vert_assignment[id].get_type() == "Web"] for id in random.sample(documents_with_random_relevance, num_relevant): labels[id] = 1 if vert_rel == "all-relevant": for id in xrange(num_relevant): if ranking_with_vert_assignment[id].get_type() != "Web": labels[id] = 1 return ranking_with_vert_assignment, labels @staticmethod def _generate_ranking(base_documents, serp_size=10, decay_power=5): # Select documents from the base_documents list # with decreasing softmax probability. probs = [1.0 / (i + 1) ** decay_power for i in range(len(base_documents))] # create a copy docs = [d for d in base_documents] s = sum(probs) normalize(probs) ranking = [] for iteration_number in range(serp_size): r = random.random() s = 0 for pos, prob in enumerate(probs): s += prob if s >= r: break ranking.append(docs[pos]) # Exclude the docid that we've just used. probs = [prob for pos1, prob in enumerate(probs) if pos1 != pos] docs = [rank for pos1, rank in enumerate(docs) if pos1 != pos] normalize(probs) # assert 0.95 <= sum(probs) <= 1.05 # Now re-order the documents to ensure that the vertical ones are grouped. reordered_ranking = [] formed_blocks = set() for doc in ranking: if doc.get_type() == 0: reordered_ranking.append(doc) elif doc.get_type() not in formed_blocks: reordered_ranking += ( [d for d in ranking if d.get_type() == doc.get_type()]) formed_blocks.add(doc.get_type()) return reordered_ranking def _pareto_dominates(self, a, b, labels, orientation=None): # Cut and sort by exmaniation probability accoridng to user model a = a[:self.length] b = b[:self.length] # Here we rely on implicit orientation values (1.0), since we don't have # them anyway for synthetic data. examination_a = self.pareto_um.get_examination_prob(a, orientation=orientation) examination_b = self.pareto_um.get_examination_prob(b, orientation=orientation) a.sort(key=dict(zip(a, examination_a)).get, reverse=True) b.sort(key=dict(zip(b, examination_b)).get, reverse=True) rel_a = [index for index, item in enumerate(a) if labels[item.get_id()] > 0] rel_b = [index for index, item in enumerate(b) if labels[item.get_id()] > 0] # if a has fewer relevant documents it cannot dominate b if len(rel_a) < len(rel_b): return False distance = 0 for index_a, index_b in zip(rel_a, rel_b): if index_a > index_b: return False elif index_a < index_b: distance += index_b - index_a # if b has fewer relevant documents and none of its elements # violate pareto dominance if len(rel_a) > len(rel_b) and index_b == rel_b[-1]: return True if distance > 0: return True return False @staticmethod
[docs] def get_online_metrics(clicks, ranking): if all(c == 0 for c in clicks): return { "MaxRR": 0.0, "MinRR": 0.0, "MeanRR": 0.0, "PLC": 0.0, "Clicks@1": 0.0, "VertClick": 0.0, "click_counts": 0.0, } else: scores = {} minClickRank = min((r for r, c in enumerate(clicks) if c)) maxClickRank = max((r for r, c in enumerate(clicks) if c)) numClicks = sum(1 for c in clicks if c) scores["MaxRR"] = 1.0 / (minClickRank + 1) scores["MinRR"] = 1.0 / (maxClickRank + 1) scores["MeanRR"] = ( sum((1.0 / (r + 1) for r, c in enumerate(clicks) if c)) / numClicks ) scores["PLC"] = float(numClicks) / (maxClickRank + 1) scores["Clicks@1"] = 1.0 if clicks[0] else 0.0 scores["VertClick"] = ( 1.0 if any( (c for (d, c) in zip(ranking, clicks) if d.get_type() != "Web") ) else 0.0 ) scores["click_counts"] = sum(1 for c in clicks if c) return scores