1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42
| from kafka import KafkaProducer, KafkaConsumer from kafka.errors import kafka_errors import traceback import json
class KafkaProducerSDK(object): def __init__(self, hostname) -> None: self.hostname = hostname self.producer = KafkaProducer( bootstrap_servers=[self.hostname], key_serializer=lambda k: json.dumps(k).encode(), value_serializer=lambda v: json.dumps(v).encode() )
def produce(self, message): future = self.producer.send( 'mykafka', key='count_num', value=message, partition=0) print("send {}".format(message)) try: future.get(timeout=10) except kafka_errors: print(traceback.format_exc())
class KafkaConsumerSDK(object): def __init__(self, topic, hostname) -> None: self.topic = topic self.hostname = hostname self.consumer = KafkaConsumer( self.topic, bootstrap_servers=[self.hostname], auto_offset_reset='latest' )
def consume(self): for message in self.consumer: if message: print ("%s:%d:%d: key=%s value=%s" % (message.topic, message.partition, message.offset, message.key, message.value)) else: print("no message")
|