| 1234567891011121314151617181920212223242526272829 |
- import readline from "readline";
- import fs from "fs";
- import * as Rx from "rxjs";
- import { map, pluck, takeUntil } from "rxjs/operators";
- type LogLine = {
- log: string;
- stream: string;
- time: string;
- };
- const rl = readline.createInterface({
- input: fs.createReadStream("testdata/user.log")
- });
- Rx.fromEvent<string>(rl, "line")
- .pipe(
- takeUntil(Rx.fromEvent(rl, "close")),
- map<string, LogLine>(line => JSON.parse(line)),
- pluck("log"),
- map(log => log.replace(/\n$/, ""))
- )
- .subscribe({
- next: console.log,
- error: console.error,
- complete: () => rl.close()
- });
|