RxJS - receiving data in batches of time + max
up vote
0
down vote
favorite
I'm playing with RxJS for the first time, and I'm trying to create a simple observable that will take both a maximum number of entries and wait for a time interval to pass before passing the data on to subscribers.
This is what I have:
import EventEmitter from "events";
import { fromEvent } from "rxjs";
import { bufferTime, filter, take } from "rxjs/operators";
const ev = new EventEmitter();
const observer = fromEvent(ev, "log").pipe(
take(10),
bufferTime(1000),
filter(logs => !!logs.length)
);
observer.subscribe(data => {
console.log(`received: ${data.length}`);
});
for (let i = 0; i < 100; i++) {
ev.emit("log", { hello: "world" });
}
What I expected to happen:
received: 10
to be printed 10x, at 1 second intervals.
What actually happened:
received: 10
was printed once, and then the script exits.
Why I think it's happening
I'm new to RxJS, but from what I can gather the take()
operator emits a 'completed' status after it has taken 10 entries, which prevents any further subscriptions from firing.
How do I make this observable 'recurring' so that it'll a) take a maximum of 10, b) ensure it runs, at most, once every 1000ms, and c) repeat forever?
rxjs rxjs6
add a comment |
up vote
0
down vote
favorite
I'm playing with RxJS for the first time, and I'm trying to create a simple observable that will take both a maximum number of entries and wait for a time interval to pass before passing the data on to subscribers.
This is what I have:
import EventEmitter from "events";
import { fromEvent } from "rxjs";
import { bufferTime, filter, take } from "rxjs/operators";
const ev = new EventEmitter();
const observer = fromEvent(ev, "log").pipe(
take(10),
bufferTime(1000),
filter(logs => !!logs.length)
);
observer.subscribe(data => {
console.log(`received: ${data.length}`);
});
for (let i = 0; i < 100; i++) {
ev.emit("log", { hello: "world" });
}
What I expected to happen:
received: 10
to be printed 10x, at 1 second intervals.
What actually happened:
received: 10
was printed once, and then the script exits.
Why I think it's happening
I'm new to RxJS, but from what I can gather the take()
operator emits a 'completed' status after it has taken 10 entries, which prevents any further subscriptions from firing.
How do I make this observable 'recurring' so that it'll a) take a maximum of 10, b) ensure it runs, at most, once every 1000ms, and c) repeat forever?
rxjs rxjs6
unfortunately cannot give your a solution for your problem, but you may have a look atbufferCount
andbufferTime
learnrxjs.io/operators/transformation/buffercount.html learnrxjs.io/operators/transformation/buffertime.html
– Oleksandr Fedotov
Nov 11 at 12:57
@OleksandrFedotov - thanks. I'm already (attempting to) usebufferTime
in my example code, andbufferCount
looks like it waits until the buffer is full, which is not what I want. I just want to take a maximum of 10 and run - at most - once per second... then repeat ad infinitum.
– Lee Benson
Nov 11 at 13:09
maybe the expand operator can help here for the recursion: learnrxjs.io/operators/transformation/expand.html
– René Winkler
Nov 11 at 14:52
@RenéWinkler - could you point to any examples of howexpand
might apply in this case, please?
– Lee Benson
Nov 11 at 15:16
add a comment |
up vote
0
down vote
favorite
up vote
0
down vote
favorite
I'm playing with RxJS for the first time, and I'm trying to create a simple observable that will take both a maximum number of entries and wait for a time interval to pass before passing the data on to subscribers.
This is what I have:
import EventEmitter from "events";
import { fromEvent } from "rxjs";
import { bufferTime, filter, take } from "rxjs/operators";
const ev = new EventEmitter();
const observer = fromEvent(ev, "log").pipe(
take(10),
bufferTime(1000),
filter(logs => !!logs.length)
);
observer.subscribe(data => {
console.log(`received: ${data.length}`);
});
for (let i = 0; i < 100; i++) {
ev.emit("log", { hello: "world" });
}
What I expected to happen:
received: 10
to be printed 10x, at 1 second intervals.
What actually happened:
received: 10
was printed once, and then the script exits.
Why I think it's happening
I'm new to RxJS, but from what I can gather the take()
operator emits a 'completed' status after it has taken 10 entries, which prevents any further subscriptions from firing.
How do I make this observable 'recurring' so that it'll a) take a maximum of 10, b) ensure it runs, at most, once every 1000ms, and c) repeat forever?
rxjs rxjs6
I'm playing with RxJS for the first time, and I'm trying to create a simple observable that will take both a maximum number of entries and wait for a time interval to pass before passing the data on to subscribers.
This is what I have:
import EventEmitter from "events";
import { fromEvent } from "rxjs";
import { bufferTime, filter, take } from "rxjs/operators";
const ev = new EventEmitter();
const observer = fromEvent(ev, "log").pipe(
take(10),
bufferTime(1000),
filter(logs => !!logs.length)
);
observer.subscribe(data => {
console.log(`received: ${data.length}`);
});
for (let i = 0; i < 100; i++) {
ev.emit("log", { hello: "world" });
}
What I expected to happen:
received: 10
to be printed 10x, at 1 second intervals.
What actually happened:
received: 10
was printed once, and then the script exits.
Why I think it's happening
I'm new to RxJS, but from what I can gather the take()
operator emits a 'completed' status after it has taken 10 entries, which prevents any further subscriptions from firing.
How do I make this observable 'recurring' so that it'll a) take a maximum of 10, b) ensure it runs, at most, once every 1000ms, and c) repeat forever?
rxjs rxjs6
rxjs rxjs6
asked Nov 11 at 12:16
Lee Benson
4,74533041
4,74533041
unfortunately cannot give your a solution for your problem, but you may have a look atbufferCount
andbufferTime
learnrxjs.io/operators/transformation/buffercount.html learnrxjs.io/operators/transformation/buffertime.html
– Oleksandr Fedotov
Nov 11 at 12:57
@OleksandrFedotov - thanks. I'm already (attempting to) usebufferTime
in my example code, andbufferCount
looks like it waits until the buffer is full, which is not what I want. I just want to take a maximum of 10 and run - at most - once per second... then repeat ad infinitum.
– Lee Benson
Nov 11 at 13:09
maybe the expand operator can help here for the recursion: learnrxjs.io/operators/transformation/expand.html
– René Winkler
Nov 11 at 14:52
@RenéWinkler - could you point to any examples of howexpand
might apply in this case, please?
– Lee Benson
Nov 11 at 15:16
add a comment |
unfortunately cannot give your a solution for your problem, but you may have a look atbufferCount
andbufferTime
learnrxjs.io/operators/transformation/buffercount.html learnrxjs.io/operators/transformation/buffertime.html
– Oleksandr Fedotov
Nov 11 at 12:57
@OleksandrFedotov - thanks. I'm already (attempting to) usebufferTime
in my example code, andbufferCount
looks like it waits until the buffer is full, which is not what I want. I just want to take a maximum of 10 and run - at most - once per second... then repeat ad infinitum.
– Lee Benson
Nov 11 at 13:09
maybe the expand operator can help here for the recursion: learnrxjs.io/operators/transformation/expand.html
– René Winkler
Nov 11 at 14:52
@RenéWinkler - could you point to any examples of howexpand
might apply in this case, please?
– Lee Benson
Nov 11 at 15:16
unfortunately cannot give your a solution for your problem, but you may have a look at
bufferCount
and bufferTime
learnrxjs.io/operators/transformation/buffercount.html learnrxjs.io/operators/transformation/buffertime.html– Oleksandr Fedotov
Nov 11 at 12:57
unfortunately cannot give your a solution for your problem, but you may have a look at
bufferCount
and bufferTime
learnrxjs.io/operators/transformation/buffercount.html learnrxjs.io/operators/transformation/buffertime.html– Oleksandr Fedotov
Nov 11 at 12:57
@OleksandrFedotov - thanks. I'm already (attempting to) use
bufferTime
in my example code, and bufferCount
looks like it waits until the buffer is full, which is not what I want. I just want to take a maximum of 10 and run - at most - once per second... then repeat ad infinitum.– Lee Benson
Nov 11 at 13:09
@OleksandrFedotov - thanks. I'm already (attempting to) use
bufferTime
in my example code, and bufferCount
looks like it waits until the buffer is full, which is not what I want. I just want to take a maximum of 10 and run - at most - once per second... then repeat ad infinitum.– Lee Benson
Nov 11 at 13:09
maybe the expand operator can help here for the recursion: learnrxjs.io/operators/transformation/expand.html
– René Winkler
Nov 11 at 14:52
maybe the expand operator can help here for the recursion: learnrxjs.io/operators/transformation/expand.html
– René Winkler
Nov 11 at 14:52
@RenéWinkler - could you point to any examples of how
expand
might apply in this case, please?– Lee Benson
Nov 11 at 15:16
@RenéWinkler - could you point to any examples of how
expand
might apply in this case, please?– Lee Benson
Nov 11 at 15:16
add a comment |
2 Answers
2
active
oldest
votes
up vote
0
down vote
Try to insert a simple tap(d => console.log(JSON.stringify(d))),
operator before take(10)
and after bufferTime
and you see what happens.
Basically you have a loop that emits 100 events. Out of this loop you create an Observable which notifies 100 times the object emitted by you event loop.
This whole thing happens synchronously in your example.
Then the Observable is transformed by a pipe which does the following:
- first takes just the first 10 notifications via the
take
operator - then says "accumulate all the notifications which happen in the first
1 second and emit them in form of an array" - considering that the
whole emission process is synchronous and that you take the first 10
of them, thenbufferTime(1000)
will emit one array of 10 items
after 1 second - nothing else will be emitted by this stream, since
take(10)
completes the source Observable after 10 synchronous notifications,
so thefilter
operator is of no use - last, in the subscription you receive just 1 notification, i.e. the only one emitted by
bufferTime
Said that, probably what you want to achieve can be accomplished by code along these lines
const ev = new EventEmitter();
function simpleObservable(maxEntries: number, waitTime: number) {
return fromEvent(ev, "log").pipe(
bufferCount(maxEntries),
filter(logs => !!logs.length),
mergeMap(logs => of(logs).pipe(delay(waitTime)), 1),
)
}
simpleObservable(10, 1000)
.subscribe(data => {
console.log(`received: ${data.length}`);
});
for (let i = 0; i < 100; i++) {
ev.emit("log", { hello: "world" });
}
The key here is that first you slice the stream of events into arrays of 10 events each, and this is done via the bufferCount
operator.
Then you introduce the asynchronicity using mergeMap
with concurrency level set to 1 (which is equivalent to the operator concatMap
). You basically transform each array emitted by bufferCount
into another Observable via the of
function and apply a delay of 1 second to each of the new Observables. Then you concatenate them so that they are emitted each after the other with the time difference of 1 second.
UPDATED RESPONSE after @Lee Benson comment
bufferTime
can be the answer to your problem.
bufferTime
has 3 parameters:
bufferTimeSpan: specifies the lifespan of the buffer, i.e. the Observable emits the buffer and resets it after at each interval
specified by bufferTimeSpan
bufferCreationInterval: to be leftnull
for our purpose
maxBufferSize: specifies the max size of the buffer - if the size is reached, then the Observable emits the buffer and resets it
So, if we use the first and the last parameter we should be able to reach the desired objectives.
This is the test I have put together
const ev = new EventEmitter();
// I merge to the event stream created by 'ev' a single notification emitted after 5 seconds,
// just to make the thing a bit more complex
const sourceObs = merge(
fromEvent(ev, "log"),
of(1).pipe(delay(5000))
)
function simpleObservable(maxEntries: number, waitTime: number) {
return sourceObs.pipe(
bufferTime(waitTime, null, maxEntries),
filter(logs => !!logs.length),
)
}
simpleObservable(10, 2000)
.subscribe(data => {
console.log(`received: ${data.length}`);
});
for (let i = 0; i < 19; i++) {
ev.emit("log", { hello: "world" });
}
// some other events are fired after 6 seconds, to make the source more complex
setTimeout(() => {
for (let i = 0; i < 17; i++) {
ev.emit("log", { hello: "world" });
}
}, 6000);
thanks. The issue I withbufferCount
is that it's waiting until there's 10 items, and not firing if there's fewer. I'm trying to build a queue that will invoke.subscribe()
whenever there's at least 1 result and at least 1 second has passed, in batches of a maximum of 10 per time.
– Lee Benson
Nov 11 at 14:41
If I look at your example, you are emitting synchronously 100 events. What is the behavior expected in this case? Do you want to invokesubscribe
10 times, each time with an array of 10 notifications, with an interval of 1 seconds each? Do you expect the 10 notifications to be given tosubscribe
as fast as possible?
– Picci
Nov 11 at 21:01
I have updated my answer
– Picci
Nov 11 at 21:35
thanks, that's closer, but still not quite there. If we changemaxEntries
to 4 and leavewaitTime
at 1000, we get 4x "Received: 4" messages back in the console after 1s, rather than each message buffered at 1s intervals. Another example: Change the loop to emit 28 events instead of 19; we'll get "Received: 10" twice after 1000ms, followed by a final "Received: 8". What should happen is "Received: 10" after 1s, "Received: 10" after 2s, "Received: 8" after 3s.
– Lee Benson
Nov 12 at 10:57
Looking atbufferTime
some more, it seems like it emits when either the time has been exhausted, or the buffer has been reached - whichever is first. Hence in your example, "Received: 10" is printed immediately, followed by a 1000ms pause, then "Received: 9". What I'm trying to achieve is for the buffer time to be awaited always, and then as a secondary criteria, to not exceedmaxEntries
. Any extra entries should be buffered in the next iteration of bufferTime.
– Lee Benson
Nov 12 at 11:20
add a comment |
up vote
0
down vote
After reading your last comment, a way to consider to solve the problem could be to combine bufferTime
with interval
using zip
.
Basically the idea is that you can set a rhythm of notifications using interval
- for instance you set interval(1000)
to have an Observable that emits every second.
Then you can use bufferTime(1000, null, 10)
to make sure that you emit an array every 1 second or if your buffer reaches 10 items, whichever comes first.
Now if you zip
these 2 Observables you obtain an Observable which emits every second, because of interval(1000)
, and it emits also whatever comes out of bufferTime(1000, 0, 10)
in sequence. So, if the source Observable emits more than 10 items in a second, the first 10 will be emitted in the first notification, the others will remain buffered in the Observable and will be emitted in the subsequent notifications in chunks of 10 every second.
The code probably makes this clearer.
const ev = new EventEmitter();
// I merge to the event stream created by 'ev' a single notification emitted after 5 seconds,
// just to make the thing a bit more complex
const sourceObs = merge(
fromEvent(ev, "log"),
of(1).pipe(delay(5000))
)
function simpleObservable(maxEntries: number, waitTime: number) {
return zip(
timer(0, waitTime),
sourceObs.pipe(
bufferTime(waitTime, null, maxEntries),
)
)
.pipe(
filter(logs => !!logs[1].length),
map(logs => logs[1])
)
}
const maxEntries = 4;
const waitTime = 1000;
simpleObservable(maxEntries, waitTime)
.subscribe(data => {
console.log(`received: ${data.length}`);
});
for (let i = 0; i < 19; i++) {
ev.emit("log", { hello: "world " + i + 'A' });
}
// some other events are fired after 8 seconds, to make the source more complex
setTimeout(() => {
for (let i = 0; i < 17; i++) {
ev.emit("log", { hello: "world " + i + 'B' });
}
}, 8000);
Clearly you need to consider that, if the source Observable emits at an speed higher than your ability to consume notifications, you may end up with memory problems.
add a comment |
2 Answers
2
active
oldest
votes
2 Answers
2
active
oldest
votes
active
oldest
votes
active
oldest
votes
up vote
0
down vote
Try to insert a simple tap(d => console.log(JSON.stringify(d))),
operator before take(10)
and after bufferTime
and you see what happens.
Basically you have a loop that emits 100 events. Out of this loop you create an Observable which notifies 100 times the object emitted by you event loop.
This whole thing happens synchronously in your example.
Then the Observable is transformed by a pipe which does the following:
- first takes just the first 10 notifications via the
take
operator - then says "accumulate all the notifications which happen in the first
1 second and emit them in form of an array" - considering that the
whole emission process is synchronous and that you take the first 10
of them, thenbufferTime(1000)
will emit one array of 10 items
after 1 second - nothing else will be emitted by this stream, since
take(10)
completes the source Observable after 10 synchronous notifications,
so thefilter
operator is of no use - last, in the subscription you receive just 1 notification, i.e. the only one emitted by
bufferTime
Said that, probably what you want to achieve can be accomplished by code along these lines
const ev = new EventEmitter();
function simpleObservable(maxEntries: number, waitTime: number) {
return fromEvent(ev, "log").pipe(
bufferCount(maxEntries),
filter(logs => !!logs.length),
mergeMap(logs => of(logs).pipe(delay(waitTime)), 1),
)
}
simpleObservable(10, 1000)
.subscribe(data => {
console.log(`received: ${data.length}`);
});
for (let i = 0; i < 100; i++) {
ev.emit("log", { hello: "world" });
}
The key here is that first you slice the stream of events into arrays of 10 events each, and this is done via the bufferCount
operator.
Then you introduce the asynchronicity using mergeMap
with concurrency level set to 1 (which is equivalent to the operator concatMap
). You basically transform each array emitted by bufferCount
into another Observable via the of
function and apply a delay of 1 second to each of the new Observables. Then you concatenate them so that they are emitted each after the other with the time difference of 1 second.
UPDATED RESPONSE after @Lee Benson comment
bufferTime
can be the answer to your problem.
bufferTime
has 3 parameters:
bufferTimeSpan: specifies the lifespan of the buffer, i.e. the Observable emits the buffer and resets it after at each interval
specified by bufferTimeSpan
bufferCreationInterval: to be leftnull
for our purpose
maxBufferSize: specifies the max size of the buffer - if the size is reached, then the Observable emits the buffer and resets it
So, if we use the first and the last parameter we should be able to reach the desired objectives.
This is the test I have put together
const ev = new EventEmitter();
// I merge to the event stream created by 'ev' a single notification emitted after 5 seconds,
// just to make the thing a bit more complex
const sourceObs = merge(
fromEvent(ev, "log"),
of(1).pipe(delay(5000))
)
function simpleObservable(maxEntries: number, waitTime: number) {
return sourceObs.pipe(
bufferTime(waitTime, null, maxEntries),
filter(logs => !!logs.length),
)
}
simpleObservable(10, 2000)
.subscribe(data => {
console.log(`received: ${data.length}`);
});
for (let i = 0; i < 19; i++) {
ev.emit("log", { hello: "world" });
}
// some other events are fired after 6 seconds, to make the source more complex
setTimeout(() => {
for (let i = 0; i < 17; i++) {
ev.emit("log", { hello: "world" });
}
}, 6000);
thanks. The issue I withbufferCount
is that it's waiting until there's 10 items, and not firing if there's fewer. I'm trying to build a queue that will invoke.subscribe()
whenever there's at least 1 result and at least 1 second has passed, in batches of a maximum of 10 per time.
– Lee Benson
Nov 11 at 14:41
If I look at your example, you are emitting synchronously 100 events. What is the behavior expected in this case? Do you want to invokesubscribe
10 times, each time with an array of 10 notifications, with an interval of 1 seconds each? Do you expect the 10 notifications to be given tosubscribe
as fast as possible?
– Picci
Nov 11 at 21:01
I have updated my answer
– Picci
Nov 11 at 21:35
thanks, that's closer, but still not quite there. If we changemaxEntries
to 4 and leavewaitTime
at 1000, we get 4x "Received: 4" messages back in the console after 1s, rather than each message buffered at 1s intervals. Another example: Change the loop to emit 28 events instead of 19; we'll get "Received: 10" twice after 1000ms, followed by a final "Received: 8". What should happen is "Received: 10" after 1s, "Received: 10" after 2s, "Received: 8" after 3s.
– Lee Benson
Nov 12 at 10:57
Looking atbufferTime
some more, it seems like it emits when either the time has been exhausted, or the buffer has been reached - whichever is first. Hence in your example, "Received: 10" is printed immediately, followed by a 1000ms pause, then "Received: 9". What I'm trying to achieve is for the buffer time to be awaited always, and then as a secondary criteria, to not exceedmaxEntries
. Any extra entries should be buffered in the next iteration of bufferTime.
– Lee Benson
Nov 12 at 11:20
add a comment |
up vote
0
down vote
Try to insert a simple tap(d => console.log(JSON.stringify(d))),
operator before take(10)
and after bufferTime
and you see what happens.
Basically you have a loop that emits 100 events. Out of this loop you create an Observable which notifies 100 times the object emitted by you event loop.
This whole thing happens synchronously in your example.
Then the Observable is transformed by a pipe which does the following:
- first takes just the first 10 notifications via the
take
operator - then says "accumulate all the notifications which happen in the first
1 second and emit them in form of an array" - considering that the
whole emission process is synchronous and that you take the first 10
of them, thenbufferTime(1000)
will emit one array of 10 items
after 1 second - nothing else will be emitted by this stream, since
take(10)
completes the source Observable after 10 synchronous notifications,
so thefilter
operator is of no use - last, in the subscription you receive just 1 notification, i.e. the only one emitted by
bufferTime
Said that, probably what you want to achieve can be accomplished by code along these lines
const ev = new EventEmitter();
function simpleObservable(maxEntries: number, waitTime: number) {
return fromEvent(ev, "log").pipe(
bufferCount(maxEntries),
filter(logs => !!logs.length),
mergeMap(logs => of(logs).pipe(delay(waitTime)), 1),
)
}
simpleObservable(10, 1000)
.subscribe(data => {
console.log(`received: ${data.length}`);
});
for (let i = 0; i < 100; i++) {
ev.emit("log", { hello: "world" });
}
The key here is that first you slice the stream of events into arrays of 10 events each, and this is done via the bufferCount
operator.
Then you introduce the asynchronicity using mergeMap
with concurrency level set to 1 (which is equivalent to the operator concatMap
). You basically transform each array emitted by bufferCount
into another Observable via the of
function and apply a delay of 1 second to each of the new Observables. Then you concatenate them so that they are emitted each after the other with the time difference of 1 second.
UPDATED RESPONSE after @Lee Benson comment
bufferTime
can be the answer to your problem.
bufferTime
has 3 parameters:
bufferTimeSpan: specifies the lifespan of the buffer, i.e. the Observable emits the buffer and resets it after at each interval
specified by bufferTimeSpan
bufferCreationInterval: to be leftnull
for our purpose
maxBufferSize: specifies the max size of the buffer - if the size is reached, then the Observable emits the buffer and resets it
So, if we use the first and the last parameter we should be able to reach the desired objectives.
This is the test I have put together
const ev = new EventEmitter();
// I merge to the event stream created by 'ev' a single notification emitted after 5 seconds,
// just to make the thing a bit more complex
const sourceObs = merge(
fromEvent(ev, "log"),
of(1).pipe(delay(5000))
)
function simpleObservable(maxEntries: number, waitTime: number) {
return sourceObs.pipe(
bufferTime(waitTime, null, maxEntries),
filter(logs => !!logs.length),
)
}
simpleObservable(10, 2000)
.subscribe(data => {
console.log(`received: ${data.length}`);
});
for (let i = 0; i < 19; i++) {
ev.emit("log", { hello: "world" });
}
// some other events are fired after 6 seconds, to make the source more complex
setTimeout(() => {
for (let i = 0; i < 17; i++) {
ev.emit("log", { hello: "world" });
}
}, 6000);
thanks. The issue I withbufferCount
is that it's waiting until there's 10 items, and not firing if there's fewer. I'm trying to build a queue that will invoke.subscribe()
whenever there's at least 1 result and at least 1 second has passed, in batches of a maximum of 10 per time.
– Lee Benson
Nov 11 at 14:41
If I look at your example, you are emitting synchronously 100 events. What is the behavior expected in this case? Do you want to invokesubscribe
10 times, each time with an array of 10 notifications, with an interval of 1 seconds each? Do you expect the 10 notifications to be given tosubscribe
as fast as possible?
– Picci
Nov 11 at 21:01
I have updated my answer
– Picci
Nov 11 at 21:35
thanks, that's closer, but still not quite there. If we changemaxEntries
to 4 and leavewaitTime
at 1000, we get 4x "Received: 4" messages back in the console after 1s, rather than each message buffered at 1s intervals. Another example: Change the loop to emit 28 events instead of 19; we'll get "Received: 10" twice after 1000ms, followed by a final "Received: 8". What should happen is "Received: 10" after 1s, "Received: 10" after 2s, "Received: 8" after 3s.
– Lee Benson
Nov 12 at 10:57
Looking atbufferTime
some more, it seems like it emits when either the time has been exhausted, or the buffer has been reached - whichever is first. Hence in your example, "Received: 10" is printed immediately, followed by a 1000ms pause, then "Received: 9". What I'm trying to achieve is for the buffer time to be awaited always, and then as a secondary criteria, to not exceedmaxEntries
. Any extra entries should be buffered in the next iteration of bufferTime.
– Lee Benson
Nov 12 at 11:20
add a comment |
up vote
0
down vote
up vote
0
down vote
Try to insert a simple tap(d => console.log(JSON.stringify(d))),
operator before take(10)
and after bufferTime
and you see what happens.
Basically you have a loop that emits 100 events. Out of this loop you create an Observable which notifies 100 times the object emitted by you event loop.
This whole thing happens synchronously in your example.
Then the Observable is transformed by a pipe which does the following:
- first takes just the first 10 notifications via the
take
operator - then says "accumulate all the notifications which happen in the first
1 second and emit them in form of an array" - considering that the
whole emission process is synchronous and that you take the first 10
of them, thenbufferTime(1000)
will emit one array of 10 items
after 1 second - nothing else will be emitted by this stream, since
take(10)
completes the source Observable after 10 synchronous notifications,
so thefilter
operator is of no use - last, in the subscription you receive just 1 notification, i.e. the only one emitted by
bufferTime
Said that, probably what you want to achieve can be accomplished by code along these lines
const ev = new EventEmitter();
function simpleObservable(maxEntries: number, waitTime: number) {
return fromEvent(ev, "log").pipe(
bufferCount(maxEntries),
filter(logs => !!logs.length),
mergeMap(logs => of(logs).pipe(delay(waitTime)), 1),
)
}
simpleObservable(10, 1000)
.subscribe(data => {
console.log(`received: ${data.length}`);
});
for (let i = 0; i < 100; i++) {
ev.emit("log", { hello: "world" });
}
The key here is that first you slice the stream of events into arrays of 10 events each, and this is done via the bufferCount
operator.
Then you introduce the asynchronicity using mergeMap
with concurrency level set to 1 (which is equivalent to the operator concatMap
). You basically transform each array emitted by bufferCount
into another Observable via the of
function and apply a delay of 1 second to each of the new Observables. Then you concatenate them so that they are emitted each after the other with the time difference of 1 second.
UPDATED RESPONSE after @Lee Benson comment
bufferTime
can be the answer to your problem.
bufferTime
has 3 parameters:
bufferTimeSpan: specifies the lifespan of the buffer, i.e. the Observable emits the buffer and resets it after at each interval
specified by bufferTimeSpan
bufferCreationInterval: to be leftnull
for our purpose
maxBufferSize: specifies the max size of the buffer - if the size is reached, then the Observable emits the buffer and resets it
So, if we use the first and the last parameter we should be able to reach the desired objectives.
This is the test I have put together
const ev = new EventEmitter();
// I merge to the event stream created by 'ev' a single notification emitted after 5 seconds,
// just to make the thing a bit more complex
const sourceObs = merge(
fromEvent(ev, "log"),
of(1).pipe(delay(5000))
)
function simpleObservable(maxEntries: number, waitTime: number) {
return sourceObs.pipe(
bufferTime(waitTime, null, maxEntries),
filter(logs => !!logs.length),
)
}
simpleObservable(10, 2000)
.subscribe(data => {
console.log(`received: ${data.length}`);
});
for (let i = 0; i < 19; i++) {
ev.emit("log", { hello: "world" });
}
// some other events are fired after 6 seconds, to make the source more complex
setTimeout(() => {
for (let i = 0; i < 17; i++) {
ev.emit("log", { hello: "world" });
}
}, 6000);
Try to insert a simple tap(d => console.log(JSON.stringify(d))),
operator before take(10)
and after bufferTime
and you see what happens.
Basically you have a loop that emits 100 events. Out of this loop you create an Observable which notifies 100 times the object emitted by you event loop.
This whole thing happens synchronously in your example.
Then the Observable is transformed by a pipe which does the following:
- first takes just the first 10 notifications via the
take
operator - then says "accumulate all the notifications which happen in the first
1 second and emit them in form of an array" - considering that the
whole emission process is synchronous and that you take the first 10
of them, thenbufferTime(1000)
will emit one array of 10 items
after 1 second - nothing else will be emitted by this stream, since
take(10)
completes the source Observable after 10 synchronous notifications,
so thefilter
operator is of no use - last, in the subscription you receive just 1 notification, i.e. the only one emitted by
bufferTime
Said that, probably what you want to achieve can be accomplished by code along these lines
const ev = new EventEmitter();
function simpleObservable(maxEntries: number, waitTime: number) {
return fromEvent(ev, "log").pipe(
bufferCount(maxEntries),
filter(logs => !!logs.length),
mergeMap(logs => of(logs).pipe(delay(waitTime)), 1),
)
}
simpleObservable(10, 1000)
.subscribe(data => {
console.log(`received: ${data.length}`);
});
for (let i = 0; i < 100; i++) {
ev.emit("log", { hello: "world" });
}
The key here is that first you slice the stream of events into arrays of 10 events each, and this is done via the bufferCount
operator.
Then you introduce the asynchronicity using mergeMap
with concurrency level set to 1 (which is equivalent to the operator concatMap
). You basically transform each array emitted by bufferCount
into another Observable via the of
function and apply a delay of 1 second to each of the new Observables. Then you concatenate them so that they are emitted each after the other with the time difference of 1 second.
UPDATED RESPONSE after @Lee Benson comment
bufferTime
can be the answer to your problem.
bufferTime
has 3 parameters:
bufferTimeSpan: specifies the lifespan of the buffer, i.e. the Observable emits the buffer and resets it after at each interval
specified by bufferTimeSpan
bufferCreationInterval: to be leftnull
for our purpose
maxBufferSize: specifies the max size of the buffer - if the size is reached, then the Observable emits the buffer and resets it
So, if we use the first and the last parameter we should be able to reach the desired objectives.
This is the test I have put together
const ev = new EventEmitter();
// I merge to the event stream created by 'ev' a single notification emitted after 5 seconds,
// just to make the thing a bit more complex
const sourceObs = merge(
fromEvent(ev, "log"),
of(1).pipe(delay(5000))
)
function simpleObservable(maxEntries: number, waitTime: number) {
return sourceObs.pipe(
bufferTime(waitTime, null, maxEntries),
filter(logs => !!logs.length),
)
}
simpleObservable(10, 2000)
.subscribe(data => {
console.log(`received: ${data.length}`);
});
for (let i = 0; i < 19; i++) {
ev.emit("log", { hello: "world" });
}
// some other events are fired after 6 seconds, to make the source more complex
setTimeout(() => {
for (let i = 0; i < 17; i++) {
ev.emit("log", { hello: "world" });
}
}, 6000);
edited Nov 11 at 21:35
answered Nov 11 at 14:05
Picci
5,08252348
5,08252348
thanks. The issue I withbufferCount
is that it's waiting until there's 10 items, and not firing if there's fewer. I'm trying to build a queue that will invoke.subscribe()
whenever there's at least 1 result and at least 1 second has passed, in batches of a maximum of 10 per time.
– Lee Benson
Nov 11 at 14:41
If I look at your example, you are emitting synchronously 100 events. What is the behavior expected in this case? Do you want to invokesubscribe
10 times, each time with an array of 10 notifications, with an interval of 1 seconds each? Do you expect the 10 notifications to be given tosubscribe
as fast as possible?
– Picci
Nov 11 at 21:01
I have updated my answer
– Picci
Nov 11 at 21:35
thanks, that's closer, but still not quite there. If we changemaxEntries
to 4 and leavewaitTime
at 1000, we get 4x "Received: 4" messages back in the console after 1s, rather than each message buffered at 1s intervals. Another example: Change the loop to emit 28 events instead of 19; we'll get "Received: 10" twice after 1000ms, followed by a final "Received: 8". What should happen is "Received: 10" after 1s, "Received: 10" after 2s, "Received: 8" after 3s.
– Lee Benson
Nov 12 at 10:57
Looking atbufferTime
some more, it seems like it emits when either the time has been exhausted, or the buffer has been reached - whichever is first. Hence in your example, "Received: 10" is printed immediately, followed by a 1000ms pause, then "Received: 9". What I'm trying to achieve is for the buffer time to be awaited always, and then as a secondary criteria, to not exceedmaxEntries
. Any extra entries should be buffered in the next iteration of bufferTime.
– Lee Benson
Nov 12 at 11:20
add a comment |
thanks. The issue I withbufferCount
is that it's waiting until there's 10 items, and not firing if there's fewer. I'm trying to build a queue that will invoke.subscribe()
whenever there's at least 1 result and at least 1 second has passed, in batches of a maximum of 10 per time.
– Lee Benson
Nov 11 at 14:41
If I look at your example, you are emitting synchronously 100 events. What is the behavior expected in this case? Do you want to invokesubscribe
10 times, each time with an array of 10 notifications, with an interval of 1 seconds each? Do you expect the 10 notifications to be given tosubscribe
as fast as possible?
– Picci
Nov 11 at 21:01
I have updated my answer
– Picci
Nov 11 at 21:35
thanks, that's closer, but still not quite there. If we changemaxEntries
to 4 and leavewaitTime
at 1000, we get 4x "Received: 4" messages back in the console after 1s, rather than each message buffered at 1s intervals. Another example: Change the loop to emit 28 events instead of 19; we'll get "Received: 10" twice after 1000ms, followed by a final "Received: 8". What should happen is "Received: 10" after 1s, "Received: 10" after 2s, "Received: 8" after 3s.
– Lee Benson
Nov 12 at 10:57
Looking atbufferTime
some more, it seems like it emits when either the time has been exhausted, or the buffer has been reached - whichever is first. Hence in your example, "Received: 10" is printed immediately, followed by a 1000ms pause, then "Received: 9". What I'm trying to achieve is for the buffer time to be awaited always, and then as a secondary criteria, to not exceedmaxEntries
. Any extra entries should be buffered in the next iteration of bufferTime.
– Lee Benson
Nov 12 at 11:20
thanks. The issue I with
bufferCount
is that it's waiting until there's 10 items, and not firing if there's fewer. I'm trying to build a queue that will invoke .subscribe()
whenever there's at least 1 result and at least 1 second has passed, in batches of a maximum of 10 per time.– Lee Benson
Nov 11 at 14:41
thanks. The issue I with
bufferCount
is that it's waiting until there's 10 items, and not firing if there's fewer. I'm trying to build a queue that will invoke .subscribe()
whenever there's at least 1 result and at least 1 second has passed, in batches of a maximum of 10 per time.– Lee Benson
Nov 11 at 14:41
If I look at your example, you are emitting synchronously 100 events. What is the behavior expected in this case? Do you want to invoke
subscribe
10 times, each time with an array of 10 notifications, with an interval of 1 seconds each? Do you expect the 10 notifications to be given to subscribe
as fast as possible?– Picci
Nov 11 at 21:01
If I look at your example, you are emitting synchronously 100 events. What is the behavior expected in this case? Do you want to invoke
subscribe
10 times, each time with an array of 10 notifications, with an interval of 1 seconds each? Do you expect the 10 notifications to be given to subscribe
as fast as possible?– Picci
Nov 11 at 21:01
I have updated my answer
– Picci
Nov 11 at 21:35
I have updated my answer
– Picci
Nov 11 at 21:35
thanks, that's closer, but still not quite there. If we change
maxEntries
to 4 and leave waitTime
at 1000, we get 4x "Received: 4" messages back in the console after 1s, rather than each message buffered at 1s intervals. Another example: Change the loop to emit 28 events instead of 19; we'll get "Received: 10" twice after 1000ms, followed by a final "Received: 8". What should happen is "Received: 10" after 1s, "Received: 10" after 2s, "Received: 8" after 3s.– Lee Benson
Nov 12 at 10:57
thanks, that's closer, but still not quite there. If we change
maxEntries
to 4 and leave waitTime
at 1000, we get 4x "Received: 4" messages back in the console after 1s, rather than each message buffered at 1s intervals. Another example: Change the loop to emit 28 events instead of 19; we'll get "Received: 10" twice after 1000ms, followed by a final "Received: 8". What should happen is "Received: 10" after 1s, "Received: 10" after 2s, "Received: 8" after 3s.– Lee Benson
Nov 12 at 10:57
Looking at
bufferTime
some more, it seems like it emits when either the time has been exhausted, or the buffer has been reached - whichever is first. Hence in your example, "Received: 10" is printed immediately, followed by a 1000ms pause, then "Received: 9". What I'm trying to achieve is for the buffer time to be awaited always, and then as a secondary criteria, to not exceed maxEntries
. Any extra entries should be buffered in the next iteration of bufferTime.– Lee Benson
Nov 12 at 11:20
Looking at
bufferTime
some more, it seems like it emits when either the time has been exhausted, or the buffer has been reached - whichever is first. Hence in your example, "Received: 10" is printed immediately, followed by a 1000ms pause, then "Received: 9". What I'm trying to achieve is for the buffer time to be awaited always, and then as a secondary criteria, to not exceed maxEntries
. Any extra entries should be buffered in the next iteration of bufferTime.– Lee Benson
Nov 12 at 11:20
add a comment |
up vote
0
down vote
After reading your last comment, a way to consider to solve the problem could be to combine bufferTime
with interval
using zip
.
Basically the idea is that you can set a rhythm of notifications using interval
- for instance you set interval(1000)
to have an Observable that emits every second.
Then you can use bufferTime(1000, null, 10)
to make sure that you emit an array every 1 second or if your buffer reaches 10 items, whichever comes first.
Now if you zip
these 2 Observables you obtain an Observable which emits every second, because of interval(1000)
, and it emits also whatever comes out of bufferTime(1000, 0, 10)
in sequence. So, if the source Observable emits more than 10 items in a second, the first 10 will be emitted in the first notification, the others will remain buffered in the Observable and will be emitted in the subsequent notifications in chunks of 10 every second.
The code probably makes this clearer.
const ev = new EventEmitter();
// I merge to the event stream created by 'ev' a single notification emitted after 5 seconds,
// just to make the thing a bit more complex
const sourceObs = merge(
fromEvent(ev, "log"),
of(1).pipe(delay(5000))
)
function simpleObservable(maxEntries: number, waitTime: number) {
return zip(
timer(0, waitTime),
sourceObs.pipe(
bufferTime(waitTime, null, maxEntries),
)
)
.pipe(
filter(logs => !!logs[1].length),
map(logs => logs[1])
)
}
const maxEntries = 4;
const waitTime = 1000;
simpleObservable(maxEntries, waitTime)
.subscribe(data => {
console.log(`received: ${data.length}`);
});
for (let i = 0; i < 19; i++) {
ev.emit("log", { hello: "world " + i + 'A' });
}
// some other events are fired after 8 seconds, to make the source more complex
setTimeout(() => {
for (let i = 0; i < 17; i++) {
ev.emit("log", { hello: "world " + i + 'B' });
}
}, 8000);
Clearly you need to consider that, if the source Observable emits at an speed higher than your ability to consume notifications, you may end up with memory problems.
add a comment |
up vote
0
down vote
After reading your last comment, a way to consider to solve the problem could be to combine bufferTime
with interval
using zip
.
Basically the idea is that you can set a rhythm of notifications using interval
- for instance you set interval(1000)
to have an Observable that emits every second.
Then you can use bufferTime(1000, null, 10)
to make sure that you emit an array every 1 second or if your buffer reaches 10 items, whichever comes first.
Now if you zip
these 2 Observables you obtain an Observable which emits every second, because of interval(1000)
, and it emits also whatever comes out of bufferTime(1000, 0, 10)
in sequence. So, if the source Observable emits more than 10 items in a second, the first 10 will be emitted in the first notification, the others will remain buffered in the Observable and will be emitted in the subsequent notifications in chunks of 10 every second.
The code probably makes this clearer.
const ev = new EventEmitter();
// I merge to the event stream created by 'ev' a single notification emitted after 5 seconds,
// just to make the thing a bit more complex
const sourceObs = merge(
fromEvent(ev, "log"),
of(1).pipe(delay(5000))
)
function simpleObservable(maxEntries: number, waitTime: number) {
return zip(
timer(0, waitTime),
sourceObs.pipe(
bufferTime(waitTime, null, maxEntries),
)
)
.pipe(
filter(logs => !!logs[1].length),
map(logs => logs[1])
)
}
const maxEntries = 4;
const waitTime = 1000;
simpleObservable(maxEntries, waitTime)
.subscribe(data => {
console.log(`received: ${data.length}`);
});
for (let i = 0; i < 19; i++) {
ev.emit("log", { hello: "world " + i + 'A' });
}
// some other events are fired after 8 seconds, to make the source more complex
setTimeout(() => {
for (let i = 0; i < 17; i++) {
ev.emit("log", { hello: "world " + i + 'B' });
}
}, 8000);
Clearly you need to consider that, if the source Observable emits at an speed higher than your ability to consume notifications, you may end up with memory problems.
add a comment |
up vote
0
down vote
up vote
0
down vote
After reading your last comment, a way to consider to solve the problem could be to combine bufferTime
with interval
using zip
.
Basically the idea is that you can set a rhythm of notifications using interval
- for instance you set interval(1000)
to have an Observable that emits every second.
Then you can use bufferTime(1000, null, 10)
to make sure that you emit an array every 1 second or if your buffer reaches 10 items, whichever comes first.
Now if you zip
these 2 Observables you obtain an Observable which emits every second, because of interval(1000)
, and it emits also whatever comes out of bufferTime(1000, 0, 10)
in sequence. So, if the source Observable emits more than 10 items in a second, the first 10 will be emitted in the first notification, the others will remain buffered in the Observable and will be emitted in the subsequent notifications in chunks of 10 every second.
The code probably makes this clearer.
const ev = new EventEmitter();
// I merge to the event stream created by 'ev' a single notification emitted after 5 seconds,
// just to make the thing a bit more complex
const sourceObs = merge(
fromEvent(ev, "log"),
of(1).pipe(delay(5000))
)
function simpleObservable(maxEntries: number, waitTime: number) {
return zip(
timer(0, waitTime),
sourceObs.pipe(
bufferTime(waitTime, null, maxEntries),
)
)
.pipe(
filter(logs => !!logs[1].length),
map(logs => logs[1])
)
}
const maxEntries = 4;
const waitTime = 1000;
simpleObservable(maxEntries, waitTime)
.subscribe(data => {
console.log(`received: ${data.length}`);
});
for (let i = 0; i < 19; i++) {
ev.emit("log", { hello: "world " + i + 'A' });
}
// some other events are fired after 8 seconds, to make the source more complex
setTimeout(() => {
for (let i = 0; i < 17; i++) {
ev.emit("log", { hello: "world " + i + 'B' });
}
}, 8000);
Clearly you need to consider that, if the source Observable emits at an speed higher than your ability to consume notifications, you may end up with memory problems.
After reading your last comment, a way to consider to solve the problem could be to combine bufferTime
with interval
using zip
.
Basically the idea is that you can set a rhythm of notifications using interval
- for instance you set interval(1000)
to have an Observable that emits every second.
Then you can use bufferTime(1000, null, 10)
to make sure that you emit an array every 1 second or if your buffer reaches 10 items, whichever comes first.
Now if you zip
these 2 Observables you obtain an Observable which emits every second, because of interval(1000)
, and it emits also whatever comes out of bufferTime(1000, 0, 10)
in sequence. So, if the source Observable emits more than 10 items in a second, the first 10 will be emitted in the first notification, the others will remain buffered in the Observable and will be emitted in the subsequent notifications in chunks of 10 every second.
The code probably makes this clearer.
const ev = new EventEmitter();
// I merge to the event stream created by 'ev' a single notification emitted after 5 seconds,
// just to make the thing a bit more complex
const sourceObs = merge(
fromEvent(ev, "log"),
of(1).pipe(delay(5000))
)
function simpleObservable(maxEntries: number, waitTime: number) {
return zip(
timer(0, waitTime),
sourceObs.pipe(
bufferTime(waitTime, null, maxEntries),
)
)
.pipe(
filter(logs => !!logs[1].length),
map(logs => logs[1])
)
}
const maxEntries = 4;
const waitTime = 1000;
simpleObservable(maxEntries, waitTime)
.subscribe(data => {
console.log(`received: ${data.length}`);
});
for (let i = 0; i < 19; i++) {
ev.emit("log", { hello: "world " + i + 'A' });
}
// some other events are fired after 8 seconds, to make the source more complex
setTimeout(() => {
for (let i = 0; i < 17; i++) {
ev.emit("log", { hello: "world " + i + 'B' });
}
}, 8000);
Clearly you need to consider that, if the source Observable emits at an speed higher than your ability to consume notifications, you may end up with memory problems.
answered Nov 12 at 17:52
Picci
5,08252348
5,08252348
add a comment |
add a comment |
Thanks for contributing an answer to Stack Overflow!
- Please be sure to answer the question. Provide details and share your research!
But avoid …
- Asking for help, clarification, or responding to other answers.
- Making statements based on opinion; back them up with references or personal experience.
To learn more, see our tips on writing great answers.
Some of your past answers have not been well-received, and you're in danger of being blocked from answering.
Please pay close attention to the following guidance:
- Please be sure to answer the question. Provide details and share your research!
But avoid …
- Asking for help, clarification, or responding to other answers.
- Making statements based on opinion; back them up with references or personal experience.
To learn more, see our tips on writing great answers.
Sign up or log in
StackExchange.ready(function () {
StackExchange.helpers.onClickDraftSave('#login-link');
});
Sign up using Google
Sign up using Facebook
Sign up using Email and Password
Post as a guest
Required, but never shown
StackExchange.ready(
function () {
StackExchange.openid.initPostLogin('.new-post-login', 'https%3a%2f%2fstackoverflow.com%2fquestions%2f53248669%2frxjs-receiving-data-in-batches-of-time-max%23new-answer', 'question_page');
}
);
Post as a guest
Required, but never shown
Sign up or log in
StackExchange.ready(function () {
StackExchange.helpers.onClickDraftSave('#login-link');
});
Sign up using Google
Sign up using Facebook
Sign up using Email and Password
Post as a guest
Required, but never shown
Sign up or log in
StackExchange.ready(function () {
StackExchange.helpers.onClickDraftSave('#login-link');
});
Sign up using Google
Sign up using Facebook
Sign up using Email and Password
Post as a guest
Required, but never shown
Sign up or log in
StackExchange.ready(function () {
StackExchange.helpers.onClickDraftSave('#login-link');
});
Sign up using Google
Sign up using Facebook
Sign up using Email and Password
Sign up using Google
Sign up using Facebook
Sign up using Email and Password
Post as a guest
Required, but never shown
Required, but never shown
Required, but never shown
Required, but never shown
Required, but never shown
Required, but never shown
Required, but never shown
Required, but never shown
Required, but never shown
unfortunately cannot give your a solution for your problem, but you may have a look at
bufferCount
andbufferTime
learnrxjs.io/operators/transformation/buffercount.html learnrxjs.io/operators/transformation/buffertime.html– Oleksandr Fedotov
Nov 11 at 12:57
@OleksandrFedotov - thanks. I'm already (attempting to) use
bufferTime
in my example code, andbufferCount
looks like it waits until the buffer is full, which is not what I want. I just want to take a maximum of 10 and run - at most - once per second... then repeat ad infinitum.– Lee Benson
Nov 11 at 13:09
maybe the expand operator can help here for the recursion: learnrxjs.io/operators/transformation/expand.html
– René Winkler
Nov 11 at 14:52
@RenéWinkler - could you point to any examples of how
expand
might apply in this case, please?– Lee Benson
Nov 11 at 15:16