Skip to content

Commit

Permalink
Add examples and utility function to deal with Cassio (#689)
Browse files Browse the repository at this point in the history
  • Loading branch information
eolivelli authored Nov 4, 2023
1 parent 3fbbaa6 commit 70fb424
Show file tree
Hide file tree
Showing 6 changed files with 290 additions and 14 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -88,9 +88,7 @@ private void registerFunctions() {
this.expressionContext
.getFunctionMapper()
.mapFunction(
"fn",
"concat",
JstlFunctions.class.getMethod("concat", Object.class, Object.class));
"fn", "concat", JstlFunctions.class.getMethod("concat", Object[].class));
this.expressionContext
.getFunctionMapper()
.mapFunction(
Expand Down Expand Up @@ -127,6 +125,10 @@ private void registerFunctions() {
"fn",
"addAll",
JstlFunctions.class.getMethod("addAll", Object.class, Object.class));
this.expressionContext
.getFunctionMapper()
.mapFunction(
"fn", "listOf", JstlFunctions.class.getMethod("listOf", Object[].class));
this.expressionContext
.getFunctionMapper()
.mapFunction("fn", "emptyList", JstlFunctions.class.getMethod("emptyList"));
Expand All @@ -153,6 +155,24 @@ private void registerFunctions() {
this.expressionContext
.getFunctionMapper()
.mapFunction("fn", "emptyMap", JstlFunctions.class.getMethod("emptyMap"));
this.expressionContext
.getFunctionMapper()
.mapFunction(
"fn",
"mapPut",
JstlFunctions.class.getMethod(
"mapPut", Object.class, Object.class, Object.class));
this.expressionContext
.getFunctionMapper()
.mapFunction("fn", "mapOf", JstlFunctions.class.getMethod("mapOf", Object[].class));

this.expressionContext
.getFunctionMapper()
.mapFunction(
"fn",
"mapRemove",
JstlFunctions.class.getMethod("mapRemove", Object.class, Object.class));

this.expressionContext
.getFunctionMapper()
.mapFunction("fn", "toInt", JstlFunctions.class.getMethod("toInt", Object.class));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
import java.time.Instant;
import java.time.temporal.ChronoUnit;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.HashMap;
import java.util.List;
Expand Down Expand Up @@ -164,6 +165,52 @@ public static Map<String, Object> emptyMap() {
return Map.of();
}

public static Map<String, Object> mapOf(Object... field) {
Map<String, Object> result = new HashMap<>();
for (int i = 0; i < field.length; i += 2) {
result.put(field[i].toString(), field[i + 1]);
}
return result;
}

public static List<Object> listOf(Object... field) {
List<Object> result = new ArrayList<>();
result.addAll(Arrays.asList(field));
return result;
}

public static Map<String, Object> mapPut(Object map, Object field, Object value) {
Map<String, Object> result = new HashMap<>();
if (map != null) {
if (map instanceof Map m) {
result.putAll(m);
} else {
throw new IllegalArgumentException("mapPut doesn't allow a non-map value");
}
}
if (field == null || field.toString().isEmpty()) {
throw new IllegalArgumentException("mapPut doesn't allow a null field");
}
result.put(field.toString(), value);
return result;
}

public static Map<String, Object> mapRemove(Object map, Object field) {
Map<String, Object> result = new HashMap<>();
if (map != null) {
if (map instanceof Map m) {
result.putAll(m);
} else {
throw new IllegalArgumentException("mapPut doesn't allow a non-map value");
}
}
if (field == null || field.toString().isEmpty()) {
throw new IllegalArgumentException("mapPut doesn't allow a null field");
}
result.remove(field.toString());
return result;
}

public static List<Object> mapToListOfStructs(Object object, String fields) {
if (object == null) {
throw new IllegalArgumentException("listOf doesn't allow a null value");
Expand Down Expand Up @@ -282,12 +329,18 @@ public static String trim(Object input) {
return input == null ? null : toString(input).trim();
}

public static String concat(Object first, Object second) {
return toString(first) + toString(second);
public static String concat(Object... elements) {
StringBuilder sb = new StringBuilder();
for (Object o : elements) {
if (o != null) {
sb.append(toString(o));
}
}
return sb.toString();
}

public static String concat3(Object first, Object second, Object third) {
return toString(first) + toString(second) + toString(third);
return concat(first, second, third);
}

public static Object coalesce(Object value, Object valueIfNull) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -319,6 +319,9 @@ private Schema getAvroSchema(ComputeFieldType type, Object value) {
case BYTES:
schemaType = Schema.Type.BYTES;
break;
case MAP:
schemaType = Schema.Type.MAP;
break;
case ARRAY:
schemaType = Schema.Type.ARRAY;
break;
Expand All @@ -342,6 +345,10 @@ private Schema getAvroSchema(ComputeFieldType type, Object value) {
return Schema.createArray(
Schema.createMap(Schema.create(Schema.Type.STRING)));
}
if (schemaType == Schema.Type.MAP) {
// we don't know the element type of the array, so we can't create a schema
return Schema.createMap(Schema.create(Schema.Type.STRING));
}

// Handle logical types:
// https://avro.apache.org/docs/1.10.2/spec.html#Logical+Types
Expand Down Expand Up @@ -517,6 +524,9 @@ private ComputeFieldType getFieldType(Object value) {
if (List.class.isAssignableFrom(value.getClass())) {
return ComputeFieldType.ARRAY;
}
if (Map.class.isAssignableFrom(value.getClass())) {
return ComputeFieldType.MAP;
}
throw new UnsupportedOperationException("Got an unsupported type: " + value.getClass());
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -39,5 +39,6 @@ public enum ComputeFieldType {
DATETIME,
BYTES,
DECIMAL,
ARRAY
ARRAY,
MAP
}
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@
import java.util.List;
import java.util.Map;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.atomic.AtomicReference;
import java.util.concurrent.ConcurrentHashMap;
import java.util.stream.Collectors;
import lombok.Getter;
import lombok.extern.slf4j.Slf4j;
Expand Down Expand Up @@ -176,15 +176,15 @@ public void initialise(Map<String, Object> agentConfiguration) {
processor.start(configuration);
}

private final AtomicReference<CompletableFuture<?>> currentRecordStatus =
new AtomicReference<>();
private final Map<Record, CompletableFuture<?>> currentRecordStatus =
new ConcurrentHashMap<>();

@Override
public CompletableFuture<?> upsert(Record record, Map<String, Object> context) {
// we must handle one record at a time
// so we block until the record is processed
CompletableFuture<?> handle = new CompletableFuture();
currentRecordStatus.set(handle);
currentRecordStatus.put(record, handle);
processor.put(List.of(new LangStreamSinkRecordAdapter(record)));
return handle;
}
Expand All @@ -208,7 +208,8 @@ public String applicationName() {
@Override
protected void handleSuccess(AbstractSinkRecord abstractRecord) {
Record record = ((LangStreamSinkRecordAdapter) abstractRecord).getRecord();
currentRecordStatus.get().complete(null);
CompletableFuture<?> remove = currentRecordStatus.remove(record);
remove.complete(null);
}

@Override
Expand Down Expand Up @@ -242,12 +243,14 @@ protected void handleFailure(
log.warn("Error decoding/mapping Kafka record {}: {}", record, e.getMessage());
}

CompletableFuture<?> remove = currentRecordStatus.remove(record);

if (ignoreErrors == CassandraSinkConfig.IgnoreErrorsPolicy.NONE
|| (ignoreErrors == CassandraSinkConfig.IgnoreErrorsPolicy.DRIVER
&& !driverFailure)) {
currentRecordStatus.get().completeExceptionally(e);
remove.completeExceptionally(e);
} else {
currentRecordStatus.get().complete(null);
remove.complete(null);
}

failCounter.run();
Expand Down
Loading

0 comments on commit 70fb424

Please sign in to comment.