diff --git a/RELEASE_NOTES.md b/RELEASE_NOTES.md index e5662c6..e8c1468 100644 --- a/RELEASE_NOTES.md +++ b/RELEASE_NOTES.md @@ -1,3 +1,7 @@ +### 0.1.4 - 03.08.2017 + +* Fixed `FetchResponse` `MessageTooBigException` when a message set has been compressed. (#160) + ### 0.1.4-beta - 25.07.2017 * Fixed v0.10.1 protocol bug for `Offset` API. diff --git a/src/kafunk/Compression.fs b/src/kafunk/Compression.fs index 00e2192..eb8e768 100644 --- a/src/kafunk/Compression.fs +++ b/src/kafunk/Compression.fs @@ -31,7 +31,7 @@ module internal Stream = use compStream = makeStream inputStream compStream.CopyTo(outputStream) let buf = Binary.Segment(outputStream.GetBuffer(), 0, int outputStream.Length) - MessageSet.Read (messageVer, 0, 0s, buf.Count, BinaryZipper(buf)) + MessageSet.Read (messageVer, 0, 0s, buf.Count, true, BinaryZipper(buf)) [] module GZip = @@ -151,7 +151,7 @@ module Snappy = let decompress (messageVer:ApiVersion) (m:Message) = let buf = CompressedMessage.decompress m.value - MessageSet.Read (messageVer, 0, 0s, buf.Count, BinaryZipper(buf)) + MessageSet.Read (messageVer, 0, 0s, buf.Count, true, BinaryZipper(buf)) let compress (messageVer:int16) (compression:byte) (ms:MessageSet) = match compression with diff --git a/src/kafunk/Protocol.fs b/src/kafunk/Protocol.fs index 4d175b5..f8bb802 100644 --- a/src/kafunk/Protocol.fs +++ b/src/kafunk/Protocol.fs @@ -434,7 +434,9 @@ module Protocol = buf.WriteInt32 x.messageSize Message.Write (messageVer, x.message, buf) - static member internal Read (messageVer:ApiVersion, partition:Partition, ec:ErrorCode, messageSetSize:int, buf:BinaryZipper) = + // NB: skipTooLarge=true is for scenarios where decompression is involved and a message set is being decoded from an individual message + // which was itself too small. + static member internal Read (messageVer:ApiVersion, partition:Partition, ec:ErrorCode, messageSetSize:int, skipTooLarge:bool, buf:BinaryZipper) = let mutable consumed = 0 let arr = ResizeArray<_>() while consumed < messageSetSize && buf.Buffer.Count > 0 do @@ -445,7 +447,20 @@ module Protocol = let (messageSize:MessageSize) = buf.ReadInt32 () let messageSetRemainder = messageSetRemainder - 12 // (Offset + MessageSize) if messageSize > messageSetSize then - raise (MessageTooBigException(sprintf "partition=%i offset=%i message_set_size=%i message_size=%i" partition offset messageSetSize messageSize)) + let errMsg = sprintf "partition=%i offset=%i message_set_size=%i message_size=%i consumed_bytes=%i consumed_count=%i" + partition offset messageSetSize messageSize consumed arr.Count + if not skipTooLarge then + raise (MessageTooBigException(errMsg)) + else +// let payload = Binary.toString buf.Buffer +// printfn "|WARN|MessageTooBig|%s" errMsg +// printfn "|WARN|MessageTooBig|payload=%s" payload +// try +// let message = Message.Read (messageVer,buf) +// printfn "|WARN|MessageTooBig|payload=%s" (Binary.toString message.value) +// with ex -> +// printfn "ERROR DECODING MESSAGE|%O" ex + () try if messageSetRemainder >= messageSize && buf.Buffer.Count >= messageSize then let message = Message.Read (messageVer,buf) @@ -740,7 +755,7 @@ module Protocol = let errorCode = buf.ReadInt16 () let hwo = buf.ReadInt64 () let mss = buf.ReadInt32 () - let ms = MessageSet.Read (MessageVersions.fetchResMessage ver,partition,errorCode,mss,buf) + let ms = MessageSet.Read (MessageVersions.fetchResMessage ver,partition,errorCode,mss,false,buf) ps.[j] <- partition, errorCode, hwo, mss, ms topics.[i] <- (t,ps) let res = FetchResponse(throttleTime, topics) diff --git a/tests/kafunk.Tests/Consumer.fsx b/tests/kafunk.Tests/Consumer.fsx index 08c4c96..b2f15af 100644 --- a/tests/kafunk.Tests/Consumer.fsx +++ b/tests/kafunk.Tests/Consumer.fsx @@ -5,7 +5,7 @@ open FSharp.Control open Kafunk open System -Log.MinLevel <- LogLevel.Trace +//Log.MinLevel <- LogLevel.Trace let Log = Log.create __SOURCE_FILE__ let argiDefault i def = fsi.CommandLineArgs |> Seq.tryItem i |> Option.getOr def @@ -20,9 +20,9 @@ let go = async { let connConfig = let chanConfig = ChanConfig.create ( - requestTimeout = TimeSpan.FromSeconds 30.0, - receiveBufferSize = 8192 * 20, - sendBufferSize = 8192 * 10, + requestTimeout = TimeSpan.FromSeconds 60.0, + receiveBufferSize = 8192 * 50, + sendBufferSize = 8192 * 50, connectRetryPolicy = ChanConfig.DefaultConnectRetryPolicy, requestRetryPolicy = ChanConfig.DefaultRequestRetryPolicy) KafkaConfig.create ( @@ -30,8 +30,8 @@ let go = async { //[KafkaUri.parse "localhost:9092" ; KafkaUri.parse "localhost:9093" ; KafkaUri.parse "localhost:9094"], tcpConfig = chanConfig, requestRetryPolicy = KafkaConfig.DefaultRequestRetryPolicy, - version = Versions.V_0_10_1, - autoApiVersions = true) + version = Versions.V_0_9_0, + autoApiVersions = false) Kafka.connAsync connConfig let consumerConfig = ConsumerConfig.create (