Skip to content

Commit

Permalink
Merge pull request #25280 from vespa-engine/fix-dimension-hashing-bug
Browse files Browse the repository at this point in the history
Fix dimension hashing bug
  • Loading branch information
baldersheim authored Dec 18, 2022
2 parents b36fded + a8751ad commit d98918a
Show file tree
Hide file tree
Showing 10 changed files with 228 additions and 83 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -75,9 +75,9 @@ public List<MetricsPacket.Builder> getMetrics(List<VespaService> services, Consu
systemCheck.ifPresent(metricsPackets::add);

MetricAggregator aggregator = new MetricAggregator(service.getDimensions());
MetricsParser.Consumer metricsConsumer = (consumerId != null)
? new GetServiceMetricsConsumer(metricsConsumers, aggregator, consumerId)
: new GetServiceMetricsConsumerForAll(metricsConsumers, aggregator);
MetricsParser.Collector metricsConsumer = (consumerId != null)
? new ServiceMetricsCollector(metricsConsumers, aggregator, consumerId)
: new ServiceMetricsCollectorForAll(metricsConsumers, aggregator);
service.consumeMetrics(metricsConsumer);

if (! aggregator.getAggregated().isEmpty()) {
Expand Down Expand Up @@ -118,10 +118,10 @@ private MetricsPacket.Builder getHealth(VespaService service) {
* In order to include a metric, it must exist in the given map of metric to consumers.
* Each returned metric will contain a collection of consumers that it should be routed to.
*/
private static abstract class GetServiceMetricsConsumerBase implements MetricsParser.Consumer {
private static abstract class ServiceMetricsCollectorBase implements MetricsParser.Collector {
protected final MetricAggregator aggregator;

GetServiceMetricsConsumerBase(MetricAggregator aggregator) {
ServiceMetricsCollectorBase(MetricAggregator aggregator) {
this.aggregator = aggregator;
}

Expand Down Expand Up @@ -150,18 +150,18 @@ private static Set<ConsumerId> extractConsumers(Set<ConsumerId> configuredConsum
}
}

private static class GetServiceMetricsConsumer extends GetServiceMetricsConsumerBase {
private static class ServiceMetricsCollector extends ServiceMetricsCollectorBase {
private final Map<MetricId, ConfiguredMetric> configuredMetrics;
private final Set<ConsumerId> consumerId;

GetServiceMetricsConsumer(MetricsConsumers metricsConsumers, MetricAggregator aggregator, ConsumerId consumerId) {
ServiceMetricsCollector(MetricsConsumers metricsConsumers, MetricAggregator aggregator, ConsumerId consumerId) {
super(aggregator);
this.consumerId = Set.of(consumerId);
this.configuredMetrics = metricsConsumers.getMetricsForConsumer(consumerId);
}

@Override
public void consume(Metric candidate) {
public void accept(Metric candidate) {
ConfiguredMetric configuredMetric = configuredMetrics.get(candidate.getName());
if (configuredMetric != null) {
aggregator.aggregate(
Expand All @@ -170,16 +170,16 @@ public void consume(Metric candidate) {
}
}

private static class GetServiceMetricsConsumerForAll extends GetServiceMetricsConsumerBase {
private static class ServiceMetricsCollectorForAll extends ServiceMetricsCollectorBase {
private final MetricsConsumers metricsConsumers;

GetServiceMetricsConsumerForAll(MetricsConsumers metricsConsumers, MetricAggregator aggregator) {
ServiceMetricsCollectorForAll(MetricsConsumers metricsConsumers, MetricAggregator aggregator) {
super(aggregator);
this.metricsConsumers = metricsConsumers;
}

@Override
public void consume(Metric candidate) {
public void accept(Metric candidate) {
Map<ConfiguredMetric, Set<ConsumerId>> consumersByMetric = metricsConsumers.getConsumersByMetric(candidate.getName());
if (consumersByMetric != null) {
consumersByMetric.keySet().forEach(
Expand Down Expand Up @@ -234,11 +234,11 @@ private static void setMetaInfo(MetricsPacket.Builder builder, Instant timestamp
.statusMessage("Data collected successfully");
}

private class MetricStringBuilder implements MetricsParser.Consumer {
private class MetricStringBuilder implements MetricsParser.Collector {
private final StringBuilder sb = new StringBuilder();
private VespaService service;
@Override
public void consume(Metric metric) {
public void accept(Metric metric) {
MetricId key = metric.getName();
MetricId alias = key;

Expand Down Expand Up @@ -272,15 +272,15 @@ public String getMetricsAsString(List<VespaService> services) {
return msb.toString();
}

private class MetricNamesBuilder implements MetricsParser.Consumer {
private class MetricNamesBuilder implements MetricsParser.Collector {
private final StringBuilder bufferOn = new StringBuilder();
private final StringBuilder bufferOff = new StringBuilder();
private final ConsumerId consumer;
MetricNamesBuilder(ConsumerId consumer) {
this.consumer = consumer;
}
@Override
public void consume(Metric m) {
public void accept(Metric m) {
String description = m.getDescription();
MetricId alias = MetricId.empty;
boolean isForwarded = false;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,6 @@ public class DummyMetricsFetcher extends RemoteMetricsFetcher {
/**
* Connect to remote service over http and fetch metrics
*/
public void getMetrics(MetricsParser.Consumer consumer, int fetchCount) {
public void getMetrics(MetricsParser.Collector consumer, int fetchCount) {
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -13,8 +13,10 @@
import java.time.Instant;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.stream.Collectors;

import static ai.vespa.metricsproxy.metric.model.DimensionId.toDimensionId;
Expand All @@ -25,20 +27,22 @@
* @author Jo Kristian Bergum
*/
public class MetricsParser {
public interface Consumer {
void consume(Metric metric);
public interface Collector {
void accept(Metric metric);
}

private static final ObjectMapper jsonMapper = new ObjectMapper();

public static void parse(String data, Consumer consumer) throws IOException {
public static void parse(String data, Collector consumer) throws IOException {
parse(jsonMapper.createParser(data), consumer);
}

static void parse(InputStream data, Consumer consumer) throws IOException {
static void parse(InputStream data, Collector consumer) throws IOException {
parse(jsonMapper.createParser(data), consumer);
}
private static void parse(JsonParser parser, Consumer consumer) throws IOException {

// Top level 'metrics' object, with e.g. 'time', 'status' and 'metrics'.
private static void parse(JsonParser parser, Collector consumer) throws IOException {
if (parser.nextToken() != JsonToken.START_OBJECT) {
throw new IOException("Expected start of object, got " + parser.currentToken());
}
Expand All @@ -55,6 +59,7 @@ private static void parse(JsonParser parser, Consumer consumer) throws IOExcepti
}
}
}

static private Instant parseSnapshot(JsonParser parser) throws IOException {
if (parser.getCurrentToken() != JsonToken.START_OBJECT) {
throw new IOException("Expected start of 'snapshot' object, got " + parser.currentToken());
Expand All @@ -75,59 +80,97 @@ static private Instant parseSnapshot(JsonParser parser) throws IOException {
return timestamp;
}

static private void parseMetricValues(JsonParser parser, Instant timestamp, Consumer consumer) throws IOException {
// 'metrics' object with 'snapshot' and 'values' arrays
static private void parseMetrics(JsonParser parser, Collector consumer) throws IOException {
if (parser.getCurrentToken() != JsonToken.START_OBJECT) {
throw new IOException("Expected start of 'metrics' object, got " + parser.currentToken());
}
Instant timestamp = Instant.now();
for (parser.nextToken(); parser.getCurrentToken() != JsonToken.END_OBJECT; parser.nextToken()) {
String fieldName = parser.getCurrentName();
JsonToken token = parser.nextToken();
if (fieldName.equals("snapshot")) {
timestamp = parseSnapshot(parser);
} else if (fieldName.equals("values")) {
parseMetricValues(parser, timestamp, consumer);
} else {
if (token == JsonToken.START_OBJECT || token == JsonToken.START_ARRAY) {
parser.skipChildren();
}
}
}
}

// 'values' array
static private void parseMetricValues(JsonParser parser, Instant timestamp, Collector consumer) throws IOException {
if (parser.getCurrentToken() != JsonToken.START_ARRAY) {
throw new IOException("Expected start of 'metrics:values' array, got " + parser.currentToken());
}

Map<Long, Map<DimensionId, String>> uniqueDimensions = new HashMap<>();
Map<Set<Dimension>, Map<DimensionId, String>> uniqueDimensions = new HashMap<>();
while (parser.nextToken() == JsonToken.START_OBJECT) {
handleValue(parser, timestamp, consumer, uniqueDimensions);
}
}

static private void parseMetrics(JsonParser parser, Consumer consumer) throws IOException {
if (parser.getCurrentToken() != JsonToken.START_OBJECT) {
throw new IOException("Expected start of 'metrics' object, got " + parser.currentToken());
}
Instant timestamp = Instant.now();
// One item in the 'values' array, where each item has 'name', 'values' and 'dimensions'
static private void handleValue(JsonParser parser, Instant timestamp, Collector consumer,
Map<Set<Dimension>, Map<DimensionId, String>> uniqueDimensions) throws IOException {
String name = "";
String description = "";
Map<DimensionId, String> dim = Map.of();
List<Map.Entry<String, Number>> values = List.of();
for (parser.nextToken(); parser.getCurrentToken() != JsonToken.END_OBJECT; parser.nextToken()) {
String fieldName = parser.getCurrentName();
JsonToken token = parser.nextToken();
if (fieldName.equals("snapshot")) {
timestamp = parseSnapshot(parser);
if (fieldName.equals("name")) {
name = parser.getText();
} else if (fieldName.equals("description")) {
description = parser.getText();
} else if (fieldName.equals("dimensions")) {
dim = parseDimensions(parser, uniqueDimensions);
} else if (fieldName.equals("values")) {
parseMetricValues(parser, timestamp, consumer);
values = parseValues(name+".", parser);
} else {
if (token == JsonToken.START_OBJECT || token == JsonToken.START_ARRAY) {
parser.skipChildren();
}
}
}
for (Map.Entry<String, Number> value : values) {
consumer.accept(new Metric(MetricId.toMetricId(value.getKey()), value.getValue(), timestamp, dim, description));
}
}

private static Map<DimensionId, String> parseDimensions(JsonParser parser,
Map<Long, Map<DimensionId, String>> uniqueDimensions) throws IOException {
List<Map.Entry<String, String>> dims = new ArrayList<>();
int keyHash = 0;
int valueHash = 0;
Map<Set<Dimension>, Map<DimensionId, String>> uniqueDimensions) throws IOException {

Set<Dimension> dimensions = new HashSet<>();

for (parser.nextToken(); parser.getCurrentToken() != JsonToken.END_OBJECT; parser.nextToken()) {
String fieldName = parser.getCurrentName();
JsonToken token = parser.nextToken();

if (token == JsonToken.VALUE_STRING){
String value = parser.getValueAsString();
dims.add(Map.entry(fieldName, value));
keyHash ^= fieldName.hashCode();
valueHash ^= value.hashCode();
dimensions.add(Dimension.of(fieldName, value));
} else if (token == JsonToken.VALUE_NULL) {
// TODO Should log a warning if this happens
} else {
throw new IllegalArgumentException("Dimension '" + fieldName + "' must be a string");
}
}
Long uniqueKey = (((long) keyHash) << 32) | (valueHash & 0xffffffffL);
return uniqueDimensions.computeIfAbsent(uniqueKey, key -> dims.stream().collect(Collectors.toUnmodifiableMap(e -> toDimensionId(e.getKey()), Map.Entry::getValue)));
return uniqueDimensions.computeIfAbsent(dimensions,
key -> dimensions.stream().collect(Collectors.toUnmodifiableMap(
dim -> toDimensionId(dim.id), dim -> dim.value)));
}

record Dimension(String id, String value) {
static Dimension of(String id, String value) {
return new Dimension(id, value);
}
}

private static List<Map.Entry<String, Number>> parseValues(String prefix, JsonParser parser) throws IOException {
List<Map.Entry<String, Number>> metrics = new ArrayList<>();
for (parser.nextToken(); parser.getCurrentToken() != JsonToken.END_OBJECT; parser.nextToken()) {
Expand All @@ -144,31 +187,5 @@ private static List<Map.Entry<String, Number>> parseValues(String prefix, JsonPa
}
return metrics;
}
static private void handleValue(JsonParser parser, Instant timestamp, Consumer consumer,
Map<Long, Map<DimensionId, String>> uniqueDimensions) throws IOException {
String name = "";
String description = "";
Map<DimensionId, String> dim = Map.of();
List<Map.Entry<String, Number>> values = List.of();
for (parser.nextToken(); parser.getCurrentToken() != JsonToken.END_OBJECT; parser.nextToken()) {
String fieldName = parser.getCurrentName();
JsonToken token = parser.nextToken();
if (fieldName.equals("name")) {
name = parser.getText();
} else if (fieldName.equals("description")) {
description = parser.getText();
} else if (fieldName.equals("dimensions")) {
dim = parseDimensions(parser, uniqueDimensions);
} else if (fieldName.equals("values")) {
values = parseValues(name+".", parser);
} else {
if (token == JsonToken.START_OBJECT || token == JsonToken.START_ARRAY) {
parser.skipChildren();
}
}
}
for (Map.Entry<String, Number> value : values) {
consumer.consume(new Metric(MetricId.toMetricId(value.getKey()), value.getValue(), timestamp, dim, description));
}
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ public class RemoteMetricsFetcher extends HttpMetricFetcher {
/**
* Connect to remote service over http and fetch metrics
*/
public void getMetrics(MetricsParser.Consumer consumer, int fetchCount) {
public void getMetrics(MetricsParser.Collector consumer, int fetchCount) {
try (CloseableHttpResponse response = getResponse()) {
HttpEntity entity = response.getEntity();
try {
Expand All @@ -37,7 +37,7 @@ public void getMetrics(MetricsParser.Consumer consumer, int fetchCount) {
} catch (IOException ignored) {}
}

void createMetrics(String data, MetricsParser.Consumer consumer, int fetchCount) throws IOException {
void createMetrics(String data, MetricsParser.Collector consumer, int fetchCount) throws IOException {
MetricsParser.parse(data, consumer);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -137,15 +137,15 @@ public Metrics getSystemMetrics() {
* Get the Metrics registered for this service. Metrics are fetched over HTTP
* if a metric http port has been defined, otherwise from log file
*/
public void consumeMetrics(MetricsParser.Consumer consumer) {
public void consumeMetrics(MetricsParser.Collector consumer) {
remoteMetricsFetcher.getMetrics(consumer, metricsFetchCount.get());
metricsFetchCount.getAndIncrement();
}

private static class CollectMetrics implements MetricsParser.Consumer {
private static class CollectMetrics implements MetricsParser.Collector {
private final Metrics metrics = new Metrics();
@Override
public void consume(Metric metric) {
public void accept(Metric metric) {
metrics.add(metric);
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@
package ai.vespa.metricsproxy.service;

import ai.vespa.metricsproxy.metric.HealthMetric;
import ai.vespa.metricsproxy.metric.Metrics;

/**
* @author gjoranv
Expand All @@ -18,7 +17,7 @@ public DownService(HealthMetric healthMetric) {
}

@Override
public void consumeMetrics(MetricsParser.Consumer consumer) {
public void consumeMetrics(MetricsParser.Collector consumer) {
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,10 +20,10 @@ public DummyService(int num, String configid) {
}

@Override
public void consumeMetrics(MetricsParser.Consumer consumer) {
public void consumeMetrics(MetricsParser.Collector consumer) {
long timestamp = System.currentTimeMillis() / 1000;
consumer.consume(new Metric(MetricId.toMetricId(METRIC_1), 5 * num + 1, timestamp));
consumer.consume(new Metric(MetricId.toMetricId(METRIC_2), 1.3 * num + 1.05, timestamp));
consumer.accept(new Metric(MetricId.toMetricId(METRIC_1), 5 * num + 1, timestamp));
consumer.accept(new Metric(MetricId.toMetricId(METRIC_2), 1.3 * num + 1.05, timestamp));
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -21,18 +21,18 @@ public class MetricsFetcherTest {

private static final int port = 9; //port number is not used in this test

private static class MetricsConsumer implements MetricsParser.Consumer {
private static class MetricsCollector implements MetricsParser.Collector {
Metrics metrics = new Metrics();
@Override
public void consume(Metric metric) {
public void accept(Metric metric) {
metrics.add(metric);
}
}
Metrics fetch(String data) throws IOException {
RemoteMetricsFetcher fetcher = new RemoteMetricsFetcher(new DummyService(0, "dummy/id/0"), port);
MetricsConsumer consumer = new MetricsConsumer();
fetcher.createMetrics(data, consumer, 0);
return consumer.metrics;
MetricsCollector collector = new MetricsCollector();
fetcher.createMetrics(data, collector, 0);
return collector.metrics;
}

@Test
Expand Down
Loading

0 comments on commit d98918a

Please sign in to comment.