package streamimport ( "log" "reflect" "sort")type ( // a Stream is where one can drain data from Stream chan interface{} // buffer stream BufferStream struct { Stream Size int } // retains those element agreed with FilterFunc FilterFunc func(i interface{}) bool // used by sort LessFunc func(i, j interface{}) bool // KeyFunc func(i interface{}) interface{})// instance an empty Streamfunc NewStream() Stream { return make(chan interface{})}// init a Stream from a sourcefunc (s Stream) From(source interface{}) Stream { v := reflect.ValueOf(source) switch k := v.Kind(); k { case reflect.Slice: go func() { defer close(s) for i := 0; i < v.Len(); i++ { s <- v.Index(i).Interface() } }() default: panic("got a non-slice kind source") } return s}func (s Stream) Retain(f FilterFunc) Stream { c := make(chan interface{}) go func() { defer close(c) for i := range s { if f(i) { c <- i } } }() return c}func (s Stream) Sort(lessFunc LessFunc) Stream { cache := make([]interface{}, 0) for i := range s { cache = append(cache, i) } sort.Slice(cache, func(i, j int) bool { return lessFunc(cache[i], cache[j]) }) return NewStream().From(cache)}func (s Stream) Reverse() Stream { var items []interface{} for item := range s { items = append(items, item) } // reverse, official method for i := len(items)/2 - 1; i >= 0; i-- { opp := len(items) - 1 - i items[i], items[opp] = items[opp], items[i] } return NewStream().From(items)}// sink: printfunc (s Stream) Print() { for i := range s { log.Println(i) }}// sink: sizefunc (s Stream) Size() int { count := 0 for range s { count += 1 } return count}// sinkfunc (s Stream) First(n int) Stream { c := make(chan interface{}, n) count := 0 go func() { defer close(c) for i := range s { if count < n { c <- i count += 1 } } }() return c}// sinkfunc (s Stream) FirstOne() (interface{}, bool) { c := s.First(1) cache := make([]interface{}, 0) for i := range c { cache = append(cache, i) } if len(cache) == 0 { return nil, false } return cache[0], true}// sinkfunc (s Stream) Sum(f KeyFunc) float64 { result := 0.0 for i := range s { v := reflect.ValueOf(f(i)) switch v.Kind() { case reflect.Int, reflect.Int8, reflect.Int16, reflect.Int32, reflect.Int64: result += float64(v.Int()) case reflect.Float32, reflect.Float64: result += v.Float() } } return result}
stream
package streamtype ( GroupedRecord struct { Key interface{} Value []interface{} } AggregatedRecord struct { Key interface{} Value interface{} } AggFunc func(elements []interface{}) interface{} GroupedStream chan GroupedRecord AggregatedGroupedStream chan AggregatedRecord)func (s Stream) GroupBy(f KeyFunc) GroupedStream { c := make(chan GroupedRecord) cache := make(map[interface{}][]interface{}) for i := range s { key := f(i) cache[key] = append(cache[key], i) } go func() { defer close(c) for k, v := range cache { c <- GroupedRecord{ Key: k, Value: v, } } }() return c}func (gs GroupedStream) Agg(aggFunc AggFunc) AggregatedGroupedStream { c := make(chan AggregatedRecord) go func() { defer close(c) for gr := range gs { c <- AggregatedRecord{ Key: gr.Key, Value: aggFunc(gr.Value), } } }() return c}func (ags AggregatedGroupedStream) Gather() map[interface{}]interface{} { r := make(map[interface{}]interface{}) for i := range ags { r[i.Key] = i.Value } return r}
stream group
package streamimport "log"type ( JoinedValue struct { Left interface{} Right interface{} } JoinedRecord struct { Key interface{} Value []JoinedValue } JoinedStream chan JoinedRecord JoinedFilterFunc func(left, right interface{}) bool JoinAggFunc func(left, right interface{}) interface{})func Join(left, right Stream, leftBy, rightBy KeyFunc) JoinedStream { c := make(chan JoinedRecord) cache := make(map[interface{}][]JoinedValue) leftCache, rightCache := make([]interface{}, 0), make([]interface{}, 0) for i := range left { leftCache = append(leftCache, i) } for j := range right { rightCache = append(rightCache, j) } for _, i := range leftCache { for _, j := range rightCache { keyLeft, keyRight := leftBy(i), rightBy(j) if keyLeft == keyRight { cache[keyLeft] = append(cache[keyLeft], JoinedValue{ Left: i, Right: j, }) } } } go func() { defer close(c) for k, v := range cache { c <- JoinedRecord{ Key: k, Value: v, } } }() return c}func (js JoinedStream) Filter(f JoinedFilterFunc) JoinedStream { c := make(chan JoinedRecord) go func() { defer close(c) for jr := range js { cache := make([]JoinedValue, 0) for _, jv := range jr.Value { if f(jv.Left, jv.Right) { cache = append(cache, jv) } } if len(cache) != 0 { c <- JoinedRecord{ Key: jr.Key, Value: cache, } } } }() return c}// sinkfunc (js JoinedStream) Print() { for i := range js { log.Println(i) }}// convert a stream of (key,[]JoinValue) to a stream of (key, []interface)// a.k.a a JoinedStream -> a GroupedStreamfunc (js JoinedStream) Fold(f JoinAggFunc) GroupedStream { c := make(chan GroupedRecord) go func() { defer close(c) for jr := range js { cache := make([]interface{}, 0) for _, jv := range jr.Value { cache = append(cache, f(jv.Left, jv.Right)) } c <- GroupedRecord{ Key: jr.Key, Value: cache, } } }() return c}
stream join