Source

Media.js

const { MediaStreamTrack } = require("msc-node");
const EventEmitter = require("events");
const fs = require("fs");
const ffmpeg = require("ffmpeg-static");
const prism = require("prism-media");

/**
 * @class
 * @classdesc Basic class to process audio streams
 */
class Media {
  /**
   * @description Init the media object
   *
   * @param {boolean} logs=false Wether or not to output logs
   * @param {number} port=5030 A ffmpeg rtp port that this instance will be using.
   * @param {PacketHandler} packetHandler=(packet)=>{this.track.writeRtp(packet);} The function that determines how audio packets are handled.
   *
   * @return {Media} The new Media object instance
   */
  constructor(logs=false, port=5030, packetHandler=(packet)=>{this.track.writeRtp(packet);}, inputFormat="") {
    this.track = new MediaStreamTrack({ kind: "audio" });
    this.socket = require("dgram").createSocket("udp4");
    this.socket.bind(port);

    this.socket.on("message", (packet) => {
      packetHandler(packet); // defined in constructor params
    })

    this.inputFormat = inputFormat.trim() + " ";
    this.port = port;
    this.logs = logs;
    this.playing = false;
    this.isMedia = true;
    this.readAtNative = true;

    this.ffmpeg = require("child_process").spawn(ffmpeg, this.ffmpegArgs(port));
    if (logs) {
      this.ffmpeg.stdout.on("data", (data) => {
        console.log(Buffer.from(data).toString());
      })
      this.ffmpeg.stderr.on("data", (data) => {
        console.log(Buffer.from(data).toString());
      });
    }

    return this;
  }
  on(event, cb) {
    return "Unimplemented";
  }
  once(event, cb) {
    return "Unimplemented";
  }

  ffmpegArgs(port) {
    return (((this.readAtNative) ? "-re " : "") + this.inputFormat + "-i - -reconnect 1 -reconnect_streamed 1 -reconnect_delay_max 4 -map 0:a -b:a 48k -maxrate 48k -acodec libopus -ar 48000 -ac 2 -f rtp rtp://127.0.0.1:" + port).split(" ");
  }
  /**
   * Returns an array of arguments that can be passed to ffmpeg
   *
   * @param  {string} start="00:00:00" The position in the audio to start the conversion.
   * @return {Array<string>}           The arguments.
   */
  createFfmpegArgs(start="00:00:00") {
    return this.ffmpegArgs(this.port);
  }
  /**
   * @description Returns the current mediasoup media track
   *
   * @return {MediaStreamTrack}  The mediasoup MediaStreamTrack
   */
  getMediaTrack() {
    return this.track;
  }
  /**
   * Load and process an audio file
   *
   * @param  {string} path The file path of the file
   * @return {void}
   */
  playFile(path) {
    if (!path) throw "You must specify a file to play!";
    const stream = fs.createReadStream(path);
    this.playStream(stream);
  }
  /**
   * Writes a chunk of data into the ffmpeg process.
   *
   * @param  {object} chunk The datachunk to write.
   * @return {void}
   */
  writeStreamChunk(chunk) {
    if (!chunk) throw "You must pass a chunk to be written into the stream";
    this.ffmpeg.stdin.write(chunk);
  }
  /**
   * Pipe a ReadStream into the ffmpeg process.
   *
   * @param  {ReadableStream} stream The stream to pipe.
   * @return {void}
   */
  playStream(stream) {
    if (!stream) throw "You must specify a stream to play!";
    stream.pipe(this.ffmpeg.stdin);
  }
  /**
   * Kill the ffmpeg instance and close the socket.
   *
   * @return {Promise<void>} A promise resolving when the udp4 socket closed.
   */
  destroy() {
    return new Promise((res, _rej) => {
      this.track = null;
      this.ffmpeg.kill();
      this.socket.close(res);
    });
  }
}

/**
 * @class
 * @augments Media
 * @description An advanced version of the Media class. It also includes media controls like pausing.
 *
 * @property {number} seconds - The amount of seconds passed during playback. Extracted from ffmpeg
 * @property {string} currTimestamp - The current timestamp as given by ffmpeg. "hh:mm:ss"
 */
class MediaPlayer extends Media {
  /**
   * @description Initiates the MediaPlayer instance.
   *
   * @param  {boolean} logs=false Wether or not to print logs to the console or not.
   * @param  {number} port=5030  The port this instance should use.
   * @param  {string} iFormat="" Optional arguments that specify the input format that are passed to ffmpeg
   * @return {MediaPlayer}            The new instance.
   */
  constructor(logs=false, port=5030) {
    super(logs, port, (packet) => {
      if (packet == "FINISHPACKET") return this.finished();
      this.track.writeRtp(packet);
    }, "-f s16le -ar 48000 -ac 2");
    this.isMediaPlayer = true;

    this.emitter = new EventEmitter();

    this.currTime = null;
    this.logs = logs;
    this.started = false;
    this.packets = [];
    this.intervals = [];
    this.lastPacket = null;
    this.paused = false;
    this.ffmpegKilled = false;
    this.ready = true;
    this.volCache = null;

    this.seconds = 0;
    this.currTimestamp = "00:00:00";

    this.volumeTransformer = new prism.VolumeTransformer({ type: "s16le", volume: 1 });
    this.volumeTransformer.pipe(this.ffmpeg.stdin);

    return this;
  }
  on(event, cb) {
    return this.emitter.on(event, cb);
  }
  once(event, cb) {
    return this.emitter.once(event, cb);
  }
  emit(event, data) {
    return this.emitter.emit(event, data);
  }

  /**
   * setReadNative
   * @description Change if ffmpeg should read the input at its native frame rate (-re flag). Set this to `false` if your input data is already at native frame rate to prevent packet loss.
   *
   * @param  {boolean} bool=true true: read at native frame rate; false: process input as fast as possible
   * @return {void}
   */
  setReadNative(bool=true) {
    this.readAtNative = bool;
  }

  static timestampToSeconds(timestamp="00:00:00", ceilMinutes=false) {
    timestamp = timestamp.split(":").map((el, index) => {
      if (index < 2) {
        return parseInt(el);
      } else {
        return ((ceilMinutes) ? Math.ceil(parseFloat(el)) : parseFloat(el));
      }
    });
    const hours = timestamp[0];
    const minutes = timestamp[1];
    const currSeconds = timestamp[2];
    return (hours * 60 * 60) + (minutes * 60) + currSeconds; // convert everything to seconds
  }

  /**
   * @description Saves a data packet temporarily
   *
   * @param  {object} packet The packet to store.
   * @return {void}
   */
  _save(packet) {
    let time = Date.now();
    if (!this.lastPacket) this.lastPacket = time;
    this.intervals.push(time - this.lastPacket);
    this.lastPacket = time + 2;
    this.packets.push(packet);
  }
  /**
   * @description Start writing the data from the temporal storage to the media track. Recursive, will stop when the storage is empty.
   *
   * @return {void}
   */
  _write() {
    if (this.playbackPaused) return this.writing = false;
    if (this.packets.length == 0) { this.paused = false; return this.writing = false;}
    this.writing = true;
    let interval = this.intervals.shift();
    let packet = this.packets.shift();
    setTimeout(() => {
      if (packet == "FINISHPACKET") {
        this.finished();
        return this._write();
      }
      this.writePacket(packet);
      this._write();
    }, interval);
  }
  writePacket(packet) {
    this.volumeTransformer.write(packet);
  }
  /**
   * @description Cleans up this instance. Should be called when the bot is leaving.
   *
   * @param  {boolean} destroy=true Wether or not to replace the mediatrack
   * @param  {boolean} f=true       Wether or not to respawn the ffmpeg instance.
   * @return {void}
   */
  async disconnect(destroy=true, f=true) { // this should be called on leave
    if (destroy) this.track = new MediaStreamTrack({ kind: "audio" }); // clean up the current data and streams
    this.paused = false;
    if (f) {
      this.ready = false;
      // prevent EPIPE errors
      this.originStream.unpipe(this.fpcm.stdin);
      this.volCache = this.volumeTransformer.volume;
      this.volumeTransformer.unpipe(this.ffmpeg.stdin);
      this.volumeTransformer.destroy();

      this.ffmpegKilled = true;
      // childProcess.exitCode indicates the exit code of the process; returns `null` if the process is still running
      if (!this.ffmpeg.exitCode) this.ffmpeg.kill();
      if (!this.fpcm.exitCode) this.fpcm.kill();
    }
    this.currTime = "00:00:00";

    this.packets = [];
    this.intervals = [];
    this.started = false
  }
  /**
   * @description Destroys all streams and frees the port.
   *
   * @return {Promise<void>} A promise that resolves when everything is finished.
   */
  destroy() {
    return Promise.all([
      super.destroy(),
      new Promise((res) => {
        this.packets = [];
        this.intervals = [];
        this.originStream.destroy();
        res();
      })
    ]);
  }
  /**
   * @description Function that is called when the ffmpeg stream finishes.
   *
   * @return {void}
   */
  finished() {
    this.playing = false;
    this.paused = false;
    this.ready = false;
    this.ffmpeg.kill();
    this.disconnect();
    this.emit("finish");
  }
  /**
   * @description Pause the current playback
   *
   * @return {void}
   */
  pause() {
    if (this.playbackPaused) return;
    this.playbackPaused = true;
    this.paused = true; // re-route packets from ffmpeg to the temporary buffers
    this.emit("pause");
  }
  /**
   * @description Resume the current playback.
   *
   * @return {void}
   */
  resume() {
    if (!this.playbackPaused) return;
    this.playbackPaused = false;
    this.emit("start");
    this._write();
  }
  /**
   * @description Set the volume of the current playback
   *
   * @param  {number} v=1 The new volume. 0 = nothing, 0.5 = half, 1 = default; Stay in between 0 and 1 to prevent bad music quality
   * @return {void}
   */
  setVolume(v=1) {
    return this.volumeTransformer.setVolume(v);
  }
  /**
   * @description Stop the playback.
   *
   * @return {Promise<void>} Resolves when all is cleaned up.
   */
  stop() {
    return new Promise(async (res) => {
      this.ffmpegKilled = true;

      // prevent EPIPE errors
      this.originStream.unpipe(this.fpcm.stdin);
      this.volCache = this.volumeTransformer.volume;
      this.volumeTransformer.unpipe(this.ffmpeg.stdin);
      this.volumeTransformer.destroy();

      if (!this.fpcm.exitCode) this.fpcm.kill();
      this.pcm.destroy();
      if (!this.ffmpeg.exitCode) this.ffmpeg.kill();
      this.ready = false;
      await this.sleep(1000);
      this.paused = false;
      this.playbackPaused = false;

      this.packets = [];
      this.intervals = [];
      this.started = false;
      this.track = new MediaStreamTrack({ kind: "audio" });
      this.emit("finish");
      res();
    });
  }
  sleep(ms) {
    return new Promise((res) => {
      setTimeout(res, ms);
    });
  }
  get streamTrack() {
    if (!this.track) this.track = new MediaStreamTrack({ kind: "audio" });
    return this.getMediaTrack();
  }
  set streamTrack(t) {
    console.log("This should not be done.", t);
  }
  set transport(t) {
    this.sendTransport = t;
  }
  get transport() {
    return this.sendTransport;
  }
  processPacket(packet) {
    if (!this.started) {
      this.started = true;
      this.emit("start");
    }
    if (this.paused) {
      return this._save(packet);
    }
    if (packet == "FINISHPACKET") return this.finished();
    this.writePacket(packet);
  };

  /**
   * @description Play an audio read stream to the media track.
   * @override
   *
   * @param  {ReadableStream} stream The stream to play.
   * @return {void}
   */
  async playStream(stream) {
    if (this.sendTransport) this.producer = await this.sendTransport.produce({ track: this.track, appData: { type: "audio" } });
    this.emit("buffer", this.producer);
    this.started = false;
    this.streamFinished = false;
    this.originStream = stream;
    this.originStream.on("end", () => {
      this.streamFinished = true;
    });

    if (!this.ready) {
      this.ffmpeg = require("child_process").spawn(ffmpeg, [ // set up new ffmpeg instance
        ...this.createFfmpegArgs()
      ]);
      //await this.sleep(1000);

      this.volumeTransformer = new prism.VolumeTransformer({ type: "s16le", volume: this.volCache || 1 });
      this.volumeTransformer.pipe(this.ffmpeg.stdin);
      this.ready = true;
    }

    const fpcm = require("child_process").spawn(ffmpeg, [
      ((this.readAtNative) ? "-re" : ""), "-i", "-",
      "-reconnect", "1",
      "-reconnect_streamed", "1",
      "-reconnect_delay_max", "4",
      "-analyzeduration", "0",
      "-loglevel", "0",
      "-f", "s16le",
      "-ar", "48000",
      "-ac", "2",
      "-"
    ]);
    const pcm = fpcm.stdout;
    this.fpcm = fpcm;
    this.pcm = pcm;
    this.ffmpegKilled = false;

    pcm.on("data", (c)=>this.processPacket(c));
    pcm.once("data", () => this.emit("startPlay"));

    // ffmpeg stuff
    this.#setupFmpeg();

    this.originStream.pipe(fpcm.stdin); // start playing
  }
  async #ffmpegFinished() {
    await this.sleep(1000); // prevent bug with no music after 3rd song
    this.processPacket("FINISHPACKET");
    this.originStream.destroy();
    this.currTime = "00:00:00";
  }
  #setupFmpeg() {
    this.seconds = 0;
    this.currTimestamp = "00:00:00";

    this.fpcm.on("exit", async (_c, s) => {
      if (s == "SIGTERM" || this.ffmpegKilled) return this.ffmpegKilled = false; // killed intentionally
      this.#ffmpegFinished();
    });
    this.ffmpeg.stdin.on("error", (e) => {
      if (e.code == "EPIPE") return;
      console.log("Ffmpeg error: ", e);
    });
    this.ffmpeg.stderr.on("data", (chunk) => {
      const output = Buffer.from(chunk).toString();
      if (this.logs) console.log("err", output);

      if (!output.includes("time=")) return;
      var time = output.slice(output.indexOf("time=") + "time=".length, output.indexOf("time=") + "time=".length + 11)
      this.currTimestamp = time;
      time = MediaPlayer.timestampToSeconds(time);
      this.seconds = time;
    });
    this.ffmpeg.stdout.on("data", (chunk) => {
      if (this.logs) console.log("OUT", Buffer.from(chunk.toString()));
    });
    this.ffmpeg.stdout.on("end", () => {
      if (this.logs) console.log("finished");
    });
  }
}

module.exports = { Media, MediaPlayer };