RSP Query Features

From RDF Stream Processing Community Group

In this page we discuss the RSP query features, and how an RSP query language can support them. To better understand these features we use examples based on the "people detected in rooms" example. For each case, we will try to describe:

  • What we want to be achieved.
  • How it is done now (with current languages).
  • How we would like it to be.

Introduction

Following the [model] as proposed in several works such as [first step towards stream reasoning], we define three classes of operators over RDF streams and RDF graphs (for more information refer also to [[1]]):

  • S2R: Stream to bounded RDF, which inherits the idea of stream-to-relation operators in CQL which produce a relation from a stream.
  • R2R: Bounded RDF to RDF, which inherits the idea of relation-to-relation operators in CQL which produce a relation from one or more other relations
  • R2S: Bounded RDF to Stream, which inherits the idea of relation-to-stream operators in CQL which produce a stream from a relation

In these RSP operators the R denotes finite RDF graphs or mappings, as opposed to unbounded sequences of RDF graphs, i.e. streams.

In addiction to those operators (which can be thought as part of a RSP Data Manipulation Language (DML) in SQL terms), there is also the need for a Data Definition Language (DDL) to register a stream, register continuous queries, etc. Of all known RSP languages, only C-SPARQL has DDL primitives, but they are limited to query registration (see the part of C-SPARQL BNF about REGISTER (QUERY|STREAM) <name> AS).

Running Example

In the following queries that showcase the RSP operators, we use the following example based in Social sensing: i.e. People detected in rooms over time.

URL of the stream: <http://…/fb>

Sample data on the stream:

Axel isIn RoomA, [2]
Axel isIn RoomA, [3]
Darko isIn RoomA [3]
Axel isIn RoomA, [4]
Darko isIn RoomA [4]
Darko isIn RoomA [5]
Axel isIn RoomB, [6]
Axel isIn RoomB, [7]
Axel isIn RoomB, [8]
Darko isIn RoomB [8]
Axel isIn RoomB, [9]
Darko isIn RoomB [9]

S2R Operators (aka Windowing)

Several [types of windows exists]. Those illustrated below are a subset of interest of RSP community. Please feel free to add more linking them to use cases.

Sliding Windows

Using sliding windows include SLIDE and TUMBLING windows, working both with time and tuple-based windows.

Alasdair: Any thoughts on supporting windows that are not terminated by the current time, e.g. something like a window from 10 minutes in the past until 5 minutes in the past?

Time based sliding window
Give me the last room where Axel has been in the last 10 minutes, updating results every minute
C-SPARQL

REGISTER QUERY TrackAxelSliding AS
SELECT ?room 
FROM STREAM <http://…/fb> [RANGE 10m SLIDE 1m]
WHERE {
 :Axel :isIn ?room
}
CQELS

SELECT ?room
WHERE { STREAM <http://…/fb> [RANGE 10m SLIDE 1m] {:Axel :isIn ?room}}

The results will be different depending on the RSP Engine that is used. For example in C-SPARQL the evaluation of the window is performed each time the window closes. In a different approach, CQELS does it when the window content changes. For a more detailed description of these differences refer to: [[2]]

time based tumbling window
Give me the last room where Axel has been in the last 10 minutes, updating results every 10 minutes
C-SPARQL

REGISTER QUERY TrackAxelTumbling AS
SELECT ?room 
FROM STREAM <http://…/fb> [RANGE 10m TUMBLING]
WHERE {
 :Axel :isIn ?room
}
CQELS

SELECT ?room
WHERE { STREAM <http://…/fb> [RANGE 10m TUMBLING] {:Axel :isIn ?room}}

This is a special case of the general sliding window, when the size of the slide is equal to the window length.

Triple based

In principle, triple-based windows were defined to emulate tuple count windows in CQL-like data stream systems. For example this query returns the last 5 entries of a price stream in CQL:

Select P.price From Prices[Rows 5] as P

For our example, we can get the last 3 people detected in the stream with the following query:

REGISTER QUERY Track3latest AS
SELECT ?who 
FROM STREAM <http://…/fb> [RANGE TRIPLES 3]
WHERE {
 ?who :isIn ?room
}

In this example it works because each triple in the stream is an event on itself. However, in the general case one event requires several triples in the stream to be fully described. For example consider the following measurement observation:

:obs1 a ssn:Observation;
      ssn:observedBy :sensor1;
      ssn:observedProperty :air_temperature
      ssn:observedValue 34.5.

if we ask for the latest 3 triples in the stream, we will not obtain the latest 3 observations, but only 3 triples, each one with incomplete information. For this reason, C-SPARQL triple-based windows are not too useful, although they work as specified. Other languages such as SPARQLStream do not include support for this altogether, as it has a demonstrated faulty behavior.

In the RSP model discussed in this W3C Community Group, as we allow representing RDF streams as sequences of graphs, rather than just triples, it should be possible to redefine this operator, with cleaner semantics.

Partitioned Windows (not supported)

  • Deal with one input stream and several output streams (i.e. the partitions), on which the query is evaluated.
  • Based on knowing the schema, and deciding how to do the partition in a way that simplifies the query. This makes it complicated for RSP.
Example queries (partitioned)
In the examples below, we can partition the input stream by dividing it by people's name and then evaluate the query
  • Find for each person the time spent until she leaves a room and enters another (time spent in a room)
  • Find for each person the time elapsed between a person leaves room A and enters room B, independently of how many rooms are traversed in between (transition time)

Predicate-based window (not supported)

In Predicate-based windows, objects are qualified to be part of the window once they satisfy a certain query predicate. Similarly, objects are expired only when they no longer satisfy a certain predicate. Predicate-based windows are a generalization of time-based and tuple-count sliding windows, and it needs some sort of caching mechanisms to be implemented.

Example queries
For each person, continuously report the elapsed time between each two consecutive readings. Only the latest reading of each person needs to be considered. Once the reading of a person is reported, the previous reading expires.
C-SPARQL

In C-SPARQL we could in principle simulate predicate-based windows by using a network of queries (Emanuele to provide an example).

CQELS

This requires the association of a stream with a predicate that characterizes the windows. How would this look like?

Relation-to-Relation Operators

Q1: SELECT

Query: Who is where? every 1 min?

C-SPARQL

REGISTER QUERY WhoIsWhere AS
SELECT ?room ?person
FROM STREAM <http://…/fb> [RANGE 1m STEP 1m] 
WHERE {
 ?person :isIn ?room .
}
CQELS

SELECT ?room ?person
WHERE {
STREAM <http://…/fb> [RANGE 1m]  {?person :isIn ?room}
}

Q2: GROUPBY

Query: Find person who has been to more than 5 different rooms during the past 5 minutes (relies on SPARQL 1.1 aggregation)

C-SPARQL

REGISTER QUERY FastAndFurious AS
SELECT ?person (count(distinct ?room) as ?rooms)
FROM STREAM <S1> [RANGE 5m TUMBLING] 
WHERE { ?person :isIn ?room}
GROUPBY ?person
HAVING  (?rooms >= 5)
CQELS

SELECT ?person (count(distinct ?room) as ?rooms)
WHERE {STREAM <S1>  [range 5m] { ?person :isIn ?room}}
GROUPBY ?person
HAVING  (?rooms >= 5)

In a similar way, other aggregates such as SUM COUNT, MIN, MAX can also be supported.

Q3: CONSTRUCT

Query: Detect two different ppl in the same room for at least three seconds and create a graph of co-presence

C-SPARQL

REGISTER STREAM WhoIsWithWhom AS
FROM STREAM <s1> [RANGE 3s SLIDING 1s]
CONSTRUCT {?p1 :isWith ?p2 }
WHERE {
 ?p1 isIn ?room. ?p2 isIn ?room. FILTER(?p1!=?p2)} 
CQELS

CONSTRUCT {?p1 :isWith ?p2}
WHERE {
STREAM <s1> [RANGE 3s SLIDING 1s]
{ ?p1 isIn ?room. ?p2 isIn ?room. FILTER(?p1!=?p2)}}

This query does not ensure that both people are present in the window for the entire 3 seconds.

EP-SPARQL (ETALIS)

CONSTRUCT {?p1 :isWith ?p2 .}
WHERE 	  {STREAM <s1> { ?p1 isIn ?room. } AND {?p2 isIn ?room. }}
FILTER    (?p1!=?p2 && getDURATION() < " PT3S"^^xsd:duration)

Q4: OPTIONAL

Query: detect persons who has entered either room1 or room2 in the past 5 min

C-SPARQL

REGISTER QUERY Q4 AS
SELECT ?person1 ?person2
FROM STREAM <S1> [RANGE 5m TUMBLING] 
WHERE { 
    OPTIONAL {?person1 :isIn :room1} 
    OPTIONAL{?person2 :isIn :room2} 
    FILTER(bound(?person1)||bound(?person2))
}
CQELS (not supported)

Currently not supported, but if supported it could work in two ways:

  • using two individual sensor streams for room1 and room2 as follows:

SELECT ?person1 ?person2
WHERE { 
STREAM<S1> [RANGE 5M] { 
    OPTIONAL {?person1 :isIn :room1} 
    OPTIONAL{?person2 :isIn :room2} 
    FILTER(bound(?person1)||bound(?person2))
}}
  • using bound filters (currently supported)

SELECT ?person1 ?person2
WHERE { 
   OPTIONAL { STREAM<S1> [RANGE 5M] {?person1 :isIn :room1}} 
   OPTIONAL { STREAM<S2> [RANGE 5M] {?person2 :isIn :room2}} 
   FILTER(bound(?person1)||bound(?person2))}
EP-SPARQL (ETALIS)

SELECT ?person1 ?person2
WHERE  {STREAM <S1> {?person1 :isIn :room1} OR {?person1 :isIn :room2}} 

Q5: FILTER MINUS

Query: detect persons who entered room1 without a doctorate during the past 5 minutes'

C-SPARQL (should be supported)

SELECT ?person 
FROM STREAM <S1> [RANGE 5m TUMBLING]
FROM NAMED <profile>
WHERE {
{ { ?person :isIn :room1 }
FILTER MINUS 
{GRAPH <profile> {?person :hasDegree :doctorate}
}}

NOTE: It may parse, but it may not give correct results. The problem is that all triples from the windows are merged in the default graph.

CQELS (not supported)

Currently not supported. If it was supported, it could look like:

SELECT ?person 
WHERE {
{ STREAM<S1> [RANGE 5m] { ?person :isIn :room1}
FILTER MINUS 
{GRAPH <profile> {?person :hasDegree :doctorate}
}}

What happens if the static part changes during the lifetime of the query?

Q6: SEQ

Query: provide pairs of rooms, e.g. (room1, room2), where Axel and Darko have been together such that they are first in a room and then following each other in another room within 5 minutes.

C-SPARQL

REGISTER QUERY Q6 AS
SELECT DISTINCT ?room_x ?room_y
FROM STREAM <S1> [RANGE 5m SLIDE 1s]
WHERE { 
GRAPH <http://deri.org/floorplan/>  { ?room_x lv:sameLevelWith ?room_y.}
 :Axel :isIn ?room_x. 
 :Darko :isIn ?room_x. 
 :Axel :isIn ?room_y. 
 :Darko :isIn ?room_y.
 FILTER (timestamp(:Axel :isIn ?room_x) = timestamp(:Darko :isIn ?room_x) && 
         timestamp(:Axel :isIn ?room_y) = timestamp(:Darko :isIn ?room_y) &&  
         timestamp(:Axel :isIn ?room_x) < timestamp(:Darko :isIn ?room_y) 
         )
} 

NOTE: it may not be reactive

CQELS (not supported)

SELECT DISTINCT ?room_x ?room_y
WHERE { 
GRAPH <http://deri.org/floorplan/>  { ?room_x lv:sameLevelWith ?room_y.}
STREAM <http://deri.org/streams/S1> [RANGE 5m SLIDE 1s]{
{:Axel :isIn ?room_x. :Darko :isIn ?room_x.} SEQ { :Axel :isIn ?room_y. :Darko :isIn ?room_y.}
} 
EP-SPARQL (ETALIS)

EP-SPARQL has a native way of expressing the SEQ operator.

SELECT DISTINCT ?room_x ?room_y
WHERE { 
 {:Axel :isIn ?room_x. :Darko :isIn ?room_x.}  
SEQ
 {:Axel :isIn ?room_y. :Darko :isIn ?room_y.}}
 FILTER (getDURATION() < "PT5M"^^xsd:duration)

Alternatively, if it was required that Axel and Darko remain in room_x exactly the same time (and so in room_y), then the query would look like the following one:

SELECT DISTINCT ?room_x ?room_y
WHERE  {STREAM <s1> 
       { :Axel :isIn ?room_x. }
EQUALS
       {:Darko :isIn ?room_x.}
SEQ
       { :Axel :isIn ?room_y. }
EQUALS
       {:Darko :isIn ?room_y.}
}
FILTER (getDURATION() < " PT5M"^^xsd:duration)

Q7: REPETITION

'Query: Find at least 3 different pairs of rooms, in which Axel and Darko have been together within 10 minutes, moving from room_x to room_y. This query relies on a SEQ operator and on counting the repetition of such sequence.

C-SPARQL

REGISTER QUERY Q7 AS
SELECT COUNT (DISTINCT *) AS ?nroom
FROM STREAM <S1> [RANGE 5m SLIDE 1s]
WHERE { 
GRAPH <http://deri.org/floorplan/>  { ?room_x lv:sameLevelWith ?room_y.}
 :Axel :isIn ?room_x. 
 :Darko :isIn ?room_x. 
 :Axel :isIn ?room_y. 
 :Darko :isIn ?room_y.
 FILTER (timestamp(:Axel :isIn ?room_x) = timestamp(:Darko :isIn ?room_x) && 
         timestamp(:Axel :isIn ?room_y) = timestamp(:Darko :isIn ?room_y) &&  
         timestamp(:Axel :isIn ?room_x) < timestamp(:Darko :isIn ?room_y) 
         )
} 
HAVING (?nroom >3) 
CQELS (not supported)

SELECT  COUNT (DISTINCT *) AS ?nroom
WHERE{ 
STREAM <http://deri.org/streams/S1> [RANGE 10m SLIDE 1s]{
{:Axel :isIn ?room_x. :Darko :isIn ?room_x.} SEQ { :Axel :isIn ?room_y. :Darko :isIn ?room_y.}}
HAVING ?nroom>3

ESPER

Since ESPER has a native way of expressing the SEQ operator, this query might result easier if expressed in ESPER.

This has been tried in ESPER, using a simplistic TripleEvent object. Notice the use of the every xx -> pattern in the Esper query. For more information and the code elading to this query refer to [[3]].

 // Query 7: 'Query: Find at least 3 different pairs of rooms, in which Person-a and Person-b have been together
 // within 10 minutes, moving from room_a to room_b.
 // This query relies on a SEQ operator and on counting the repetition of such sequence.
static String query7 = "select * from pattern [ every a=TripleEvent -> every b=TripleEvent -> every d=TripleEvent -> every c=TripleEvent -> every e=TripleEvent -> every f=TripleEvent where timer:within(10 min)]"
+ "where a.subject!=b.subject and a.object=b.object and c.subject!=d.subject and c.object=d.object and e.subject!=f.subject and e.object=f.object "
+ "and a.subject=d.subject and b.subject=c.subject and d.subject=e.subject and c.subject=f.subject "
+ "and a.predicate='isIn' and b.predicate='isIn' and c.predicate='isIn' and d.predicate='isIn' and e.predicate='isIn' and f.predicate='isIn' ";

Refreshing stored data

Combining streaming and stored data is supported in existing RSP engines. However, in many cases the stored data can change during the query lifetime, so it might be important to refresh or update the stored contents at certain point of the evaluation process timeline. Existing RSP languages do not impose or propose any way of explicitly performing these updates. In fact, in some RSP engines the stored data is assumed to be static during query evaluation.

An extension to the core functionality of the query language would be for the user to be able to provide hints as to how often the stored data is updated. These may be interpreted by the query processing engine to indicate how often to refresh the stored data. We will need to think about the granularity of these hints, e.g. by dataset, class, etc.

An example of this, implemented as a query operator, can be found in SNEEql[[4]], using the RESCAN keyword:

SELECT * FROM locations[RESCAN 20 SECONDS];

Although SPARQLStream has been used to rewrite to SNEEql, the RESCAN operator has not been mapped to an equivalent in SPARQLStream.

Q9: Fact

Fact is a Complex Event Processing operator, which maintains temporal states (the Facts) of a system.

It differentiates: Events, i.e. things that happen(ed) and Facts, i.e. things that are true for a specified amount of time. More detailed description can be found at TEF-SPARQL [5].

Assuming the following example:

Axel enter RoomA, [2]
Darko enter RoomA, [3]
Axel leave RoomA, [6]
Axel enter RoomB, [6]
Darko leave RoomA, [8]
Darko enter RoomB, [8]

Each data entry in this stream is an Event: The event “Axel enter RoomA at time 2” is always true, as it actually happened. The Fact that “Axel isIn RoomA” is a temporal state in the system, which is only true for a restricted period of time. Axel is in roomA, only SINCE time 2, UNTIL he leaves the room at time 6.

The FACT operator maintains such temporal states, together with operations such as SINCE (set the beginning time of a valid fact) and TILL (set the ending time of a fact).

E.g., in TEF-SPARQL [6] syntax,

CONSTRUCT FACT UserFact  {?user isIn ?room}
(UNION 
         (SINCE   ?user :enter ?room)
         (TILL  ?user :leave ?room) 
)

The benefits of using Fact operator:

  • It simplifies stream reasoning by creating/updating Facts.
  • It saves the cost of maintaining events between consecutive time windows.

Query: Give the current number of people in each room, every 3 seconds.

TEF-SPARQL

We can store and count the number of Facts (e.g., <Axel isIn RoomA, startTime, endTime>). The Facts can be created by “SINCE” clause (when <Axel enter RoomA> arrives, the fact is created with empty “endTime”. ) and terminated by “TILL” clause (when <Axel leave RoomA> arrives, the fact is terminated by updating “endTime”. )

// creating facts
CONSTRUCT FACT UserFact  {?user isIn ?room}
(UNION 
         (SINCE   ?user :enter ?room)
         (TILL  ?user :leave ?room) 
) 
// counting facts 
SELECT ?room AS ROOM, ?user as USER (AGGREGATE ? room, COUNT ?user
         WHERE (CURRENT ?user isIn ? room)
         GROUP BY ? room
         EVERY  "P3SEC"^^xsd:Duration) 

Esper with fact

We use the operator "Named window" in ESPER to manage Facts.

create window factUser.win:keepall() as (userName String, room String, startTime Long, endTime Long)
// create facts
on UserEvent merge factUser where factUser.room = UserEvent.room and factUser.userName = UserEvent.userName
when matched  and UserEvent.userAction = '<leave>'  
// update the ending time of the fact, and delete
then delete where factUser.userName = UserEvent.userName  
when not matched
// insert a fact with an opening end time
then insert  
select UserEvent.userName as userName, UserEvent.userRoom as room, UserEvent.timeStamp as startTime, 0L as endTime	
	
// counting the facts
select count(*), room from factUser group by room output all every 3 sec

SPARQL without Fact

Without Fact operator, the system needs to create an additional stream to maintain system states. In this example, UseRoomCounts stream is created for two kinds of counting events of each room: enterCount and leftCount.

REGISTER STREAM UserEnterLeftCounts AS
CONSTRUCT { ?room uc:enterCount ?enterCount ; uc:leftCount ?leftCount . }
FROM STREAM <http://…/fb> [RANGE 3 sec TUMBLING]
WHERE {
          { SELECT ( COUNT(?userEnter) as ?enterCount ) ?room
                    WHERE { ?userEnter enter ?room } GROUP BY ?room }
          { SELECT ( COUNT(?userLeft) as ?leftCount ) ?room
                    WHERE { ?userLeft left ?room } GROUP BY ?room } }

REGISTER STREAM UseRoomCounts AS
CONSTRUCT {?room uc:currentCount ?newCount }
FROM STREAM <http://…/fb> [RANGE 3 sec TUMBLING]
WHERE {
          { SELECT ( ?currentCount + ?enterCount - ?leftCount as ?newCount ) ?room
                    WHERE {?room uc:enterCount ?enterCount ; uc:leftCount ?leftCount ; uc:currentCount ?currentCount . } } }

Esper without Fact

Without using "named window" operator, the system needs to copy the count events (Part 2 in blow) between windows, in order to maintain the count events.

create schema countEventType (timeStamp Long , userCount Long, room String)
// part1: create count events
insert into countEventType 
select  
           (select count(*)  
           from UserEvent.win:time_batch( 3 sec)  as ue 
           where ce.userRoom = ue.userRoom and ue.userAction = '<leave>'  ) 
           (select count(*)  
           from UserEvent.win:time_batch( 3 sec) as ue 
           where ce.userRoom = ue.userRoom and ue.userAction = '<enter>' ) 
as userCount , ce.userRoom as room ,  (current_timestamp + 3000L) as timeStamp 
from UserEvent.win:time_batch( 3 sec) as ce
group by ce.userRoom  

// part2:  copy count events between each window
insert into countEventType 
select sum(userCount) as userCount, room as room , (current_timestamp + 3000L) as timeStamp 
from countEventType.win:time_batch( 3 sec) group by room  
	
// part3:  aggregate count events
select sum(userCount) as counter, ce.room as room, current_timestamp as currentTime, ce.room 
from countEventType.win:time_batch( 3 sec) as ce 
group by ce.room  

If there are multiple streams involved, the cost of maintaining events could be potentially very high. Facts provide a solution for modelling and maintaining temporal states.

R2S Operators

R2S operators produce a stream out of bounded RDF mappings. This operator is typically used as an output operator, producing a stream after the R2R and S2R operators have been applied. Of the existing RSP engines, only SPARQLStream has provided an implementation for these operators, namely:

  • ISTREAM: only the data that was not in the previous window is added to the output stream.
  • DSTREAM: only the data that was in the previous windows but not in the new one is streamed.
  • RSTREAM: all data is added to the stream.

Example:

SELECT ISTREAM ?room ?person
FROM NAMED STREAM <http://…/fb> [1 MINUTES SLIDE 1 MINUTES] 
WHERE {
 ?person :isIn ?room .
}

In this case the room and person will be output only if they were not in the previous window. Notice that implicitly, C-SPARQL always RSTREAMs results, and CQELS always ISTREAMs results, but do not allow for customizing the behavior, as reported in [[7]].

Summary of Query Features

  • Relation-to-Relation Operators: to produce a relation from one or more other relations, i.e. SPARQL 1.1 operators. This should include entailment regimes up to RDFS (see Reasoning).
    • Repetition: Most cases can be done with aggregation
    • Sequence operator
    • Query Form: SELECT, CONSTRUCT
  • Stream-to-Relation Operators (Windowing): to produce a a relation from a stream
    • windows: sliding, partitioned and predicate-based
    • windows one or more streams (even on the same)
    • window now (shortcut range of min time the engine can process)
    • window landmark: comes with the graph-based model
    • semantic window: not now (by now outside of query processor)
  • Relation-to-Stream Operators: to produce a stream from a relation
    • istream,rstream,dstream.
    • what about the first result? Shall we send the first result with an rStream and then only iStream and dStream? -> do this at the protocol level? Discussion: to be discussed when we will talk about protocol

AG: That would be the semanitcs of istream anyway since there is no previous window to compare with. In general, there should be no special cases for the starting of the query. The semantics of an operator are the semantics of the operator

  • Create, register a stream (DDL in SQL). Timestamp for created stream is normally the system clock.
  • Querying static RDF Data (including)

Open Issues

Named Windows

The use of named windows can help dealing with multiple windows on the same stream, for example to use in outlier detection. Example queries we might want to ask which can be managed with named windows include:
  • is Axel now in the room where he has been longer in the last hour?
  • who is now in the room where she spends more time?
  • does something happen before something else? (note that this relate to the SEQ operator)

AG: Is a named window not just a view, i.e. a named query? I think the crucial point here is understanding what our base types are, viz stream, window, stored?

Windows specification
  • as a dataset clause (like in C-SPARQL)
  • as a stream graph pattern (like CQELS)