Skip to content

Commit

Permalink
Fix code review comments
Browse files Browse the repository at this point in the history
  • Loading branch information
yossipapi committed Jul 27, 2022
1 parent 05ee8b8 commit 4ffa01d
Show file tree
Hide file tree
Showing 3 changed files with 22 additions and 12 deletions.
2 changes: 2 additions & 0 deletions alpha/config/kConfMapNames.php
Original file line number Diff line number Diff line change
Expand Up @@ -16,4 +16,6 @@ class kConfMapNames
const DYNAMIC_EMAIL_CONTENTS = 'dynamic_email_contents';
const REDIS = 'redis';
const LIVE_SETTINGS = 'live';
const AVRO_SCHEMA_REGISTRY = 'schema_registry';
const KAFKA = 'kafka';
}
Original file line number Diff line number Diff line change
Expand Up @@ -84,7 +84,7 @@ public function dispatch(kScope $scope)
return;
}

if(!kConf::hasMap('kafka'))
if(!kConf::hasMap(kConfMapNames::KAFKA))
{
KalturaLog::debug("Kafka configuration file (kafka.ini) wasn't found!");
return;
Expand Down Expand Up @@ -154,7 +154,7 @@ public function dispatch(kScope $scope)
}
}

private function getKafkaPayload($topicName, $msg, $messageFormat)
protected function getKafkaPayload($topicName, $msg, $messageFormat)
{
$kafkaPayload = null;

Expand Down Expand Up @@ -184,7 +184,7 @@ private function getKafkaPayload($topicName, $msg, $messageFormat)
* @throws \FlixTech\SchemaRegistryApi\Exception\SchemaRegistryException
* @throws kCoreException
*/
private function getAvroPayload($topicName, $msg)
protected function getAvroPayload($topicName, $msg)
{
list($schema, $schemaId) = $this->getSchemaInfo($topicName);
if(!($schema && $schemaId))
Expand All @@ -194,12 +194,12 @@ private function getAvroPayload($topicName, $msg)
return $this->buildAvroPayload($schemaId, $schema, $msg);
}

private function getSchemaInfo($subject)
protected function getSchemaInfo($subject)
{
$schemaId = null;
$schema = null;

if(!kConf::hasMap('schemaRegistry'))
if(!kConf::hasMap(kConfMapNames::AVRO_SCHEMA_REGISTRY))
{
throw new kCoreException("schema registry configuration file (schemaRegistry.ini) wasn't found!");
}
Expand Down Expand Up @@ -287,7 +287,7 @@ private function getSchemaInfo($subject)
* @return string
* @throws AvroIOException
*/
private function buildAvroPayload($schemaId, $schema, $msg)
protected function buildAvroPayload($schemaId, $schema, $msg)
{
$io = new \AvroStringIO();
$io->write(pack('C', 0));
Expand All @@ -298,7 +298,7 @@ private function buildAvroPayload($schemaId, $schema, $msg)
return $io->string();
}

private function buildMessageOldValues($oldValues)
protected function buildMessageOldValues($oldValues)
{
$result = array();
foreach ($oldValues as $key => $value)
Expand Down
18 changes: 13 additions & 5 deletions plugins/queue/providers/kafka/KafkaPlugin.php
Original file line number Diff line number Diff line change
Expand Up @@ -26,8 +26,10 @@ public static function getPluginName()
*/
public static function getObjectClass($baseClass, $enumValue)
{
if ($baseClass == 'QueueProvider' && $enumValue == self::getKafakaQueueProviderTypeCoreValue(KafkaProviderType::KAFKA)) {
if (!kConf::hasMap('kafka')) {
if ($baseClass == 'QueueProvider' && $enumValue == self::getKafakaQueueProviderTypeCoreValue(KafkaProviderType::KAFKA))
{
if (!kConf::hasMap(kConfMapNames::KAFKA))
{
throw new kCoreException("Kafka configuration file (kafka.ini) wasn't found!");
}

Expand All @@ -41,12 +43,14 @@ public static function getObjectClass($baseClass, $enumValue)
*/
public static function loadObject($baseClass, $enumValue, array $constructorArgs = null)
{
if ($baseClass == 'QueueProvider' && $enumValue == self::getKafakaQueueProviderTypeCoreValue(KafkaProviderType::KAFKA)) {
if (!kConf::hasMap('kafka')) {
if ($baseClass == 'QueueProvider' && $enumValue == self::getKafakaQueueProviderTypeCoreValue(KafkaProviderType::KAFKA))
{
if (!kConf::hasMap(kConfMapNames::KAFKA))
{
throw new kCoreException("Kafka configuration file (kafka.ini) wasn't found!");
}

$kafkaConfig = kConf::getMap('kafka');
$kafkaConfig = kConf::getMap(kConfMapNames::KAFKA);

return new KafkaProvider($kafkaConfig, $constructorArgs);
}
Expand Down Expand Up @@ -82,9 +86,13 @@ public static function dependsOn()
public static function getEnums($baseEnumName = null)
{
if (is_null($baseEnumName))
{
return array('KafkaProviderType');
}
if ($baseEnumName == 'QueueProviderType')
{
return array('KafkaProviderType');
}
return array();
}
}

0 comments on commit 4ffa01d

Please sign in to comment.