-
Get the unique query term, store them in a HashSet — method
getQueryWordsSet()
. -
Create a custom HashMap Accumulator —
TotalQueryWordsAccumulator
, and 2 Long Accumulator:wordCountAccumulator
: The overall all document length after preprocessing.docCountAccumulator
: The total doc number after preprocessing.TotalQueryWordsAccumulator
: Count the total number of Query term appears in the whole corpus.
And using above variables, we can get
averageDocumentLengthInCorpus
. -
Use
PreprocessFlatMap()
to convertNewsArticle
DataSet toArticleWordsDic
DataSet and collect, in order to lower the data size, only convert the article has query term to ArticleWordsDic.- Check if the title or ID is null, and the get 5 paragraph and title into a List.
wordCountAccumulator
+= list size, anddocCountAccumulator
+ 1.- Use the query term hashset, count each term and frequency in current Article, store in a HashMap.
- If the map size == 0, means that no term showed up in this Article, return an empty list.
TotalQueryWordsAccumulator
add the HashMap.- Return the created
ArticleWordsDic
-
Retrieve above DPH parameter and the HashMap.
public class ArticleWordsDic implements Serializable {
String id; // unique article identifier
String title; // article title
int length;
Map<String, Integer> map;
}
public class TotalQueryWordsAccumulator extends AccumulatorV2<HashMap<String, Integer>, HashMap<String, Integer>> {
private HashMap<String, Integer> hashMap = new HashMap<>();
....
}
- Wrap the DPH parameter and the List into Broadcast variables.
- Based on the queries dataset, call
QueryToQueryResultMap
method to get theQueryResultWithArticleId
.- Get the score of each article, store the articleID and score into
List<DPHResult> dphResultList
- Sort the
dphResultList
. - Calculate the distance between titles, and create 10 article list, storing at
queryResultWithArticleId
. - Return the
queryResultWithArticleId
.
- Get the score of each article, store the articleID and score into
public class QueryResultWithArticleId implements Serializable {
Query query;
List<DPHResult> articleIdList;
}
public class DPHResult implements Serializable {
String id;
String title;
double score;
}
public class QueryToQueryResultMap implements MapFunction<Query, QueryResultWithArticleId> {
private static final long serialVersionUID = -484410270146328326L;
Broadcast<List<ArticleWordsDic>> listBroadcast;
Broadcast<Long> totalDocsInCorpus;
Broadcast<Double> averageDocumentLengthInCorpus;
Broadcast<Map<String, Integer>> totalTermFrequencyInCorpusDic;
public QueryToQueryResultMap(Broadcast<List<ArticleWordsDic>> listBroadcast, Broadcast<Long> totalDocsInCorpus,
Broadcast<Double> averageDocumentLengthInCorpus,
Broadcast<Map<String, Integer>> totalTermFrequencyInCorpusDic) {
this.listBroadcast = listBroadcast;
this.totalDocsInCorpus = totalDocsInCorpus;
this.averageDocumentLengthInCorpus = averageDocumentLengthInCorpus;
this.totalTermFrequencyInCorpusDic = totalTermFrequencyInCorpusDic;
}
}
- In
Dataset<QueryResultWithArticleId> queryResultWithArticleIdDataset
, there are all the ArticleID needed for the result, then usingGetReusltArticleIdFlatMap()
to generate the unique ArticleID as HashSet, use it onNewsArticleResultFlatMap
of news dataset to get the Dataset needed for create DocumentRanking. - After getting Dataset, map it to
JavaPairRDD<String, NewsArticle>
, the key is the ArticleId, collect as Map. - Then broadcast it to the previous
queryResultWithArticleIdDataset
, usingQueryWithArticleIdToDR
Map to get the finalList<DocumentRanking> documentRankingList
.
- Renamed flatmap function
GetReusltArticleIdFlatMap
toGetResultArticleIdFlatMap
, and updated the related classes. - rephrase comments in AssessedExercise and QueryToQueryResultMap files.