Using QuickOPC with Trill

From OPC Labs Knowledge Base
Jump to navigation Jump to search

Trill is a high-performance one-pass in-memory streaming analytics engine from Microsoft Research. QuickOPC integrates with Microsoft Trill well, both for ingress and egress data. This is thanks to the Reactive Programming Model that is part of QuickOPC. It can be used with OPC Classic (OPC DA and OPC A&E), as well as with OPC UA.

The following simple application demonstrates the easiness of getting OPC data into Microsoft Trill with QuickOPC.

// Shows how to query real-time OPC UA data using Trill. The query looks for event sequences where the previous value is
// less than 42, and the current value is greater than or equal to 42.

using System;
using System.Reactive.Linq;
using Microsoft.StreamProcessing;
using OpcLabs.EasyOpc.UA;
using OpcLabs.EasyOpc.UA.Generic;
using OpcLabs.EasyOpc.UA.Reactive;

namespace SimpleTrillApplication
{
    public sealed class Program
    {
        public static void Main(string[] args)
        {
            // Define which server we will work with.
            UAEndpointDescriptor endpointDescriptor =
                "opc.tcp://opcua.demo-this.com:51210/UA/SampleServer";
                // or "http://opcua.demo-this.com:51211/UA/SampleServer" (not in .NET Standard)
                // or "https://opcua.demo-this.com:51212/UA/SampleServer/"

            Console.WriteLine("Creating source observable...");
            // Create a data change observable for specified OPC UA node value.
            IObservable<UAAttributeData<byte>> sourceObservable = UADataChangeNotificationObservable
                .Create<byte>(endpointDescriptor, "nsu=http://test.org/UA/Data/ ;i=10846", 200)
                .Where(eventArgs => eventArgs.Succeeded)    // ignore events that carry failures
                .Select(eventArgs => eventArgs.TypedAttributeData);

            Console.WriteLine("Creating input streamable...");
            // Load the observable as a stream in Trill, injecting a punctuation every second. Because we use
            // FlushPolicy.FlushOnPunctuation, this will also flush the data every second.
            IObservableIngressStreamable<UAAttributeData<byte>> inputStreamable =
                sourceObservable.Select(attributeData => 
                        StreamEvent.CreateStart(attributeData.SourceTimestamp.Ticks, attributeData))
                    .ToStreamable(
                        DisorderPolicy.Drop(),
                        FlushPolicy.FlushOnPunctuation,
                        PeriodicPunctuationPolicy.Time((ulong)TimeSpan.FromSeconds(1).Ticks));

            // We will be building a query that takes a stream of UAAttributeData<byte> events.
            Console.WriteLine("Creating the query...");
            // Query: look for pattern of [value < 42] --> [value >= 42]
            IStreamable<Empty, Tuple<byte, byte>> query = inputStreamable
                .AlterEventDuration(TimeSpan.FromSeconds(10).Ticks)
                .Detect(
                    default(Tuple<byte, byte>), // register to store the value
                    pattern => pattern
                        .SingleElement(e => e.TypedValue < 42, (ts, ev, reg) => new Tuple<byte, byte>(ev.TypedValue, 0))
                        .SingleElement(e => e.TypedValue >= 42, (ts, ev, reg) => new Tuple<byte, byte>(reg.Item1, ev.TypedValue)));

            Console.WriteLine("Running the query for 20 seconds...");
            query.ToStreamEventObservable()
                .Take(TimeSpan.FromSeconds(20))
                .ForEachAsync(streamEvent => Console.WriteLine(streamEvent)).Wait();

            Console.WriteLine("Finished.");
        }



        // Example output:
        //Creating source observable...
        //Creating input streamable...
        //Creating the query...
        //Running the query for 20 seconds...
        //[Punctuation: 637080168490000000]
        //[Interval: 637080168493266740 - 637080168591235679, (17, 76)]
        //[Punctuation: 637080168500000000]
        //[Interval: 637080168509585008 - 637080168607485434, (28, 197)]
        //[Punctuation: 637080168510000000]
        //[Punctuation: 637080168520000000]
        //[Punctuation: 637080168530000000]
        //[Punctuation: 637080168540000000]
        //[Interval: 637080168540136670 - 637080168638105248, (23, 108)]
        //[Punctuation: 637080168550000000]
        //[Interval: 637080168556412772 - 637080168654381015, (20, 157)]
        //[Punctuation: 637080168560000000]
        //[Interval: 637080168560631254 - 637080168658599730, (36, 53)]
        //[Punctuation: 637080168570000000]
        //[Interval: 637080168572974888 - 637080168670943594, (34, 113)]
        //[Punctuation: 637080168580000000]
        //[Interval: 637080168589224991 - 637080168687193550, (25, 176)]
        //[Punctuation: 637080168590000000]
        //[Punctuation: 637080168600000000]
        //[Interval: 637080168605474719 - 637080168703443726, (1, 161)]
        //[Punctuation: 637080168610000000]
        //[Interval: 637080168611568222 - 637080168709537000, (32, 222)]
        //[Punctuation: 637080168620000000]
        //[Punctuation: 637080168630000000]
        //[Punctuation: 637080168640000000]
        //[Interval: 637080168644224600 - 637080168742193693, (24, 87)]
        //[Punctuation: 637080168650000000]
        //[Punctuation: 637080168660000000]
        //[Interval: 637080168660475491 - 637080168758444908, (40, 230)]
        //[Punctuation: 637080168670000000]
        //Finished.
    }
}

This example, and possible some more, ships with QuickOPC 2019.2 and later.