unable to publish/consume message to/from kafka in testng?

classic Classic list List threaded Threaded
2 messages Options
Reply | Threaded
Open this post in threaded view
|

unable to publish/consume message to/from kafka in testng?

James Yu
Hi,

I try to publish messages to a kafka topic in testng with the following snippet:

Properties props = new Properties();
props.put("bootstrap.servers", ""+details.get("bootstrapServers"));
props.put("key.serializer", StringSerializer.class.getName());
props.put("value.serializer", StringSerializer.class.getName());
Producer<String, String> producer = new org.apache.kafka.clients.producer.KafkaProducer<String, String>(props);

int nrPartition = (int) producerInfo.get("nrPartition");
for(SomeRecord rec : records) {
String msg = gson.toJson(rec);
int partition = msg.hashCode() % nrPartition;
producer.send(new ProducerRecord<String, String>(""+producerInfo.get("topic"), partition, "a", "a"));
}
producer.close();

and I get the following exception:

2018-05-13 15:33:58.540 WARN  o.a.kafka.common.network.Selector.pollSelectionKeys[531] - [Producer clientId=producer-1] Unexpected error from localhost/127.0.0.1; closing connection
org.apache.kafka.common.network.InvalidReceiveException: Invalid receive (size = -2062548992)
at org.apache.kafka.common.network.NetworkReceive.readFromReadableChannel(NetworkReceive.java:130)
at org.apache.kafka.common.network.NetworkReceive.readFrom(NetworkReceive.java:93)
at org.apache.kafka.common.network.KafkaChannel.receive(KafkaChannel.java:235)
at org.apache.kafka.common.network.KafkaChannel.read(KafkaChannel.java:196)
at org.apache.kafka.common.network.Selector.attemptRead(Selector.java:557)
at org.apache.kafka.common.network.Selector.pollSelectionKeys(Selector.java:495)
at org.apache.kafka.common.network.Selector.poll(Selector.java:424)
at org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:460)
at org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:239)
at org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:163)
at java.lang.Thread.run(Thread.java:748)

The same snippet work fine in a simple java program like this:

public class SimpleProducer {  
  public static void main(String[] args) throws Exception{  
    String topicName = "MyTopic";
    Properties props = new Properties();
    props.put("bootstrap.servers", "localhost:9092");
    props.put("key.serializer", StringSerializer.class.getName());
    props.put("value.serializer", StringSerializer.class.getName());
    Producer<String, String> producer = new KafkaProducer<String, String>(props);
    for(int i = 0; i < 10; i++) {
      producer.send(new ProducerRecord<String, String>(topicName, 0, Integer.toString(i), Integer.toString(i)));
    }
    System.out.println("Message sent successfully");
    producer.close();
  }
}

Does anyone know how to access kafka topic in testng?

Thank you.

--
You received this message because you are subscribed to the Google Groups "testng-users" group.
To unsubscribe from this group and stop receiving emails from it, send an email to [hidden email].
To post to this group, send email to [hidden email].
Visit this group at https://groups.google.com/group/testng-users.
For more options, visit https://groups.google.com/d/optout.
Reply | Threaded
Open this post in threaded view
|

Re: unable to publish/consume message to/from kafka in testng?

Cédric Beust ♔-2
This is not a TestNG question.


-- 
Cédric


On Sun, May 13, 2018 at 12:39 AM, James Yu <[hidden email]> wrote:
Hi,

I try to publish messages to a kafka topic in testng with the following snippet:

Properties props = new Properties();
props.put("bootstrap.servers", ""+details.get("bootstrapServers"));
props.put("key.serializer", StringSerializer.class.getName());
props.put("value.serializer", StringSerializer.class.getName());
Producer<String, String> producer = new org.apache.kafka.clients.producer.KafkaProducer<String, String>(props);

int nrPartition = (int) producerInfo.get("nrPartition");
for(SomeRecord rec : records) {
String msg = gson.toJson(rec);
int partition = msg.hashCode() % nrPartition;
producer.send(new ProducerRecord<String, String>(""+producerInfo.get("topic"), partition, "a", "a"));
}
producer.close();

and I get the following exception:

2018-05-13 15:33:58.540 WARN  o.a.kafka.common.network.Selector.pollSelectionKeys[531] - [Producer clientId=producer-1] Unexpected error from localhost/127.0.0.1; closing connection
org.apache.kafka.common.network.InvalidReceiveException: Invalid receive (size = -2062548992)
at org.apache.kafka.common.network.NetworkReceive.readFromReadableChannel(NetworkReceive.java:130)
at org.apache.kafka.common.network.NetworkReceive.readFrom(NetworkReceive.java:93)
at org.apache.kafka.common.network.KafkaChannel.receive(KafkaChannel.java:235)
at org.apache.kafka.common.network.KafkaChannel.read(KafkaChannel.java:196)
at org.apache.kafka.common.network.Selector.attemptRead(Selector.java:557)
at org.apache.kafka.common.network.Selector.pollSelectionKeys(Selector.java:495)
at org.apache.kafka.common.network.Selector.poll(Selector.java:424)
at org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:460)
at org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:239)
at org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:163)
at java.lang.Thread.run(Thread.java:748)

The same snippet work fine in a simple java program like this:

public class SimpleProducer {  
  public static void main(String[] args) throws Exception{  
    String topicName = "MyTopic";
    Properties props = new Properties();
    props.put("bootstrap.servers", "localhost:9092");
    props.put("key.serializer", StringSerializer.class.getName());
    props.put("value.serializer", StringSerializer.class.getName());
    Producer<String, String> producer = new KafkaProducer<String, String>(props);
    for(int i = 0; i < 10; i++) {
      producer.send(new ProducerRecord<String, String>(topicName, 0, Integer.toString(i), Integer.toString(i)));
    }
    System.out.println("Message sent successfully");
    producer.close();
  }
}

Does anyone know how to access kafka topic in testng?

Thank you.

--
You received this message because you are subscribed to the Google Groups "testng-users" group.
To unsubscribe from this group and stop receiving emails from it, send an email to [hidden email].
To post to this group, send email to [hidden email].
Visit this group at https://groups.google.com/group/testng-users.
For more options, visit https://groups.google.com/d/optout.

--
You received this message because you are subscribed to the Google Groups "testng-users" group.
To unsubscribe from this group and stop receiving emails from it, send an email to [hidden email].
To post to this group, send email to [hidden email].
Visit this group at https://groups.google.com/group/testng-users.
For more options, visit https://groups.google.com/d/optout.