Skip to content

Commit

Permalink
Add services and DAO and extracting installation report to a service
Browse files Browse the repository at this point in the history
  • Loading branch information
thiagohora committed Sep 27, 2024
1 parent f13cde5 commit b99925f
Show file tree
Hide file tree
Showing 16 changed files with 299 additions and 210 deletions.
7 changes: 4 additions & 3 deletions apps/opik-backend/config.yml
Original file line number Diff line number Diff line change
Expand Up @@ -74,8 +74,9 @@ rateLimit:
limit: ${RATE_LIMIT_GENERAL_EVENTS_LIMIT:-10000}
durationInSeconds: ${RATE_LIMIT_GENERAL_EVENTS_DURATION_IN_SEC:-60}

usageReport:
enabled: ${OPIK_USAGE_REPORT_ENABLED:-true}
url: ${OPIK_USAGE_REPORT_URL:-https://stats.comet.com/notify/event/}

metadata:
version: ${OPIK_VERSION:-latest}
usageReport:
enabled: ${OPIK_REPORTING_ENABLED:-true}
url: ${OPIK_REPORTING_URL:-https://stats.comet.com/notify/event/}
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
import com.comet.opik.infrastructure.db.DatabaseAnalyticsModule;
import com.comet.opik.infrastructure.db.IdGeneratorModule;
import com.comet.opik.infrastructure.db.NameGeneratorModule;
import com.comet.opik.infrastructure.http.HttpModule;
import com.comet.opik.infrastructure.ratelimit.RateLimitModule;
import com.comet.opik.infrastructure.redis.RedisModule;
import com.comet.opik.utils.JsonBigDecimalDeserializer;
Expand Down Expand Up @@ -62,7 +63,7 @@ public void initialize(Bootstrap<OpikConfiguration> bootstrap) {
.bundles(JdbiBundle.<OpikConfiguration>forDatabase((conf, env) -> conf.getDatabase())
.withPlugins(new SqlObjectPlugin(), new Jackson2Plugin()))
.modules(new DatabaseAnalyticsModule(), new IdGeneratorModule(), new AuthModule(), new RedisModule(),
new RateLimitModule(), new NameGeneratorModule())
new RateLimitModule(), new NameGeneratorModule(), new HttpModule())
.listen(new ApplicationStartupListener())
.enableAutoConfig()
.build());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,16 +6,10 @@
import lombok.Data;

@Data
public class OpikMetadataConfig {

public record UsageReport(@Valid @JsonProperty boolean enabled, @Valid @JsonProperty String url) {
}
public class MetadataConfig {

@Valid
@JsonProperty
@NotNull private String version;

@Valid
@NotNull @JsonProperty
private UsageReport usageReport;
}
Original file line number Diff line number Diff line change
Expand Up @@ -40,5 +40,9 @@ public class OpikConfiguration extends Configuration {

@Valid
@NotNull @JsonProperty
private OpikMetadataConfig metadata = new OpikMetadataConfig();
private MetadataConfig metadata = new MetadataConfig();

@Valid
@NotNull @JsonProperty
private UsageReportConfig usageReport = new UsageReportConfig();
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
package com.comet.opik.infrastructure;

import com.fasterxml.jackson.annotation.JsonProperty;
import jakarta.validation.Valid;
import lombok.Data;

@Data
public class UsageReportConfig {

@Valid
@JsonProperty
private boolean enabled;

@Valid
@JsonProperty
private String url;

}
Original file line number Diff line number Diff line change
@@ -1,39 +1,20 @@
package com.comet.opik.infrastructure.bi;

import com.comet.opik.domain.IdGenerator;
import com.comet.opik.infrastructure.OpikConfiguration;
import com.comet.opik.infrastructure.lock.LockService;
import com.google.inject.Injector;
import jakarta.ws.rs.client.Client;
import jakarta.ws.rs.client.ClientBuilder;
import jakarta.ws.rs.client.Entity;
import jakarta.ws.rs.core.MediaType;
import jakarta.ws.rs.core.Response;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.StringUtils;
import reactor.core.publisher.Mono;
import reactor.core.scheduler.Schedulers;
import ru.vyarus.dropwizard.guice.module.lifecycle.GuiceyLifecycle;
import ru.vyarus.dropwizard.guice.module.lifecycle.GuiceyLifecycleListener;
import ru.vyarus.dropwizard.guice.module.lifecycle.event.GuiceyLifecycleEvent;
import ru.vyarus.dropwizard.guice.module.lifecycle.event.InjectorPhaseEvent;

import java.net.URI;
import java.util.Map;
import java.util.Optional;
import java.util.concurrent.atomic.AtomicReference;

import static com.comet.opik.infrastructure.lock.LockService.Lock;

@Slf4j
@RequiredArgsConstructor
public class ApplicationStartupListener implements GuiceyLifecycleListener {

public static final String NOTIFICATION_EVENT_TYPE = "opik_os_startup_be";

// This event cannot depend on authentication
private final Client client = ClientBuilder.newClient();
private final AtomicReference<Injector> injector = new AtomicReference<>();

@Override
Expand All @@ -44,104 +25,10 @@ public void onEvent(GuiceyLifecycleEvent event) {
}

if (event.getType() == GuiceyLifecycle.ApplicationStarted) {
var installationReportService = injector.get().getInstance(InstallationReportService.class);

String eventType = GuiceyLifecycle.ApplicationStarted.name();

var config = (OpikConfiguration) event.getSharedState().getConfiguration().get();

if (!config.getMetadata().getUsageReport().enabled()) {
log.info("Usage report is disabled");
return;
}

if (StringUtils.isEmpty(config.getMetadata().getUsageReport().url())) {
log.warn("Usage report URL is not set");
return;
}

var lockService = injector.get().getInstance(LockService.class);
var generator = injector.get().getInstance(IdGenerator.class);
var usageReport = injector.get().getInstance(UsageReportDAO.class);

var lock = new Lock("opik-%s".formatted(eventType));

lockService.executeWithLock(lock, tryToReportStartupEvent(usageReport, generator, eventType, config))
.subscribeOn(Schedulers.boundedElastic())
.onErrorResume(e -> {
log.warn("Didn't reported due to error", e);
return Mono.empty();
}).block();
installationReportService.reportInstallation();
}
}

private Mono<Void> tryToReportStartupEvent(UsageReportDAO usageReport, IdGenerator generator, String eventType,
OpikConfiguration config) {
return Mono.fromCallable(() -> {

String anonymousId = getAnonymousId(usageReport, generator);

log.info("Anonymous ID: {}", anonymousId);

if (usageReport.isEventReported(eventType)) {
log.info("Event already reported");
return null;
}

reportEvent(anonymousId, eventType, config, usageReport);
return null;
});
}

private String getAnonymousId(UsageReportDAO usageReport, IdGenerator generator) {
var anonymousId = usageReport.getAnonymousId();

if (anonymousId.isEmpty()) {
log.info("Anonymous ID not found, generating a new one");
var newId = generator.generateId();
log.info("Generated new ID: {}", newId);

// Save the new ID
usageReport.saveAnonymousId(newId.toString());

anonymousId = Optional.of(newId.toString());
}

return anonymousId.get();
}

private void reportEvent(String anonymousId, String eventType, OpikConfiguration config,
UsageReportDAO usageReport) {

usageReport.addEvent(eventType);

var startupEvent = new OpikStartupEvent(
anonymousId,
NOTIFICATION_EVENT_TYPE,
Map.of("opik_app_version", config.getMetadata().getVersion()));

try (Response response = client.target(URI.create(config.getMetadata().getUsageReport().url()))
.request()
.accept(MediaType.APPLICATION_JSON_TYPE)
.post(Entity.json(startupEvent))) {

if (response.getStatusInfo().getFamily() == Response.Status.Family.SUCCESSFUL && response.hasEntity()) {

var notificationEventResponse = response.readEntity(NotificationEventResponse.class);

if (notificationEventResponse.success()) {
usageReport.markEventAsReported(eventType);
log.info("Event reported successfully: {}", notificationEventResponse.message());
} else {
log.warn("Failed to report event: {}", notificationEventResponse.message());
}

return;
}

log.warn("Failed to report event: {}", response.getStatusInfo());
if (response.hasEntity()) {
log.warn("Response: {}", response.readEntity(String.class));
}
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,135 @@
package com.comet.opik.infrastructure.bi;

import com.comet.opik.domain.IdGenerator;
import com.comet.opik.infrastructure.OpikConfiguration;
import com.comet.opik.infrastructure.lock.LockService;
import com.google.inject.ImplementedBy;
import jakarta.inject.Inject;
import jakarta.inject.Singleton;
import jakarta.ws.rs.client.Client;
import jakarta.ws.rs.client.Entity;
import jakarta.ws.rs.core.MediaType;
import jakarta.ws.rs.core.Response;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.StringUtils;
import reactor.core.publisher.Mono;
import reactor.core.scheduler.Schedulers;
import ru.vyarus.dropwizard.guice.module.lifecycle.GuiceyLifecycle;

import java.net.URI;
import java.util.Map;
import java.util.Optional;

@ImplementedBy(InstallationReportServiceImpl.class)
interface InstallationReportService {
String NOTIFICATION_EVENT_TYPE = "opik_os_startup_be";

void reportInstallation();
}

@Singleton
@Slf4j
@RequiredArgsConstructor(onConstructor_ = @Inject)
class InstallationReportServiceImpl implements InstallationReportService {

private final LockService lockService;
private final IdGenerator generator;
private final UsageReportService usageReport;
private final OpikConfiguration config;
private final Client client;

public void reportInstallation() {

String eventType = GuiceyLifecycle.ApplicationStarted.name();

if (!config.getUsageReport().isEnabled()) {
log.info("Usage report is disabled");
return;
}

if (StringUtils.isEmpty(config.getUsageReport().getUrl())) {
log.warn("Usage report URL is not set");
return;
}

var lock = new LockService.Lock("opik-%s".formatted(eventType));

lockService.executeWithLock(lock, tryToReportStartupEvent(eventType))
.subscribeOn(Schedulers.boundedElastic())
.onErrorResume(e -> {
log.warn("Didn't reported due to error", e);
return Mono.empty();
}).block();

}

private Mono<Void> tryToReportStartupEvent(String eventType) {
return Mono.fromCallable(() -> {

String anonymousId = getAnonymousId();

log.info("Anonymous ID: {}", anonymousId);

if (usageReport.isEventReported(eventType)) {
log.info("Event already reported");
return null;
}

reportEvent(anonymousId, eventType);
return null;
});
}

private String getAnonymousId() {
var anonymousId = usageReport.getAnonymousId();

if (anonymousId.isEmpty()) {
log.info("Anonymous ID not found, generating a new one");
var newId = generator.generateId();
log.info("Generated new ID: {}", newId);

// Save the new ID
usageReport.saveAnonymousId(newId.toString());

anonymousId = Optional.of(newId.toString());
}

return anonymousId.get();
}

private void reportEvent(String anonymousId, String eventType) {

usageReport.addEvent(eventType);

var startupEvent = new OpikStartupEvent(
anonymousId,
NOTIFICATION_EVENT_TYPE,
Map.of("opik_app_version", config.getMetadata().getVersion()));

try (Response response = client.target(URI.create(config.getUsageReport().getUrl()))
.request()
.accept(MediaType.APPLICATION_JSON_TYPE)
.post(Entity.json(startupEvent))) {

if (response.getStatusInfo().getFamily() == Response.Status.Family.SUCCESSFUL && response.hasEntity()) {

var notificationEventResponse = response.readEntity(NotificationEventResponse.class);

if (notificationEventResponse.success()) {
usageReport.markEventAsReported(eventType);
log.info("Event reported successfully: {}", notificationEventResponse.message());
} else {
log.warn("Failed to report event: {}", notificationEventResponse.message());
}

return;
}

log.warn("Failed to report event: {}", response.getStatusInfo());
if (response.hasEntity()) {
log.warn("Response: {}", response.readEntity(String.class));
}
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
package com.comet.opik.infrastructure.bi;

import org.jdbi.v3.sqlobject.customizer.Bind;
import org.jdbi.v3.sqlobject.statement.SqlQuery;
import org.jdbi.v3.sqlobject.statement.SqlUpdate;

import java.util.Optional;

interface MetadataDAO {

@SqlQuery("SELECT value FROM metadata WHERE `key` = :key")
Optional<String> getMetadataKey(@Bind("key") String key);

@SqlUpdate("INSERT INTO metadata (`key`, value) VALUES (:key, :value)")
void saveMetadataKey(@Bind("key") String key, @Bind("value") String value);
}
Original file line number Diff line number Diff line change
@@ -1,6 +1,5 @@
package com.comet.opik.infrastructure.bi;


import com.fasterxml.jackson.annotation.JsonIgnoreProperties;
import com.fasterxml.jackson.databind.PropertyNamingStrategies;
import com.fasterxml.jackson.databind.annotation.JsonNaming;
Expand Down
Loading

0 comments on commit b99925f

Please sign in to comment.