Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

standalone kafkareader no print #184

Open
bronzels opened this issue Dec 27, 2019 · 12 comments
Open

standalone kafkareader no print #184

bronzels opened this issue Dec 27, 2019 · 12 comments

Comments

@bronzels
Copy link

hi, chris, this time i run a standalone kafkareader connecting to a remote kafka cluster. no error reported, but no print after i put into messages into the "test" topic by kafka-console-consumer.sh. i changed the %x to %s, still no print.

@bronzels
Copy link
Author

package main

import (
"flag"
"path/filepath"
"strings"

"os"

"github.com/chrislusf/gleam/flow"
"github.com/chrislusf/gleam/gio"
"github.com/chrislusf/gleam/plugins/kafka"
)

var (
brokers = flag.String("brokers", "beta-hbase02:9092,beta-hbase03:9092,beta-hbase04:9092", "a list of comma separated broker:port")
topic = flag.String("topic", "test", "the topic name")
group = flag.String("group", filepath.Base(os.Args[0]), "the consumer group name")
timeout = flag.Int("timeout", 30, "the number of seconds for timeout connections")
)

func main() {

gio.Init()
flag.Parse()

brokerList := strings.Split(*brokers, ",")

k := kafka.New(brokerList, *topic, *group)
k.TimeoutSeconds = *timeout

f := flow.New("kafka " + *topic).Read(k).Printlnf("%x")

f.Run()

}

@bronzels
Copy link
Author

k is like this:

@bronzels
Copy link
Author

k = {*github.com/chrislusf/gleam/plugins/kafka.KafkaSource | 0xc0000dc7d0}
Brokers = {[]string} len:3, cap:3
0 = {string} "beta-hbase02:9092"
1 = {string} "beta-hbase03:9092"
2 = {string} "beta-hbase04:9092"
Group = {string} "___go_build_gocrawler_gleamtest_standalone_kafkareader"
Topic = {string} "test"
TimeoutSeconds = {int} 16
prefix = {string} "test"

@chrislusf
Copy link
Owner

try to add some actual work between read and println.

@bronzels
Copy link
Author

added a Map between read and println, still no print, debug with break point set in the 1st line of capitalize, no stop after new msgs sent into the topic.

@bronzels
Copy link
Author

Capitalize  = gio.RegisterMapper(capitalize)

)

func main() {

gio.Init()
flag.Parse()

brokerList := strings.Split(*brokers, ",")

k := kafka.New(brokerList, *topic, *group)
k.TimeoutSeconds = *timeout

f := flow.New("kafka " + *topic).Read(k).Map("capitalize", Capitalize).Printlnf("%x")

f.Run()

}

func capitalize(row []interface{}) error {
line := gio.ToString(row[0])
gio.Emit(strings.ToUpper(line))
return nil
}

@bronzels
Copy link
Author

i tried connect to a not existing topic, the topic appear after gleam app is started, but nothing active detected in the gleam app, here is the summary after app is stopped:

@bronzels
Copy link
Author

step:testgleam.list0
output : d0
shard:0 time:306.505µs completed 285
step:testgleam.Read.Map1
input : d0
shard:0 time:306.505µs completed 285
output : d1
shard:0 time:1m16.258485573s processed 0
step:capitalize.Map2
input : d1
shard:0 time:1m16.258495764s processed 0
output : d2
shard:0 time:1m16.258422547s processed 0

@bronzels
Copy link
Author

BTW, default partition num in the remote kafka cluster is 12

@bronzels
Copy link
Author

or if there is a QQ group for gleam discussion, pls. share the id

@chrislusf
Copy link
Owner

connect to some real kafka topic. There are some buffers. If not much traffic, the data are buffered.

@bronzels
Copy link
Author

great! after connect to a real topic in production, it is working now. how to control the buffer, i would like each new msg is dealt by the Map ASAP without buffering.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

2 participants