Обработчики потоков данных
Конфигурация обработчиков событий настраивается на вкладке Конфигурация соответствующего Потока данных (руководство)
Настройка непосредственно сценария обработки производится на визуальном движке программирования (руководство)
После внесения изменений в сценарии необходимо произвести его компиляцию соответствующей кнопкой в верхней панели меню.
Изменения в сценарии, после компиляции, подтягиваются в Поток данных автоматически.
Системные функции обработчиков событий
DefaultParser
Категория: Automaton.Cl
Тип функции: Impure
Описание:
Функция принимает на вход структуру, содержащую событие с коллектора логов Monq. Преобразовывает и возвращает валидное событие для записи в БД.
Inputs
Название Тип Описание Параметры In Exec Пин вызова функции Связь Event Struct: RawCollectorLogEvent Принимает исходное событие Связь Outputs
Название Тип Описание Параметры Ok Exec Пин вызова функции, если преобразование выполнено успешно Связь MonitoringEvent Exec Пин вызова функции, в случае получения события мониторинга Связь Failed Exec Пин вызова функции при наличии ошибки Связь Result Struct:OnProcessedLogEvent Возвращает результат обработчика Связь Error String Возвращает сообщение об ошибке обработчика Связь
Исходный код функции DefaultParser
namespace Automaton.Cl
{
public static class DefaultParserGlobalFunc
{
static FuncResult<(Automaton.Cl.ProcessedCollectorLogEvent, string)> ValidateData(string data, string sourceType, out System.Xml.XmlDocument xmlDocument)
{
xmlDocument = null;
switch (sourceType)
{
case "application/json":
{
if (string.IsNullOrWhiteSpace(data))
return new FuncResult<(Automaton.Cl.ProcessedCollectorLogEvent, string)>("Failed", (null, "Event source is empty."));
data = data.Trim();
if (!(data.StartsWith("{") && data.EndsWith("}")) && !(data.StartsWith("[") && data.EndsWith("]")))
return new FuncResult<(Automaton.Cl.ProcessedCollectorLogEvent, string)>("Failed", (null, "Event source has an incorrect format."));
try
{
System.Text.Json.Nodes.JsonNode.Parse(data);
}
catch (System.Text.Json.JsonException ex)
{
return new FuncResult<(Automaton.Cl.ProcessedCollectorLogEvent, string)>("Failed", (null, ex.Message));
}
return new FuncResult<(Automaton.Cl.ProcessedCollectorLogEvent, string)>("Ok", (null, string.Empty));
}
case "application/x-ndjson":
{
if (string.IsNullOrWhiteSpace(data))
return new FuncResult<(ProcessedCollectorLogEvent, string)>("Failed", (null, "Event source is empty."));
data = data.Replace("\r", "").Trim();
if (!(data.StartsWith("{") && data.EndsWith("}")))
return new FuncResult<(ProcessedCollectorLogEvent, string)>("Failed", (null, "Event source has an incorrect format."));
try
{
var splitedJsons = data.Split("\n");
foreach (var json in splitedJsons)
System.Text.Json.Nodes.JsonNode.Parse(json);
}
catch (System.Text.Json.JsonException ex)
{
return new FuncResult<(ProcessedCollectorLogEvent, string)>("Failed", (null, ex.Message));
}
return new FuncResult<(ProcessedCollectorLogEvent, string)>("Ok", (null, string.Empty));
}
case "text/plain":
{
if (string.IsNullOrWhiteSpace(data))
return new FuncResult<(Automaton.Cl.ProcessedCollectorLogEvent, string)>("Failed", (null, "Event source is empty."));
return new FuncResult<(Automaton.Cl.ProcessedCollectorLogEvent, string)>("Ok", (null, string.Empty));
}
case "text/xml":
{
if (string.IsNullOrWhiteSpace(data))
return new FuncResult<(Automaton.Cl.ProcessedCollectorLogEvent, string)>("Failed", (null, "Event source is empty."));
data = data.Trim();
try
{
var doc = new System.Xml.XmlDocument();
doc.LoadXml(data);
xmlDocument = doc;
}
catch (System.Xml.XmlException ex)
{
return new FuncResult<(Automaton.Cl.ProcessedCollectorLogEvent, string)>("Failed", (null, ex.Message));
}
return new FuncResult<(Automaton.Cl.ProcessedCollectorLogEvent, string)>("Ok", (null, string.Empty));
}
}
return new FuncResult<(Automaton.Cl.ProcessedCollectorLogEvent, string)>("Failed", (null, "Source type has an incorrect format."));
}
public static FuncResult<(Automaton.Cl.ProcessedCollectorLogEvent Result, string Error)> DefaultParser(Automaton.Cl.RawCollectorLogEvent Event)
{
var data = Event.source;
var sourceType = Event._sourceType;
var funcResult = ValidateData(data, sourceType, out var xmlDocument);
if (funcResult.OutPinName == "Failed")
return funcResult;
data = data.Replace("\r", "").Trim();
var processedLogEvent = new Automaton.Cl.ProcessedCollectorLogEvent()
{
_userspaceId = Event._userspaceId,
_stream = Event._stream,
_sourceType = Event._sourceType,
_rawId = Event._rawId,
_aggregatedAt = Event._aggregatedAt
};
switch (sourceType)
{
case "application/json":
{
// Проверка события на тип MonitoringEvent.
var node = System.Text.Json.Nodes.JsonNode.Parse(data);
processedLogEvent.source = node;
if (node is System.Text.Json.Nodes.JsonObject jsonObject && jsonObject.ContainsKey("monqStreamControl"))
return new FuncResult<(ProcessedCollectorLogEvent Result, string Error)>("MonitoringEvent", (processedLogEvent, "The event is stream monitoring event."));
return new FuncResult<(ProcessedCollectorLogEvent Result, string Error)>("Ok", (processedLogEvent, string.Empty));
}
case "application/x-ndjson":
{
data = data.Replace("\r", "").Trim();
var jsonArray = $"[{string.Join(",", data.Split("\n"))}]";
var parsed = System.Text.Json.Nodes.JsonNode.Parse(jsonArray);
processedLogEvent.source = parsed;
return new FuncResult<(ProcessedCollectorLogEvent Result, string Error)>("Ok", (processedLogEvent, string.Empty));
}
case "text/xml":
{
var json = Newtonsoft.Json.JsonConvert.SerializeXmlNode(xmlDocument);
processedLogEvent.source = json;
return new FuncResult<(Automaton.Cl.ProcessedCollectorLogEvent Result, string Error)>("Ok", (processedLogEvent, string.Empty));
}
case "text/plain":
{
var result = System.Text.Json.JsonEncodedText.Encode(data);
var json = $@"{{""sourceText"": ""{result}""}}";
processedLogEvent.source = json;
return new FuncResult<(Automaton.Cl.ProcessedCollectorLogEvent Result, string Error)>("Ok", (processedLogEvent, string.Empty));
}
default:
{
processedLogEvent.source = data;
return new FuncResult<(ProcessedCollectorLogEvent Result, string Error)>("Ok", (processedLogEvent, string.Empty));
}
}
}
}
}
AddLabels
Категория: Automaton.Cl
Тип функции: Impure
Описание:
Функция принимает на вход структуру
OnProcessedLogEvent
, и массив объектов, который будет добавлен к объекту_labels
, входящему в состав структуры.Inputs
Название Тип Описание Параметры In Exec Пин вызова функции Связь ProcessedEvent Struct:OnProcessedLogEvent Принимает на вход структуру OnProcessedLogEvent Связь Labels Dynamic (array) Массив меток Связь Outputs
Название Тип Описание Параметры Out Exec Пин вызова функции Связь Result Struct:OnProcessedLogEvent Обновленная модель события с добавленными метками. Связь
SendAutomatonEvent
Категория: Automaton.Core
Тип функции: Impure
Описание:
Функция принимает событие и отправляет в очередь RabbitMQ по указанному пользователем ключу события.
Inputs
Название Тип Описание Параметры In Exec Пин вызова функции Связь Value Struct: Any Принимает событие, которое будет отправлено в указанную очередь Связь EventName String Строка для указания ключа Связь/контрол Scenario Struct:ScenarioBasic Системная переменная, содержит метаданные текущего сценария Связь Outputs
Название Тип Описание Параметры Ok Exec Последовательность активна при успешной отправке события Связь Failed Exec Последовательность активна при ошибке отправки события Связь Error String Текст ошибки (при наличии) Связь