diff --git a/src/MatrixClientPeg.js b/src/MatrixClientPeg.js index bebb254afc..5c5ee6e4ec 100644 --- a/src/MatrixClientPeg.js +++ b/src/MatrixClientPeg.js @@ -30,6 +30,7 @@ import {verificationMethods} from 'matrix-js-sdk/lib/crypto'; import MatrixClientBackedSettingsHandler from "./settings/handlers/MatrixClientBackedSettingsHandler"; import * as StorageManager from './utils/StorageManager'; import IdentityAuthClient from './IdentityAuthClient'; +import PlatformPeg from "./PlatformPeg"; interface MatrixClientCreds { homeserverUrl: string, @@ -222,6 +223,9 @@ class MatrixClientPeg { this.matrixClient = createMatrixClient(opts); + const platform = PlatformPeg.get(); + if (platform.supportsEventIndexing()) platform.initEventIndex(creds.userId); + // we're going to add eventlisteners for each matrix event tile, so the // potential number of event listeners is quite high. this.matrixClient.setMaxListeners(500); diff --git a/src/components/structures/MatrixChat.js b/src/components/structures/MatrixChat.js index da67416400..218b7e4d4e 100644 --- a/src/components/structures/MatrixChat.js +++ b/src/components/structures/MatrixChat.js @@ -1262,6 +1262,7 @@ export default createReactClass({ // to do the first sync this.firstSyncComplete = false; this.firstSyncPromise = Promise.defer(); + this.crawlerChekpoints = []; const cli = MatrixClientPeg.get(); const IncomingSasDialog = sdk.getComponent('views.dialogs.IncomingSasDialog'); @@ -1287,6 +1288,75 @@ export default createReactClass({ return self._loggedInView.child.canResetTimelineInRoom(roomId); }); + cli.on('sync', async (state, prevState, data) => { + const platform = PlatformPeg.get(); + if (!platform.supportsEventIndexing()) return; + + if (prevState === null && state === "PREPARED") { + /// Load our stored checkpoints, if any. + self.crawlerChekpoints = await platform.loadCheckpoints(); + console.log("Seshat: Loaded checkpoints", + self.crawlerChekpoints); + return; + } + + if (prevState === "PREPARED" && state === "SYNCING") { + const addInitialCheckpoints = async () => { + const client = MatrixClientPeg.get(); + const rooms = client.getRooms(); + + const isRoomEncrypted = (room) => { + return client.isRoomEncrypted(room.roomId); + }; + + // We only care to crawl the encrypted rooms, non-encrytped + // rooms can use the search provided by the Homeserver. + const encryptedRooms = rooms.filter(isRoomEncrypted); + + console.log("Seshat: Adding initial crawler checkpoints"); + + // Gather the prev_batch tokens and create checkpoints for + // our message crawler. + await Promise.all(encryptedRooms.map(async (room) => { + const timeline = room.getLiveTimeline(); + const token = timeline.getPaginationToken("b"); + + console.log("Seshat: Got token for indexer", + room.roomId, token); + + const backCheckpoint = { + roomId: room.roomId, + token: token, + direction: "b", + }; + + const forwardCheckpoint = { + roomId: room.roomId, + token: token, + direction: "f", + }; + + await platform.addCrawlerCheckpoint(backCheckpoint); + await platform.addCrawlerCheckpoint(forwardCheckpoint); + self.crawlerChekpoints.push(backCheckpoint); + self.crawlerChekpoints.push(forwardCheckpoint); + })); + }; + + // If our indexer is empty we're most likely running Riot the + // first time with indexing support or running it with an + // initial sync. Add checkpoints to crawl our encrypted rooms. + const eventIndexWasEmpty = await platform.isEventIndexEmpty(); + if (eventIndexWasEmpty) await addInitialCheckpoints(); + + // Start our crawler. + const crawlerHandle = {}; + self.crawlerFunc(crawlerHandle); + self.crawlerRef = crawlerHandle; + return; + } + }); + cli.on('sync', function(state, prevState, data) { // LifecycleStore and others cannot directly subscribe to matrix client for // events because flux only allows store state changes during flux dispatches. @@ -1930,4 +2000,165 @@ export default createReactClass({ {view} ; }, + + async crawlerFunc(handle) { + // TODO either put this in a better place or find a library provided + // method that does this. + const sleep = async (ms) => { + return new Promise(resolve => setTimeout(resolve, ms)); + }; + + let cancelled = false; + + console.log("Seshat: Started crawler function"); + + const client = MatrixClientPeg.get(); + const platform = PlatformPeg.get(); + + handle.cancel = () => { + cancelled = true; + }; + + while (!cancelled) { + // This is a low priority task and we don't want to spam our + // Homeserver with /messages requests so we set a hefty 3s timeout + // here. + await sleep(3000); + + if (cancelled) { + break; + } + + const checkpoint = this.crawlerChekpoints.shift(); + + /// There is no checkpoint available currently, one may appear if + // a sync with limited room timelines happens, so go back to sleep. + if (checkpoint === undefined) { + continue; + } + + console.log("Seshat: crawling using checkpoint", checkpoint); + + // We have a checkpoint, let us fetch some messages, again, very + // conservatively to not bother our Homeserver too much. + const eventMapper = client.getEventMapper(); + // TODO we need to ensure to use member lazy loading with this + // request so we get the correct profiles. + const res = await client._createMessagesRequest(checkpoint.roomId, + checkpoint.token, 100, checkpoint.direction); + + if (res.chunk.length === 0) { + // We got to the start/end of our timeline, lets just + // delete our checkpoint and go back to sleep. + await platform.removeCrawlerCheckpoint(checkpoint); + continue; + } + + // Convert the plain JSON events into Matrix events so they get + // decrypted if necessary. + const matrixEvents = res.chunk.map(eventMapper); + const stateEvents = res.state.map(eventMapper); + + const profiles = {}; + + stateEvents.forEach(ev => { + if (ev.event.content && + ev.event.content.membership === "join") { + profiles[ev.event.sender] = { + displayname: ev.event.content.displayname, + avatar_url: ev.event.content.avatar_url, + }; + } + }); + + const decryptionPromises = []; + + matrixEvents.forEach(ev => { + if (ev.isBeingDecrypted() || ev.isDecryptionFailure()) { + // TODO the decryption promise is a private property, this + // should either be made public or we should convert the + // event that gets fired when decryption is done into a + // promise using the once event emitter method: + // https://nodejs.org/api/events.html#events_events_once_emitter_name + decryptionPromises.push(ev._decryptionPromise); + } + }); + + // Let us wait for all the events to get decrypted. + await Promise.all(decryptionPromises); + + // We filter out events for which decryption failed, are redacted + // or aren't of a type that we know how to index. + const isValidEvent = (value) => { + return ([ + "m.room.message", + "m.room.name", + "m.room.topic", + ].indexOf(value.getType()) >= 0 + && !value.isRedacted() && !value.isDecryptionFailure() + ); + // TODO do we need to check if the event has all the valid + // attributes? + }; + + // TODO if there ar no events at this point we're missing a lot + // decryption keys, do we wan't to retry this checkpoint at a later + // stage? + const filteredEvents = matrixEvents.filter(isValidEvent); + + // Let us convert the events back into a format that Seshat can + // consume. + const events = filteredEvents.map((ev) => { + const jsonEvent = ev.toJSON(); + + let e; + if (ev.isEncrypted()) e = jsonEvent.decrypted; + else e = jsonEvent; + + let profile = {}; + if (e.sender in profiles) profile = profiles[e.sender]; + const object = { + event: e, + profile: profile, + }; + return object; + }); + + // Create a new checkpoint so we can continue crawling the room for + // messages. + const newCheckpoint = { + roomId: checkpoint.roomId, + token: res.end, + fullCrawl: checkpoint.fullCrawl, + direction: checkpoint.direction, + }; + + console.log( + "Seshat: Crawled room", + client.getRoom(checkpoint.roomId).name, + "and fetched", events.length, "events.", + ); + + try { + const eventsAlreadyAdded = await platform.addHistoricEvents( + events, newCheckpoint, checkpoint); + // If all events were already indexed we assume that we catched + // up with our index and don't need to crawl the room further. + // Let us delete the checkpoint in that case, otherwise push + // the new checkpoint to be used by the crawler. + if (eventsAlreadyAdded === true && newCheckpoint.fullCrawl !== true) { + await platform.removeCrawlerCheckpoint(newCheckpoint); + } else { + this.crawlerChekpoints.push(newCheckpoint); + } + } catch (e) { + console.log("Seshat: Error durring a crawl", e); + // An error occured, put the checkpoint back so we + // can retry. + this.crawlerChekpoints.push(checkpoint); + } + } + + console.log("Seshat: Stopping crawler function"); + }, });