Rich Hickey a écrit :
> Again, I don't see the enormous side effect. Steams form a safe,
> stateful pipeline, you'll generally only call seq on the end of the
> pipe. If you ask for a seq on a stream you are asking for a (lazy)
> reification. That reification and ownership is what makes the pipeline
> safe.
>
> I am working on seq/stream api unification right now, and we will see
> how often we'll be calling seq fns yet subsequently using as a stream.
> Many of those places where seq is called will now call stream instead
> (e.g. sequence fn entry points), and there may be a non-generator-
> capturing function for determining eos.
I undesrtand but I found this behaviour surprising :
user=> (defn take1 [s]
(let [i (stream-iter s)
n (next! i nil)]
(detach! i)
n))
(defn touch [s] (seq s) s)
(def s1 (stream (range 10)))
user=> (take1 s1)
0
user=> (take1 s1)
1
user=> (take1 s1)
2
user=> (touch s1)
#<AStream clojure.lang.astr...@19ee8a>
user=> (take1 s1)
3
user=> (take1 s1)
3
user=> (take1 s1)
3
; s1 is stuck on 3 because stream-iter returns a new iter on a new
stream on the canonical seq for s1
With the attached patch, you get:
user=> (defn take1 [s]
(let [i (stream-iter s)
n (next! i nil)]
(detach! i)
n))
(def s1 (stream (range 10)))
user=> (take1 s1)
0
user=> (take1 s1)
1
user=> (take1 s1)
2
user=> (seq s1)
(3 4 5 6 7 8 9)
user=> (identical? (seq s1) (seq s1))
true
user=> (take1 s1)
3
user=> (take1 s1)
4
user=> (first s1)
5
;; seq lookup or realization don't consume the stream:
user=> (seq s1)
(5 6 7 8 9)
user=> (identical? (seq s1) (seq s1))
true
user=> (first s1)
5
user=> (take1 s1)
5
user=> (take1 s1)
6
I relaxed the constraint saying that "a stream ensures that /*every call
to seq on a stream will return the same seq" to be */"a stream ensures
that /*every call to seq on a stream will return the same seq as long as
the stream state doesn't change".*/
/*What did I lose?
Christophe
*/
--~--~---------~--~----~------------~-------~--~----~
You received this message because you are subscribed to the Google Groups
"Clojure" group.
To post to this group, send email to [email protected]
To unsubscribe from this group, send email to
[email protected]
For more options, visit this group at
http://groups.google.com/group/clojure?hl=en
-~----------~----~----~----~------~----~------~--~---
Index: C:/Documents and Settings/Christophe/workspaces/clojure/clojure-streams/src/jvm/clojure/lang/AStream.java
===================================================================
--- C:/Documents and Settings/Christophe/workspaces/clojure/clojure-streams/src/jvm/clojure/lang/AStream.java (revision 1222)
+++ C:/Documents and Settings/Christophe/workspaces/clojure/clojure-streams/src/jvm/clojure/lang/AStream.java (working copy)
@@ -16,28 +16,44 @@
static final ISeq NO_SEQ = new Cons(null, null);
- ISeq seq = NO_SEQ;
+ boolean coseq;
+ AStream seqstream = null;
final IFn src;
Cons pushed = null;
Iter iter = null;
public AStream(IFn src) {
+ this(src, false);
+ }
+
+ public AStream(IFn src, boolean coseq) {
this.src = src;
+ this.coseq = coseq;
}
final synchronized public ISeq seq() {
- if (seq == NO_SEQ)
+ if (coseq)
+ try {
+ return (ISeq) src.invoke();
+ } catch (Exception e) {
+ throw new RuntimeException(e);
+ }
+ if (seqstream == null)
{
iter();
- seq = Seq.create(pushed,src);
+ try {
+ seqstream = RT.stream(Seq.create(pushed,src));
+ } catch (Exception e) {
+ throw new RuntimeException(e);
+ }
}
- return seq;
+ return seqstream.seq();
}
final synchronized public AStream stream() throws Exception {
- if (seq == NO_SEQ)
+ if (seqstream == null)
return this;
- return RT.stream(seq);
+ return seqstream;
}
final synchronized public Iter iter() {
Index: C:/Documents and Settings/Christophe/workspaces/clojure/clojure-streams/src/jvm/clojure/lang/RT.java
===================================================================
--- C:/Documents and Settings/Christophe/workspaces/clojure/clojure-streams/src/jvm/clojure/lang/RT.java (revision 1222)
+++ C:/Documents and Settings/Christophe/workspaces/clojure/clojure-streams/src/jvm/clojure/lang/RT.java (working copy)
@@ -244,6 +244,10 @@
}
static final public IFn EMPTY_GEN = new AFn(){
+ synchronized public Object invoke() throws Exception {
+ return null;
+ }
+
synchronized public Object invoke(Object eos) throws Exception {
return eos;
}
@@ -461,7 +465,7 @@
static public AStream stream(final Object coll) throws Exception{
if(coll == null)
- return new AStream(EMPTY_GEN);
+ return new AStream(EMPTY_GEN, true);
else if(coll instanceof Streamable)
return ((Streamable) coll).stream();
else if(coll instanceof Fn)
Index: C:/Documents and Settings/Christophe/workspaces/clojure/clojure-streams/src/jvm/clojure/lang/ASeq.java
===================================================================
--- C:/Documents and Settings/Christophe/workspaces/clojure/clojure-streams/src/jvm/clojure/lang/ASeq.java (revision 1222)
+++ C:/Documents and Settings/Christophe/workspaces/clojure/clojure-streams/src/jvm/clojure/lang/ASeq.java (working copy)
@@ -177,7 +177,7 @@
}
public AStream stream() throws Exception {
- return new AStream(new Src(this));
+ return new AStream(new Src(this), true);
}
static class Src extends AFn{
@@ -187,6 +187,10 @@
this.s = s;
}
+ public Object invoke() throws Exception {
+ return s;
+ }
+
public Object invoke(Object eos) throws Exception {
if(s != null)
{