Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Missing insertion data exists during bulk write #11

Open
Uijeong97 opened this issue Mar 27, 2024 · 20 comments
Open

Missing insertion data exists during bulk write #11

Uijeong97 opened this issue Mar 27, 2024 · 20 comments

Comments

@Uijeong97
Copy link

Uijeong97 commented Mar 27, 2024

Hi, I'm using spark milvus connector to do bulk insert, but I have an issue with some data missing.

I'm getting an error from the proxy, and It is a describe collection error.
I think it's fatal to have missing data. check please.

milvus_writer = df.write \
    .format("milvus") \
    .mode("append") \
    .option("milvus.host", host) \
    .option("milvus.port", port) \
    .option("milvus.database.name", db_name) \
    .option("milvus.collection.name", collection_name) \
    .option("milvus.collection.vectorField", "embedding") \
    .option("milvus.collection.vectorDim", "768") \
    .option("milvus.collection.primaryKeyField", "poi_id")

I made sure segments were all flushed, and did a count check at the point where enough time had passed.

  • expected collection count (df count)
스크린샷 2024-03-27 오후 10 06 20
  • real collection count (after bulk write using spark-milvus connector)
스크린샷 2024-03-27 오후 10 06 27
  • spark error log
2024-03-27 21:21:40,046 ERROR client.AbstractMilvusGrpcClient: DescribeCollectionRequest failed:can't find collection collection not found[collection=448192185218736664]
2024-03-27 21:21:40,062 ERROR client.AbstractMilvusGrpcClient: Failed to describe collection: cp_poi_embedding
  • This shows a failed request from the proxy. Errors often occur in the describe collection query.
스크린샷 2024-03-27 오후 10 07 07

and another log

2024-03-27 21:21:43,242 ERROR internal.ManagedChannelOrphanWrapper: *~*~*~ Channel ManagedChannelImpl{logId=135, target=gateway-pai-milvus.pai-staging-milvus.svc.pr1.io.navercorp.com:10001} was not shutdown properly!!! ~*~*~*
    Make sure to call shutdown()/shutdownNow() and wait until awaitTermination() returns true.
java.lang.RuntimeException: ManagedChannel allocation site
	at io.grpc.internal.ManagedChannelOrphanWrapper$ManagedChannelReference.<init>(ManagedChannelOrphanWrapper.java:93)
	at io.grpc.internal.ManagedChannelOrphanWrapper.<init>(ManagedChannelOrphanWrapper.java:53)
	at io.grpc.internal.ManagedChannelOrphanWrapper.<init>(ManagedChannelOrphanWrapper.java:44)
	at io.grpc.internal.ManagedChannelImplBuilder.build(ManagedChannelImplBuilder.java:630)
	at io.grpc.internal.AbstractManagedChannelImplBuilder.build(AbstractManagedChannelImplBuilder.java:297)
	at io.milvus.client.MilvusServiceClient.<init>(MilvusServiceClient.java:143)
	at zilliztech.spark.milvus.MilvusConnection$.acquire(MilvusConnection.scala:30)
	at zilliztech.spark.milvus.writer.MilvusDataWriter.<init>(MilvusDataWriter.scala:18)
	at zilliztech.spark.milvus.writer.MilvusDataWriterFactory.createWriter(MilvusDataWriterFactory.scala:11)
	at org.apache.spark.sql.execution.datasources.v2.WritingSparkTask.run(WriteToDataSourceV2Exec.scala:459)
	at org.apache.spark.sql.execution.datasources.v2.WritingSparkTask.run$(WriteToDataSourceV2Exec.scala:448)
	at org.apache.spark.sql.execution.datasources.v2.DataWritingSparkTask$.run(WriteToDataSourceV2Exec.scala:514)
	at org.apache.spark.sql.execution.datasources.v2.V2TableWriteExec.$anonfun$writeWithV2$2(WriteToDataSourceV2Exec.scala:411)
	at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:92)
	at org.apache.spark.TaskContext.runTaskWithListeners(TaskContext.scala:161)
	at org.apache.spark.scheduler.Task.run(Task.scala:139)
	at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:554)
	at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1529)
	at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:557)
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
	at java.lang.Thread.run(Thread.java:745)
@xiaofan-luan
Copy link

/assign @wayblink

@xiaofan-luan
Copy link

is there a easy way to compare the data imported into milvus and the data to import?

@xiaofan-luan
Copy link

I think we can build a merkle tree on top of all datas? Easiest way would be directly iterator all the data out all query in batch to find missing datas,

@Uijeong97
Copy link
Author

is there a easy way to compare the data imported into milvus and the data to import?

It's hard to compare all of them, and I only extracted some of them.

part-00000-4c722b82-de61-4204-9946-c089f22da462-c000.snappy.parquet.zip

When I use the "upsert" and "delete" queries using pymilvus client on the data extracted above, it is added or deleted normally.

However, when I bulk write to the spark connector, the same data is not saved.

the collection schema is below.

스크린샷 2024-03-29 오후 12 38 34

@wayblink
Copy link
Contributor

@Uijeong97 Hi, Sorry to hear that. How many data you lost. spark-milvus connector only support insert btw. The above error message seems not related to data lost. Did you get any more relevant error messages?

@wayblink
Copy link
Contributor

@Uijeong97 Or could provide your code and data to me? Maybe I can try to reproduce your issue

@Uijeong97
Copy link
Author

Uijeong97 commented Mar 29, 2024

@wayblink

hello.

My data is in the form of a parquet.
I have an collection with a field of type string called 'poi_id' and 'embedding' field with 768 dim.

This is the collection schema and index information.

from pymilvus import CollectionSchema, DataType, FieldSchema

poi_id = FieldSchema(
    name="poi_id",
    dtype=DataType.VARCHAR,
    max_length=64,
    is_primary=True,
)
embedding = FieldSchema(name="embedding", dtype=DataType.FLOAT_VECTOR, dim=768)

schema = CollectionSchema(
    fields=[poi_id, embedding],
    description="CP Matching Vector DB",
    enable_dynamic_field=True,
)

from pymilvus import Collection, utility
collection = Collection(
    name=collection_name,
    schema=schema,
    # shards_num=2
)


index_settings = [
    {
        "field_name": "embedding",
        "index_name": "vector_index",
        "index_params": {
            "metric_type": "IP",
            "index_type": "FLAT",
        },
    },
    {
        "field_name": "poi_id",
        "index_name": "scalar_index",
    },
]


from pymilvus import utility
for setting in index_settings:
    collection.create_index(**setting)
    utility.index_building_progress(collection_name, index_name=setting["index_name"])

And I tried to save it into milvus using the spark milvus connector from pymilvus in the following way.

# https://milvus.io/docs/integrate_with_spark.md

milvus_writer = poi_vector_db_df.write \
    .format("milvus") \
    .mode("append") \
    .option("milvus.host", host) \
    .option("milvus.port", port) \
    .option("milvus.database.name", db_name) \
    .option("milvus.collection.name", collection_name) \
    .option("milvus.collection.vectorField", "embedding") \
    .option("milvus.collection.vectorDim", "768") \
    .option("milvus.collection.primaryKeyField", "poi_id")

I've extracted only the data that is missed.

part-00000-4c722b82-de61-4204-9946-c089f22da462-c000.snappy.parquet.zip

@Uijeong97
Copy link
Author

@wayblink

I think the problem is that there was missing data, but the spark write operation ended successfully.

Did you get any more relevant error messages?

Um.. That seems to be the only thing that looks like an error in the logs.

@wayblink
Copy link
Contributor

wayblink commented Mar 29, 2024

@Uijeong97 alright, I will try to reproduce it. Can it be reproduced in your environment? What is your milvus version

@Uijeong97
Copy link
Author

@wayblink

Thanks.
In my environment, it reproduces on load with the above extracted data.

  • spark version 3.4.2
  • milvus connector version

wget https://github.com/zilliztech/spark-milvus/raw/1.0.0-SNAPSHOT/output/spark-milvus-1.0.0-SNAPSHOT.jar

@sayanbiswas59
Copy link

sayanbiswas59 commented Aug 29, 2024

Hi @Uijeong97, I quickly wanted to check with you if you have checked the embeddings were inserted and getting stored correctly in Milvus, as I am doing something similar but my embeddings get changed when inserted using the spark-milvus connector as shown below:

Raw values of the embeddings that we want to ingest:
[0.04970105364918709, 0.03401502966880798, 0.003363868221640587,..]

The embedding values that appear in Milvus is :
[-1.0842022e-19, -1.2552958, -0.0, ..]

Could you please help me resolve this and please let me know if I am missing something? TIA!

@wayblink
Copy link
Contributor

Hi @Uijeong97, I quickly wanted to check with you if you have checked the embeddings were inserted and getting stored correctly in Milvus, as I am doing something similar but my embeddings get changed when inserted using the spark-milvus connector as shown below:

Raw values of the embeddings that we want to ingest: [0.04970105364918709, 0.03401502966880798, 0.003363868221640587,..]

The embedding values that appear in Milvus is : [-1.0842022e-19, -1.2552958, -0.0, ..]

Could you please help me resolve this and please let me know if I am missing something? TIA!

Hi, it seems a serious problem and I'd like to look into it. Can you provide me more info like your code? I will try to reproduce it.

@sayanbiswas59
Copy link

sayanbiswas59 commented Aug 29, 2024

Hi @wayblink It seems like the issue was due to the datatype mismatch of the embedding field between Milvus and Spark DataFrame. In Milvus, we are using the DataType.FLOAT_VECTOR, and in the Spark DataFrame, the embedding field is of type array of double. The issue gets resolved when we convert the embedding field to array of float in the Spark DataFrame before ingesting it to Milvus.

@wayblink
Copy link
Contributor

Hi @wayblink It seems like the issue was due to the datatype mismatch of the embedding field between Milvus and Spark DataFrame. In Milvus, we are using the DataType.FLOAT_VECTOR, and in the Spark DataFrame, the embedding field is of type array of double. The issue gets resolved when we convert the embedding field to array of float in the Spark DataFrame before ingesting it to Milvus.

Cool! Thanks. Please let me know if you have any issue or suggestion for spark-milvus connector

@sayanbiswas59
Copy link

Hi @wayblink, We are currently planning to ingest approximately 500 million image embeddings to Milvus, leveraging the spark-milvus connector and Pyspark. We have noticed that two data formats - 'Milvus' and 'mjson', are recommended for data ingestion. Additionally, we are aware that MilvusUtils is currently only supported for Scala. Given these factors, we would greatly appreciate if you could guide us on how to effectively perform a bulk insert to Milvus using Pyspark. Thank you in advance for your assistance.

@sayanbiswas59
Copy link

Hi @wayblink Could you please help guide us how to effectively perform a bulk insert to Milvus using Pyspark? Thanks!

@wayblink
Copy link
Contributor

@sayanbiswas59 OK, I'd like to support you do this. I can provide some demo for you. What is your original data format?

@wayblink
Copy link
Contributor

wayblink commented Sep 10, 2024

@sayanbiswas59 By the way, what is your Milvus version? Considering milvus already support bulkinsert parquet format. It is no need to use 'mjson'. I think do some processing in pyspark, store data in parquet format, and then bulkinsert into milvus might be an approach. Maybe you can try it with a small piece of data first?

@sayanbiswas59
Copy link

Hi @wayblink thank you for your response. We are using Milvus 2.4.4. The id, embeddings, url are stored in a parquet format. We want to bulk ingest them to Milvus using Pyspark(spark 3.3.2)

@wayblink
Copy link
Contributor

Hi @wayblink thank you for your response. We are using Milvus 2.4.4. The id, embeddings, url are stored in a parquet format. We want to bulk ingest them to Milvus using Pyspark(spark 3.3.2)

Yes, I think you don't need spark-connector. What you need to do:

  1. move data to milvus storage(minio or s3), so bulkinsert can read the files
  2. create a collection based on your schema
  3. write a script to list the parquet directory and call bulkinsert by file.

Does this meet your need?

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

4 participants