Merge commit '4896d2c4c6d3bd6b878c5a075b6611c65d4203b2' into glitch-soc/merge-upstream

Conflicts:
- `app/views/settings/preferences/appearance/show.html.haml`:
  Upstream changed stuff too close to glitch-soc's theming system changes.
  Applied upstream's changes.
- `streaming/index.js`:
  Upstream refactored a bunch of stuff where our code was different due to
  local-only posts.
  Applied upstream's changes while taking care of local-only posts.
This commit is contained in:
Claire
2025-10-28 22:10:12 +01:00
136 changed files with 1041 additions and 1409 deletions

View File

@@ -19,6 +19,7 @@ import * as Redis from './redis.js';
import { isTruthy, normalizeHashtag, firstParam } from './utils.js';
const environment = process.env.NODE_ENV || 'development';
const PERMISSION_VIEW_FEEDS = 0x0000000000100000;
// Correctly detect and load .env or .env.production file based on environment:
const dotenvFile = environment === 'production' ? '.env.production' : '.env';
@@ -44,6 +45,18 @@ initializeLogLevel(process.env, environment);
* @property {string[]} scopes
* @property {string} accountId
* @property {string[]} chosenLanguages
* @property {number} permissions
*/
/**
* @typedef {http.IncomingMessage & ResolvedAccount & {
* path: string
* query: Record<string, unknown>
* remoteAddress?: string
* cachedFilters: unknown
* scopes: string[]
* necessaryScopes: string[]
* }} Request
*/
@@ -52,8 +65,8 @@ initializeLogLevel(process.env, environment);
* from redis and when receiving a message from a client over a websocket
* connection, this is why it accepts a `req` argument.
* @param {string} json
* @param {any?} req
* @returns {Object.<string, any>|null}
* @param {Request?} req
* @returns {Object.<string, unknown>|null}
*/
const parseJSON = (json, req) => {
try {
@@ -170,6 +183,7 @@ const startServer = async () => {
let resolvedAccount;
try {
// @ts-expect-error
resolvedAccount = await accountFromRequest(request);
} catch (err) {
// Unfortunately for using the on('upgrade') setup, we need to manually
@@ -220,7 +234,7 @@ const startServer = async () => {
});
/**
* @type {Object.<string, Array.<function(Object<string, any>): void>>}
* @type {Object.<string, Array.<function(Object<string, unknown>): void>>}
*/
const subs = {};
@@ -338,7 +352,7 @@ const startServer = async () => {
};
/**
* @param {http.IncomingMessage & ResolvedAccount} req
* @param {Request} req
* @param {string[]} necessaryScopes
* @returns {boolean}
*/
@@ -347,11 +361,11 @@ const startServer = async () => {
/**
* @param {string} token
* @param {any} req
* @param {Request} req
* @returns {Promise<ResolvedAccount>}
*/
const accountFromToken = async (token, req) => {
const result = await pgPool.query('SELECT oauth_access_tokens.id, oauth_access_tokens.resource_owner_id, users.account_id, users.chosen_languages, oauth_access_tokens.scopes FROM oauth_access_tokens INNER JOIN users ON oauth_access_tokens.resource_owner_id = users.id INNER JOIN accounts ON accounts.id = users.account_id WHERE oauth_access_tokens.token = $1 AND oauth_access_tokens.revoked_at IS NULL AND users.disabled IS FALSE AND accounts.suspended_at IS NULL LIMIT 1', [token]);
const result = await pgPool.query('SELECT oauth_access_tokens.id, oauth_access_tokens.resource_owner_id, users.account_id, users.chosen_languages, oauth_access_tokens.scopes, COALESCE(user_roles.permissions, 0) AS permissions FROM oauth_access_tokens INNER JOIN users ON oauth_access_tokens.resource_owner_id = users.id INNER JOIN accounts ON accounts.id = users.account_id LEFT OUTER JOIN user_roles ON user_roles.id = users.role_id WHERE oauth_access_tokens.token = $1 AND oauth_access_tokens.revoked_at IS NULL AND users.disabled IS FALSE AND accounts.suspended_at IS NULL LIMIT 1', [token]);
if (result.rows.length === 0) {
throw new AuthenticationError('Invalid access token');
@@ -367,17 +381,18 @@ const startServer = async () => {
scopes: result.rows[0].scopes.split(' '),
accountId: result.rows[0].account_id,
chosenLanguages: result.rows[0].chosen_languages,
permissions: result.rows[0].permissions,
};
};
/**
* @param {any} req
* @param {Request} req
* @returns {Promise<ResolvedAccount>}
*/
const accountFromRequest = (req) => new Promise((resolve, reject) => {
const authorization = req.headers.authorization;
const location = url.parse(req.url, true);
const accessToken = location.query.access_token || req.headers['sec-websocket-protocol'];
const location = req.url ? url.parse(req.url, true) : undefined;
const accessToken = location?.query.access_token || req.headers['sec-websocket-protocol'];
if (!authorization && !accessToken) {
reject(new AuthenticationError('Missing access token'));
@@ -386,11 +401,12 @@ const startServer = async () => {
const token = authorization ? authorization.replace(/^Bearer /, '') : accessToken;
// @ts-expect-error
resolve(accountFromToken(token, req));
});
/**
* @param {any} req
* @param {Request} req
* @returns {string|undefined}
*/
const channelNameFromPath = req => {
@@ -422,7 +438,7 @@ const startServer = async () => {
};
/**
* @param {http.IncomingMessage & ResolvedAccount} req
* @param {Request} req
* @param {import('pino').Logger} logger
* @param {string|undefined} channelName
* @returns {Promise.<void>}
@@ -460,7 +476,7 @@ const startServer = async () => {
*/
/**
* @param {any} req
* @param {Request} req
* @param {SystemMessageHandlers} eventHandlers
* @returns {SubscriptionListener}
*/
@@ -485,7 +501,7 @@ const startServer = async () => {
};
/**
* @param {http.IncomingMessage & ResolvedAccount} req
* @param {Request} req
* @param {http.OutgoingMessage} res
*/
const subscribeHttpToSystemChannel = (req, res) => {
@@ -512,8 +528,8 @@ const startServer = async () => {
};
/**
* @param {any} req
* @param {any} res
* @param {Request} req
* @param {http.ServerResponse} res
* @param {function(Error=): void} next
*/
const authenticationMiddleware = (req, res, next) => {
@@ -542,8 +558,8 @@ const startServer = async () => {
/**
* @param {Error} err
* @param {any} req
* @param {any} res
* @param {Request} req
* @param {http.ServerResponse} res
* @param {function(Error=): void} next
*/
const errorMiddleware = (err, req, res, next) => {
@@ -561,16 +577,15 @@ const startServer = async () => {
};
/**
* @param {any[]} arr
* @param {string[]} arr
* @param {number=} shift
* @returns {string}
*/
// @ts-ignore
const placeholders = (arr, shift = 0) => arr.map((_, i) => `$${i + 1 + shift}`).join(', ');
/**
* @param {string} listId
* @param {any} req
* @param {Request} req
* @returns {Promise.<void>}
*/
const authorizeListAccess = async (listId, req) => {
@@ -583,18 +598,56 @@ const startServer = async () => {
}
};
/**
* @param {string} kind
* @param {ResolvedAccount} account
* @returns {Promise.<{ localAccess: boolean, remoteAccess: boolean }>}
*/
const getFeedAccessSettings = async (kind, account) => {
const access = { localAccess: true, remoteAccess: true };
if (account.permissions & PERMISSION_VIEW_FEEDS) {
return access;
}
let localAccessVar, remoteAccessVar;
if (kind === 'hashtag') {
localAccessVar = 'local_topic_feed_access';
remoteAccessVar = 'remote_topic_feed_access';
} else {
localAccessVar = 'local_live_feed_access';
remoteAccessVar = 'remote_live_feed_access';
}
const result = await pgPool.query('SELECT var, value FROM settings WHERE var IN ($1, $2)', [localAccessVar, remoteAccessVar]);
result.rows.forEach((row) => {
if (row.var === localAccessVar) {
access.localAccess = row.value !== "--- disabled\n";
} else {
access.remoteAccess = row.value !== "--- disabled\n";
}
});
return access;
};
/**
* @param {string[]} channelIds
* @param {http.IncomingMessage & ResolvedAccount} req
* @param {Request} req
* @param {import('pino').Logger} log
* @param {function(string, string): void} output
* @param {undefined | function(string[], SubscriptionListener): void} attachCloseHandler
* @param {'websocket' | 'eventsource'} destinationType
* @param {boolean=} needsFiltering
* @param {boolean=} allowLocalOnly
* @param {Object} options
* @param {boolean} options.needsFiltering
* @param {boolean=} options.filterLocal
* @param {boolean=} options.filterRemote
* @param {boolean=} options.allowLocalOnly
* @returns {SubscriptionListener}
*/
const streamFrom = (channelIds, req, log, output, attachCloseHandler, destinationType, needsFiltering = false, allowLocalOnly = false) => {
const streamFrom = (channelIds, req, log, output, attachCloseHandler, destinationType, { needsFiltering, filterLocal, filterRemote, allowLocalOnly } = { needsFiltering: false, filterLocal: false, filterRemote: false, allowLocalOnly: false }) => {
log.info({ channelIds }, `Starting stream`);
/**
@@ -641,6 +694,7 @@ const startServer = async () => {
// The channels that need filtering are determined in the function
// `channelNameToIds` defined below:
if (!needsFiltering || (event !== 'update' && event !== 'status.update')) {
// @ts-expect-error
transmit(event, payload);
return;
}
@@ -648,8 +702,16 @@ const startServer = async () => {
// The rest of the logic from here on in this function is to handle
// filtering of statuses:
const localPayload = payload.account.username === payload.account.acct;
if (localPayload ? filterLocal : filterRemote) {
log.debug(`Message ${payload.id} filtered by feed settings`);
return;
}
// Filter based on language:
// @ts-expect-error
if (Array.isArray(req.chosenLanguages) && req.chosenLanguages.indexOf(payload.language) === -1) {
// @ts-expect-error
log.debug(`Message ${payload.id} filtered by language (${payload.language})`);
return;
}
@@ -661,8 +723,9 @@ const startServer = async () => {
}
// Filter based on domain blocks, blocks, mutes, or custom filters:
// @ts-ignore
// @ts-expect-error
const targetAccountIds = [payload.account.id].concat(payload.mentions.map(item => item.id));
// @ts-expect-error
const accountDomain = payload.account.acct.split('@')[1];
// TODO: Move this logic out of the message handling loop
@@ -673,7 +736,7 @@ const startServer = async () => {
}
const queries = [
// @ts-ignore
// @ts-expect-error
client.query(`SELECT 1
FROM blocks
WHERE (account_id = $1 AND target_account_id IN (${placeholders(targetAccountIds, 2)}))
@@ -682,17 +745,19 @@ const startServer = async () => {
SELECT 1
FROM mutes
WHERE account_id = $1
AND target_account_id IN (${placeholders(targetAccountIds, 2)})`, [req.accountId, payload.account.id].concat(targetAccountIds)),
AND target_account_id IN (${placeholders(targetAccountIds, 2)})`, [req.accountId, payload.
// @ts-expect-error
account.id].concat(targetAccountIds)),
];
if (accountDomain) {
// @ts-ignore
// @ts-expect-error
queries.push(client.query('SELECT 1 FROM account_domain_blocks WHERE account_id = $1 AND domain = $2', [req.accountId, accountDomain]));
}
// @ts-ignore
// @ts-expect-error
if (!payload.filtered && !req.cachedFilters) {
// @ts-ignore
// @ts-expect-error
queries.push(client.query('SELECT filter.id AS id, filter.phrase AS title, filter.context AS context, filter.expires_at AS expires_at, filter.action AS filter_action, keyword.keyword AS keyword, keyword.whole_word AS whole_word FROM custom_filter_keywords keyword JOIN custom_filters filter ON keyword.custom_filter_id = filter.id WHERE filter.account_id = $1 AND (filter.expires_at IS NULL OR filter.expires_at > NOW())', [req.accountId]));
}
@@ -701,6 +766,7 @@ const startServer = async () => {
// Handling blocks & mutes and domain blocks: If one of those applies,
// then we don't transmit the payload of the event to the client
// @ts-expect-error
if (values[0].rows.length > 0 || (accountDomain && values[1].rows.length > 0)) {
return;
}
@@ -717,9 +783,9 @@ const startServer = async () => {
// TODO: Move this logic out of the message handling lifecycle
// @ts-ignore
if (!req.cachedFilters) {
// @ts-expect-error
const filterRows = values[accountDomain ? 2 : 1].rows;
// @ts-ignore
req.cachedFilters = filterRows.reduce((cache, filter) => {
if (cache[filter.id]) {
cache[filter.id].keywords.push([filter.keyword, filter.whole_word]);
@@ -749,9 +815,9 @@ const startServer = async () => {
// needs to be done in a separate loop as the database returns one
// filterRow per keyword, so we need all the keywords before
// constructing the regular expression
// @ts-ignore
// @ts-expect-error
Object.keys(req.cachedFilters).forEach((key) => {
// @ts-ignore
// @ts-expect-error
req.cachedFilters[key].regexp = new RegExp(req.cachedFilters[key].keywords.map(([keyword, whole_word]) => {
let expr = keyword.replace(/[.*+?^${}()|[\]\\]/g, '\\$&');
@@ -772,16 +838,14 @@ const startServer = async () => {
// Apply cachedFilters against the payload, constructing a
// `filter_results` array of FilterResult entities
// @ts-ignore
if (req.cachedFilters) {
const status = payload;
// TODO: Calculate searchableContent in Ruby on Rails:
// @ts-ignore
// @ts-expect-error
const searchableContent = ([status.spoiler_text || '', status.content].concat((status.poll && status.poll.options) ? status.poll.options.map(option => option.title) : [])).concat(status.media_attachments.map(att => att.description)).join('\n\n').replace(/<br\s*\/?>/g, '\n').replace(/<\/p><p>/g, '\n\n');
const searchableTextContent = JSDOM.fragment(searchableContent).textContent;
const now = new Date();
// @ts-ignore
const filter_results = Object.values(req.cachedFilters).reduce((results, cachedFilter) => {
// Check the filter hasn't expired before applying:
if (cachedFilter.expires_at !== null && cachedFilter.expires_at < now) {
@@ -841,8 +905,8 @@ const startServer = async () => {
};
/**
* @param {any} req
* @param {any} res
* @param {Request} req
* @param {http.ServerResponse} res
* @returns {function(string, string): void}
*/
const streamToHttp = (req, res) => {
@@ -884,7 +948,7 @@ const startServer = async () => {
};
/**
* @param {any} req
* @param {Request} req
* @param {function(): void} [closeHandler]
* @returns {function(string[], SubscriptionListener): void}
*/
@@ -934,10 +998,13 @@ const startServer = async () => {
app.use(api);
// @ts-expect-error
api.use(authenticationMiddleware);
// @ts-expect-error
api.use(errorMiddleware);
api.get('/api/v1/streaming/*', (req, res) => {
// @ts-expect-error
const channelName = channelNameFromPath(req);
// FIXME: In theory we'd never actually reach here due to
@@ -948,12 +1015,15 @@ const startServer = async () => {
return;
}
// @ts-expect-error
channelNameToIds(req, channelName, req.query).then(({ channelIds, options }) => {
// @ts-expect-error
const onSend = streamToHttp(req, res);
// @ts-expect-error
const onEnd = streamHttpEnd(req, subscriptionHeartbeat(channelIds));
// @ts-ignore
streamFrom(channelIds, req, req.log, onSend, onEnd, 'eventsource', options.needsFiltering, options.allowLocalOnly);
streamFrom(channelIds, req, req.log, onSend, onEnd, 'eventsource', options);
}).catch(err => {
const {statusCode, errorMessage } = extractErrorStatusAndMessage(err);
@@ -972,7 +1042,7 @@ const startServer = async () => {
*/
/**
* @param {any} req
* @param {Request} req
* @returns {string[]}
*/
const channelsForUserStream = req => {
@@ -986,12 +1056,28 @@ const startServer = async () => {
};
/**
* @param {any} req
* @param {Request} req
* @param {string} name
* @param {StreamParams} params
* @returns {Promise.<{ channelIds: string[], options: { needsFiltering: boolean } }>}
* @returns {Promise.<{ channelIds: string[], options: { needsFiltering: boolean, filterLocal?: boolean, filterRemote?: boolean, allowLocalOnly?: boolean } }>}
*/
const channelNameToIds = (req, name, params) => new Promise((resolve, reject) => {
/**
* @param {string} feedKind
* @param {string} channelId
* @param {{ needsFiltering: boolean, allowLocalOnly: boolean }} options
*/
const resolveFeed = (feedKind, channelId, options) => {
getFeedAccessSettings(feedKind, req).then(({ localAccess, remoteAccess }) => {
resolve({
channelIds: [channelId],
options: { ...options, filterLocal: !localAccess, filterRemote: !remoteAccess },
});
}).catch(() => {
reject(new Error('Error getting feed access settings'));
});
};
switch (name) {
case 'user':
resolve({
@@ -1008,60 +1094,28 @@ const startServer = async () => {
break;
case 'public':
resolve({
channelIds: ['timeline:public'],
options: { needsFiltering: true, allowLocalOnly: isTruthy(params.allow_local_only) },
});
resolveFeed('public', 'timeline:public', { needsFiltering: true, allowLocalOnly: isTruthy(params.allow_local_only) });
break;
case 'public:allow_local_only':
resolve({
channelIds: ['timeline:public'],
options: { needsFiltering: true, allowLocalOnly: true },
});
resolveFeed('public', 'timeline:public', { needsFiltering: true, allowLocalOnly: true });
break;
case 'public:local':
resolve({
channelIds: ['timeline:public:local'],
options: { needsFiltering: true, allowLocalOnly: true },
});
resolveFeed('public', 'timeline:public:local', { needsFiltering: true, allowLocalOnly: true });
break;
case 'public:remote':
resolve({
channelIds: ['timeline:public:remote'],
options: { needsFiltering: true, allowLocalOnly: false },
});
resolveFeed('public', 'timeline:public:remote', { needsFiltering: true, allowLocalOnly: false });
break;
case 'public:media':
resolve({
channelIds: ['timeline:public:media'],
options: { needsFiltering: true, allowLocalOnly: isTruthy(params.allow_local_only) },
});
resolveFeed('public', 'timeline:public:media', { needsFiltering: true, allowLocalOnly: isTruthy(params.allow_local_only) });
break;
case 'public:allow_local_only:media':
resolve({
channelIds: ['timeline:public:media'],
options: { needsFiltering: true, allowLocalOnly: true },
});
resolveFeed('public', 'timeline:public:media', { needsFiltering: true, allowLocalOnly: true });
break;
case 'public:local:media':
resolve({
channelIds: ['timeline:public:local:media'],
options: { needsFiltering: true, allowLocalOnly: true },
});
resolveFeed('public', 'timeline:public:local:media', { needsFiltering: true, allowLocalOnly: true });
break;
case 'public:remote:media':
resolve({
channelIds: ['timeline:public:remote:media'],
options: { needsFiltering: true, allowLocalOnly: false },
});
resolveFeed('public', 'timeline:public:remote:media', { needsFiltering: true, allowLocalOnly: false });
break;
case 'direct':
resolve({
@@ -1073,24 +1127,20 @@ const startServer = async () => {
case 'hashtag':
if (!params.tag) {
reject(new RequestError('Missing tag name parameter'));
} else {
resolve({
channelIds: [`timeline:hashtag:${normalizeHashtag(params.tag)}`],
options: { needsFiltering: true, allowLocalOnly: true },
});
return;
}
resolveFeed('hashtag', `timeline:hashtag:${normalizeHashtag(params.tag)}`, { needsFiltering: true, allowLocalOnly: true });
break;
case 'hashtag:local':
if (!params.tag) {
reject(new RequestError('Missing tag name parameter'));
} else {
resolve({
channelIds: [`timeline:hashtag:${normalizeHashtag(params.tag)}:local`],
options: { needsFiltering: true, allowLocalOnly: true },
});
return;
}
resolveFeed('hashtag', `timeline:hashtag:${normalizeHashtag(params.tag)}:local`, { needsFiltering: true, allowLocalOnly: true });
break;
case 'list':
if (!params.list) {
@@ -1131,7 +1181,7 @@ const startServer = async () => {
/**
* @typedef WebSocketSession
* @property {import('ws').WebSocket & { isAlive: boolean}} websocket
* @property {http.IncomingMessage & ResolvedAccount} request
* @property {Request} request
* @property {import('pino').Logger} logger
* @property {Object.<string, { channelName: string, listener: SubscriptionListener, stopHeartbeat: function(): void }>} subscriptions
*/
@@ -1153,7 +1203,7 @@ const startServer = async () => {
const onSend = streamToWs(request, websocket, streamNameFromChannelName(channelName, params));
const stopHeartbeat = subscriptionHeartbeat(channelIds);
const listener = streamFrom(channelIds, request, logger, onSend, undefined, 'websocket', options.needsFiltering, options.allowLocalOnly);
const listener = streamFrom(channelIds, request, logger, onSend, undefined, 'websocket', options);
metrics.connectedChannels.labels({ type: 'websocket', channel: channelName }).inc();
@@ -1257,7 +1307,7 @@ const startServer = async () => {
/**
* @param {import('ws').WebSocket & { isAlive: boolean }} ws
* @param {http.IncomingMessage & ResolvedAccount} req
* @param {Request} req
* @param {import('pino').Logger} log
*/
function onConnection(ws, req, log) {
@@ -1324,9 +1374,19 @@ const startServer = async () => {
const { type, stream, ...params } = json;
if (type === 'subscribe') {
subscribeWebsocketToChannel(session, firstParam(stream), params);
subscribeWebsocketToChannel(
session,
// @ts-expect-error
firstParam(stream),
params
);
} else if (type === 'unsubscribe') {
unsubscribeWebsocketFromChannel(session, firstParam(stream), params);
unsubscribeWebsocketFromChannel(
session,
// @ts-expect-error
firstParam(stream),
params
);
} else {
// Unknown action type
}
@@ -1346,13 +1406,13 @@ const startServer = async () => {
setInterval(() => {
wss.clients.forEach(ws => {
// @ts-ignore
// @ts-expect-error
if (ws.isAlive === false) {
ws.terminate();
return;
}
// @ts-ignore
// @ts-expect-error
ws.isAlive = false;
ws.ping('', false);
});
@@ -1382,14 +1442,16 @@ const startServer = async () => {
};
/**
* @param {any} server
* @param {http.Server} server
* @param {function(string): void} [onSuccess]
*/
const attachServerWithConfig = (server, onSuccess) => {
if (process.env.SOCKET) {
server.listen(process.env.SOCKET, () => {
if (onSuccess) {
// @ts-expect-error
fs.chmodSync(server.address(), 0o666);
// @ts-expect-error
onSuccess(server.address());
}
});
@@ -1404,6 +1466,7 @@ const attachServerWithConfig = (server, onSuccess) => {
server.listen(port, bind, () => {
if (onSuccess) {
// @ts-expect-error
onSuccess(`${server.address().address}:${server.address().port}`);
}
});