相关文章

一本书里面内容较多, 因此分成了多篇 Post, 可以从此处看到相关文章:

Data

Linear Async Flows

首先回顾一下 JS 原生的一些特性:

  1. Event-driven—The JavaScript engine uses an event loop to constantly monitor a task queue, also known as a callback queue. When a task is detected, the event loop dequeues the task and runs it to completion.
  2. Single-threaded—JavaScript provides a single-threaded model to developers. There are no standard, language-level threading APIs to spawn new threads.
  3. Asynchronous—All modern JavaScript engines use multiple threads (managed by an internal worker pool) so that you can perform nonblocking I/O actions without blocking the main thread.

然后是 NodeJS 的一些实现

额外需要注意的是: JS 原生是单线程, 但是一些 underlying platform (Browser and NodeJS).

Promise

一个简单的 Promise

const someFutureValue = new Promise((resolve, reject) => {
  const value = doSomething()
  if (value === null) {
    reject(new Error("Ooops!"));
  }
  resolve(value);
});

someFutureValue.then(doSomethingElseWithThatValue);

Interoperability for Promise & ADT | Promise 和 ADT 的共通性

Promise 和上方的一个 ADT: Validation 很是相似, 可以看到 Promise 有一些 ADT 的特性

  1. Identity: Promise.resolve('aa').then(identity); and Promise.resolve('aa'); 两者相同, 都最终生成 Promise { 'aa' }
  2. Composition:
Promise.resolve("aabbcc").then(unique).then(join).then(toUpper);
Promise.resolve("aabbcc").then(compose(toUpper, join, unique));

同上两者相同, 都得到 Promise { 'ABC' }

  1. 并且 Promise.then()Promise.catch() 其实也就和 Validation 里面的 Success 和 Failure 里面的逻辑类似

Promise 没有完全符合 ADT 的 Fantasy Land Specification (Chapter 5), 但是确实和 ADT 的行为很相似

Async Iteration | 异步迭代

假设有 A,B,C 三个 Promise 需要按照顺序依次执行.

Traditional Loop

比较传统的使用 for 和 reduce 来进行处理的方法:

const sleep = ms => {
  return new Promise(resolve => setTimeout(resolve, ms))
}
for(let i = 0;i <= 10;i++){
    await sleep(1000);
    console.log(new Date(), i);
}
[delay("a", 500), delay("b", 100), delay("c", 200)].reduce(
  (chain, next) => chain.then(() => next).then(::console.log),
  Promise.resolve()
);

for await……of

MDN: for await……of

使用 for await……of 来处理的方法:

Note: for await……of doesn't work with async iterators that are not async iterables.

var txt = "";
const array = [1,2,3,4,5];
async function test() {
  for await (const p of array) {
      var txt = p + "</br>";
      document.write(txt);
  }
}
test();

需要注意的是 for await……of 需要一个对象拥有一个 function-valued symbol property Symbol.asyncIterator, 因此可以如此设计一个对象用于 for await……of

const LIMIT = 3;

const asyncIterable = {
  [Symbol.asyncIterator]() {    /* 必须要拥有这个属性才能使用 for await……of */
    let i = 0;
    return {
      next() {
        const done = i === LIMIT;
        const value = done ? undefined : i++;
        return Promise.resolve({ value, done });
      },
      return() {
        // This will be reached if the consumer called 'break' or 'return' early in the loop.
        return { done: true };
      }
    };
  }
};

(async () => {
  for await (const num of asyncIterable) {
    console.log(num);
  }
})();
// 0
// 1
// 2
Example

var tasks = [
  ["a", 500],
  ["b", 100],
  ["c", 200],
];

function delay(value, time) {
  return new Promise((resolve) => {
    console.log(value);
    setTimeout(resolve, time, value);
  });
}

function delayedIterator(tasks) {
  return {
    async next() {
      /* 每一次从任务中取出最顶上一个, 需要注意的是这里 task 会被缓存。直到全部取完之返回一个 done:true */
      if (tasks.length) {
        const [value, time] = tasks.shift();
        return await delay(value, time);
      }
      return Promise.resolve({ done: true });
    },
  };
}
var it = delayedIterator(tasks);

/* 多次执行之后直到最后一次会得到 done */
await it.next().then(({ value, done }) => { value; done; }); /* a */
await it.next().then(({ value, done }) => { value; done; }); /* b */
await it.next().then(({ value, done }) => { value; done; }); /* c */
await it.next().then(({ value, done }) => { value; done; }); /* done */


以及一个更简洁简洁的例子:

var tasks = [ ["a", 500], ["b", 100], ["c", 200], ];

function delay(value, time) {
  return new Promise((resolve) => {
    // console.log(value);
    setTimeout(resolve, time, value);
  });
}

const delayedIterator = (task) => () => ({
  async next() {
    if (tasks.length) {
      const [value, time] = tasks.shift();
      await delay(value, time);
      return { done: false, value };
    }
    return Promise.resolve({ done: true });
  },
});

var delayedIterable = { [Symbol.asyncIterator]: delayedIterator(tasks) };

for await (const value of delayedIterable) {
  console.log(value);
}

Stream Programming

Iterables & Iterator Protocol

上一节正好有一个例子可以参考

关于 IterablesIterator Protocol:

  • Iterables 是一个可以被枚举,遍历及循环的对象
  • 一般需要一个 Symbol.iterator 元素来决定遍历的细节.
  • Symbol.iterator 可以是一个简单的函数或者是一个 Generator

对于 Iterator 需要满足一些协议:

  • 一个方法返回至少两个参数: done: booleanvalue: any
    • 用于标记遍历操作是否已经完成以及对应的值
    • 需要注意的是 done 返回 true 的时候 value 会被忽略 (便利完成就就没有必要返回值了)
  • 如果一个遍历器没有返回上面这两个属性, 那么在遍历的时候会抛出一个错误.

Example

class Block {
  index = 0;
  constructor(index, previousHash, data = [], difficulty = 0) {
    this.index = index;
    this.previousHash = previousHash;
    this.data = data;
    this.nonce = 0;
    this.difficulty = difficulty;
    this.timestamp = Date.now();
    this.hash = this.calculateHash();
  } 
  
  //……
  [Symbol.iterator]() {
    return this.data[Symbol.iterator]();
  }
}

for (const transaction of block) { 
  // do something with transaction
}

Strings implementing @@iterator

[Symbol.iterator] 其实是个普遍存在的属性, Array/Map/Set 以及 String 都有这个属性

"Joy of JavaScript"[Symbol.iterator]; // [Function: [Symbol.iterator]]

for(const letter of "Joy of JavaScript") {
  console.log(letter);
}

Generators

需要注意的是: Generator 是一个函数, 并且不能用 Arrow Function 来定义. (也许未来版本 ES 会支持)

function* sayIt() {
  yield "The";
  yield "Joy";
  yield "of";
  yield "JavaScript!";
}



const it = sayIt();
it.next(); // { value: 'The', done: false } 
it.next(); // { value: 'Joy', done: false }
/* …… */


// 当然也可以使用 `for` 循环来遍历

/* 
const it = sayIt();
for(const message of sayIt()) { 
  console.log(message);
}
 */

在 Class 中的形式:

class SomeClass {
  *sayIt() {
    return "The Joy of JavaScript!";
  }
}

Creating iterable objects

先看一个简单版的例子:


const obj = {
  *[Symbol.iterator]() { 
    yield 1
    yield 2
    yield 3
  }
}


/* 其实这里使用 destructuring assignment 执行了三次 yield */
const [a, b, c] = obj

console.log(a,b,c);  // 1,2,3

其核心在于 yield 了多次, 使用 destructuring assignment 一次性将值全部取出来, 下面还有一个稍微复杂一点的例子:

class Validation {
  #val;
  /* …… */
  
  *[Symbol.iterator]() {
    yield this.isFailure ? Failure.of(this.#val) : undefined;
    yield this.isSuccess ? Success.of(this.#val) : undefined;
  }
}


/* 第一个元素和 Success 的情况无关可以忽略 */
const [, right] = Success.of(2);
right.isSuccess; // true
right.get(); // 2

/* 第二个元素 right 和 Failure 的情况无关也可以忽略 */
const [left, /* right */] = Failure.of(new Error("Error occurred!"));
left.isFailure; // true
left.getOrElse(5); // 5

Async Generators

  • 相比普通的 Generator 会 yield 一个值, Async Generator 会 yield 一个 Promise.
  • 后期可以使用 for await……of 来遍历
  • 适用于一个大文件的分段获取, 是一个很简单的例子,用于计算 chunk 的数目, 当然 Async Generator 的潜力很大, 可以进行一些复杂的操作, 比如对每个 chunk 进行 validate 等等
async function* generateBlocksFromFile(file) {  /* 'async function*' 定义一个异步迭代器  */
  try {
    const dataStream = fs.read(file);
    let previousDecodedData = "";
    for await (const chunk of dataStream) {
      previousDecodedData += chunk;
      let separatorIndex;
      while ((separatorIndex = previousDecodedData.indexOf(";")) >= 0) {
        const decodedData = previousDecodedData.slice(0, separatorIndex + 1);
        const blocks = tokenize(";", decodedData)
          .filter((str) => str.length > 0)
          .map((str) => str.replace(";", ""));
        for (const block of blocks) {
          /* 对每个 chunk 进行 yeild */
          yield JSON.parse(block);
        }
      }
    }
    if (previousDecodedData.length > 0) {
      /* 对多种情况进行 yeild */
      yield JSON.parse(previousDecodedData);
    }
  } catch (e) {
    console.error(`Error processing file: ${e.message}`);
    throw e;
  }
}






let result = 0;

/* 然后通过循环就可以逐步执行 Promise */
for await (const block of generateBlocksFromFile("blocks.txt")) {
  console.log("Counting block", block.hash);
  result++;
  previousDecodedData = previousDecodedData.slice(separatorIndex + 1);
}
result; // 3, 最终进行了 3 个部分的 await

EventEmitter

Basic Usage of EventEmitter

EventEmitter 需要通过 Node.js 的 events 模块来使用

const EventEmitter = require('events');

const myEmitter = new EventEmitter();
myEmitter.on("some_event", () => {
  console.log("An event occurred!");
});
myEmitter.emit("some_event");

Extend Array with EventEmitter

const {EventEmitter} = require('events'); 

class PushArray extends Array {
  static EVENT_NAME = "new_value";

  #eventEmitter = new EventEmitter();

  constructor(……values) {
    super(……values);
  }

  push(value) {
    this.#eventEmitter.emit(PushArray.EVENT_NAME, value);
    return super.push(value);
  }

  subscribe({ next }) {
    this.#eventEmitter.on(PushArray.EVENT_NAME, (value) => {
      next(value);
    });
  }

  unsubscribe() {
    this.#eventEmitter.removeAllListeners(PushArray.EVENT_NAME);
  }
}

const pushArray = new PushArray(1, 2, 3);
pushArray.subscribe({
  next(value) {
    console.log("New value:", value); // do something with value
  },
});
pushArray.push(4);
pushArray.push(5);
pushArray.unsubscribe();
pushArray.push(6);

Observable

An Observable object is designed to model a lazy, unidirectional, push-based data source (such as streams).

Observable 暂时没有 JS 的原生实现, 一般通过 RxJS 或者 core-js 进行使用, 它具有一些特性:

  1. Unidirectional Data Flow(Data Propagation): 数据流一定是从 Publisher 到 Subscriber .
  2. Declarative, Lazy Pipeline: 只有当 Subscriber 订阅时才会被创建.

一个 Observable 的大体模板:

const Observer = {
  next(event) {
    // Receives each event in the stream
  },
  error(e) {
    // Triggered when an exception occurs somewhere along the observable
  },
  complete() {
    // Called when there are no more values to emit; not called on error
  },
};

Example - Simple Observable

需要提前安装 core-js: npm install core-js -S

import 'core-js/features/observable/index.js'

function newRandom(min, max) {
  return Math.floor(Math.random() * (max - min)) + min;
}

/* 定义一个 Observable */
const randomNum$ = new Observable((observer) => {
  const _id = setInterval(() => {
    observer.next(newRandom(1, 10));
  }, 1_000);

  return () => {
    clearInterval(_id);
  };
});

const subs = randomNum$.subscribe({
  next(number) {
    console.log("New random number:", number);
  },
  complete() {
    console.log("Stream ended");
  },
});

// some time later…… 
subs.unsubscribe();

Observable Datastream Methods

下面来实现针对 Observable 的一些方法

map() for Observable

import 'core-js/features/observable/index.js'

/**
 * @name curry
 * @see https://github.com/JoyOfJavaScript/joj/blob/7f231147029df787bd9eb510f42354adc7461ac0/src/blockchain/src/util/fp/combinators.js
 */
export const curry = fn => (……args1) =>
  args1.length === fn.length
    ? fn(……args1)
    : (……args2) => {
      const args = [……args1, ……args2]
      return args.length >= fn.length ? fn(……args) : curry(fn)(……args)
    }

/**
 * @name map
 * @description 给 Observable 添加一个 map 的方法 
 * @param fn {Function} map handler
 * @param stream {object (Observable)} 接受一个 Observable
 */
const map = curry( /* 使用 curry 以方便后期的 composition */
  (fn, stream) =>
    /* 返回一个 New Observable */
    new Observable(observer => {
      /* 在 New Observable 里面对 Stream 的 Observable 进行 subscribe 并执行 */
      const subs = stream.subscribe({ /* 对 stream 进行 Subscribe */
        next(value) {
          try {
            observer.next(fn(value));
          }
          catch (err) {
            observer.error(err);
          }
        },
        error(e) {
          observer.error(e);
        },
        complete() {
          observer.complete();
        }
      });
      return () => subs.unsubscribe();   /* 返回一个 用于 unSubscribe 的匿名 function */
    })
);


  • 这个 map() 返回了一个对 Observable(stream) 进行 subscribe 的另一个新的 Observable(New Observable) 对象
  • 对 New Observable 进行 subscribe 就可以对 stream 里面的所有元素执行迭代
const square = num => num ** 2;

map(
  square,
  Observable.of(1, 2, 3, 4) 
  /* map 会返回一个 New Observable, subscribe 这个 New Observable 就可以 subscribe stream 指向的 Observable */
).subscribe({
  next(number) {
    console.log(number);
  },
  complete() {
    console.log('Stream ended');
  }
});

// Prints 1  4  9  16

同样的最终的目的是使得代码可以连续使用:

const square = num => num ** 2;

const add = curry((x, y) => x + y);

const subs = map(square, map(add(1), Observable.of(1, 2, 3)))
  .subscribe({
    next(number) {
      console.log(number);
    },
    complete() {
      console.log('Stream ended');
    }
  })
// Prints: 4  9  16

如此创造了一个 dataStream:

  1. Observable.of() 得到数据: 1,2,3
  2. add(1) 处理后得到数据: 2,3,4
  3. square 处理后得到数据, 4,9,16
filter() for Observable
const filter = curry(
  (predicate, stream) =>
    new Observable((observer) => {
      const subs = stream.subscribe({
        next(value) {
          if (predicate(value)) {  
            /* predicate 是 filter 传入的 fn, 如果返回 true 则继续否则跳过 */
            observer.next(value);
          }
        },
        error(e) {
          observer.error(e);
        },
        complete() {
          observer.complete();
        },
      });
      return () => subs.unsubscribe();
    })
);

reduce() for Observable
const reduce = curry((accumulator, initialValue, stream) => {
  let result = initialValue ?? {};
  return new Observable((observer) => {
    const subs = stream.subscribe({
      next(value) {
        result = accumulator(result, value);
      },
      error(e) {
        observer.error(e);
      },
      complete() {
        observer.next(result);
        observer.complete();
      },
    });
    return () => subs.unsubscribe();
  });
});

skip() for Observable
const skip = curry((count, stream) => {
  let skipped = 0;
  return new Observable((observer) => {
    const subs = stream.subscribe({
      next(value) {
        if (skipped++ >= count) {
          observer.next(value);
        }
      },
      error(e) {
        observer.error(e);
      },
      complete() {
        observer.complete();
      },
    });
    return () => subs.unsubscribe();
  });
});

Example - Datastream with Observable

import 'core-js/features/observable/index.js'
const square = num => num ** 2;
const isEven = num => num % 2 === 0;


export const curry = fn => (……args1) =>
  args1.length === fn.length
    ? fn(……args1)
    : (……args2) => {
      const args = [……args1, ……args2]
      return args.length >= fn.length ? fn(……args) : curry(fn)(……args)
    }


const filter = curry(
  (predicate, stream) =>
    new Observable((observer) => {
      const subs = stream.subscribe({
        next(value) {
          if (predicate(value)) {
            /* predicate 是 filter 传入的 fn, 如果返回 true 则继续否则跳过 */
            observer.next(value);
          }
        },
        error(e) {
          observer.error(e);
        },
        complete() {
          observer.complete();
        },
      });
      return () => subs.unsubscribe();
    })
);

const add = curry((x, y) => x + y);

const skip = curry((count, stream) => {
  let skipped = 0;
  return new Observable((observer) => {
    const subs = stream.subscribe({
      next(value) {
        if (skipped++ >= count) {
          observer.next(value);
        }
      },
      error(e) {
        observer.error(e);
      },
      complete() {
        observer.complete();
      },
    });
    return () => subs.unsubscribe();
  });
});

const map = curry( /* 使用 curry 以方便后期的 composition */
  (fn, stream) =>
    /* 返回一个 New Observable */
    new Observable(observer => {
      /* 在 New Observable 里面对 Stream 的 Observable 进行 subscribe 并执行 */
      const subs = stream.subscribe({ /* 对 stream 进行 Subscribe */
        next(value) {
          try {
            observer.next(fn(value));
          }
          catch (err) {
            observer.error(err);
          }
        },
        error(e) {
          observer.error(e);
        },
        complete() {
          observer.complete();
        }
      });
      return () => subs.unsubscribe();   /* 返回一个 用于 unSubscribe 的匿名 function */
    })
);

const reduce = curry((accumulator, initialValue, stream) => {
  let result = initialValue ?? {};
  return new Observable((observer) => {
    const subs = stream.subscribe({
      next(value) {
        result = accumulator(result, value);
      },
      error(e) {
        observer.error(e);
      },
      complete() {
        observer.next(result);
        observer.complete();
      },
    });
    return () => subs.unsubscribe();
  });
});

const obs = Observable.of(1, 2, 3, 4);


reduce( /* Step4: reduce with add, 最终得到一个 Observable 里面数据包含 20 */
  add,
  0,
  map(
    square, /* Step3: 平方, 得到 4, 16 */
    filter(
      isEven, /* Step2: 仅仅筛选双数, 保留 2,4 */
      skip(1, obs)   /* Step1: 跳过第一个, 返回一个 Observable 里面包含数据 2,3,4 */
    )
  )
)
  .subscribe({
    next(value) {
      console.log(value);
    },
    complete() {
      // done();
    }
  });

Observable as Mixin

上方的语法还是有一些难懂, 我们希望可以使用 chained statement 来进行简化. 这个时候就可以设计一个 Mixin:

export const ReactiveExtensions = {

  /* 下面使用到了上方定义的几个方法(filter, map, skip, reduce) */
  filter(predicate) {
    return filter(predicate, this);
  },
  map(fn) {
    return map(fn, this);
  },
  skip(count) {
    return skip(count, this);
  },
  reduce(accumulator, initialValue = {}) {
    return reduce(accumulator, initialValue, this);
  },
};


/* 这个地方需要注意,这里扩展了原生的方法, 最好在方法里面添加一些判断,以防止未来原生方法里面出现一些变化, 比如同名方法等 */
Object.assign(Observable.prototype, ReactiveExtensions);



/* Step1: 生成一个 Observable */
Observable.of(1, 2, 3, 4)
  .skip(1)  /* Step2: 调用 Observable 的 skip 方法, 这个方法会返回一个 New Observable 并且在 New Observable 里面对当前调用的这个 Observable 进行 subscribe */
  .filter(isEven) /* Step3: 同上, 新建第二个 New Observable 并且对第一个 Observable 进行 subscriber */
  .map(square) /* Step4: 新建第三个 New Obs 并且对 第二个进行 subscribe */
  .reduce(add, 0) /* 如此循环最终返回一个 Obs, 并且最后进行 subscribe */
  .subscribe({
    next(value) {
      console.log(value);
    },
  });


Representing push streams with generators

上方都是使用 Observable.of() 来创造一个 Observable, 我们可以扩展一个 Observable.from(generator) 的方法通过 Generator 来构造一个 Observable

import 'core-js/features/observable/index.js'

/* node 16 需要额外的库来支持 readable 方法 */
import {Readable} from 'readable-stream'

Object.defineProperty(Observable, 'fromGenerator', {
  value(generator) {
    return new Observable(observer => {
      Readable.from(generator)
        .on('data', (x) => observer.next(x))
        .on('end', (x) => observer.complete(x));
    });
  },
  enumerable: false,
  writable: false,
  configurable: false
});

/* 另外也支持 async 的 generator (async) */
function* words() {
  yield 'The';
  yield 'Joy';
  yield 'of';
  yield 'JavaScript';
}

Observable.fromGenerator(words())
  .subscribe({
    next: (x)=>console.log(x),
  })

Example - Stream Programming

Before

let validBlocks = 0;
const chain = new Blockchain();
let skippedGenesis = false;

/* Step1: 读取一个文件 */
for await (const blockData of generateBlocksFromFile("blocks.txt")) {
  /* Step2: 跳过一些内容 */
  if (!skippedGenesis) {
    skippedGenesis = true;
    continue;
  }
  /* 对 Block 进行一些操作 */
  const block = new Block(
    blockData.index,
    chain.top.hash,
    blockData.data,
    blockData.difficulty
  );
  chain.push(block);
  /* validate */
  if (block.validate().isFailure) {
    continue;
  }
  validBlocks++;
}

After

const chain = new Blockchain();

// 将一些方法抽取出来作为 helper functions
const validateBlock = (block) => block.validate();
const isSuccess = (validation) => validation.isSuccess;
const boolToInt = (bool) => (bool ? 1 : 0);
const addBlockToChain = curry((chain, blockData) => {
  const block = new Block(
    blockData.index,
    chain.top.hash,
    blockData.data,
    blockData.difficulty
  );
  return chain.push(block);
});

// Main Logic: 可见业务逻辑明显变得更简单
Observable.fromGenerator(generateBlocksFromFile("blocks.txt"))
  .skip(1)
  .map(addBlockToChain(chain))
  .map(validateBlock)
  .filter(prop("isSuccess"))
  .map(compose(boolToInt, isSuccess))
  .reduce(add, 0)
  .subscribe({
    next(validBlocks) {
      if (validBlocks === chain.height() - 1) {
        console.log("All blocks are valid!");
      } else {
        console.log("Detected validation error in blocks.txt");
      }
    },
    error(error) {
      console.error(error.message);
    },
    complete() {
      console.log("Done validating all blocks!");
    },
  });

Extra: Streamifying objects

这一节通过设置 Object 的 Symbol.observable 属性以实现返回一个 Observable

import 'core-js/features/observable/index.js'

const Pair = (left, right) => ({
  left,
  right,
  [Symbol.observable]() {
    return new Observable(observer => {
      observer.next(left);
      observer.next(right);
      observer.complete();
    });
  }
});

Observable.from(Pair(20, 30))
  .subscribe({
    next(value) {
      console.log('Pair element: ', value);
    }
  });

class Blockchain {
  blocks = new Map();
  blockPushEmitter = new EventEmitter();

  constructor(genesis = createGenesisBlock()) {
    this.top = genesis;
    this.blocks.set(genesis.hash, genesis);
    this.timestamp = Date.now();
    this.pendingTransactions = [];
  }

  push(newBlock) {
    newBlock.blockchain = this;
    this.blocks.set(newBlock.hash, newBlock);
    this.blockPushEmitter.emit(EVENT_NAME, newBlock);
    this.top = newBlock;
    return this.top;
  }
  //……

/* 返回一个 Obs */
  [Symbol.observable]() {
    return new Observable(observer => {
      for (const block of this) {
        observer.next(block);
      }
      this.blockPushEmitter.on(EVENT_NAME, block => {
        console.log('Emitting a new block: ', block.hash);
        observer.next(block);
      });
    });
  }
}


const chain = new Blockchain();
chain.push(new Block(chain.height() + 1, chain.top.hash, []));
chain.push(new Block(chain.height() + 1, chain.top.hash, []));

const subs = Observable.from(chain)
  .subscribe({
    next(block) {
      console.log('Received block: ', block.hash);
      if (block.validate().isSuccess) {
        console.log('Block is valid');
      }
      else {
        console.log('Block is invalid');
      }
    }
  });

// …… later in time
chain.push(new Block(chain.height() + 1, chain.top.hash, []));
subs.unsubscribe();
chain.height(); // 4

Outputs:

Received block b81e08daa89a92cc4edd995fe704fe2c5e16205eff2fc470d7ace8a1372e7de4
Block is valid
Received block 352f29c2d159437621ab37658c0624e6a7b1aed30ca3e17848bc9be1de036cfd
Block is valid
Received block 93ff8219d77be5110fa61978c0b5f77c6c8ece96dd3bba2dc6c3c4b731a724e7
Block is valid
Emitting a new block: 
07a68467a3a5652f387c1be5b63159e7d1a068517070e3f4b66e5311e44796e4
Received block 07a68467a3a5652f387c1be5b63159e7d1a068517070e3f4b66e5311e44796e4
Block is valid

另外如果 push 一个 invalid 的 Block :

chain.push(new Block(-1, chain.top.hash, []))

会得到以下输出:

Emitting a new block: 
c3cc935840c71aa533c46ed7c3bfc5fc81e55519c7e52e0849afe091423bf5e0
Received block c3cc935840c71aa533c46ed7c3bfc5fc81e55519c7e52e0849afe091423bf5e0
Block is invalid

Dynamic streamification

这里是一个完整的将 Observable 进行动态数据流转化的例子.

obUtils.js: 这个文件里面都是上文提到过的一些 util 方法

export const curry = fn => (……args1) =>
  args1.length === fn.length
    ? fn(……args1)
    : (……args2) => {
      const args = [……args1, ……args2]
      return args.length >= fn.length ? fn(……args) : curry(fn)(……args)
    }


export const filter = curry(
  (predicate, stream) =>
    new Observable((observer) => {
      const subs = stream.subscribe({
        next(value) {
          if (predicate(value)) {
            /* predicate 是 filter 传入的 fn, 如果返回 true 则继续否则跳过 */
            observer.next(value);
          }
        },
        error(e) {
          observer.error(e);
        },
        complete() {
          observer.complete();
        },
      });
      return () => subs.unsubscribe();
    })
);


export const skip = curry((count, stream) => {
  let skipped = 0;
  return new Observable((observer) => {
    const subs = stream.subscribe({
      next(value) {
        if (skipped++ >= count) {
          observer.next(value);
        }
      },
      error(e) {
        observer.error(e);
      },
      complete() {
        observer.complete();
      },
    });
    return () => subs.unsubscribe();
  });
});

export const map = curry( /* 使用 curry 以方便后期的 composition */
  (fn, stream) =>
    /* 返回一个 New Observable */
    new Observable(observer => {
      /* 在 New Observable 里面对 Stream 的 Observable 进行 subscribe 并执行 */
      const subs = stream.subscribe({ /* 对 stream 进行 Subscribe */
        next(value) {
          try {
            observer.next(fn(value));
          }
          catch (err) {
            observer.error(err);
          }
        },
        error(e) {
          observer.error(e);
        },
        complete() {
          observer.complete();
        }
      });
      return () => subs.unsubscribe();   /* 返回一个 用于 unSubscribe 的匿名 function */
    })
);

export const reduce = curry((accumulator, initialValue, stream) => {
  let result = initialValue ?? {};
  return new Observable((observer) => {
    const subs = stream.subscribe({
      next(value) {
        result = accumulator(result, value);
      },
      error(e) {
        observer.error(e);
      },
      complete() {
        observer.next(result);
        observer.complete();
      },
    });
    return () => subs.unsubscribe();
  });
});

export const ReactiveExtensions = {
  /* 下面使用到了上方定义的几个方法(filter, map, skip, reduce) */
  filter(predicate) {
    return filter(predicate, this);
  },
  map(fn) {
    return map(fn, this);
  },
  skip(count) {
    return skip(count, this);
  },
  reduce(accumulator, initialValue = {}) {
    return reduce(accumulator, initialValue, this);
  },
};

ob.js

import EventEmitter from 'events'
import 'core-js/features/observable/index.js'
import { ReactiveExtensions,curry } from'./obUtil.js'

/* 这个地方需要注意,这里扩展了原生的方法, 最好在方法里面添加一些判断,以防止未来原生方法里面出现一些变化, 比如同名方法等 */
Object.assign(Observable.prototype, ReactiveExtensions);

const square = num => {
  console.log('square', num);
  return num ** 2
};
const isEven = num => {
  console.log('isEven', num);
  return num % 2 === 0
};

const ON_EVENT = "on";  /* 自定义的事件名称 */
const END_EVENT = "end";
const LOG_LABEL = `IN-STREAM`;
const LOG_LABEL_INNER = `${LOG_LABEL}:push`;

function implementsPush(obj) {
  return obj
    && Symbol.iterator in Object(obj)
    && typeof obj['push'] === 'function'
    && typeof obj[Symbol.iterator] === 'function';
}

const reactivize = (obj) => {
  implementsPush(obj) ||
    new TypeError("Object does not implement a push protocol");

  const emitter = new EventEmitter();

  /* 
    变量 pushProxy 会被 Object.assign 扩展到返回值里面
    实际上这里有用的就是一个 Proxy 的细节
    因此变量名称不重要
  */
  const pushProxy = new Proxy(obj, {
    get(……args) {
      /*  使用 spread operator 获取所有参数 */
      const [target, key] =
        args; /* 第一个参数是 target obj, 第二个参数是对应的 key */
      if (key === "push") {   /* Step6: 通过 Proxy 监听了 push 事件 */
        const methodRef = target[key];
        return (……capturedArgs) => {
          const result = methodRef.call(
            target,
            ……[capturedArgs]
          ); /* 实际执行一次 Push */
          emitter.emit(ON_EVENT, ……capturedArgs); /* Step7: 此处调用一次 ON_EVENT */
          return result;
        };
      }
      return Reflect.get(……args);
    },
  });

  /* 
    变量 observable 会被 Object.assign 扩展到返回值里面
    实际上这里有用的就是 [Symbol.observable] 这个属性
    因此变量名称也不重要
  */
  const observable = {
    [Symbol.observable]() {
      return new Observable((observer) => {
        console.group(LOG_LABEL);

        /* Step8: 监听 ON_EVENT, 并进行处理, 核心在于 observer.next() */
        emitter.on(ON_EVENT, (newValue) => {
          console.group(LOG_LABEL_INNER);
          console.log("Emitting new value: ", newValue);
          observer.next(newValue);
          console.groupEnd(LOG_LABEL_INNER);
        });
        emitter.on(END_EVENT, () => {
          observer.complete();
        });
        for (const value of obj) {
          observer.next(value);
        }
        return () => {
          console.groupEnd(LOG_LABEL);
          emitter.removeAllListeners(ON_EVENT, END_EVENT);
        };
      });
    },
  };

  return Object.assign(pushProxy, observable);
};


let count = 0;
const arr$ = reactivize([1, 2, 3, 4, 5]);
const subs = Observable.from(arr$)  /* Step1: 得到一个 Obs */
  .filter(isEven)  /* Step2: 筛选偶数 */
  .map(square) /* Step3: 平方 */
  .subscribe({
    next(value) {
      /* Step 4: 对结果进行处理 */
      console.log("Received new value", value);
      count += value;
    },
  });
//…… later in time
arr$.push(6); /* Step 5: push 一个新的元素 */
subs.unsubscribe();  /* Step 6: unsubscribe */
arr$.push(7); /* Step 7: unsubscribe 之后就不会有新的输出了 */


/* 

IN-STREAM
  isEven 1
  isEven 2
  square 2
  Received new value 4
  isEven 3
  isEven 4
  square 4
  Received new value 16
  isEven 5
  IN-STREAM:push
    Emitting new value:  6
    isEven 6
    square 6
    Received new value 36

*/

Summary

  • An Iterator object has the method next, which returns an object with properties value and done. value contains the next element in the iteration, and done is the control switch that stops the iteration process.
  • An async iterator follows the same behavior as a normal iterator except that next returns a Promise with a result of the same shape {value, done}.
  • To build custom enumerable objects, you can implement Symbol.iterator. You can also define Symbol.asyncIterator to enumerate the pieces of your objects asynchronously.
  • Generators are a special type of function that can produce a sequence of values instead of a single value—a factory for iterables. A generator function is identified by an asterisk (*).
  • A generator function returns a Generator object that implements the iterator protocol, which means you can consume it by using the for……of loop.
  • The difference between a normal generator and an async generator is that generated values are wrapped by a Promise. To consume an async generator, you can use the for await……of loop.
  • Streams are sequences of values emitted over time. Anything can become a stream, such as a single value, an array, or a generator function. Anything that is iterable can be modeled as a stream.
  • The new Observable API proposes to make stream-based, reactive programming easier.
  • Observables are push-based, declarative streams. Their programming model is based on publish/subscribe. Observables are agnostic to the type of data in the sequence and to whether the data is synchronous or asynchronous; the programming model is the same.
  • You can create and augment your own observable objects by implementing a function-valued Symbol.observable property.

Extra

我的博客即将同步至腾讯云开发者社区,邀请大家一同入驻:https://cloud.tencent.com/developer/support-plan?invite_code=18vc0foyqr7jc