Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

adding support for dictinctCount on groupBy queries #1

Open
wants to merge 12 commits into
base: master
Choose a base branch
from
2 changes: 1 addition & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ Wiki is available at:

Issues are tracked at:

[http://linkedin.jira.com/browse/SENSEI](http://linkedin.jira.com/browse/SENSEI)
[https://senseidb.atlassian.net/browse/SENSEI](https://senseidb.atlassian.net/browse/SENSEI)

### Mailing List / Discussion Group

Expand Down
2 changes: 1 addition & 1 deletion clients/java/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@
<parent>
<groupId>com.senseidb</groupId>
<artifactId>sensei-parent</artifactId>
<version>1.5.1-SNAPSHOT</version>
<version>1.6.1-SNAPSHOT</version>
<relativePath>../../sensei-parent/pom.xml</relativePath>
</parent>

Expand Down
2 changes: 1 addition & 1 deletion docs/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@

<artifactId>sensei-doc</artifactId>

<version>1.5.1-SNAPSHOT</version>
<version>1.6.1-SNAPSHOT</version>

<url>http://www.senseidb.com</url>

Expand Down
2 changes: 1 addition & 1 deletion example/hadoop-indexing/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@
<artifactId>sensei-example-hadoop</artifactId>
<packaging>jar</packaging>
<name>sensei example hadoop indexing</name>
<version>1.5.1-SNAPSHOT</version>
<version>1.6.1-SNAPSHOT</version>
<description>sensei hadoop indexer example</description>

<!-- Set the compiler to java6 -->
Expand Down
4 changes: 2 additions & 2 deletions example/tweets/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@
<artifactId>tweet-example</artifactId>
<packaging>jar</packaging>
<name>tweet-examples</name>
<version>1.5.1-SNAPSHOT</version>
<version>1.6.1-SNAPSHOT</version>
<description>sensei examples - tweets</description>

<!-- Set the compiler to java6 -->
Expand Down Expand Up @@ -59,7 +59,7 @@
<dependency>
<groupId>com.senseidb</groupId>
<artifactId>sensei-core</artifactId>
<version>1.5.1-SNAPSHOT</version>
<version>1.6.1-SNAPSHOT</version>
</dependency>
</dependencies>

Expand Down
2 changes: 1 addition & 1 deletion perf/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@

<groupId>com.senseidb</groupId>
<artifactId>sensei-perf</artifactId>
<version>1.5.1-SNAPSHOT</version>
<version>1.6.1-SNAPSHOT</version>
<packaging>jar</packaging>
<name>sensei-perf</name>
<description>sensei perf</description>
Expand Down
2 changes: 1 addition & 1 deletion pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@
<parent>
<groupId>com.senseidb</groupId>
<artifactId>sensei-parent</artifactId>
<version>1.5.1-SNAPSHOT</version>
<version>1.6.1-SNAPSHOT</version>
<relativePath>sensei-parent/pom.xml</relativePath>
</parent>

Expand Down
6 changes: 3 additions & 3 deletions sensei-core/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@
<parent>
<groupId>com.senseidb</groupId>
<artifactId>sensei-parent</artifactId>
<version>1.5.1-SNAPSHOT</version>
<version>1.6.1-SNAPSHOT</version>
<relativePath>../sensei-parent/pom.xml</relativePath>
</parent>

Expand Down Expand Up @@ -176,7 +176,7 @@
<dependency>
<groupId>com.senseidb.zoie</groupId>
<artifactId>zoie-jms</artifactId>
<version>3.2.1-SNAPSHOT</version>
<version>3.3.1-SNAPSHOT</version>
</dependency>

<dependency>
Expand Down Expand Up @@ -220,7 +220,7 @@
<dependency>
<groupId>com.browseengine.bobo</groupId>
<artifactId>bobo-browse</artifactId>
<version>3.1.2-SNAPSHOT</version>
<version>3.2.1-SNAPSHOT</version>
</dependency>

<dependency>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
import java.util.Set;
import java.util.Timer;
import java.util.TimerTask;
import java.util.concurrent.ExecutionException;

import javax.management.MBeanServer;
import javax.management.ObjectName;
Expand All @@ -20,6 +21,7 @@

import com.browseengine.bobo.api.FacetSpec;
import com.linkedin.norbert.NorbertException;
import com.linkedin.norbert.cluster.ClusterDisconnectedException;
import com.linkedin.norbert.javacompat.cluster.ClusterClient;
import com.linkedin.norbert.javacompat.cluster.Node;
import com.linkedin.norbert.javacompat.network.PartitionedNetworkClient;
Expand Down Expand Up @@ -49,6 +51,9 @@ public class SenseiBroker extends AbstractConsistentHashBroker<SenseiRequest, Se

private final boolean allowPartialMerge;
private final ClusterClient clusterClient;

private volatile boolean disconnected;

private static Counter numberOfNodesInTheCluster = Metrics.newCounter(new MetricName(SenseiBroker.class, "numberOfNodesInTheCluster"));
public SenseiBroker(PartitionedNetworkClient<String> networkClient, ClusterClient clusterClient, boolean allowPartialMerge)
throws NorbertException {
Expand Down Expand Up @@ -256,6 +261,17 @@ public int getNumberOfNodes() {
}
return count;
}


@Override
protected List<SenseiResult> doCall(SenseiRequest req) throws ExecutionException {
try {
// TODO Auto-generated method stub
return super.doCall(req);
} catch (ClusterDisconnectedException ex) {
disconnected = true;
throw ex;
}
}
public boolean isDisconnected() {
return disconnected;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -452,6 +452,7 @@ public static void main(String[] args) throws Exception {
public void run(){

try{
server.setAvailable(false);
jettyServer.stop();
} catch (Exception e) {
logger.error(e.getMessage(),e);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -136,7 +136,16 @@ public void setLoadBalancerFactory(PartitionedLoadBalancerFactory<String> loadBa
public void setAllowPartialMerge(boolean allowPartialMerge) {
this.allowPartialMerge = allowPartialMerge;
}

public void shutdown() {
networkClientConfig.setClusterClient(null);
try {
clusterClient.shutdown();
clusterClient = null;
} finally {
networkClient.shutdown();
networkClient = null;
}
}


}
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@ public static void decorateWithMapReduce(JSONObject jsonObj, java.util.List<Pair
int countSum = 0;
int top = groupBy.optInt("top");
for (Pair<String, String> pair: aggreagationFunctions) {
if (columns.length() == 1 && "sum".equals(pair.getFirst()) && countSum == 0) {
if (columns.length() == 1 && "sum".equalsIgnoreCase(pair.getFirst()) && countSum == 0) {
countSum++;

JSONObject facetSpec = new FastJSONObject().put("expand", false)
Expand All @@ -59,7 +59,7 @@ public static void decorateWithMapReduce(JSONObject jsonObj, java.util.List<Pair
jsonObj.put("facets", new FastJSONObject());
}
jsonObj.getJSONObject("facets").put(SenseiFacetHandlerBuilder.SUM_GROUP_BY_FACET_NAME, facetSpec);
} else if (columns.length() == 1 && "count".equals(pair.getFirst()) ) {
} else if (columns.length() == 1 && "count".equalsIgnoreCase(pair.getFirst()) ) {
JSONObject facetSpec = new FastJSONObject().put("expand", false)
.put("minhit", 0)
.put("max", top);
Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
package com.senseidb.search.req.mapred.functions.groupby;


import it.unimi.dsi.fastutil.longs.LongOpenHashSet;

import java.util.ArrayList;
import java.util.Collections;
import java.util.Comparator;
Expand All @@ -18,6 +20,11 @@
public class AggregateFunctionFactory {
public static AggregateFunction valueOf(String name, String column) {
name = name.toLowerCase();

if (name.equals("distinctcount")) {
return new DistictCountAggregateFunction(column);
}

if (name.endsWith("avg")) {
return new AvgAggregationFunction(column);
}
Expand Down Expand Up @@ -97,6 +104,68 @@ public Object toJson(HashMap<String, SumGroupedValue> reduceResult) {
}

}

/* Distinct Group By Count impl start */

public static class DistictCountAggregateFunction implements AggregateFunction<GroupedValue> {
private final String column;

public DistictCountAggregateFunction(String column) {
this.column = column;
}


@Override
public GroupedValue produceSingleValue(SingleFieldAccessor accessor, int docId) {
DistinctCountGroupValue val = new DistinctCountGroupValue();
val.addToSet(accessor.getLong(docId));
return val;
}

@Override
public Object toJson(HashMap<String, GroupedValue> reduceResult) {
try {
JSONArray ret = new JSONUtil.FastJSONArray();
for (String key : AggregateFunctionFactory.sort(reduceResult)) {
DistinctCountGroupValue value = (DistinctCountGroupValue) reduceResult.get(key);
ret.put(new JSONUtil.FastJSONObject().put("count", value.longSet.size()).put("group", key));
}
return ret;
} catch (Exception e) {
throw new RuntimeException(e);
}
}

}

public static class DistinctCountGroupValue implements GroupedValue {
LongOpenHashSet longSet = new LongOpenHashSet();

long count = 0;
int group = 0;

@Override
public int compareTo(GroupedValue o) {
long val = longSet.size() - ((DistinctCountGroupValue) o).longSet.size();
if (val < 0)
return -1;
if (val == 0)
return 0;
return 1;
}

@Override
public void merge(GroupedValue anotherValue) {
longSet.addAll(((DistinctCountGroupValue) anotherValue).longSet);
}

public void addToSet(long val) {
longSet.add(val);
}
}

/* Distinct Group By Count impl end */

public static class CountGroupedValue implements GroupedValue {
long count = 0;

Expand All @@ -115,6 +184,11 @@ public void merge(GroupedValue anotherValue) {
count += ((CountGroupedValue) anotherValue).count;
}

@Override
public String toString() {
return "CountGroupedValue [count=" + count + "]";
}

}

public static class CountAggregationFunction implements AggregateFunction<CountGroupedValue> {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,11 @@ public MapResult(int initialCapacity, TermValueList[] dictionaries, BoboIndexRea
public Long2ObjectOpenHashMap<GroupedValue> results;
public TermValueList[] dictionaries;
public BoboIndexReader indexReader;
@Override
public String toString() {
return "MapResult [results=" + results + ", dictionaries=" + java.util.Arrays.toString(dictionaries) + "]";
}

}

public class GroupByMapReduceJob implements SenseiMapReduce<Serializable, HashMap<String, GroupedValue>> {
Expand Down Expand Up @@ -106,8 +111,8 @@ public Serializable map(IntArray docIds, int docIdCount, long[] uids, FieldAcces
}
}

if (mapResult.results.size() > TRIM_SIZE * 20) {
trimToSize(mapResult.results, TRIM_SIZE * 5);
if (mapResult.results.size() > Math.max(TRIM_SIZE, top) * 20) {
trimToSize(mapResult.results, Math.max(TRIM_SIZE, top) * 5);
}

return mapResult;
Expand Down Expand Up @@ -154,7 +159,9 @@ private String decodeKey(String[] str, TermValueList[] dictionaries, int[] numBi

@Override
public List<Serializable> combine(List<Serializable> mapResults, CombinerStage combinerStage) {
/*
// System.out.println("Combine - " + mapResults);

/*
* if (combinerStage == CombinerStage.partitionLevel) { if (map == null)
* { return Collections.EMPTY_LIST; } trimToSize(map, TRIM_SIZE * 5);
* List<HashMap<String, GroupedValue>> ret =
Expand All @@ -173,8 +180,20 @@ public List<Serializable> combine(List<Serializable> mapResults, CombinerStage c
for (int i = 0; i < mapResults.size(); i++) {
MapResult current = (MapResult) mapResults.get(i);
if (results.get(current.indexReader) != null) {
results.get(current.indexReader).results.putAll(current.results);
trimToSize(current.results, TRIM_SIZE);

Long2ObjectOpenHashMap<GroupedValue> currentMergedResults = results.get(current.indexReader).results;

Long2ObjectOpenHashMap<GroupedValue> currentResultsToMerge = current.results;
for (long key : currentResultsToMerge.keySet()) {
GroupedValue groupedValue = currentMergedResults.get(key);
if (groupedValue != null) {
groupedValue.merge(currentResultsToMerge.get(key));
} else {
currentMergedResults.put(key, currentResultsToMerge.get(key));
}
}
// .putAll(currentResultsToMerge);
trimToSize(currentResultsToMerge, Math.max(TRIM_SIZE, top));
} else {
results.put(current.indexReader, current);
}
Expand All @@ -188,6 +207,7 @@ public List<Serializable> combine(List<Serializable> mapResults, CombinerStage c
}

}
// System.out.println("End combine - " + ret);
return java.util.Arrays.asList((Serializable)ret);
}
if (mapResults.size() == 0) {
Expand All @@ -201,7 +221,7 @@ public List<Serializable> combine(List<Serializable> mapResults, CombinerStage c
for (int i = 1; i < mapResults.size(); i++) {
merge(firstMap, (HashMap<String, GroupedValue>) mapResults.get(i));
}
trimToSize(firstMap, TRIM_SIZE);
trimToSize(firstMap, Math.max(TRIM_SIZE, top));
return java.util.Arrays.asList((Serializable)firstMap);

}
Expand Down Expand Up @@ -372,7 +392,7 @@ public JSONObject render(HashMap<String, GroupedValue> reduceResult) {
JSONArray jsonArrResult = (JSONArray) result;
if (jsonArrResult.length() > top) {
JSONArray newArr = new JSONUtil.FastJSONArray();
for (int i = 0; i <= top; i++) {
for (int i = 0; i < top; i++) {
newArr.put(jsonArrResult.get(i));
}
jsonArrResult = newArr;
Expand Down
Loading