using System; using System.Collections.Generic; using System.Globalization; using System.IO; using System.Linq; using System.Net; using System.Text.RegularExpressions; using System.Threading; using System.Threading.Tasks; using CsvHelper; using Newtonsoft.Json; //Do not remove - Required for JObject using Newtonsoft.Json.Linq; using Polly; using Shinydocs.CognitiveToolkit.Core; using Shinydocs.CognitiveToolkit.Core.Tools.ScrollTools.RunScript; using Shinydocs.CognitiveToolkit.Core.Utilities; //Do not remove - Required for JObject using Shinydocs.CognitiveToolkit.Scripting; using Shinydocs.CognitiveToolkit.UI.CommandLine.Progress; #pragma warning disable CS8603 #pragma warning disable CS8602 #pragma warning disable CS8604 #pragma warning disable CS8600 #pragma warning disable CS8601 #pragma warning disable CS8618 public class BulkDocumentEnricher : IScript { private static string ScriptName = "BulkDocumentEnricher"; private static string Version = "2.6.1.0.3"; private readonly ScriptLogger _log = new ScriptLogger(); public string _serverUrl; public string _indexName; public string _csvFile; public int _threads = 1; public string _dateFormat; public string _query; public List _columnDataToAdd; public List _columnsStripped; private int _nodesPerRequest; private RunScriptDocumentUpdater _documentUpdater; private int rowsProcessed = 0; private bool _dry = false; private Policy _retryPolicy; public void SetUp(string[] arguments) { _log.Information(string.Format("{0} version {1}", ScriptName, Version)); Console.WriteLine(string.Format("{0} version {1}", ScriptName, Version)); Dictionary options; if (OptionsParser.TryParse(arguments, out options)) { OptionsParser.ParseStandardOptions(options, out _serverUrl, out _indexName, out _threads, out _nodesPerRequest); if (!options.TryGetValue("--csv", out _csvFile)) OptionsParser.InputError("The path to csv (--csv) is a required parameter"); string queryFile; if (!options.TryGetValue("-q", out queryFile)) OptionsParser.InputError("The query file (-q) is a required parameter"); _query = CommandLineUtils.LoadFileContents(queryFile); string dateFormat; if (options.TryGetValue("--dateFormat", out dateFormat)) { _dateFormat = dateFormat; } else { _dateFormat = "yyyy-MM-dd HH:mm"; } string columns; if (!options.TryGetValue("--column-names", out columns)) OptionsParser.InputError("The column names (--column-names) is a required parameter"); _columnDataToAdd = columns.Split(',').Select(s => s.Trim()).ToList(); _columnsStripped = _columnDataToAdd.Select(s => s.TrimEnd(new[] { '*' })).ToList(); _documentUpdater = new RunScriptDocumentUpdater(_serverUrl, _indexName, _columnsStripped, _nodesPerRequest, _threads); string dry; if (options.TryGetValue("--dry-run", out dry)) { _dry = bool.Parse(dry); } _retryPolicy = Policy .Handle() .Or() .Retry( GlobalRuntimeSettings.NumberOfRetries, onRetry: (exception, attempt) => { _log.Error(string.Format("{0} Retrying Attempt #{1}", exception.Message, attempt)); // Ensure TotalProcessed matches actual number of documents processed. Interlocked.Decrement(ref RunScriptDocumentUpdater.TotalProcessed); }); } else { OptionsParser.InputError("Failed to parse arguments"); } } public void Run() { Console.WriteLine("\n"); using (var progress = new CommandLineProgress()) { _documentUpdater.Progress = progress; using (var streamReader = new StreamReader(_csvFile)) { using (var reader = new CsvReader(streamReader, false)) { reader.Configuration.HasHeaderRecord = true; reader.Configuration.BadDataFound = null; var records = reader.GetRecords().ToList(); var total = records.Count; if (total > 0) ValidateColumnNames(records[0] as IDictionary); progress.UpdateProgress(rowsProcessed, total, "Starting ... "); Parallel.For( 0, records.Count, new ParallelOptions{MaxDegreeOfParallelism = _threads}, i => { Interlocked.Increment(ref rowsProcessed); progress.UpdateProgress(rowsProcessed,total,"Processing CSV ... "); ProcessCsvLine(records[i] as IDictionary, progress); }); } } } } public void TearDown() { _log.Information(string.Format("Complete. Total items processed : {0}", RunScriptDocumentUpdater.TotalProcessed)); } private void ValidateColumnNames(IDictionary? dict) { var invalidNames = new List(); _columnDataToAdd.ForEach( fieldName => { fieldName = fieldName.TrimEnd('*'); if (!dict.ContainsKey(fieldName)) invalidNames.Add(fieldName); }); if (invalidNames.Count > 0) { var message = string.Format("The following fields could not be found: {0}.\nCheck {1} for expected column names.", string.Join(", ", invalidNames), _csvFile); _log.Error(message); throw new Exception(message); } } private void ProcessCsvLine( IDictionary dict, CommandLineProgress progress) { if (dict != null) { var replacementQuery = _query; foreach (var key in dict.Keys) { var keyString = "{" + key + "}"; if (replacementQuery.Contains(keyString)) { var replacementString = Regex.Replace(dict[key].ToString(), "(? { _documentUpdater.Update(replacementQuery, document => Process(document, dict)); }); } } public Dictionary Process(JObject document, IDictionary fieldValues) { _columnDataToAdd.ForEach( fieldName => { // If we find the single value marker, save this as a string rather than an array if (fieldName.EndsWith("*")) { fieldName = fieldName.Remove(fieldName.Length - 1); // remove the asterisk if (CorrectType(fieldValues[fieldName]) != null && !string.IsNullOrEmpty(CorrectType(fieldValues[fieldName]).ToString())) { document[fieldName] = new JValue(CorrectType(fieldValues[fieldName])); } } else { var field = document[fieldName] as JArray ?? new JArray(); if (CorrectType(fieldValues[fieldName]) != null && !string.IsNullOrEmpty(CorrectType(fieldValues[fieldName]).ToString())) { if (!field.ToObject>().Contains(CorrectType(fieldValues[fieldName]).ToString())) { field.Add(CorrectType(fieldValues[fieldName])); document[fieldName] = field; } } } }); if (_dry) return null; return document.ToObject>() .Where(pair => _columnsStripped.Contains(pair.Key)) .ToDictionary(pair => pair.Key, pair => pair.Value); } public object CorrectType(object document) { long typeLong; double typeDouble; float typeFloat; bool typeBool; try { return DateTime.ParseExact(document.ToString(), _dateFormat, CultureInfo.InvariantCulture); } catch {} if (long.TryParse(document.ToString(), out typeLong)) return typeLong; else if (double.TryParse(document.ToString(), out typeDouble)) return typeDouble; else if (float.TryParse(document.ToString(), out typeFloat)) return typeFloat; else if (bool.TryParse(document.ToString(), out typeBool)) return typeBool; else return document; } }