Change playing of a track to mark nowPlaying rather than scrobble, refactor/tidy up track streaming

This commit is contained in:
simojenki
2021-06-26 18:11:26 +10:00
parent cb987aeacb
commit eec3313587
7 changed files with 418 additions and 183 deletions

View File

@@ -168,6 +168,7 @@ export interface MusicLibrary {
range: string | undefined;
}): Promise<TrackStream>;
coverArt(id: string, type: "album" | "artist", size?: number): Promise<CoverArt | undefined>;
nowPlaying(id: string): Promise<boolean>
scrobble(id: string): Promise<boolean>
searchArtists(query: string): Promise<ArtistSummary[]>;
searchAlbums(query: string): Promise<AlbumSummary[]>;

View File

@@ -549,7 +549,10 @@ export class Navidrome implements MusicService {
.get(
credentials,
`/rest/stream`,
{ id: trackId, c: this.streamClientApplication(track) },
{
id: trackId,
c: this.streamClientApplication(track),
},
{
headers: pipe(
range,
@@ -628,6 +631,13 @@ export class Navidrome implements MusicService {
})
.then((_) => true)
.catch(() => false),
nowPlaying: async (id: string) =>
navidrome
.get(credentials, `/rest/scrobble`, {
id,
})
.then((_) => true)
.catch(() => false),
searchArtists: async (query: string) =>
navidrome
.search3(credentials, { query, artistCount: 20 })

View File

@@ -1,7 +1,10 @@
import { option as O } from "fp-ts";
import express, { Express } from "express";
import * as Eta from "eta";
import morgan from "morgan";
import { PassThrough, Transform, TransformCallback } from "stream";
import { Sonos, Service } from "./sonos";
import {
SOAP_PATH,
@@ -16,13 +19,51 @@ import bindSmapiSoapServiceToExpress from "./smapi";
import { AccessTokens, AccessTokenPerAuthToken } from "./access_tokens";
import logger from "./logger";
import { Clock, SystemClock } from "./clock";
import { pipe } from "fp-ts/lib/function";
export const BONOB_ACCESS_TOKEN_HEADER = "bonob-access-token";
interface RangeFilter extends Transform {
range: (length: number) => string;
}
export function rangeFilterFor(rangeHeader: string): RangeFilter {
// if (rangeHeader == undefined) return new PassThrough();
const match = rangeHeader.match(/^bytes=(\d+)-$/);
if (match) return new RangeBytesFromFilter(Number.parseInt(match[1]!));
else throw `Unsupported range: ${rangeHeader}`;
}
export class RangeBytesFromFilter extends Transform {
from: number;
count: number = 0;
constructor(f: number) {
super();
this.from = f;
}
_transform(chunk: any, _: BufferEncoding, next: TransformCallback) {
if (this.count + chunk.length <= this.from) {
// before start
next();
} else if (this.from <= this.count) {
// off the end
next(null, chunk);
} else {
// from somewhere in chunk
next(null, chunk.slice(this.from - this.count));
}
this.count = this.count + chunk.length;
}
range = (number: number) => `${this.from}-${number - 1}/${number}`;
}
function server(
sonos: Sonos,
service: Service,
webAddress: string | "http://localhost:4534",
webAddress: string,
musicService: MusicService,
linkCodes: LinkCodes = new InMemoryLinkCodes(),
accessTokens: AccessTokens = new AccessTokenPerAuthToken(),
@@ -139,58 +180,102 @@ function server(
</Presentation>`);
});
app.head("/stream/track/:id", async (req, res) => {
const id = req.params["id"]!;
const accessToken = req.headers[BONOB_ACCESS_TOKEN_HEADER] as string;
logger.info(`Stream HEAD requested for ${id}, accessToken=${accessToken}`)
const authToken = accessTokens.authTokenFor(accessToken);
if (!authToken) {
return res.status(401).send();
} else {
return musicService
.login(authToken)
.then((it) =>
it.stream({ trackId: id, range: req.headers["range"] || undefined })
)
.then((trackStream) => {
res.status(trackStream.status);
Object.entries(trackStream.headers)
.filter(([_, v]) => v !== undefined)
.forEach(([header, value]) => res.setHeader(header, value));
res.send();
});
}
});
app.get("/stream/track/:id", async (req, res) => {
const id = req.params["id"]!;
const accessToken = req.headers[BONOB_ACCESS_TOKEN_HEADER] as string;
logger.info(`Stream requested for ${id}, accessToken=${accessToken}`)
const authToken = accessTokens.authTokenFor(accessToken);
logger.info(
`-> /stream/track/${id}, headers=${JSON.stringify(req.headers)}`
);
const authToken = pipe(
req.header(BONOB_ACCESS_TOKEN_HEADER),
O.fromNullable,
O.map((accessToken) => accessTokens.authTokenFor(accessToken)),
O.getOrElseW(() => undefined)
);
if (!authToken) {
return res.status(401).send();
} else {
return musicService
.login(authToken)
.then((it) =>
it.scrobble(id).then((scrobbleSuccess) => {
if (scrobbleSuccess) logger.info(`Scrobbled ${id}`);
else logger.warn(`Failed to scrobble ${id}....`);
return it;
})
it
.stream({
trackId: id,
range: req.headers["range"] || undefined,
})
.then((stream) => ({ musicLibrary: it, stream }))
)
.then((it) =>
it.stream({ trackId: id, range: req.headers["range"] || undefined })
)
.then((trackStream) => {
logger.info(`Streaming ${id}, status=${trackStream.status}, headers=(${JSON.stringify(trackStream.headers)})`)
res.status(trackStream.status);
Object.entries(trackStream.headers)
.filter(([_, v]) => v !== undefined)
.forEach(([header, value]) => res.setHeader(header, value));
.then(({ musicLibrary, stream }) => {
logger.info(
`stream response from music service for ${id}, status=${
stream.status
}, headers=(${JSON.stringify(stream.headers)})`
);
trackStream.stream.pipe(res);
const respondWith = ({
status,
filter,
headers,
sendStream,
nowPlaying,
}: {
status: number;
filter: Transform;
headers: Record<string, string | undefined>;
sendStream: boolean;
nowPlaying: boolean;
}) => {
logger.info(
`<- /stream/track/${id}, status=${status}, headers=${JSON.stringify(
headers
)}`
);
(nowPlaying
? musicLibrary.nowPlaying(id)
: Promise.resolve(true)
).then((_) => {
res.status(status);
Object.entries(stream.headers)
.filter(([_, v]) => v !== undefined)
.forEach(([header, value]) => res.setHeader(header, value));
if (sendStream) stream.stream.pipe(filter).pipe(res);
else res.send();
});
};
if (stream.status == 200) {
respondWith({
status: 200,
filter: new PassThrough(),
headers: {
"content-type": stream.headers["content-type"],
"content-length": stream.headers["content-length"],
"accept-ranges": stream.headers["accept-ranges"],
},
sendStream: req.method == "GET",
nowPlaying: req.method == "GET",
});
} else if (stream.status == 206) {
respondWith({
status: 206,
filter: new PassThrough(),
headers: {
"content-type": stream.headers["content-type"],
"content-length": stream.headers["content-length"],
"content-range": stream.headers["content-range"],
"accept-ranges": stream.headers["accept-ranges"],
},
sendStream: req.method == "GET",
nowPlaying: req.method == "GET",
});
} else {
respondWith({
status: stream.status,
filter: new PassThrough(),
headers: {},
sendStream: req.method == "GET",
nowPlaying: false,
});
}
});
}
});