Skip to content

Commit

Permalink
Merge pull request #30 from zilliztech/dev
Browse files Browse the repository at this point in the history
dev
  • Loading branch information
nianliuu authored Dec 9, 2024
2 parents 461630d + fa1888c commit 2694a5b
Show file tree
Hide file tree
Showing 7 changed files with 114 additions and 79 deletions.
4 changes: 2 additions & 2 deletions pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@
<dependency>
<groupId>io.milvus</groupId>
<artifactId>milvus-sdk-java</artifactId>
<version>2.4.1</version>
<version>2.5.1</version>
</dependency>
<dependency>
<groupId>junit</groupId>
Expand All @@ -68,7 +68,7 @@
<dependency>
<groupId>io.netty</groupId>
<artifactId>netty-common</artifactId>
<version>4.1.100.Final</version>
<version>4.1.115.Final</version>
</dependency>
<dependency>
<groupId>io.netty</groupId>
Expand Down
4 changes: 2 additions & 2 deletions src/main/java/com/milvus/io/kafka/MilvusSinkConnector.java
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@
import java.util.List;
import java.util.Map;

public class MilvusSinkConnector extends SinkConnector{
public class MilvusSinkConnector extends SinkConnector {

private static final Logger log = LoggerFactory.getLogger(MilvusSinkConnector.class);
private Map<String, String> configProperties;
Expand All @@ -24,7 +24,7 @@ public void start(Map<String, String> props) {
configProperties = props;
// validation
new MilvusSinkConnectorConfig(props);
}catch (ConfigException e){
} catch (ConfigException e) {
throw new ConfigException("Couldn't start MilvusSinkConnector due to configuration error", e);
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,5 +34,7 @@ public Password getToken() {
return getPassword(TOKEN);
}

public String getCollectionName(){return getString(COLLECTION_NAME);}
public String getCollectionName() {
return getString(COLLECTION_NAME);
}
}
27 changes: 14 additions & 13 deletions src/main/java/com/milvus/io/kafka/MilvusSinkTask.java
Original file line number Diff line number Diff line change
@@ -1,12 +1,12 @@
package com.milvus.io.kafka;

import com.alibaba.fastjson.JSONObject;
import com.google.gson.JsonObject;
import static com.milvus.io.kafka.MilvusSinkConnectorConfig.TOKEN;
import com.milvus.io.kafka.helper.MilvusClientHelper;
import com.milvus.io.kafka.utils.DataConverter;
import com.milvus.io.kafka.utils.Utils;
import com.milvus.io.kafka.utils.VersionUtil;
import io.milvus.v2.client.MilvusClientV2;
import io.milvus.v2.service.collection.request.CreateCollectionReq;
import io.milvus.v2.service.collection.request.DescribeCollectionReq;
import io.milvus.v2.service.collection.request.GetLoadStateReq;
import io.milvus.v2.service.collection.request.HasCollectionReq;
Expand All @@ -18,9 +18,10 @@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.util.*;

import static com.milvus.io.kafka.MilvusSinkConnectorConfig.TOKEN;
import java.util.ArrayList;
import java.util.Collection;
import java.util.List;
import java.util.Map;

public class MilvusSinkTask extends SinkTask {

Expand Down Expand Up @@ -58,7 +59,7 @@ private void preValidate() {
throw new RuntimeException("Collection not exist" + config.getCollectionName());
}
// check if the collection is loaded
if (!myMilvusClient.getLoadState(GetLoadStateReq.builder().collectionName(config.getCollectionName()).build())){
if (!myMilvusClient.getLoadState(GetLoadStateReq.builder().collectionName(config.getCollectionName()).build())) {
log.error("Collection not loaded");
throw new RuntimeException("Collection not loaded" + config.getCollectionName());
}
Expand All @@ -68,36 +69,36 @@ private void preValidate() {
@Override
public void put(Collection<SinkRecord> records) {
log.info("Putting {} records to Milvus.", records.size());
if(records.isEmpty()) {
if (records.isEmpty()) {
log.info("No records to put.");
return;
}

// not support dynamic schema for now, for dynamic schema, we need to put the data into a JSONObject
List<JSONObject> datas = new ArrayList<>();
List<JsonObject> datas = new ArrayList<>();
for (SinkRecord record : records) {
log.debug("Writing {} to Milvus.", record);
if(record.value() == null) {
if (record.value() == null) {
log.warn("Skipping record with null value.");
continue;
}
try {
JSONObject data = converter.convertRecord(record, response.getCollectionSchema());
JsonObject data = converter.convertRecord(record, response.getCollectionSchema());
datas.add(data);
}catch (Exception e){
} catch (Exception e) {
log.error("Failed to convert record to JSONObject, skip it", e);
}
}

if(!response.getAutoID()){
if (!response.getAutoID()) {
// default to use upsert
UpsertReq upsertReq = UpsertReq.builder()
.collectionName(config.getCollectionName())
.data(datas)
.build();
log.info("Upserting data to collection: {} with datas: {}", config.getCollectionName(), datas);
myMilvusClient.upsert(upsertReq);
}else {
} else {
InsertReq insertReq = InsertReq.builder()
.collectionName(config.getCollectionName())
.data(datas)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,8 +2,6 @@

import com.milvus.io.kafka.MilvusSinkConnectorConfig;
import com.milvus.io.kafka.utils.Utils;
import io.milvus.client.MilvusServiceClient;
import io.milvus.param.ConnectParam;
import io.milvus.v2.client.ConnectConfig;
import io.milvus.v2.client.MilvusClientV2;

Expand Down
85 changes: 45 additions & 40 deletions src/main/java/com/milvus/io/kafka/utils/DataConverter.java
Original file line number Diff line number Diff line change
@@ -1,10 +1,11 @@
package com.milvus.io.kafka.utils;

import com.alibaba.fastjson.JSONObject;
import com.google.common.collect.Lists;
import com.google.gson.Gson;
import com.google.gson.JsonObject;
import com.google.gson.JsonParser;
import com.milvus.io.kafka.MilvusSinkConnectorConfig;
import io.milvus.param.dml.InsertParam;
import io.milvus.common.utils.JsonUtils;
import io.milvus.v2.common.DataType;
import io.milvus.v2.service.collection.request.CreateCollectionReq;
import org.apache.kafka.connect.data.Struct;
Expand All @@ -13,63 +14,66 @@
import org.slf4j.LoggerFactory;

import java.nio.ByteBuffer;
import java.util.*;
import java.util.stream.Collectors;
import java.util.HashMap;
import java.util.List;

public class DataConverter {

private final MilvusSinkConnectorConfig config;

private static final Logger log = LoggerFactory.getLogger(DataConverter.class);
private final MilvusSinkConnectorConfig config;

public DataConverter(MilvusSinkConnectorConfig config) {
this.config = config;
}

/*
* Convert SinkRecord to JSONObject
* Convert SinkRecord to JsonObject
*/
public JSONObject convertRecord(SinkRecord sr, CreateCollectionReq.CollectionSchema collectionSchema) {
// parse sinkRecord to get filed name and value
if(sr.value() instanceof Struct) {
return parseValue((Struct)sr.value(), collectionSchema);
}else if (sr.value() instanceof HashMap) {
return parseValue((HashMap<?, ?>)sr.value(), collectionSchema);
}else {
throw new RuntimeException("Unsupported SinkRecord data type" + sr.value());
public JsonObject convertRecord(SinkRecord sr, CreateCollectionReq.CollectionSchema collectionSchema) {
// parse sinkRecord to get field name and value
if (sr.value() instanceof Struct) {
return parseValue((Struct) sr.value(), collectionSchema);
} else if (sr.value() instanceof HashMap) {
return parseValue((HashMap<?, ?>) sr.value(), collectionSchema);
} else {
throw new RuntimeException("Unsupported SinkRecord data type: " + sr.value());
}
}

private JSONObject parseValue(HashMap<?, ?> mapValue, CreateCollectionReq.CollectionSchema collectionSchema) {
JSONObject fields = new JSONObject();
private JsonObject parseValue(HashMap<?, ?> mapValue, CreateCollectionReq.CollectionSchema collectionSchema) {
JsonObject fields = new JsonObject();
Gson gson = new Gson();
mapValue.forEach((field, value) -> {
if(collectionSchema.getField(field.toString())!=null){
if (collectionSchema.getField(field.toString()) != null) {
// if the key exists in the collection, store the value by collectionSchema DataType
fields.put(field.toString(), castValueToType(value, collectionSchema.getField(field.toString()).getDataType()));
}else {
Object object = convertValueByMilvusType(value, collectionSchema.getField(field.toString()).getDataType());
fields.add(field.toString(), gson.toJsonTree(object));
} else {
log.warn("Field {} not exists in collection", field);
}

});
return fields;
}

private JSONObject parseValue(Struct structValue, CreateCollectionReq.CollectionSchema collectionSchema) {
JSONObject fields = new JSONObject();

private JsonObject parseValue(Struct structValue, CreateCollectionReq.CollectionSchema collectionSchema) {
JsonObject fields = new JsonObject();
Gson gson = new Gson();
structValue.schema().fields().forEach(field -> {
if(collectionSchema.getField(field.name()) != null){
if (collectionSchema.getField(field.name()) != null) {
// if the key exists in the collection, store the value by collectionSchema DataType
fields.put(field.toString(), castValueToType(structValue.get(field.name()), collectionSchema.getField(field.name()).getDataType()));
}else {
Object object = convertValueByMilvusType(structValue.get(field.name()), collectionSchema.getField(field.name()).getDataType());
fields.add(field.name(), gson.toJsonTree(object));
} else {
log.warn("Field {} not exists in collection", field);
}
});

return fields;
}

private Object castValueToType(Object value, DataType dataType) {
switch (dataType){
private Object convertValueByMilvusType(Object value, DataType dataType) {
Gson gson = new Gson();
switch (dataType) {
case Bool:
return Boolean.parseBoolean(value.toString());
case Int8:
Expand All @@ -87,36 +91,37 @@ private Object castValueToType(Object value, DataType dataType) {
case String:
return value.toString();
case JSON:
Gson gson = new Gson();
return gson.toJson(value);
case BinaryVector:
return parseBinaryVectorField(value.toString());
case FloatVector:
return parseFloatVectorField(value.toString());
case SparseFloatVector:
return gson.toJsonTree(value).getAsJsonObject();
default:
throw new RuntimeException("Unsupported data type" + dataType);
throw new RuntimeException("Unsupported data type: " + dataType);
}
}

protected List<Float> parseFloatVectorField(String vectors){
protected List<Float> parseFloatVectorField(String vectors) {
try {
log.debug("parse float vectors: {}", vectors);

String[] vectorArrays = vectors.replaceAll("\\[", "").replaceAll("\\]", "")
.replaceAll(" ","").split(",");
.replaceAll(" ", "").split(",");

List<Float> floatList = Lists.newLinkedList();
for (String vector : vectorArrays) {
floatList.add(Float.valueOf(vector));
}

return floatList;
}catch (Exception e){
throw new RuntimeException("parse float vector field error: " + e.getMessage() + vectors);
} catch (Exception e) {
throw new RuntimeException("parse float vector field error: " + e.getMessage() + " " + vectors);
}

}
protected ByteBuffer parseBinaryVectorField(String vectors){

protected ByteBuffer parseBinaryVectorField(String vectors) {
try {
log.debug("parse binary vectors: {}", vectors);

Expand All @@ -130,8 +135,8 @@ protected ByteBuffer parseBinaryVectorField(String vectors){
}

return buffer;
}catch (Exception e){
throw new RuntimeException("parse binary vector field error: " + e.getMessage() + vectors);
} catch (Exception e) {
throw new RuntimeException("parse binary vector field error: " + e.getMessage() + " " + vectors);
}
}
}
}
Loading

0 comments on commit 2694a5b

Please sign in to comment.