diff --git a/CHANGELOG.md b/CHANGELOG.md index ad084c8a56..c2648b84b6 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -2,6 +2,28 @@ All notable changes to this project will be documented in this file. +## [4.4.6] - 2025-10-13 + +### Security + +- Update dependencies `rack` and `uri` +- Fix streaming server connection not being closed on user suspension (by @ThisIsMissEm, [GHSA-r2fh-jr9c-9pxh](https://github.com/mastodon/mastodon/security/advisories/GHSA-r2fh-jr9c-9pxh)) +- Fix password change through admin CLI not invalidating existing sessions and access tokens (by @ThisIsMissEm, [GHSA-f3q3-rmf7-9655](https://github.com/mastodon/mastodon/security/advisories/GHSA-f3q3-rmf7-9655)) +- Fix streaming server allowing access to public timelines even without the `read` or `read:statuses` OAuth scopes (by @ThisIsMissEm, [GHSA-7gwh-mw97-qjgp](https://github.com/mastodon/mastodon/security/advisories/GHSA-7gwh-mw97-qjgp)) + +### Added + +- Add support for processing quotes of deleted posts signaled through a `Tombstone` (#36381 by @ClearlyClaire) + +### Fixed + +- Fix quote post state sometimes not being updated through streaming server (#36408 by @ClearlyClaire) +- Fix inconsistent “pending tags” count on admin dashboard (#36404 by @mjankowski) +- Fix JSON payload being potentially mutated when processing interaction policies (#36392 by @ClearlyClaire) +- Fix quotes not being displayed in email notifications (#36379 by @diondiondion) +- Fix redirect to external object when URL is missing or malformed (#36347 by @ClearlyClaire) +- Fix quotes not being displayed in the featured carousel (#36335 by @diondiondion) + ## [4.4.5] - 2025-09-23 ### Security diff --git a/Gemfile.lock b/Gemfile.lock index 88f6818319..553cac49fe 100644 --- a/Gemfile.lock +++ b/Gemfile.lock @@ -494,7 +494,7 @@ GEM tzinfo validate_url webfinger (~> 2.0) - openssl (3.3.0) + openssl (3.3.1) openssl-signature_algorithm (1.3.0) openssl (> 2.0) opentelemetry-api (1.5.0) @@ -642,7 +642,7 @@ GEM activesupport (>= 3.0.0) raabro (1.4.0) racc (1.8.1) - rack (3.1.17) + rack (3.1.18) rack-attack (6.7.0) rack (>= 1.0, < 4) rack-cors (3.0.0) diff --git a/app/models/concerns/account/suspensions.rb b/app/models/concerns/account/suspensions.rb index c981fb5a29..4c9ca593ad 100644 --- a/app/models/concerns/account/suspensions.rb +++ b/app/models/concerns/account/suspensions.rb @@ -32,6 +32,10 @@ module Account::Suspensions update!(suspended_at: date, suspension_origin: origin) create_canonical_email_block! if block_email end + + # This terminates all connections for the given account with the streaming + # server: + redis.publish("timeline:system:#{id}", Oj.dump(event: :kill)) if local? end def unsuspend! diff --git a/app/models/user.rb b/app/models/user.rb index 01965a67f4..12532652da 100644 --- a/app/models/user.rb +++ b/app/models/user.rb @@ -197,6 +197,10 @@ class User < ApplicationRecord def disable! update!(disabled: true) + + # This terminates all connections for the given account with the streaming + # server: + redis.publish("timeline:system:#{account.id}", Oj.dump(event: :kill)) end def enable! @@ -389,17 +393,22 @@ class User < ApplicationRecord end def reset_password! + # First, change password to something random, this revokes sessions and on-going access: + change_password!(SecureRandom.hex) + + # Finally, send a reset password prompt to the user + send_reset_password_instructions + end + + def change_password!(new_password) # First, change password to something random and deactivate all sessions transaction do - update(password: SecureRandom.hex) + update(password: new_password) session_activations.destroy_all end # Then, remove all authorized applications and connected push subscriptions revoke_access! - - # Finally, send a reset password prompt to the user - send_reset_password_instructions end protected diff --git a/docker-compose.yml b/docker-compose.yml index 788f4aa0b2..83ff4336d0 100644 --- a/docker-compose.yml +++ b/docker-compose.yml @@ -59,7 +59,7 @@ services: web: # You can uncomment the following line if you want to not use the prebuilt image, for example if you have local code changes # build: . - image: ghcr.io/glitch-soc/mastodon:v4.4.5 + image: ghcr.io/glitch-soc/mastodon:v4.4.6 restart: always env_file: .env.production command: bundle exec puma -C config/puma.rb @@ -83,7 +83,7 @@ services: # build: # dockerfile: ./streaming/Dockerfile # context: . - image: ghcr.io/glitch-soc/mastodon-streaming:v4.4.5 + image: ghcr.io/glitch-soc/mastodon-streaming:v4.4.6 restart: always env_file: .env.production command: node ./streaming/index.js @@ -102,7 +102,7 @@ services: sidekiq: # You can uncomment the following line if you want to not use the prebuilt image, for example if you have local code changes # build: . - image: ghcr.io/glitch-soc/mastodon:v4.4.5 + image: ghcr.io/glitch-soc/mastodon:v4.4.6 restart: always env_file: .env.production command: bundle exec sidekiq diff --git a/lib/mastodon/cli/accounts.rb b/lib/mastodon/cli/accounts.rb index 1b33f56055..25e966bd8e 100644 --- a/lib/mastodon/cli/accounts.rb +++ b/lib/mastodon/cli/accounts.rb @@ -165,14 +165,17 @@ module Mastodon::CLI user.role_id = nil end - password = SecureRandom.hex if options[:reset_password] - user.password = password if options[:reset_password] user.email = options[:email] if options[:email] user.disabled = false if options[:enable] user.disabled = true if options[:disable] user.approved = true if options[:approve] user.disable_two_factor! if options[:disable_2fa] + # Password changes are a little different, as we also need to ensure + # sessions, subscriptions, and access tokens are revoked after changing: + password = SecureRandom.hex if options[:reset_password] + user.change_password!(password) if options[:reset_password] + if user.save user.confirm if options[:confirm] diff --git a/lib/mastodon/version.rb b/lib/mastodon/version.rb index 6ad44cff7d..069d0ea491 100644 --- a/lib/mastodon/version.rb +++ b/lib/mastodon/version.rb @@ -13,7 +13,7 @@ module Mastodon end def patch - 5 + 6 end def default_prerelease diff --git a/spec/lib/mastodon/cli/accounts_spec.rb b/spec/lib/mastodon/cli/accounts_spec.rb index 7287b6a034..d9c6aaddec 100644 --- a/spec/lib/mastodon/cli/accounts_spec.rb +++ b/spec/lib/mastodon/cli/accounts_spec.rb @@ -361,11 +361,20 @@ RSpec.describe Mastodon::CLI::Accounts do context 'with --reset-password option' do let(:options) { { reset_password: true } } + let(:user) { Fabricate(:user, password: original_password) } + let(:original_password) { 'foobar12345' } + let(:new_password) { 'new_password12345' } + it 'returns a new password for the user' do - allow(SecureRandom).to receive(:hex).and_return('new_password') + allow(SecureRandom).to receive(:hex).and_return(new_password) + allow(Account).to receive(:find_local).and_return(user.account) + allow(user).to receive(:change_password!).and_call_original expect { subject } - .to output_results('new_password') + .to output_results(new_password) + + expect(user).to have_received(:change_password!).with(new_password) + expect(user.reload).to_not be_external_or_valid_password(original_password) end end diff --git a/spec/models/user_spec.rb b/spec/models/user_spec.rb index b732b2d84d..3119d95cce 100644 --- a/spec/models/user_spec.rb +++ b/spec/models/user_spec.rb @@ -476,12 +476,15 @@ RSpec.describe User do let(:current_sign_in_at) { Time.zone.now } - before do - user.disable! - end - it 'disables user' do + allow(redis).to receive(:publish) + + user.disable! + expect(user).to have_attributes(disabled: true) + + expect(redis) + .to have_received(:publish).with("timeline:system:#{user.account.id}", Oj.dump(event: :kill)).once end end diff --git a/spec/system/streaming/streaming_spec.rb b/spec/system/streaming/streaming_spec.rb index c12bd1b18f..f5d3ba1142 100644 --- a/spec/system/streaming/streaming_spec.rb +++ b/spec/system/streaming/streaming_spec.rb @@ -74,4 +74,52 @@ RSpec.describe 'Streaming', :inline_jobs, :streaming do expect(streaming_client.open?).to be(false) end end + + context 'with a disabled user account' do + before do + user.disable! + end + + it 'receives an 401 unauthorized error when trying to connect' do + streaming_client.connect + + expect(streaming_client.status).to eq(401) + expect(streaming_client.open?).to be(false) + end + end + + context 'when the user account is disabled whilst connected' do + it 'terminates the connection for the user' do + streaming_client.connect + + user.disable! + + expect(streaming_client.wait_for(:closed).code).to be(1000) + expect(streaming_client.open?).to be(false) + end + end + + context 'with a suspended user account' do + before do + user.account.suspend! + end + + it 'receives an 401 unauthorized error when trying to connect' do + streaming_client.connect + + expect(streaming_client.status).to eq(401) + expect(streaming_client.open?).to be(false) + end + end + + context 'when the user account is suspended whilst connected' do + it 'terminates the connection for the user' do + streaming_client.connect + + user.account.suspend! + + expect(streaming_client.wait_for(:closed).code).to be(1000) + expect(streaming_client.open?).to be(false) + end + end end diff --git a/streaming/index.js b/streaming/index.js index d53f4ffcd4..abad6b323e 100644 --- a/streaming/index.js +++ b/streaming/index.js @@ -12,8 +12,18 @@ import { JSDOM } from 'jsdom'; import { WebSocketServer } from 'ws'; import * as Database from './database.js'; -import { AuthenticationError, RequestError, extractStatusAndMessage as extractErrorStatusAndMessage } from './errors.js'; -import { logger, httpLogger, initializeLogLevel, attachWebsocketHttpLogger, createWebsocketLogger } from './logging.js'; +import { + AuthenticationError, + RequestError, + extractStatusAndMessage as extractErrorStatusAndMessage, +} from './errors.js'; +import { + logger, + httpLogger, + initializeLogLevel, + attachWebsocketHttpLogger, + createWebsocketLogger, +} from './logging.js'; import { setupMetrics } from './metrics.js'; import * as Redis from './redis.js'; import { isTruthy, normalizeHashtag, firstParam } from './utils.js'; @@ -23,13 +33,11 @@ const environment = process.env.NODE_ENV || 'development'; // Correctly detect and load .env or .env.production file based on environment: const dotenvFile = environment === 'production' ? '.env.production' : '.env'; const dotenvFilePath = path.resolve( - url.fileURLToPath( - new URL(path.join('..', dotenvFile), import.meta.url) - ) + url.fileURLToPath(new URL(path.join('..', dotenvFile), import.meta.url)), ); dotenv.config({ - path: dotenvFilePath + path: dotenvFilePath, }); initializeLogLevel(process.env, environment); @@ -46,7 +54,6 @@ initializeLogLevel(process.env, environment); * @property {string[]} chosenLanguages */ - /** * Attempts to safely parse a string as JSON, used when both receiving a message * from redis and when receiving a message from a client over a websocket @@ -67,9 +74,15 @@ const parseJSON = (json, req) => { */ if (req) { if (req.accountId) { - req.log.error({ err }, `Error parsing message from user ${req.accountId}`); + req.log.error( + { err }, + `Error parsing message from user ${req.accountId}`, + ); } else { - req.log.error({ err }, `Error parsing message from ${req.remoteAddress}`); + req.log.error( + { err }, + `Error parsing message from ${req.remoteAddress}`, + ); } } else { logger.error({ err }, `Error parsing message from redis`); @@ -78,17 +91,6 @@ const parseJSON = (json, req) => { } }; -const PUBLIC_CHANNELS = [ - 'public', - 'public:media', - 'public:local', - 'public:local:media', - 'public:remote', - 'public:remote:media', - 'hashtag', - 'hashtag:local', -]; - // Used for priming the counters/gauges for the various metrics that are // per-channel const CHANNEL_NAMES = [ @@ -97,7 +99,14 @@ const CHANNEL_NAMES = [ 'user:notification', 'list', 'direct', - ...PUBLIC_CHANNELS + 'public', + 'public:media', + 'public:local', + 'public:local:media', + 'public:remote', + 'public:remote:media', + 'hashtag', + 'hashtag:local', ]; const startServer = async () => { @@ -131,7 +140,7 @@ const startServer = async () => { * @returns {string} */ function redisUnnamespaced(channel) { - if (typeof redisConfig.namespace === "string") { + if (typeof redisConfig.namespace === 'string') { // Note: this removes the configured namespace and the colon that is used // to separate it: return channel.slice(redisConfig.namespace.length + 1); @@ -141,13 +150,18 @@ const startServer = async () => { } // Set the X-Request-Id header on WebSockets: - wss.on("headers", function onHeaders(headers, req) { + wss.on('headers', function onHeaders(headers, req) { headers.push(`X-Request-Id: ${req.id}`); }); const app = express(); - app.set('trust proxy', process.env.TRUSTED_PROXY_IP ? process.env.TRUSTED_PROXY_IP.split(/(?:\s*,\s*|\s+)/) : 'loopback,uniquelocal'); + app.set( + 'trust proxy', + process.env.TRUSTED_PROXY_IP + ? process.env.TRUSTED_PROXY_IP.split(/(?:\s*,\s*|\s+)/) + : 'loopback,uniquelocal', + ); app.use(httpLogger); app.use(cors()); @@ -161,7 +175,7 @@ const startServer = async () => { // logger. This decorates the `request` object. attachWebsocketHttpLogger(request); - request.log.info("HTTP Upgrade Requested"); + request.log.info('HTTP Upgrade Requested'); /** @param {Error} err */ const onSocketError = (err) => { @@ -179,15 +193,15 @@ const startServer = async () => { // Unfortunately for using the on('upgrade') setup, we need to manually // write a HTTP Response to the Socket to close the connection upgrade // attempt, so the following code is to handle all of that. - const {statusCode, errorMessage } = extractErrorStatusAndMessage(err); + const { statusCode, errorMessage } = extractErrorStatusAndMessage(err); /** @type {Record} */ const headers = { - 'Connection': 'close', + Connection: 'close', 'Content-Type': 'text/plain', 'Content-Length': 0, 'X-Request-Id': request.id, - 'X-Error-Message': errorMessage + 'X-Error-Message': errorMessage, }; // Ensure the socket is closed once we've finished writing to it: @@ -196,16 +210,25 @@ const startServer = async () => { }); // Write the HTTP response manually: - socket.end(`HTTP/1.1 ${statusCode} ${http.STATUS_CODES[statusCode]}\r\n${Object.keys(headers).map((key) => `${key}: ${headers[key]}`).join('\r\n')}\r\n\r\n`); + socket.end( + `HTTP/1.1 ${statusCode} ${http.STATUS_CODES[statusCode]}\r\n${Object.keys( + headers, + ) + .map((key) => `${key}: ${headers[key]}`) + .join('\r\n')}\r\n\r\n`, + ); // Finally, log the error: - request.log.error({ - err, - res: { - statusCode, - headers - } - }, errorMessage); + request.log.error( + { + err, + res: { + statusCode, + headers, + }, + }, + errorMessage, + ); return; } @@ -214,7 +237,9 @@ const startServer = async () => { socket.removeListener('error', onSocketError); wss.handleUpgrade(request, socket, head, function done(ws) { - request.log.info("Authenticated request & upgraded to WebSocket connection"); + request.log.info( + 'Authenticated request & upgraded to WebSocket connection', + ); const wsLogger = createWebsocketLogger(request, resolvedAccount); @@ -236,7 +261,10 @@ const startServer = async () => { app.get('/favicon.ico', (_req, res) => res.status(404).end()); app.get('/api/v1/streaming/health', (_req, res) => { - res.writeHead(200, { 'Content-Type': 'text/plain', 'Cache-Control': 'private, no-store' }); + res.writeHead(200, { + 'Content-Type': 'text/plain', + 'Cache-Control': 'private, no-store', + }); res.end('OK'); }); @@ -246,11 +274,18 @@ const startServer = async () => { * @param {string[]} channels * @returns {function(): void} */ - const subscriptionHeartbeat = channels => { + const subscriptionHeartbeat = (channels) => { const interval = 6 * 60; const tellSubscribed = () => { - channels.forEach(channel => redisClient.set(redisNamespaced(`subscribed:${channel}`), '1', 'EX', interval * 3)); + channels.forEach((channel) => + redisClient.set( + redisNamespaced(`subscribed:${channel}`), + '1', + 'EX', + interval * 3, + ), + ); }; tellSubscribed(); @@ -279,9 +314,9 @@ const startServer = async () => { const json = parseJSON(message, null); if (!json) return; - callbacks.forEach(callback => callback(json)); + callbacks.forEach((callback) => callback(json)); }; - redisSubscribeClient.on("message", onRedisMessage); + redisSubscribeClient.on('message', onRedisMessage); /** * @callback SubscriptionListener @@ -324,19 +359,22 @@ const startServer = async () => { return; } - subs[channel] = subs[channel].filter(item => item !== callback); + subs[channel] = subs[channel].filter((item) => item !== callback); if (subs[channel].length === 0) { logger.debug(`Unsubscribe ${channel}`); // FIXME: https://github.com/redis/ioredis/issues/1910 - redisSubscribeClient.unsubscribe(redisNamespaced(channel), (err, count) => { - if (err) { - logger.error(`Error unsubscribing to ${channel}`); - } else if (typeof count === 'number') { - metrics.redisSubscriptions.set(count); - } - }); + redisSubscribeClient.unsubscribe( + redisNamespaced(channel), + (err, count) => { + if (err) { + logger.error(`Error unsubscribing to ${channel}`); + } else if (typeof count === 'number') { + metrics.redisSubscriptions.set(count); + } + }, + ); delete subs[channel]; } }; @@ -347,7 +385,7 @@ const startServer = async () => { * @returns {boolean} */ const isInScope = (req, necessaryScopes) => - req.scopes.some(scope => necessaryScopes.includes(scope)); + req.scopes.some((scope) => necessaryScopes.includes(scope)); /** * @param {string} token @@ -355,7 +393,10 @@ const startServer = async () => { * @returns {Promise} */ const accountFromToken = async (token, req) => { - const result = await pgPool.query('SELECT oauth_access_tokens.id, oauth_access_tokens.resource_owner_id, users.account_id, users.chosen_languages, oauth_access_tokens.scopes FROM oauth_access_tokens INNER JOIN users ON oauth_access_tokens.resource_owner_id = users.id WHERE oauth_access_tokens.token = $1 AND oauth_access_tokens.revoked_at IS NULL LIMIT 1', [token]); + const result = await pgPool.query( + 'SELECT oauth_access_tokens.id, oauth_access_tokens.resource_owner_id, users.account_id, users.chosen_languages, oauth_access_tokens.scopes FROM oauth_access_tokens INNER JOIN users ON oauth_access_tokens.resource_owner_id = users.id INNER JOIN accounts ON accounts.id = users.account_id WHERE oauth_access_tokens.token = $1 AND oauth_access_tokens.revoked_at IS NULL AND users.disabled IS FALSE AND accounts.suspended_at IS NULL LIMIT 1', + [token], + ); if (result.rows.length === 0) { throw new AuthenticationError('Invalid access token'); @@ -378,50 +419,54 @@ const startServer = async () => { * @param {any} req * @returns {Promise} */ - const accountFromRequest = (req) => new Promise((resolve, reject) => { - const authorization = req.headers.authorization; - const location = url.parse(req.url, true); - const accessToken = location.query.access_token || req.headers['sec-websocket-protocol']; + const accountFromRequest = (req) => + new Promise((resolve, reject) => { + const authorization = req.headers.authorization; + const location = url.parse(req.url, true); + const accessToken = + location.query.access_token || req.headers['sec-websocket-protocol']; - if (!authorization && !accessToken) { - reject(new AuthenticationError('Missing access token')); - return; - } + if (!authorization && !accessToken) { + reject(new AuthenticationError('Missing access token')); + return; + } - const token = authorization ? authorization.replace(/^Bearer /, '') : accessToken; + const token = authorization + ? authorization.replace(/^Bearer /, '') + : accessToken; - resolve(accountFromToken(token, req)); - }); + resolve(accountFromToken(token, req)); + }); /** * @param {any} req * @returns {string|undefined} */ - const channelNameFromPath = req => { + const channelNameFromPath = (req) => { const { path, query } = req; const onlyMedia = isTruthy(query.only_media); switch (path) { - case '/api/v1/streaming/user': - return 'user'; - case '/api/v1/streaming/user/notification': - return 'user:notification'; - case '/api/v1/streaming/public': - return onlyMedia ? 'public:media' : 'public'; - case '/api/v1/streaming/public/local': - return onlyMedia ? 'public:local:media' : 'public:local'; - case '/api/v1/streaming/public/remote': - return onlyMedia ? 'public:remote:media' : 'public:remote'; - case '/api/v1/streaming/hashtag': - return 'hashtag'; - case '/api/v1/streaming/hashtag/local': - return 'hashtag:local'; - case '/api/v1/streaming/direct': - return 'direct'; - case '/api/v1/streaming/list': - return 'list'; - default: - return undefined; + case '/api/v1/streaming/user': + return 'user'; + case '/api/v1/streaming/user/notification': + return 'user:notification'; + case '/api/v1/streaming/public': + return onlyMedia ? 'public:media' : 'public'; + case '/api/v1/streaming/public/local': + return onlyMedia ? 'public:local:media' : 'public:local'; + case '/api/v1/streaming/public/remote': + return onlyMedia ? 'public:remote:media' : 'public:remote'; + case '/api/v1/streaming/hashtag': + return 'hashtag'; + case '/api/v1/streaming/hashtag/local': + return 'hashtag:local'; + case '/api/v1/streaming/direct': + return 'direct'; + case '/api/v1/streaming/list': + return 'list'; + default: + return undefined; } }; @@ -431,38 +476,42 @@ const startServer = async () => { * @param {string|undefined} channelName * @returns {Promise.} */ - const checkScopes = (req, logger, channelName) => new Promise((resolve, reject) => { - logger.debug(`Checking OAuth scopes for ${channelName}`); + const checkScopes = (req, logger, channelName) => + new Promise((resolve, reject) => { + logger.debug(`Checking OAuth scopes for ${channelName}`); - // When accessing public channels, no scopes are needed - if (channelName && PUBLIC_CHANNELS.includes(channelName)) { - resolve(); - return; - } + // The `read` scope has the highest priority, if the token has it + // then it can access all streams + const requiredScopes = ['read']; - // The `read` scope has the highest priority, if the token has it - // then it can access all streams - const requiredScopes = ['read']; + // When accessing specifically the notifications stream, + // we need a read:notifications, while in all other cases, + // we can allow access with read:statuses. Mind that the + // user stream will not contain notifications unless + // the token has either read or read:notifications scope + // as well, this is handled separately. + if (channelName === 'user:notification') { + requiredScopes.push('read:notifications'); + } else { + requiredScopes.push('read:statuses'); + } - // When accessing specifically the notifications stream, - // we need a read:notifications, while in all other cases, - // we can allow access with read:statuses. Mind that the - // user stream will not contain notifications unless - // the token has either read or read:notifications scope - // as well, this is handled separately. - if (channelName === 'user:notification') { - requiredScopes.push('read:notifications'); - } else { - requiredScopes.push('read:statuses'); - } + if ( + req.scopes && + requiredScopes.some((requiredScope) => + req.scopes.includes(requiredScope), + ) + ) { + resolve(); + return; + } - if (req.scopes && requiredScopes.some(requiredScope => req.scopes.includes(requiredScope))) { - resolve(); - return; - } - - reject(new AuthenticationError('Access token does not have the required scopes')); - }); + reject( + new AuthenticationError( + 'Access token does not have the required scopes', + ), + ); + }); /** * @typedef SystemMessageHandlers @@ -475,7 +524,7 @@ const startServer = async () => { * @returns {SubscriptionListener} */ const createSystemMessageListener = (req, eventHandlers) => { - return message => { + return (message) => { if (!message?.event) { return; } @@ -485,7 +534,9 @@ const startServer = async () => { req.log.debug(`System message for ${req.accountId}: ${event}`); if (event === 'kill') { - req.log.debug(`Closing connection for ${req.accountId} due to expired access token`); + req.log.debug( + `Closing connection for ${req.accountId} due to expired access token`, + ); eventHandlers.onKill(); } else if (event === 'filters_changed') { req.log.debug(`Invalidating filters cache for ${req.accountId}`); @@ -512,13 +563,17 @@ const startServer = async () => { unsubscribe(accessTokenChannelId, listener); unsubscribe(systemChannelId, listener); - metrics.connectedChannels.labels({ type: 'eventsource', channel: 'system' }).dec(2); + metrics.connectedChannels + .labels({ type: 'eventsource', channel: 'system' }) + .dec(2); }); subscribe(accessTokenChannelId, listener); subscribe(systemChannelId, listener); - metrics.connectedChannels.labels({ type: 'eventsource', channel: 'system' }).inc(2); + metrics.connectedChannels + .labels({ type: 'eventsource', channel: 'system' }) + .inc(2); }; /** @@ -541,13 +596,17 @@ const startServer = async () => { return; } - accountFromRequest(req).then(() => checkScopes(req, req.log, channelName)).then(() => { - subscribeHttpToSystemChannel(req, res); - }).then(() => { - next(); - }).catch(err => { - next(err); - }); + accountFromRequest(req) + .then(() => checkScopes(req, req.log, channelName)) + .then(() => { + subscribeHttpToSystemChannel(req, res); + }) + .then(() => { + next(); + }) + .catch((err) => { + next(err); + }); }; /** @@ -564,7 +623,7 @@ const startServer = async () => { return; } - const {statusCode, errorMessage } = extractErrorStatusAndMessage(err); + const { statusCode, errorMessage } = extractErrorStatusAndMessage(err); res.writeHead(statusCode, { 'Content-Type': 'application/json' }); res.end(JSON.stringify({ error: errorMessage })); @@ -576,7 +635,8 @@ const startServer = async () => { * @returns {string} */ // @ts-ignore - const placeholders = (arr, shift = 0) => arr.map((_, i) => `$${i + 1 + shift}`).join(', '); + const placeholders = (arr, shift = 0) => + arr.map((_, i) => `$${i + 1 + shift}`).join(', '); /** * @param {string} listId @@ -586,7 +646,10 @@ const startServer = async () => { const authorizeListAccess = async (listId, req) => { const { accountId } = req; - const result = await pgPool.query('SELECT id, account_id FROM lists WHERE id = $1 AND account_id = $2 LIMIT 1', [listId, accountId]); + const result = await pgPool.query( + 'SELECT id, account_id FROM lists WHERE id = $1 AND account_id = $2 LIMIT 1', + [listId, accountId], + ); if (result.rows.length === 0) { throw new AuthenticationError('List not found'); @@ -604,7 +667,16 @@ const startServer = async () => { * @param {boolean=} allowLocalOnly * @returns {SubscriptionListener} */ - const streamFrom = (channelIds, req, log, output, attachCloseHandler, destinationType, needsFiltering = false, allowLocalOnly = false) => { + const streamFrom = ( + channelIds, + req, + log, + output, + attachCloseHandler, + destinationType, + needsFiltering = false, + allowLocalOnly = false, + ) => { log.info({ channelIds }, `Starting stream`); /** @@ -613,11 +685,15 @@ const startServer = async () => { */ const transmit = (event, payload) => { // TODO: Replace "string"-based delete payloads with object payloads: - const encodedPayload = typeof payload === 'object' ? JSON.stringify(payload) : payload; + const encodedPayload = + typeof payload === 'object' ? JSON.stringify(payload) : payload; metrics.messagesSent.labels({ type: destinationType }).inc(1); - log.debug({ event, payload }, `Transmitting ${event} to ${req.accountId}`); + log.debug( + { event, payload }, + `Transmitting ${event} to ${req.accountId}`, + ); output(event, encodedPayload); }; @@ -627,7 +703,7 @@ const startServer = async () => { // events also include a queued_at value, but this is being removed shortly. /** @type {SubscriptionListener} */ - const listener = message => { + const listener = (message) => { if (!message?.event || !message?.payload) { return; } @@ -635,7 +711,11 @@ const startServer = async () => { const { event, payload } = message; // Only send local-only statuses to logged-in users - if ((event === 'update' || event === 'status.update') && payload.local_only && !(req.accountId && allowLocalOnly)) { + if ( + (event === 'update' || event === 'status.update') && + payload.local_only && + !(req.accountId && allowLocalOnly) + ) { log.debug(`Message ${payload.id} filtered because it was local-only`); return; } @@ -650,7 +730,10 @@ const startServer = async () => { // // The channels that need filtering are determined in the function // `channelNameToIds` defined below: - if (!needsFiltering || (event !== 'update' && event !== 'status.update')) { + if ( + !needsFiltering || + (event !== 'update' && event !== 'status.update') + ) { transmit(event, payload); return; } @@ -659,8 +742,13 @@ const startServer = async () => { // filtering of statuses: // Filter based on language: - if (Array.isArray(req.chosenLanguages) && req.chosenLanguages.indexOf(payload.language) === -1) { - log.debug(`Message ${payload.id} filtered by language (${payload.language})`); + if ( + Array.isArray(req.chosenLanguages) && + req.chosenLanguages.indexOf(payload.language) === -1 + ) { + log.debug( + `Message ${payload.id} filtered by language (${payload.language})`, + ); return; } @@ -672,7 +760,9 @@ const startServer = async () => { // Filter based on domain blocks, blocks, mutes, or custom filters: // @ts-ignore - const targetAccountIds = [payload.account.id].concat(payload.mentions.map(item => item.id)); + const targetAccountIds = [payload.account.id].concat( + payload.mentions.map((item) => item.id), + ); const accountDomain = payload.account.acct.split('@')[1]; // TODO: Move this logic out of the message handling loop @@ -684,7 +774,8 @@ const startServer = async () => { const queries = [ // @ts-ignore - client.query(`SELECT 1 + client.query( + `SELECT 1 FROM blocks WHERE (account_id = $1 AND target_account_id IN (${placeholders(targetAccountIds, 2)})) OR (account_id = $2 AND target_account_id = $1) @@ -692,154 +783,200 @@ const startServer = async () => { SELECT 1 FROM mutes WHERE account_id = $1 - AND target_account_id IN (${placeholders(targetAccountIds, 2)})`, [req.accountId, payload.account.id].concat(targetAccountIds)), + AND target_account_id IN (${placeholders(targetAccountIds, 2)})`, + [req.accountId, payload.account.id].concat(targetAccountIds), + ), ]; if (accountDomain) { // @ts-ignore - queries.push(client.query('SELECT 1 FROM account_domain_blocks WHERE account_id = $1 AND domain = $2', [req.accountId, accountDomain])); + queries.push( + client.query( + 'SELECT 1 FROM account_domain_blocks WHERE account_id = $1 AND domain = $2', + [req.accountId, accountDomain], + ), + ); } // @ts-ignore if (!payload.filtered && !req.cachedFilters) { // @ts-ignore - queries.push(client.query('SELECT filter.id AS id, filter.phrase AS title, filter.context AS context, filter.expires_at AS expires_at, filter.action AS filter_action, keyword.keyword AS keyword, keyword.whole_word AS whole_word FROM custom_filter_keywords keyword JOIN custom_filters filter ON keyword.custom_filter_id = filter.id WHERE filter.account_id = $1 AND (filter.expires_at IS NULL OR filter.expires_at > NOW())', [req.accountId])); + queries.push( + client.query( + 'SELECT filter.id AS id, filter.phrase AS title, filter.context AS context, filter.expires_at AS expires_at, filter.action AS filter_action, keyword.keyword AS keyword, keyword.whole_word AS whole_word FROM custom_filter_keywords keyword JOIN custom_filters filter ON keyword.custom_filter_id = filter.id WHERE filter.account_id = $1 AND (filter.expires_at IS NULL OR filter.expires_at > NOW())', + [req.accountId], + ), + ); } - Promise.all(queries).then(values => { - releasePgConnection(); + Promise.all(queries) + .then((values) => { + releasePgConnection(); - // Handling blocks & mutes and domain blocks: If one of those applies, - // then we don't transmit the payload of the event to the client - if (values[0].rows.length > 0 || (accountDomain && values[1].rows.length > 0)) { - return; - } + // Handling blocks & mutes and domain blocks: If one of those applies, + // then we don't transmit the payload of the event to the client + if ( + values[0].rows.length > 0 || + (accountDomain && values[1].rows.length > 0) + ) { + return; + } - // If the payload already contains the `filtered` property, it means - // that filtering has been applied on the ruby on rails side, as - // such, we don't need to construct or apply the filters in streaming: - if (Object.hasOwn(payload, "filtered")) { - transmit(event, payload); - return; - } - - // Handling for constructing the custom filters and caching them on the request - // TODO: Move this logic out of the message handling lifecycle - // @ts-ignore - if (!req.cachedFilters) { - const filterRows = values[accountDomain ? 2 : 1].rows; + // If the payload already contains the `filtered` property, it means + // that filtering has been applied on the ruby on rails side, as + // such, we don't need to construct or apply the filters in streaming: + if (Object.hasOwn(payload, 'filtered')) { + transmit(event, payload); + return; + } + // Handling for constructing the custom filters and caching them on the request + // TODO: Move this logic out of the message handling lifecycle // @ts-ignore - req.cachedFilters = filterRows.reduce((cache, filter) => { - if (cache[filter.id]) { - cache[filter.id].keywords.push([filter.keyword, filter.whole_word]); - } else { - cache[filter.id] = { - keywords: [[filter.keyword, filter.whole_word]], - expires_at: filter.expires_at, - filter: { - id: filter.id, - title: filter.title, - context: filter.context, - expires_at: filter.expires_at, - // filter.filter_action is the value from the - // custom_filters.action database column, it is an integer - // representing a value in an enum defined by Ruby on Rails: - // - // enum { warn: 0, hide: 1 } - filter_action: ['warn', 'hide'][filter.filter_action], - }, - }; - } + if (!req.cachedFilters) { + const filterRows = values[accountDomain ? 2 : 1].rows; - return cache; - }, {}); - - // Construct the regular expressions for the custom filters: This - // needs to be done in a separate loop as the database returns one - // filterRow per keyword, so we need all the keywords before - // constructing the regular expression - // @ts-ignore - Object.keys(req.cachedFilters).forEach((key) => { // @ts-ignore - req.cachedFilters[key].regexp = new RegExp(req.cachedFilters[key].keywords.map(([keyword, whole_word]) => { - let expr = keyword.replace(/[.*+?^${}()|[\]\\]/g, '\\$&'); - - if (whole_word) { - if (/^[\w]/.test(expr)) { - expr = `\\b${expr}`; - } - - if (/[\w]$/.test(expr)) { - expr = `${expr}\\b`; - } + req.cachedFilters = filterRows.reduce((cache, filter) => { + if (cache[filter.id]) { + cache[filter.id].keywords.push([ + filter.keyword, + filter.whole_word, + ]); + } else { + cache[filter.id] = { + keywords: [[filter.keyword, filter.whole_word]], + expires_at: filter.expires_at, + filter: { + id: filter.id, + title: filter.title, + context: filter.context, + expires_at: filter.expires_at, + // filter.filter_action is the value from the + // custom_filters.action database column, it is an integer + // representing a value in an enum defined by Ruby on Rails: + // + // enum { warn: 0, hide: 1 } + filter_action: ['warn', 'hide'][filter.filter_action], + }, + }; } - return expr; - }).join('|'), 'i'); - }); - } + return cache; + }, {}); - // Apply cachedFilters against the payload, constructing a - // `filter_results` array of FilterResult entities - // @ts-ignore - if (req.cachedFilters) { - const status = payload; - // TODO: Calculate searchableContent in Ruby on Rails: + // Construct the regular expressions for the custom filters: This + // needs to be done in a separate loop as the database returns one + // filterRow per keyword, so we need all the keywords before + // constructing the regular expression + // @ts-ignore + Object.keys(req.cachedFilters).forEach((key) => { + // @ts-ignore + req.cachedFilters[key].regexp = new RegExp( + req.cachedFilters[key].keywords + .map(([keyword, whole_word]) => { + let expr = keyword.replace(/[.*+?^${}()|[\]\\]/g, '\\$&'); + + if (whole_word) { + if (/^[\w]/.test(expr)) { + expr = `\\b${expr}`; + } + + if (/[\w]$/.test(expr)) { + expr = `${expr}\\b`; + } + } + + return expr; + }) + .join('|'), + 'i', + ); + }); + } + + // Apply cachedFilters against the payload, constructing a + // `filter_results` array of FilterResult entities // @ts-ignore - const searchableContent = ([status.spoiler_text || '', status.content].concat((status.poll && status.poll.options) ? status.poll.options.map(option => option.title) : [])).concat(status.media_attachments.map(att => att.description)).join('\n\n').replace(//g, '\n').replace(/<\/p>

/g, '\n\n'); - const searchableTextContent = JSDOM.fragment(searchableContent).textContent; + if (req.cachedFilters) { + const status = payload; + // TODO: Calculate searchableContent in Ruby on Rails: + // @ts-ignore + const searchableContent = [ + status.spoiler_text || '', + status.content, + ] + .concat( + status.poll && status.poll.options + ? status.poll.options.map((option) => option.title) + : [], + ) + .concat(status.media_attachments.map((att) => att.description)) + .join('\n\n') + .replace(//g, '\n') + .replace(/<\/p>

/g, '\n\n'); + const searchableTextContent = + JSDOM.fragment(searchableContent).textContent; - const now = new Date(); - // @ts-ignore - const filter_results = Object.values(req.cachedFilters).reduce((results, cachedFilter) => { - // Check the filter hasn't expired before applying: - if (cachedFilter.expires_at !== null && cachedFilter.expires_at < now) { - return results; - } + const now = new Date(); + // @ts-ignore + const filter_results = Object.values(req.cachedFilters).reduce( + (results, cachedFilter) => { + // Check the filter hasn't expired before applying: + if ( + cachedFilter.expires_at !== null && + cachedFilter.expires_at < now + ) { + return results; + } - // Just in-case JSDOM fails to find textContent in searchableContent - if (!searchableTextContent) { - return results; - } + // Just in-case JSDOM fails to find textContent in searchableContent + if (!searchableTextContent) { + return results; + } - const keyword_matches = searchableTextContent.match(cachedFilter.regexp); - if (keyword_matches) { - // results is an Array of FilterResult; status_matches is always - // null as we only are only applying the keyword-based custom - // filters, not the status-based custom filters. - // https://docs.joinmastodon.org/entities/FilterResult/ - results.push({ - filter: cachedFilter.filter, - keyword_matches, - status_matches: null - }); - } + const keyword_matches = searchableTextContent.match( + cachedFilter.regexp, + ); + if (keyword_matches) { + // results is an Array of FilterResult; status_matches is always + // null as we only are only applying the keyword-based custom + // filters, not the status-based custom filters. + // https://docs.joinmastodon.org/entities/FilterResult/ + results.push({ + filter: cachedFilter.filter, + keyword_matches, + status_matches: null, + }); + } - return results; - }, []); + return results; + }, + [], + ); - // Send the payload + the FilterResults as the `filtered` property - // to the streaming connection. To reach this code, the `event` must - // have been either `update` or `status.update`, meaning the - // `payload` is a Status entity, which has a `filtered` property: - // - // filtered: https://docs.joinmastodon.org/entities/Status/#filtered - transmit(event, { - ...payload, - filtered: filter_results - }); - } else { - transmit(event, payload); - } - }).catch(err => { - log.error(err); - releasePgConnection(); - }); + // Send the payload + the FilterResults as the `filtered` property + // to the streaming connection. To reach this code, the `event` must + // have been either `update` or `status.update`, meaning the + // `payload` is a Status entity, which has a `filtered` property: + // + // filtered: https://docs.joinmastodon.org/entities/Status/#filtered + transmit(event, { + ...payload, + filtered: filter_results, + }); + } else { + transmit(event, payload); + } + }) + .catch((err) => { + log.error(err); + releasePgConnection(); + }); }); }; - channelIds.forEach(id => { + channelIds.forEach((id) => { subscribe(id, listener); }); @@ -862,7 +999,9 @@ const startServer = async () => { // In theory we'll always have a channel name, but channelNameFromPath can return undefined: if (typeof channelName === 'string') { - metrics.connectedChannels.labels({ type: 'eventsource', channel: channelName }).inc(); + metrics.connectedChannels + .labels({ type: 'eventsource', channel: channelName }) + .inc(); } res.setHeader('Content-Type', 'text/event-stream'); @@ -881,7 +1020,9 @@ const startServer = async () => { metrics.connectedClients.labels({ type: 'eventsource' }).dec(); // In theory we'll always have a channel name, but channelNameFromPath can return undefined: if (typeof channelName === 'string') { - metrics.connectedChannels.labels({ type: 'eventsource', channel: channelName }).dec(); + metrics.connectedChannels + .labels({ type: 'eventsource', channel: channelName }) + .dec(); } clearInterval(heartbeat); @@ -899,17 +1040,19 @@ const startServer = async () => { * @returns {function(string[], SubscriptionListener): void} */ - const streamHttpEnd = (req, closeHandler = undefined) => (ids, listener) => { - req.on('close', () => { - ids.forEach(id => { - unsubscribe(id, listener); - }); + const streamHttpEnd = + (req, closeHandler = undefined) => + (ids, listener) => { + req.on('close', () => { + ids.forEach((id) => { + unsubscribe(id, listener); + }); - if (closeHandler) { - closeHandler(); - } - }); - }; + if (closeHandler) { + closeHandler(); + } + }); + }; /** * @param {http.IncomingMessage} req @@ -927,7 +1070,7 @@ const startServer = async () => { ws.send(message, (/** @type {Error|undefined} */ err) => { if (err) { - req.log.error({err}, `Failed to send to websocket`); + req.log.error({ err }, `Failed to send to websocket`); } }); }; @@ -935,7 +1078,7 @@ const startServer = async () => { /** * @param {http.ServerResponse} res */ - const httpNotFound = res => { + const httpNotFound = (res) => { res.writeHead(404, { 'Content-Type': 'application/json' }); res.end(JSON.stringify({ error: 'Not found' })); }; @@ -958,20 +1101,31 @@ const startServer = async () => { return; } - channelNameToIds(req, channelName, req.query).then(({ channelIds, options }) => { - const onSend = streamToHttp(req, res); - const onEnd = streamHttpEnd(req, subscriptionHeartbeat(channelIds)); + channelNameToIds(req, channelName, req.query) + .then(({ channelIds, options }) => { + const onSend = streamToHttp(req, res); + const onEnd = streamHttpEnd(req, subscriptionHeartbeat(channelIds)); - // @ts-ignore - streamFrom(channelIds, req, req.log, onSend, onEnd, 'eventsource', options.needsFiltering, options.allowLocalOnly); - }).catch(err => { - const {statusCode, errorMessage } = extractErrorStatusAndMessage(err); + // @ts-ignore + streamFrom( + channelIds, + req, + req.log, + onSend, + onEnd, + 'eventsource', + options.needsFiltering, + options.allowLocalOnly, + ); + }) + .catch((err) => { + const { statusCode, errorMessage } = extractErrorStatusAndMessage(err); - res.log.info({ err }, 'Eventsource subscription error'); + res.log.info({ err }, 'Eventsource subscription error'); - res.writeHead(statusCode, { 'Content-Type': 'application/json' }); - res.end(JSON.stringify({ error: errorMessage })); - }); + res.writeHead(statusCode, { 'Content-Type': 'application/json' }); + res.end(JSON.stringify({ error: errorMessage })); + }); }); /** @@ -985,7 +1139,7 @@ const startServer = async () => { * @param {any} req * @returns {string[]} */ - const channelsForUserStream = req => { + const channelsForUserStream = (req) => { const arr = [`timeline:${req.accountId}`]; if (isInScope(req, ['read', 'read:notifications'])) { @@ -1001,127 +1155,140 @@ const startServer = async () => { * @param {StreamParams} params * @returns {Promise.<{ channelIds: string[], options: { needsFiltering: boolean } }>} */ - const channelNameToIds = (req, name, params) => new Promise((resolve, reject) => { - switch (name) { - case 'user': - resolve({ - channelIds: channelsForUserStream(req), - options: { needsFiltering: false, allowLocalOnly: true }, - }); + const channelNameToIds = (req, name, params) => + new Promise((resolve, reject) => { + switch (name) { + case 'user': + resolve({ + channelIds: channelsForUserStream(req), + options: { needsFiltering: false, allowLocalOnly: true }, + }); - break; - case 'user:notification': - resolve({ - channelIds: [`timeline:${req.accountId}:notifications`], - options: { needsFiltering: false, allowLocalOnly: true }, - }); + break; + case 'user:notification': + resolve({ + channelIds: [`timeline:${req.accountId}:notifications`], + options: { needsFiltering: false, allowLocalOnly: true }, + }); - break; - case 'public': - resolve({ - channelIds: ['timeline:public'], - options: { needsFiltering: true, allowLocalOnly: isTruthy(params.allow_local_only) }, - }); + break; + case 'public': + resolve({ + channelIds: ['timeline:public'], + options: { + needsFiltering: true, + allowLocalOnly: isTruthy(params.allow_local_only), + }, + }); - break; - case 'public:allow_local_only': - resolve({ - channelIds: ['timeline:public'], - options: { needsFiltering: true, allowLocalOnly: true }, - }); + break; + case 'public:allow_local_only': + resolve({ + channelIds: ['timeline:public'], + options: { needsFiltering: true, allowLocalOnly: true }, + }); - break; - case 'public:local': - resolve({ - channelIds: ['timeline:public:local'], - options: { needsFiltering: true, allowLocalOnly: true }, - }); + break; + case 'public:local': + resolve({ + channelIds: ['timeline:public:local'], + options: { needsFiltering: true, allowLocalOnly: true }, + }); - break; - case 'public:remote': - resolve({ - channelIds: ['timeline:public:remote'], - options: { needsFiltering: true, allowLocalOnly: false }, - }); + break; + case 'public:remote': + resolve({ + channelIds: ['timeline:public:remote'], + options: { needsFiltering: true, allowLocalOnly: false }, + }); - break; - case 'public:media': - resolve({ - channelIds: ['timeline:public:media'], - options: { needsFiltering: true, allowLocalOnly: isTruthy(params.allow_local_only) }, - }); + break; + case 'public:media': + resolve({ + channelIds: ['timeline:public:media'], + options: { + needsFiltering: true, + allowLocalOnly: isTruthy(params.allow_local_only), + }, + }); - break; - case 'public:allow_local_only:media': - resolve({ - channelIds: ['timeline:public:media'], - options: { needsFiltering: true, allowLocalOnly: true }, - }); + break; + case 'public:allow_local_only:media': + resolve({ + channelIds: ['timeline:public:media'], + options: { needsFiltering: true, allowLocalOnly: true }, + }); - break; - case 'public:local:media': - resolve({ - channelIds: ['timeline:public:local:media'], - options: { needsFiltering: true, allowLocalOnly: true }, - }); + break; + case 'public:local:media': + resolve({ + channelIds: ['timeline:public:local:media'], + options: { needsFiltering: true, allowLocalOnly: true }, + }); - break; - case 'public:remote:media': - resolve({ - channelIds: ['timeline:public:remote:media'], - options: { needsFiltering: true, allowLocalOnly: false }, - }); + break; + case 'public:remote:media': + resolve({ + channelIds: ['timeline:public:remote:media'], + options: { needsFiltering: true, allowLocalOnly: false }, + }); - break; - case 'direct': - resolve({ - channelIds: [`timeline:direct:${req.accountId}`], - options: { needsFiltering: false, allowLocalOnly: true }, - }); + break; + case 'direct': + resolve({ + channelIds: [`timeline:direct:${req.accountId}`], + options: { needsFiltering: false, allowLocalOnly: true }, + }); - break; - case 'hashtag': - if (!params.tag) { - reject(new RequestError('Missing tag name parameter')); - } else { - resolve({ - channelIds: [`timeline:hashtag:${normalizeHashtag(params.tag)}`], - options: { needsFiltering: true, allowLocalOnly: true }, - }); + break; + case 'hashtag': + if (!params.tag) { + reject(new RequestError('Missing tag name parameter')); + } else { + resolve({ + channelIds: [`timeline:hashtag:${normalizeHashtag(params.tag)}`], + options: { needsFiltering: true, allowLocalOnly: true }, + }); + } + + break; + case 'hashtag:local': + if (!params.tag) { + reject(new RequestError('Missing tag name parameter')); + } else { + resolve({ + channelIds: [ + `timeline:hashtag:${normalizeHashtag(params.tag)}:local`, + ], + options: { needsFiltering: true, allowLocalOnly: true }, + }); + } + + break; + case 'list': + if (!params.list) { + reject(new RequestError('Missing list name parameter')); + return; + } + + authorizeListAccess(params.list, req) + .then(() => { + resolve({ + channelIds: [`timeline:list:${params.list}`], + options: { needsFiltering: false, allowLocalOnly: true }, + }); + }) + .catch(() => { + reject( + new AuthenticationError('Not authorized to stream this list'), + ); + }); + + break; + default: + reject(new RequestError('Unknown stream type')); } - - break; - case 'hashtag:local': - if (!params.tag) { - reject(new RequestError('Missing tag name parameter')); - } else { - resolve({ - channelIds: [`timeline:hashtag:${normalizeHashtag(params.tag)}:local`], - options: { needsFiltering: true, allowLocalOnly: true }, - }); - } - - break; - case 'list': - if (!params.list) { - reject(new RequestError('Missing list name parameter')); - return; - } - - authorizeListAccess(params.list, req).then(() => { - resolve({ - channelIds: [`timeline:list:${params.list}`], - options: { needsFiltering: false, allowLocalOnly: true }, - }); - }).catch(() => { - reject(new AuthenticationError('Not authorized to stream this list')); - }); - - break; - default: - reject(new RequestError('Unknown stream type')); - } - }); + }); /** * @param {string} channelName @@ -1131,7 +1298,10 @@ const startServer = async () => { const streamNameFromChannelName = (channelName, params) => { if (channelName === 'list' && params.list) { return [channelName, params.list]; - } else if (['hashtag', 'hashtag:local'].includes(channelName) && params.tag) { + } else if ( + ['hashtag', 'hashtag:local'].includes(channelName) && + params.tag + ) { return [channelName, params.tag]; } else { return [channelName]; @@ -1152,46 +1322,70 @@ const startServer = async () => { * @param {StreamParams} params * @returns {void} */ - const subscribeWebsocketToChannel = ({ websocket, request, logger, subscriptions }, channelName, params) => { - checkScopes(request, logger, channelName).then(() => channelNameToIds(request, channelName, params)).then(({ - channelIds, - options, - }) => { - if (subscriptions[channelIds.join(';')]) { - return; - } + const subscribeWebsocketToChannel = ( + { websocket, request, logger, subscriptions }, + channelName, + params, + ) => { + checkScopes(request, logger, channelName) + .then(() => channelNameToIds(request, channelName, params)) + .then(({ channelIds, options }) => { + if (subscriptions[channelIds.join(';')]) { + return; + } - const onSend = streamToWs(request, websocket, streamNameFromChannelName(channelName, params)); - const stopHeartbeat = subscriptionHeartbeat(channelIds); - const listener = streamFrom(channelIds, request, logger, onSend, undefined, 'websocket', options.needsFiltering, options.allowLocalOnly); + const onSend = streamToWs( + request, + websocket, + streamNameFromChannelName(channelName, params), + ); + const stopHeartbeat = subscriptionHeartbeat(channelIds); + const listener = streamFrom( + channelIds, + request, + logger, + onSend, + undefined, + 'websocket', + options.needsFiltering, + options.allowLocalOnly, + ); - metrics.connectedChannels.labels({ type: 'websocket', channel: channelName }).inc(); + metrics.connectedChannels + .labels({ type: 'websocket', channel: channelName }) + .inc(); - subscriptions[channelIds.join(';')] = { - channelName, - listener, - stopHeartbeat, - }; - }).catch(err => { - const {statusCode, errorMessage } = extractErrorStatusAndMessage(err); + subscriptions[channelIds.join(';')] = { + channelName, + listener, + stopHeartbeat, + }; + }) + .catch((err) => { + const { statusCode, errorMessage } = extractErrorStatusAndMessage(err); - logger.error({ err }, 'Websocket subscription error'); + logger.error({ err }, 'Websocket subscription error'); - // If we have a socket that is alive and open still, send the error back to the client: - if (websocket.isAlive && websocket.readyState === websocket.OPEN) { - websocket.send(JSON.stringify({ - error: errorMessage, - status: statusCode - })); - } - }); + // If we have a socket that is alive and open still, send the error back to the client: + if (websocket.isAlive && websocket.readyState === websocket.OPEN) { + websocket.send( + JSON.stringify({ + error: errorMessage, + status: statusCode, + }), + ); + } + }); }; /** * @param {WebSocketSession} session * @param {string[]} channelIds */ - const removeSubscription = ({ request, logger, subscriptions }, channelIds) => { + const removeSubscription = ( + { request, logger, subscriptions }, + channelIds, + ) => { logger.info({ channelIds, accountId: request.accountId }, `Ending stream`); const subscription = subscriptions[channelIds.join(';')]; @@ -1200,11 +1394,13 @@ const startServer = async () => { return; } - channelIds.forEach(channelId => { + channelIds.forEach((channelId) => { unsubscribe(channelId, subscription.listener); }); - metrics.connectedChannels.labels({ type: 'websocket', channel: subscription.channelName }).dec(); + metrics.connectedChannels + .labels({ type: 'websocket', channel: subscription.channelName }) + .dec(); subscription.stopHeartbeat(); delete subscriptions[channelIds.join(';')]; @@ -1219,23 +1415,31 @@ const startServer = async () => { const unsubscribeWebsocketFromChannel = (session, channelName, params) => { const { websocket, request, logger } = session; - channelNameToIds(request, channelName, params).then(({ channelIds }) => { - removeSubscription(session, channelIds); - }).catch(err => { - logger.error({err}, 'Websocket unsubscribe error'); + channelNameToIds(request, channelName, params) + .then(({ channelIds }) => { + removeSubscription(session, channelIds); + }) + .catch((err) => { + logger.error({ err }, 'Websocket unsubscribe error'); - // If we have a socket that is alive and open still, send the error back to the client: - if (websocket.isAlive && websocket.readyState === websocket.OPEN) { - // TODO: Use a better error response here - websocket.send(JSON.stringify({ error: "Error unsubscribing from channel" })); - } - }); + // If we have a socket that is alive and open still, send the error back to the client: + if (websocket.isAlive && websocket.readyState === websocket.OPEN) { + // TODO: Use a better error response here + websocket.send( + JSON.stringify({ error: 'Error unsubscribing from channel' }), + ); + } + }); }; /** * @param {WebSocketSession} session */ - const subscribeWebsocketToSystemChannel = ({ websocket, request, subscriptions }) => { + const subscribeWebsocketToSystemChannel = ({ + websocket, + request, + subscriptions, + }) => { const accessTokenChannelId = `timeline:access_token:${request.accessTokenId}`; const systemChannelId = `timeline:system:${request.accountId}`; @@ -1251,18 +1455,18 @@ const startServer = async () => { subscriptions[accessTokenChannelId] = { channelName: 'system', listener, - stopHeartbeat: () => { - }, + stopHeartbeat: () => {}, }; subscriptions[systemChannelId] = { channelName: 'system', listener, - stopHeartbeat: () => { - }, + stopHeartbeat: () => {}, }; - metrics.connectedChannels.labels({ type: 'websocket', channel: 'system' }).inc(2); + metrics.connectedChannels + .labels({ type: 'websocket', channel: 'system' }) + .inc(2); }; /** @@ -1295,7 +1499,7 @@ const startServer = async () => { ws.on('close', function onWebsocketClose() { const subscriptions = Object.keys(session.subscriptions); - subscriptions.forEach(channelIds => { + subscriptions.forEach((channelIds) => { removeSubscription(session, channelIds.split(';')); }); @@ -1322,7 +1526,10 @@ const startServer = async () => { ws.on('message', (data, isBinary) => { if (isBinary) { log.warn('Received binary data, closing connection'); - ws.close(1003, 'The mastodon streaming server does not support binary messages'); + ws.close( + 1003, + 'The mastodon streaming server does not support binary messages', + ); return; } const message = data.toString('utf8'); @@ -1348,14 +1555,18 @@ const startServer = async () => { const location = req.url && url.parse(req.url, true); if (location && location.query.stream) { - subscribeWebsocketToChannel(session, firstParam(location.query.stream), location.query); + subscribeWebsocketToChannel( + session, + firstParam(location.query.stream), + location.query, + ); } } wss.on('connection', onConnection); setInterval(() => { - wss.clients.forEach(ws => { + wss.clients.forEach((ws) => { // @ts-ignore if (ws.isAlive === false) { ws.terminate(); @@ -1368,7 +1579,7 @@ const startServer = async () => { }); }, 30000); - attachServerWithConfig(server, address => { + attachServerWithConfig(server, (address) => { logger.info(`Streaming API now listening on ${address}`); });