Skip to content

Commit

Permalink
Fix facet serialization on multi-node cluster
Browse files Browse the repository at this point in the history
  • Loading branch information
gornik committed Oct 29, 2013
1 parent 8249e44 commit 61f44b7
Show file tree
Hide file tree
Showing 13 changed files with 260 additions and 4 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,10 @@

import org.elasticsearch.common.geo.GeoHashUtils;
import org.elasticsearch.common.geo.GeoPoint;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;

import java.io.IOException;

/**
* Represents a cell established by the geo hash prefix.
Expand Down Expand Up @@ -79,4 +83,13 @@ public int hashCode() {
public String toString() {
return getCenter().toString();
}

public void writeTo(StreamOutput out) throws IOException {
out.writeString(geoHashPrefix);
}

public static GeoHashCell readFrom(StreamInput in) throws IOException {
String geoHashPrefix = in.readString();
return new GeoHashCell(geoHashPrefix);
}
}
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
package org.elasticsearch.plugin.geohashcellfacet;

import org.elasticsearch.common.geo.GeoPoint;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.common.xcontent.XContentBuilder;

import java.io.IOException;
Expand Down Expand Up @@ -48,4 +50,15 @@ public GeoHashCellWithGroupings createCellWithGroupings(AtomicLong value) {
public String getKey() {
return cell.toString();
}

@Override
public void writeTo(StreamOutput out) throws IOException {
out.writeString("GeoHashCellEntry");
cell.writeTo(out);
}

public static GeoHashCellFacetEntry readFrom(StreamInput in) throws IOException {
GeoHashCell cell = GeoHashCell.readFrom(in);
return new GeoHashCellEntry(cell);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,10 @@
import org.elasticsearch.common.bytes.BytesArray;
import org.elasticsearch.common.bytes.BytesReference;
import org.elasticsearch.common.collect.Maps;
import org.elasticsearch.common.geo.GeoHashUtils;
import org.elasticsearch.common.geo.GeoPoint;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.common.xcontent.XContentBuilder;
import org.elasticsearch.common.xcontent.XContentBuilderString;
import org.elasticsearch.search.facet.Facet;
Expand All @@ -28,12 +31,12 @@ public class GeoHashCellFacet extends InternalFacet {

public static final String TYPE = "geohashcell";
private static final BytesReference STREAM_TYPE = new BytesArray(TYPE);
private final MapBox mapBox;
private MapBox mapBox;

private Map<GeoHashCellFacetEntry, AtomicLong> counts = Maps.newHashMap();
private final String fieldName;
private final int userLevel;
private final String additionalGroupingField;
private String fieldName;
private int userLevel;
private String additionalGroupingField;

private GeoHashCellFacetResult results;

Expand Down Expand Up @@ -84,6 +87,46 @@ public BytesReference streamType() {
return STREAM_TYPE;
}

@Override
public void readFrom(StreamInput in) throws IOException {
super.readFrom(in);
this.mapBox = MapBox.readFrom(in);

int countsSize = in.readInt();

for (int i = 0; i<countsSize; i++) {
GeoHashCellFacetEntry entry = GeoHashCellFacetEntryReader.readFrom(in);
long value = in.readLong();
counts.put(entry, new AtomicLong(value));
}

this.results = GeoHashCellFacetResult.readFrom(in);

this.fieldName = in.readString();
this.userLevel = in.readInt();
this.additionalGroupingField = in.readOptionalString();
}

@Override
public void writeTo(StreamOutput out) throws IOException {
super.writeTo(out);

this.mapBox.writeTo(out);

out.writeInt(counts.size());

for (Map.Entry<GeoHashCellFacetEntry, AtomicLong> entry : counts.entrySet()) {
entry.getKey().writeTo(out);
out.writeLong(entry.getValue().get());
}

this.results.writeTo(out);

out.writeString(this.fieldName);
out.writeInt(this.userLevel);
out.writeOptionalString(this.additionalGroupingField);
}

@Override
public Facet reduce(ReduceContext reduceContext) {
List<Facet> facets = reduceContext.facets();
Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
package org.elasticsearch.plugin.geohashcellfacet;

import com.sun.tools.corba.se.idl.InterfaceState;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.common.xcontent.XContentBuilder;

import java.io.IOException;
Expand All @@ -11,4 +13,7 @@ public interface GeoHashCellFacetEntry {
ResultWithGroupings createCellWithGroupings(AtomicLong value);

String getKey();

void writeTo(StreamOutput out) throws IOException;
}

Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
package org.elasticsearch.plugin.geohashcellfacet;

import org.elasticsearch.common.io.stream.StreamInput;

import java.io.IOException;

public class GeoHashCellFacetEntryReader {

public static GeoHashCellFacetEntry readFrom(StreamInput in) throws IOException {
String type = in.readString();

if (type.equals("GeoHashCellEntry"))
return GeoHashCellEntry.readFrom(in);

if (type.equals("GeoHashCellWithGroupingEntry"))
return GeoHashCellWithGroupingEntry.readFrom(in);

if (type.equals("MissingGeoValueEntry"))
return new MissingGeoValueEntry();

throw new IllegalArgumentException("Type not supported: " + type);
}
}
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
package org.elasticsearch.plugin.geohashcellfacet;

import org.elasticsearch.common.collect.Maps;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.common.xcontent.XContentBuilder;

import java.io.IOException;
Expand Down Expand Up @@ -62,6 +64,29 @@ public void toXContent(XContentBuilder builder) throws IOException {
entry.getValue().toXContent(builder);
}
}

public void writeTo(StreamOutput out) throws IOException {
out.writeInt(counts.size());

for (Map.Entry<String, ResultWithGroupings> entry : counts.entrySet()) {
out.writeString(entry.getKey());
entry.getValue().writeTo(out);
}

}

public static GeoHashCellFacetResult readFrom(StreamInput in) throws IOException {
int countsSize = in.readInt();

Map<String, ResultWithGroupings> counts = Maps.newHashMap();

for (int i = 0; i<countsSize; i++) {
String key = in.readString();
counts.put(key, ResultWithGroupingsReader.readFrom(in));
}

return new GeoHashCellFacetResult(counts);
}
}


Original file line number Diff line number Diff line change
@@ -1,7 +1,10 @@
package org.elasticsearch.plugin.geohashcellfacet;

import org.elasticsearch.common.collect.Maps;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;

import java.io.IOException;
import java.util.Map;
import java.util.concurrent.atomic.AtomicLong;

Expand Down Expand Up @@ -38,4 +41,17 @@ public int hashCode() {
result = 31 * result + additionalGroupingValue.hashCode();
return result;
}

@Override
public void writeTo(StreamOutput out) throws IOException {
out.writeString("GeoHashCellWithGroupingEntry");
cell.writeTo(out);
out.writeString(additionalGroupingValue);
}

public static GeoHashCellFacetEntry readFrom(StreamInput in) throws IOException {
GeoHashCell cell = GeoHashCell.readFrom(in);
String additionalGroupingValue = in.readString();
return new GeoHashCellWithGroupingEntry(cell, additionalGroupingValue);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,8 @@

import org.elasticsearch.common.collect.Maps;
import org.elasticsearch.common.geo.GeoPoint;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.common.xcontent.XContentBuilder;

import java.io.IOException;
Expand Down Expand Up @@ -60,6 +62,36 @@ public void toXContent(XContentBuilder builder) throws IOException {

builder.endObject();
}

@Override
public void writeTo(StreamOutput out) throws IOException {
out.writeString("GeoHashCellWithGroupings");
cell.writeTo(out);
out.writeLong(total);
out.writeInt(groupings.size());

for (Map.Entry<String, AtomicLong> entry : groupings.entrySet()) {
out.writeString(entry.getKey());
out.writeLong(entry.getValue().get());
}
}

public static ResultWithGroupings readFrom(StreamInput in) throws IOException {
GeoHashCell cell = GeoHashCell.readFrom(in);
long total = in.readLong();
int groupingsSize = in.readInt();

Map<String, AtomicLong> groupings = Maps.newHashMap();

for (int i = 0; i<groupingsSize; i++) {
String key = in.readString();
long value = in.readLong();

groupings.put(key, new AtomicLong(value));
}

return new GeoHashCellWithGroupings(cell, total, groupings);
}
}


Original file line number Diff line number Diff line change
Expand Up @@ -4,8 +4,12 @@
import org.elasticsearch.common.geo.GeoDistance;
import org.elasticsearch.common.geo.GeoPoint;
import org.elasticsearch.common.geo.GeoUtils;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.common.unit.DistanceUnit;

import java.io.IOException;

/**
* A box representing the current map viewport.
*/
Expand Down Expand Up @@ -110,4 +114,24 @@ private double calculateHeight() {
bottomRight.lat(), topLeft.lon(),
DistanceUnit.METERS);
}

public static MapBox readFrom(StreamInput in) throws IOException {
GeoPoint topLeft = new GeoPoint();
GeoPoint bottomRight = new GeoPoint();

String topLeftHash = in.readString();
String bottomRightHash = in.readString();
topLeft.resetFromGeoHash(topLeftHash);
bottomRight.resetFromGeoHash(bottomRightHash);

return new MapBox(topLeft, bottomRight);
}

public void writeTo(StreamOutput out) throws IOException {
String topLeftHash = this.getTopLeft().geohash();
String bottomRightHash = this.getBottomRight().geohash();

out.writeString(topLeftHash);
out.writeString(bottomRightHash);
}
}
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
package org.elasticsearch.plugin.geohashcellfacet;

import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.common.xcontent.XContentBuilder;

import java.io.IOException;
Expand Down Expand Up @@ -29,4 +30,9 @@ public ResultWithGroupings createCellWithGroupings(AtomicLong value) {
public String getKey() {
return MISSING_ENTRY;
}

@Override
public void writeTo(StreamOutput out) throws IOException {
out.writeString("MissingGeoValueEntry");
}
}
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
package org.elasticsearch.plugin.geohashcellfacet;

import org.elasticsearch.common.collect.Maps;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.common.xcontent.XContentBuilder;

import java.io.IOException;
Expand Down Expand Up @@ -46,4 +48,34 @@ public void toXContent(XContentBuilder builder) throws IOException {
}
builder.endObject();
}

@Override
public void writeTo(StreamOutput out) throws IOException {
out.writeString("MissingGeoValueWithGroupings");

out.writeLong(total.get());
out.writeInt(groupings.size());

for (Map.Entry<String, AtomicLong> entry : groupings.entrySet()) {
out.writeString(entry.getKey());
out.writeLong(entry.getValue().get());
}
}

public static ResultWithGroupings readFrom(StreamInput in) throws IOException {

long total = in.readLong();
int groupingsSize = in.readInt();

Map<String, AtomicLong> groupings = Maps.newHashMap();

for (int i = 0; i<groupingsSize; i++) {
String key = in.readString();
long value = in.readLong();

groupings.put(key, new AtomicLong(value));
}

return new MissingGeoValueWithGroupings(new AtomicLong(total), groupings);
}
}
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package org.elasticsearch.plugin.geohashcellfacet;

import org.elasticsearch.common.collect.Maps;
import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.common.xcontent.XContentBuilder;

import java.io.IOException;
Expand All @@ -26,4 +27,6 @@ protected Map<String, AtomicLong> mergeGroupings(
}

public abstract void toXContent(XContentBuilder builder) throws IOException;

public abstract void writeTo(StreamOutput out) throws IOException;
}
Loading

0 comments on commit 61f44b7

Please sign in to comment.