Skip to content
This repository has been archived by the owner on Dec 8, 2019. It is now read-only.

Commit

Permalink
Added message confirmations to protocol (ref #22).
Browse files Browse the repository at this point in the history
  • Loading branch information
Nutomic committed Jul 17, 2016
1 parent 338a51f commit 579d1f5
Show file tree
Hide file tree
Showing 16 changed files with 207 additions and 75 deletions.
13 changes: 12 additions & 1 deletion PROTOCOL.md
Original file line number Diff line number Diff line change
Expand Up @@ -85,7 +85,7 @@ version, type and ID, followed by the length of the message.
0 1 2 3
0 1 2 3 4 5 6 7 8 9 0 1 2 3 4 5 6 7 8 9 0 1 2 3 4 5 6 7 8 9 0 1
+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+
| Version | Protocol-Type | Tokens | Hop Limit |
| Version | Protocol-Type | Tokens | Hop Count |
+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+
| Length |
+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+
Expand Down Expand Up @@ -327,3 +327,14 @@ Text the string to be transferred, encoded as UTF-8.

Contains the sender's name and status, which should be used for
display to users.

### MessageReceived (Content-Type = 8)

0 1 2 3
0 1 2 3 4 5 6 7 8 9 0 1 2 3 4 5 6 7 8 9 0 1 2 3 4 5 6 7 8 9 0 1
+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+
| Message ID |
+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+

Confirms that a previous content message has been received by the
target node. Message ID is the ID of that message.
Original file line number Diff line number Diff line change
Expand Up @@ -32,11 +32,12 @@ class ChatService extends Service {

private val callbackHandler = new CallbackHandler(this, notificationHandler)

lazy val database = new Database(getDatabasePath("database"), callbackHandler)
private def settingsWrapper = new SettingsWrapper(this)

lazy val database = new Database(getDatabasePath("database"), settingsWrapper, callbackHandler)

private lazy val connectionHandler =
new ConnectionHandler(new SettingsWrapper(this), database, callbackHandler,
ChatService.newCrypto(this), 1)
new ConnectionHandler(settingsWrapper, database, callbackHandler, ChatService.newCrypto(this), 1)

private val networkReceiver = new NetworkChangedReceiver()

Expand Down
Original file line number Diff line number Diff line change
@@ -1,15 +1,14 @@
package com.nutomic.ensichat.core

import java.security.InvalidKeyException
import java.util.Date

import com.nutomic.ensichat.core.body._
import com.nutomic.ensichat.core.header.{AbstractHeader, ContentHeader, MessageHeader}
import com.nutomic.ensichat.core.interfaces._
import com.nutomic.ensichat.core.internet.InternetInterface
import com.nutomic.ensichat.core.util._
import com.typesafe.scalalogging.Logger
import org.joda.time.Duration
import org.joda.time.{DateTime, Duration}

import scala.concurrent.ExecutionContext.Implicits.global
import scala.concurrent.Future
Expand Down Expand Up @@ -39,7 +38,7 @@ final class ConnectionHandler(settings: SettingsInterface, database: Database,
(a, m) => transmissionInterfaces.foreach(_.send(a, m)),
noRouteFound)

private val messageBuffer = new MessageBuffer(requestRoute)
private lazy val messageBuffer = new MessageBuffer(crypto.localAddress, requestRoute)

/**
* Holds all known users.
Expand All @@ -64,6 +63,11 @@ final class ConnectionHandler(settings: SettingsInterface, database: Database,
transmissionInterfaces +=
new InternetInterface(this, crypto, settings, maxInternetConnections, port)
transmissionInterfaces.foreach(_.create())
database.getUnconfirmedMessages.foreach { m =>
val encrypted = crypto.encryptAndSign(m)
messageBuffer.addMessage(encrypted)
requestRoute(encrypted.header.target)
}
}
}

Expand All @@ -81,7 +85,7 @@ final class ConnectionHandler(settings: SettingsInterface, database: Database,
FutureHelper {
val messageId = settings.get("message_id", 0L)
val header = new ContentHeader(crypto.localAddress, target, seqNumGenerator.next(),
body.contentType, Some(messageId), Some(new Date()), AbstractHeader.InitialForwardingTokens)
body.contentType, Some(messageId), Some(DateTime.now), AbstractHeader.InitialForwardingTokens)
settings.put("message_id", messageId + 1)

val msg = new Message(header, body)
Expand All @@ -100,7 +104,6 @@ final class ConnectionHandler(settings: SettingsInterface, database: Database,
val header = new MessageHeader(body.protocolType, crypto.localAddress, Address.Broadcast, seqNum, 0)

val signed = crypto.sign(new Message(header, body))
logger.trace(s"sending new $signed")
router.forwardMessage(signed)
}

Expand All @@ -110,7 +113,6 @@ final class ConnectionHandler(settings: SettingsInterface, database: Database,
val header = new MessageHeader(body.protocolType, crypto.localAddress, replyTo, seqNum, 0)

val signed = crypto.sign(new Message(header, body))
logger.trace(s"sending new $signed")
router.forwardMessage(signed)
}

Expand All @@ -122,7 +124,6 @@ final class ConnectionHandler(settings: SettingsInterface, database: Database,
val body = new RouteError(address, seqNum)

val signed = crypto.sign(new Message(header, body))
logger.trace(s"sending new $signed")
router.forwardMessage(signed)
}

Expand All @@ -149,7 +150,6 @@ final class ConnectionHandler(settings: SettingsInterface, database: Database,

msg.body match {
case rreq: RouteRequest =>
logger.trace(s"Received $msg")
localRoutesInfo.addRoute(msg.header.origin, rreq.originSeqNum, previousHop, rreq.originMetric)
resendMissingRouteMessages()
// TODO: Respecting this causes the RERR test to fail. We have to fix the implementation
Expand All @@ -172,7 +172,6 @@ final class ConnectionHandler(settings: SettingsInterface, database: Database,
}
return
case rrep: RouteReply =>
logger.trace(s"Received $msg")
localRoutesInfo.addRoute(msg.header.origin, rrep.originSeqNum, previousHop, 0)
// TODO: See above (in RREQ handler).
if (routeMessageInfo.isMessageRedundant(msg)) {
Expand All @@ -198,7 +197,6 @@ final class ConnectionHandler(settings: SettingsInterface, database: Database,
router.forwardMessage(forwardMsg)
return
case rerr: RouteError =>
logger.trace(s"Received $msg")
localRoutesInfo.getRoute(rerr.address).foreach { route =>
if (route.nextHop == msg.header.origin && (rerr.seqNum == 0 || rerr.seqNum > route.seqNum)) {
localRoutesInfo.connectionClosed(rerr.address)
Expand Down Expand Up @@ -231,6 +229,19 @@ final class ConnectionHandler(settings: SettingsInterface, database: Database,
return
}

// This is necessary because a message is sent to the destination and relays seperately,
// with different sequence numbers. Because of this, we also have to check the message ID
// to avoid duplicate messages.
if (database.getMessages(msg.header.origin).exists(m => m.header.origin == plainMsg.header.origin && m.header.messageId == plainMsg.header.messageId)) {
logger.trace(s"Received message $msg again, ignoring")
return
}

if (plainMsg.body.contentType == Text.Type) {
logger.trace(s"Sending confirmation for $plainMsg")
sendTo(plainMsg.header.origin, new MessageReceived(plainMsg.header.messageId.get))
}

onNewMessage(plainMsg)
}

Expand Down Expand Up @@ -273,6 +284,8 @@ final class ConnectionHandler(settings: SettingsInterface, database: Database,
database.updateContact(contact)

callbacks.onConnectionsChanged()
case mr: MessageReceived =>
database.setMessageConfirmed(mr.messageId)
case _ =>
val origin = msg.header.origin
if (origin != crypto.localAddress && database.getContact(origin).isEmpty)
Expand Down
1 change: 1 addition & 0 deletions core/src/main/scala/com/nutomic/ensichat/core/Crypto.scala
Original file line number Diff line number Diff line change
Expand Up @@ -262,6 +262,7 @@ class Crypto(settings: SettingsInterface, keyFolder: File) {
val body = msg.header.asInstanceOf[ContentHeader].contentType match {
case Text.Type => Text.read(decrypted)
case UserInfo.Type => UserInfo.read(decrypted)
case MessageReceived.Type => MessageReceived.read(decrypted)
}
new Message(msg.header, msg.crypto, body)
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,49 @@
package com.nutomic.ensichat.core.body

import java.nio.ByteBuffer

import com.nutomic.ensichat.core.Message
import com.nutomic.ensichat.core.util.BufferUtils

import scala.Predef.String

object MessageReceived {

val Type = 8

/**
* Constructs [[Text]] instance from byte array.
*/
def read(array: Array[Byte]): MessageReceived = {
val b = ByteBuffer.wrap(array)
val messageId = BufferUtils.getUnsignedInt(b)
new MessageReceived(messageId)
}

}

/**
* Holds a plain text message.
*/
final case class MessageReceived(messageId: Long) extends MessageBody {

override def protocolType = -1

override def contentType = MessageReceived.Type

override def write: Array[Byte] = {
val b = ByteBuffer.allocate(length)
// TODO: This should be putUnsignedLong, but doesn't seem possible in the JVM.
// Alternatively, we could use signed ints instead.
BufferUtils.putUnsignedInt(b, messageId)
b.array()
}

override def length = 4

override def equals(a: Any): Boolean = a match {
case o: MessageReceived => messageId == o.messageId
case _ => false
}

}
Original file line number Diff line number Diff line change
@@ -1,10 +1,10 @@
package com.nutomic.ensichat.core.header

import java.nio.ByteBuffer
import java.util.Date

import com.nutomic.ensichat.core.Address
import com.nutomic.ensichat.core.util.BufferUtils
import org.joda.time.DateTime

object AbstractHeader {

Expand Down Expand Up @@ -33,7 +33,7 @@ trait AbstractHeader {
def target: Address
def seqNum: Int
def messageId: Option[Long] = None
def time: Option[Date] = None
def time: Option[DateTime] = None

/**
* Writes the header to byte array.
Expand Down
Original file line number Diff line number Diff line change
@@ -1,10 +1,10 @@
package com.nutomic.ensichat.core.header

import java.nio.ByteBuffer
import java.util.Date

import com.nutomic.ensichat.core.Address
import com.nutomic.ensichat.core.util.BufferUtils
import org.joda.time.DateTime

object ContentHeader {

Expand All @@ -25,7 +25,7 @@ object ContentHeader {
val time = BufferUtils.getUnsignedInt(b)

val ch = new ContentHeader(mh.origin, mh.target, mh.seqNum, contentType, Some(messageId),
Some(new Date(time * 1000)), mh.tokens, mh.hopCount)
Some(new DateTime(time * 1000)), mh.tokens, mh.hopCount)

val remaining = new Array[Byte](b.remaining())
b.get(remaining, 0, b.remaining())
Expand All @@ -44,7 +44,7 @@ final case class ContentHeader(override val origin: Address,
override val seqNum: Int,
contentType: Int,
override val messageId: Some[Long],
override val time: Some[Date],
override val time: Some[DateTime],
override val tokens: Int,
override val hopCount: Int = 0)
extends AbstractHeader {
Expand All @@ -61,7 +61,7 @@ final case class ContentHeader(override val origin: Address,

BufferUtils.putUnsignedShort(b, contentType)
BufferUtils.putUnsignedInt(b, messageId.get)
BufferUtils.putUnsignedInt(b, time.get.getTime / 1000)
BufferUtils.putUnsignedInt(b, time.get.getMillis / 1000)

b.array()
}
Expand All @@ -73,7 +73,7 @@ final case class ContentHeader(override val origin: Address,
super.equals(a) &&
contentType == o.contentType &&
messageId == o.messageId &&
time.get.getTime / 1000 == o.time.get.getTime / 1000
time.get.getMillis / 1000 == o.time.get.getMillis / 1000
case _ => false
}

Expand Down
Loading

0 comments on commit 579d1f5

Please sign in to comment.