from confluent_kafka import Producer, KafkaException
import json
from datetime import datetime
from requests_oauthlib import OAuth2Session
# Kafka 프로듀서 설정
conf = {
'bootstrap.servers': 'kafka01.mqnicrnd5.com:9093,kafka02.mqnicrnd5.com:9093,kafka03.mqnicrnd5.com:9093',
'security.protocol': 'sasl_plaintext',
'sasl.mechanism': 'OAUTHBEARER',
'sasl.oauthbearer.method': 'oidc',
'sasl.oauthbearer.client.id': '<Client ID>', # Change it
'sasl.oauthbearer.client.secret': '<Client Secret>', # Change it
'sasl.oauthbearer.token.endpoint.url': '<Token URL>', # Change it
}
producer = Producer(conf)
# 현재 시간을 YYYY-MM-DD HH:MM:SS 형식으로 생성
current_timestamp = datetime.now().strftime("%Y-%m-%d %H:%M:%S")
# 데이터 객체 정의. 알맞게 정의하여 사용하세요.
data_object = {
"sourceId":"RSU00041",
"sDSMTimeStamp":current_timestamp,
"edgeSensorStatus":[
{
"edgeId":"C0001",
"powerControl":{
"mode":"AUTO",
"status":"ON"
},
"fanControl":{
"mode":"AUTO",
"status":"ON"
},
"heaterControl":{
"mode":"AUTO",
"status":"OFF"
},
"preControl":{
"mode":"MANUAL",
"status":"OFF"
},
"doorStatus":{
"front":"OPEN",
"back":"CLOSE"
},
"sensorStatus":1,
"temp":60,
"humidity":50,
"sensorErrCode":0
}
]
}
# JSON 형식의 데이터 생성
json_data = json.dumps(data_object)
# 메시지 전송
try:
producer.produce('esi_collect_json', json_data.encode('utf-8')) # Change it
print(f'Message sent successfully [sDSMTimeStamp: {current_timestamp}]')
except KafkaException as e:
print(f'Failed to send message: {e}')
finally:
producer.flush()