Skip to content
This repository has been archived by the owner on Dec 8, 2019. It is now read-only.

Commit

Permalink
Send messages via relays (fixes #26).
Browse files Browse the repository at this point in the history
  • Loading branch information
Nutomic committed Jun 24, 2016
1 parent 1add05b commit 4a36fdb
Show file tree
Hide file tree
Showing 17 changed files with 255 additions and 98 deletions.
6 changes: 3 additions & 3 deletions PROTOCOL.md
Original file line number Diff line number Diff line change
Expand Up @@ -85,7 +85,7 @@ version, type and ID, followed by the length of the message.
0 1 2 3
0 1 2 3 4 5 6 7 8 9 0 1 2 3 4 5 6 7 8 9 0 1 2 3 4 5 6 7 8 9 0 1
+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+
| Version | Protocol-Type | Hop Limit | Hop Count |
| Version | Protocol-Type | Tokens | Hop Limit |
+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+
| Length |
+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+
Expand All @@ -111,8 +111,8 @@ where such a packet came from MAY be closed.
Protocol-Type is one of those specified in section Protocol Messages,
or 255 for Content Messages.

Hop Limit SHOULD be set to `20` on message creation, and
MUST NOT be changed by a forwarding node.
Tokens is the number of times this message should be copied to
different relays.

Hop Count specifies the number of nodes a message may pass. When
creating a package, it is initialized to 0. Whenever a node forwards
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,7 @@ class BluetoothTransferThread(context: Context, device: Device, socket: Bluetoot
new IntentFilter(BluetoothDevice.ACTION_ACL_DISCONNECTED))

send(crypto.sign(new Message(new MessageHeader(ConnectionInfo.Type,
Address.Null, Address.Null, 0), new ConnectionInfo(crypto.getLocalPublicKey))))
Address.Null, Address.Null, 0, 0), new ConnectionInfo(crypto.getLocalPublicKey))))

while (socket.isConnected) {
try {
Expand Down
Original file line number Diff line number Diff line change
@@ -1,15 +1,15 @@
package com.nutomic.ensichat.core

import java.security.InvalidKeyException
import java.util.{TimerTask, Timer, Date}
import java.util.Date

import com.nutomic.ensichat.core.body._
import com.nutomic.ensichat.core.header.{ContentHeader, MessageHeader}
import com.nutomic.ensichat.core.header.{AbstractHeader, ContentHeader, MessageHeader}
import com.nutomic.ensichat.core.interfaces._
import com.nutomic.ensichat.core.internet.InternetInterface
import com.nutomic.ensichat.core.util._
import com.typesafe.scalalogging.Logger
import org.joda.time.{DateTime, Duration}
import org.joda.time.Duration

import scala.concurrent.ExecutionContext.Implicits.global
import scala.concurrent.Future
Expand All @@ -27,8 +27,6 @@ final class ConnectionHandler(settings: SettingsInterface, database: Database,

private val logger = Logger(this.getClass)

private val CheckMessageRetryInterval = Duration.standardMinutes(1)

private var transmissionInterfaces = Set[TransmissionInterface]()

private lazy val seqNumGenerator = new SeqNumGenerator(settings)
Expand Down Expand Up @@ -83,12 +81,13 @@ final class ConnectionHandler(settings: SettingsInterface, database: Database,
FutureHelper {
val messageId = settings.get("message_id", 0L)
val header = new ContentHeader(crypto.localAddress, target, seqNumGenerator.next(),
body.contentType, Some(messageId), Some(new Date()))
body.contentType, Some(messageId), Some(new Date()), AbstractHeader.InitialForwardingTokens)
settings.put("message_id", messageId + 1)

val msg = new Message(header, body)
val encrypted = crypto.encryptAndSign(msg)
router.forwardMessage(encrypted)
forwardMessageToRelays(encrypted)
onNewMessage(msg)
}
}
Expand All @@ -98,7 +97,7 @@ final class ConnectionHandler(settings: SettingsInterface, database: Database,
val seqNum = seqNumGenerator.next()
val targetSeqNum = localRoutesInfo.getRoute(target).map(_.seqNum).getOrElse(-1)
val body = new RouteRequest(target, seqNum, targetSeqNum, 0)
val header = new MessageHeader(body.protocolType, crypto.localAddress, Address.Broadcast, seqNum)
val header = new MessageHeader(body.protocolType, crypto.localAddress, Address.Broadcast, seqNum, 0)

val signed = crypto.sign(new Message(header, body))
logger.trace(s"sending new $signed")
Expand All @@ -108,7 +107,7 @@ final class ConnectionHandler(settings: SettingsInterface, database: Database,
private def replyRoute(target: Address, replyTo: Address): Unit = {
val seqNum = seqNumGenerator.next()
val body = new RouteReply(seqNum, 0)
val header = new MessageHeader(body.protocolType, crypto.localAddress, replyTo, seqNum)
val header = new MessageHeader(body.protocolType, crypto.localAddress, replyTo, seqNum, 0)

val signed = crypto.sign(new Message(header, body))
logger.trace(s"sending new $signed")
Expand All @@ -118,7 +117,7 @@ final class ConnectionHandler(settings: SettingsInterface, database: Database,
private def routeError(address: Address, packetSource: Option[Address]): Unit = {
val destination = packetSource.getOrElse(Address.Broadcast)
val header = new MessageHeader(RouteError.Type, crypto.localAddress, destination,
seqNumGenerator.next())
seqNumGenerator.next(), 0)
val seqNum = localRoutesInfo.getRoute(address).map(_.seqNum).getOrElse(-1)
val body = new RouteError(address, seqNum)

Expand Down Expand Up @@ -211,6 +210,7 @@ final class ConnectionHandler(settings: SettingsInterface, database: Database,

if (msg.header.target != crypto.localAddress) {
router.forwardMessage(msg)
forwardMessageToRelays(msg)
return
}

Expand All @@ -234,6 +234,20 @@ final class ConnectionHandler(settings: SettingsInterface, database: Database,
onNewMessage(plainMsg)
}

private def forwardMessageToRelays(message: Message): Unit = {
var tokens = message.header.tokens
val relays = database.pickLongestConnectionDevice(connections())
var index = 0
while (tokens > 1) {
val forwardTokens = tokens / 2
val headerCopy = message.header.asInstanceOf[ContentHeader].copy(tokens = forwardTokens)
router.forwardMessage(message.copy(header = headerCopy), relays.lift(index))
tokens -= forwardTokens
database.updateMessageForwardingTokens(message, tokens)
index += 1
}
}

/**
* Tries to send messages in [[MessageBuffer]] again, after we acquired a new route.
*/
Expand All @@ -244,11 +258,8 @@ final class ConnectionHandler(settings: SettingsInterface, database: Database,
}

private def noRouteFound(message: Message): Unit = {
if (message.header.origin == crypto.localAddress) {
messageBuffer.addMessage(message)
requestRoute(message.header.target)
} else
routeError(message.header.target, Option(message.header.origin))
messageBuffer.addMessage(message)
requestRoute(message.header.target)
}

/**
Expand Down Expand Up @@ -317,6 +328,9 @@ final class ConnectionHandler(settings: SettingsInterface, database: Database,
settings.get(SettingsInterface.KeyUserStatus, "")))
callbacks.onConnectionsChanged()
resendMissingRouteMessages()
messageBuffer.getAllMessages
.filter(_.header.tokens > 1)
.foreach(forwardMessageToRelays)
true
}

Expand Down
10 changes: 5 additions & 5 deletions core/src/main/scala/com/nutomic/ensichat/core/Router.scala
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,8 @@ import com.nutomic.ensichat.core.util.LocalRoutesInfo

object Router extends Comparator[Int] {

private val HopLimit = 20

/**
* Compares which sequence number is newer.
*
Expand Down Expand Up @@ -50,7 +52,7 @@ private[core] class Router(routesInfo: LocalRoutesInfo, send: (Address, Message)
* true.
*/
def forwardMessage(msg: Message, nextHopOption: Option[Address] = None): Unit = {
if (msg.header.hopCount + 1 >= msg.header.hopLimit)
if (msg.header.hopCount + 1 >= Router.HopLimit)
return

val nextHop = nextHopOption.getOrElse(msg.header.target)
Expand Down Expand Up @@ -79,10 +81,8 @@ private[core] class Router(routesInfo: LocalRoutesInfo, send: (Address, Message)
*/
private def incHopCount(msg: Message): Message = {
val updatedHeader = msg.header match {
case ch: ContentHeader => new ContentHeader(ch.origin, ch.target, ch.seqNum, ch.contentType,
ch.messageId, ch.time, ch.hopCount + 1, ch.hopLimit)
case mh: MessageHeader => new MessageHeader(mh.protocolType, mh.origin, mh.target, mh.seqNum,
mh.hopCount + 1, mh.hopLimit)
case ch: ContentHeader => ch.copy(hopCount = ch.hopCount + 1)
case mh: MessageHeader => mh.copy(hopCount = mh.hopCount + 1)
}
new Message(updatedHeader, msg.crypto, msg.body)
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ import com.nutomic.ensichat.core.util.BufferUtils

object AbstractHeader {

val DefaultHopLimit = 20
val InitialForwardingTokens = 3

val Version = 0

Expand All @@ -25,7 +25,7 @@ object AbstractHeader {
trait AbstractHeader {

def protocolType: Int
def hopLimit: Int
def tokens: Int
def hopCount: Int
def origin: Address
def target: Address
Expand All @@ -41,7 +41,7 @@ trait AbstractHeader {

BufferUtils.putUnsignedByte(b, AbstractHeader.Version)
BufferUtils.putUnsignedByte(b, protocolType)
BufferUtils.putUnsignedByte(b, hopLimit)
BufferUtils.putUnsignedByte(b, tokens)
BufferUtils.putUnsignedByte(b, hopCount)

BufferUtils.putUnsignedInt(b, length + contentLength)
Expand All @@ -63,7 +63,7 @@ trait AbstractHeader {
override def equals(a: Any): Boolean = a match {
case o: AbstractHeader =>
protocolType == o.protocolType &&
hopLimit == o.hopLimit &&
tokens == o.tokens &&
hopCount == o.hopCount &&
origin == o.origin &&
target == o.target &&
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ object ContentHeader {
val time = BufferUtils.getUnsignedInt(b)

val ch = new ContentHeader(mh.origin, mh.target, mh.seqNum, contentType, Some(messageId),
Some(new Date(time * 1000)), mh.hopCount)
Some(new Date(time * 1000)), mh.tokens, mh.hopCount)

val remaining = new Array[Byte](b.remaining())
b.get(remaining, 0, b.remaining())
Expand All @@ -45,8 +45,8 @@ final case class ContentHeader(override val origin: Address,
contentType: Int,
override val messageId: Some[Long],
override val time: Some[Date],
override val hopCount: Int = 0,
override val hopLimit: Int = AbstractHeader.DefaultHopLimit)
override val tokens: Int,
override val hopCount: Int = 0)
extends AbstractHeader {

override val protocolType = ContentHeader.ContentMessageType
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ object MessageHeader {
if (version != AbstractHeader.Version)
throw new ReadMessageException("Failed to parse message with unsupported version " + version)
val protocolType = BufferUtils.getUnsignedByte(b)
val hopLimit = BufferUtils.getUnsignedByte(b)
val tokens = BufferUtils.getUnsignedByte(b)
val hopCount = BufferUtils.getUnsignedByte(b)

val length = BufferUtils.getUnsignedInt(b)
Expand All @@ -34,7 +34,7 @@ object MessageHeader {

val seqNum = BufferUtils.getUnsignedShort(b)

(new MessageHeader(protocolType, origin, target, seqNum, hopCount, hopLimit), length.toInt)
(new MessageHeader(protocolType, origin, target, seqNum, tokens, hopCount), length.toInt)
}

}
Expand All @@ -48,8 +48,8 @@ final case class MessageHeader(override val protocolType: Int,
override val origin: Address,
override val target: Address,
override val seqNum: Int,
override val hopCount: Int = 0,
override val hopLimit: Int = AbstractHeader.DefaultHopLimit)
override val tokens: Int,
override val hopCount: Int = 0)
extends AbstractHeader {

def length: Int = MessageHeader.Length
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@ private[core] class InternetConnectionThread(socket: Socket, crypto: Crypto,
logger.info("Connection opened to " + socket.getInetAddress)

send(crypto.sign(new Message(new MessageHeader(ConnectionInfo.Type,
Address.Null, Address.Null, 0), new ConnectionInfo(crypto.getLocalPublicKey))))
Address.Null, Address.Null, 0, 0), new ConnectionInfo(crypto.getLocalPublicKey))))

try {
socket.setKeepAlive(true)
Expand Down
61 changes: 52 additions & 9 deletions core/src/main/scala/com/nutomic/ensichat/core/util/Database.scala
Original file line number Diff line number Diff line change
@@ -1,11 +1,12 @@
package com.nutomic.ensichat.core.util

import java.io.File
import java.sql.DriverManager
import java.util.Date

import com.nutomic.ensichat.core.body.Text
import com.nutomic.ensichat.core.header.ContentHeader
import com.nutomic.ensichat.core.interfaces.CallbackInterface
import com.nutomic.ensichat.core.interfaces.{CallbackInterface, SettingsInterface}
import com.nutomic.ensichat.core.{Address, Message, User}
import com.typesafe.scalalogging.Logger
import org.joda.time
Expand All @@ -20,31 +21,38 @@ import scala.concurrent.duration.Duration
*
* @param path The database file.
*/
class Database(path: File, callbackInterface: CallbackInterface) {
class Database(path: File, settings: SettingsInterface, callbackInterface: CallbackInterface) {

private val logger = Logger(this.getClass)

private val DatabaseVersionKey = "database_version"
private val DatabaseVersion = 2

private val DatabasePath = "jdbc:h2:" + path.getAbsolutePath + ";DATABASE_TO_UPPER=false"

private class Messages(tag: Tag) extends Table[Message](tag, "MESSAGES") {
def id = primaryKey("id", (origin, messageId))
def origin = column[String]("origin")
def target = column[String]("target")
def messageId = column[Long]("message_id")
def text = column[String]("text")
def date = column[Long]("date")
def * = (origin, target, messageId, text, date) <> [Message, (String, String, Long, String, Long)]( {
def tokens = column[Int]("tokens")
def * = (origin, target, messageId, text, date, tokens) <> [Message, (String, String, Long, String, Long, Int)]( {
tuple =>
val header = new ContentHeader(new Address(tuple._1),
new Address(tuple._2),
-1,
Text.Type,
Some(tuple._3),
Some(new Date(tuple._5)))
Some(new Date(tuple._5)),
tuple._6)
val body = new Text(tuple._4)
new Message(header, body)
}, message =>
Option((message.header.origin.toString(), message.header.target.toString(),
message.header.messageId.get, message.body.asInstanceOf[Text].text,
message.header.time.get.getTime))
message.header.time.get.getTime, message.header.tokens))
)
}
private val messages = TableQuery[Messages]
Expand All @@ -67,15 +75,33 @@ class Database(path: File, callbackInterface: CallbackInterface) {
}
private val knownDevices = TableQuery[KnownDevices]

private val db = Database.forURL("jdbc:h2:" + path.getAbsolutePath, driver = "org.h2.Driver")
private val db = Database.forURL(DatabasePath, driver = "org.h2.Driver")

// Create tables if database doesn't exist.
{
// H2 appends a .mv.db suffix to the path which we can't change, so we have to check that file.
val dbFile = new File(path.getAbsolutePath + ".mv.db")
if (!dbFile.exists()) {
logger.info("Database does not exist, creating tables")
Await.result(db.run((messages.schema ++ contacts.schema).create), Duration.Inf)
val query = (messages.schema ++ contacts.schema ++ knownDevices.schema).create
Await.result(db.run(query), Duration.Inf)
settings.put(DatabaseVersionKey, DatabaseVersion)
}
}

// Apparently, slick doesn't support ALTER TABLE, so we have to write raw SQL for this...
{
val oldVersion = settings.get(DatabaseVersionKey, 0)
if (oldVersion != DatabaseVersion) {
logger.info(s"Upgrading database from version $oldVersion to $DatabaseVersion")
val connection = DriverManager.getConnection(DatabasePath)
if (oldVersion <= 2) {
connection.createStatement().executeUpdate("ALTER TABLE MESSAGES ADD COLUMN (tokens INT);")
connection.commit()
Await.result(db.run(knownDevices.schema.create), Duration.Inf)
}
connection.close()
settings.put(DatabaseVersionKey, DatabaseVersion)
}
}

Expand Down Expand Up @@ -138,8 +164,25 @@ class Database(path: File, callbackInterface: CallbackInterface) {
Await.result(db.run(query), Duration.Inf)
}

def getKnownDevices: Seq[(Address, time.Duration)] = {
Await.result(db.run(knownDevices.result), Duration.Inf)
/**
* Returns neighbors sorted by connection time, according to [[KnownDevices]].
*/
def pickLongestConnectionDevice(connections: Set[Address]): List[Address] = {
val map = Await.result(db.run(knownDevices.result), Duration.Inf).toMap
connections
.toList
.sortBy(map(_).getMillis)
.reverse
}

def updateMessageForwardingTokens(message: Message, tokens: Int): Unit = {
val query = messages.filter { c =>
c.origin === message.header.origin.toString &&
c.messageId === message.header.messageId
}
.map(_.tokens)
.update(tokens)
Await.result(db.run(query), Duration.Inf)
}

}
Loading

0 comments on commit 4a36fdb

Please sign in to comment.