package org.apache.calcite.test;

import com.ctc.wstx.shaded.msv_core.scanner.dtd.DTDParser;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.UnmodifiableIterator;
import java.sql.ResultSet;
import java.sql.ResultSetMetaData;
import java.sql.SQLException;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.function.Consumer;
import org.apache.calcite.DataContext;
import org.apache.calcite.avatica.util.DateTimeUtils;
import org.apache.calcite.config.CalciteConnectionConfig;
import org.apache.calcite.linq4j.Enumerable;
import org.apache.calcite.linq4j.Linq4j;
import org.apache.calcite.rel.RelCollations;
import org.apache.calcite.rel.type.RelDataType;
import org.apache.calcite.rel.type.RelDataTypeFactory;
import org.apache.calcite.rel.type.RelProtoDataType;
import org.apache.calcite.schema.ScannableTable;
import org.apache.calcite.schema.Schema;
import org.apache.calcite.schema.SchemaPlus;
import org.apache.calcite.schema.Statistic;
import org.apache.calcite.schema.Statistics;
import org.apache.calcite.schema.StreamableTable;
import org.apache.calcite.schema.Table;
import org.apache.calcite.schema.TableFactory;
import org.apache.calcite.schema.TemporalTable;
import org.apache.calcite.sql.SqlCall;
import org.apache.calcite.sql.SqlNode;
import org.apache.calcite.sql.type.ExtraSqlTypes;
import org.apache.calcite.sql.type.SqlTypeName;
import org.apache.calcite.test.CalciteAssert;
import org.apache.calcite.util.TestUtil;
import org.hamcrest.CoreMatchers;
import org.hamcrest.MatcherAssert;
import org.hamcrest.comparator.ComparatorMatcherBuilder;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.Disabled;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.Timeout;

/* loaded from: input_file:BOOT-INF/lib/calcite-core-1.22.0-tests.jar:org/apache/calcite/test/StreamTest.class */
public class StreamTest {
    public static final String STREAM_JOINS_SCHEMA_NAME = "STREAM_JOINS";
    private static final String STREAM_JOINS_MODEL = "{\n  version: '1.0',\n  defaultSchema: 'STREAM_JOINS',\n   schemas: [\n     {\n       name: 'STREAM_JOINS',\n       tables: [ {\n         type: 'custom',\n         name: 'ORDERS',\n         stream: {\n           stream: true\n         },\n         factory: '" + OrdersStreamTableFactory.class.getName() + "'\n       },\n       {\n         type: 'custom',\n         name: 'PRODUCTS',\n         factory: '" + ProductsTableFactory.class.getName() + "'\n       }]\n     }]}";
    public static final String STREAM_SCHEMA_NAME = "STREAMS";
    public static final String INFINITE_STREAM_SCHEMA_NAME = "INFINITE_STREAMS";
    public static final String STREAM_MODEL = "{\n  version: '1.0',\n  defaultSchema: 'foodmart',\n   schemas: [\n" + schemaFor(STREAM_SCHEMA_NAME, OrdersStreamTableFactory.class) + ",\n" + schemaFor(INFINITE_STREAM_SCHEMA_NAME, InfiniteOrdersStreamTableFactory.class) + "\n   ]\n}";

    /* loaded from: input_file:BOOT-INF/lib/calcite-core-1.22.0-tests.jar:org/apache/calcite/test/StreamTest$BaseOrderStreamTable.class */
    private static abstract class BaseOrderStreamTable implements ScannableTable {
        protected final RelProtoDataType protoRowType;

        private BaseOrderStreamTable() {
            this.protoRowType = relDataTypeFactory -> {
                return relDataTypeFactory.builder().add("ROWTIME", SqlTypeName.TIMESTAMP).add(DTDParser.TYPE_ID, SqlTypeName.INTEGER).add("PRODUCT", SqlTypeName.VARCHAR, 10).add("UNITS", SqlTypeName.INTEGER).build();
            };
        }

        @Override // org.apache.calcite.schema.Table
        public RelDataType getRowType(RelDataTypeFactory relDataTypeFactory) {
            return this.protoRowType.apply(relDataTypeFactory);
        }

        @Override // org.apache.calcite.schema.Table
        public Statistic getStatistic() {
            return Statistics.of(100.0d, ImmutableList.of(), RelCollations.createSingleton(0));
        }

        @Override // org.apache.calcite.schema.Table
        public Schema.TableType getJdbcTableType() {
            return Schema.TableType.TABLE;
        }

        @Override // org.apache.calcite.schema.Table
        public boolean isRolledUp(String str) {
            return false;
        }

        @Override // org.apache.calcite.schema.Table
        public boolean rolledUpColumnValidInsideAgg(String str, SqlCall sqlCall, SqlNode sqlNode, CalciteConnectionConfig calciteConnectionConfig) {
            return false;
        }
    }

    /* loaded from: input_file:BOOT-INF/lib/calcite-core-1.22.0-tests.jar:org/apache/calcite/test/StreamTest$InfiniteOrdersStreamTableFactory.class */
    public static class InfiniteOrdersStreamTableFactory implements TableFactory<Table> {
        @Override // org.apache.calcite.schema.TableFactory
        public Table create(SchemaPlus schemaPlus, String str, Map<String, Object> map, RelDataType relDataType) {
            return new InfiniteOrdersTable();
        }
    }

    /* loaded from: input_file:BOOT-INF/lib/calcite-core-1.22.0-tests.jar:org/apache/calcite/test/StreamTest$InfiniteOrdersTable.class */
    public static class InfiniteOrdersTable extends BaseOrderStreamTable implements StreamableTable {
        public InfiniteOrdersTable() {
            super();
        }

        @Override // org.apache.calcite.schema.ScannableTable
        public Enumerable<Object[]> scan(DataContext dataContext) {
            return Linq4j.asEnumerable(() -> {
                return new Iterator<Object[]>() { // from class: org.apache.calcite.test.StreamTest.InfiniteOrdersTable.1
                    private final String[] items = {"paint", "paper", "brush"};
                    private int counter = 0;

                    @Override // java.util.Iterator
                    public boolean hasNext() {
                        return true;
                    }

                    /* JADX WARN: Can't rename method to resolve collision */
                    @Override // java.util.Iterator
                    public Object[] next() {
                        int i = this.counter;
                        this.counter = i + 1;
                        return new Object[]{Long.valueOf(System.currentTimeMillis()), Integer.valueOf(i), this.items[i % this.items.length], 10};
                    }

                    @Override // java.util.Iterator
                    public void remove() {
                        throw new UnsupportedOperationException();
                    }
                };
            });
        }

        @Override // org.apache.calcite.schema.StreamableTable
        public Table stream() {
            return this;
        }

        @Override // org.apache.calcite.test.StreamTest.BaseOrderStreamTable, org.apache.calcite.schema.Table
        public /* bridge */ /* synthetic */ boolean rolledUpColumnValidInsideAgg(String str, SqlCall sqlCall, SqlNode sqlNode, CalciteConnectionConfig calciteConnectionConfig) {
            return super.rolledUpColumnValidInsideAgg(str, sqlCall, sqlNode, calciteConnectionConfig);
        }

        @Override // org.apache.calcite.test.StreamTest.BaseOrderStreamTable, org.apache.calcite.schema.Table
        public /* bridge */ /* synthetic */ boolean isRolledUp(String str) {
            return super.isRolledUp(str);
        }

        @Override // org.apache.calcite.test.StreamTest.BaseOrderStreamTable, org.apache.calcite.schema.Table
        public /* bridge */ /* synthetic */ Schema.TableType getJdbcTableType() {
            return super.getJdbcTableType();
        }

        @Override // org.apache.calcite.test.StreamTest.BaseOrderStreamTable, org.apache.calcite.schema.Table
        public /* bridge */ /* synthetic */ Statistic getStatistic() {
            return super.getStatistic();
        }

        @Override // org.apache.calcite.test.StreamTest.BaseOrderStreamTable, org.apache.calcite.schema.Table
        public /* bridge */ /* synthetic */ RelDataType getRowType(RelDataTypeFactory relDataTypeFactory) {
            return super.getRowType(relDataTypeFactory);
        }
    }

    /* loaded from: input_file:BOOT-INF/lib/calcite-core-1.22.0-tests.jar:org/apache/calcite/test/StreamTest$OrdersHistoryTable.class */
    public static class OrdersHistoryTable extends BaseOrderStreamTable {
        private final ImmutableList<Object[]> rows;

        public OrdersHistoryTable(ImmutableList<Object[]> immutableList) {
            super();
            this.rows = immutableList;
        }

        @Override // org.apache.calcite.schema.ScannableTable
        public Enumerable<Object[]> scan(DataContext dataContext) {
            return Linq4j.asEnumerable((List) this.rows);
        }

        @Override // org.apache.calcite.test.StreamTest.BaseOrderStreamTable, org.apache.calcite.schema.Table
        public /* bridge */ /* synthetic */ boolean rolledUpColumnValidInsideAgg(String str, SqlCall sqlCall, SqlNode sqlNode, CalciteConnectionConfig calciteConnectionConfig) {
            return super.rolledUpColumnValidInsideAgg(str, sqlCall, sqlNode, calciteConnectionConfig);
        }

        @Override // org.apache.calcite.test.StreamTest.BaseOrderStreamTable, org.apache.calcite.schema.Table
        public /* bridge */ /* synthetic */ boolean isRolledUp(String str) {
            return super.isRolledUp(str);
        }

        @Override // org.apache.calcite.test.StreamTest.BaseOrderStreamTable, org.apache.calcite.schema.Table
        public /* bridge */ /* synthetic */ Schema.TableType getJdbcTableType() {
            return super.getJdbcTableType();
        }

        @Override // org.apache.calcite.test.StreamTest.BaseOrderStreamTable, org.apache.calcite.schema.Table
        public /* bridge */ /* synthetic */ Statistic getStatistic() {
            return super.getStatistic();
        }

        @Override // org.apache.calcite.test.StreamTest.BaseOrderStreamTable, org.apache.calcite.schema.Table
        public /* bridge */ /* synthetic */ RelDataType getRowType(RelDataTypeFactory relDataTypeFactory) {
            return super.getRowType(relDataTypeFactory);
        }
    }

    /* loaded from: input_file:BOOT-INF/lib/calcite-core-1.22.0-tests.jar:org/apache/calcite/test/StreamTest$OrdersStreamTableFactory.class */
    public static class OrdersStreamTableFactory implements TableFactory<Table> {
        @Override // org.apache.calcite.schema.TableFactory
        public Table create(SchemaPlus schemaPlus, String str, Map<String, Object> map, RelDataType relDataType) {
            return new OrdersTable(getRowList());
        }

        public static ImmutableList<Object[]> getRowList() {
            return ImmutableList.copyOf(new Object[]{new Object[]{ts(10, 15, 0), 1, "paint", 10}, new Object[]{ts(10, 24, 15), 2, "paper", 5}, new Object[]{ts(10, 24, 45), 3, "brush", 12}, new Object[]{ts(10, 58, 0), 4, "paint", 3}, new Object[]{ts(11, 10, 0), 5, "paint", 3}});
        }

        private static Object ts(int i, int i2, int i3) {
            return Long.valueOf(DateTimeUtils.unixTimestamp(ExtraSqlTypes.GEOMETRY, 2, 15, i, i2, i3));
        }
    }

    /* loaded from: input_file:BOOT-INF/lib/calcite-core-1.22.0-tests.jar:org/apache/calcite/test/StreamTest$OrdersTable.class */
    public static class OrdersTable extends BaseOrderStreamTable implements StreamableTable {
        private final ImmutableList<Object[]> rows;

        public OrdersTable(ImmutableList<Object[]> immutableList) {
            super();
            this.rows = immutableList;
        }

        @Override // org.apache.calcite.schema.ScannableTable
        public Enumerable<Object[]> scan(DataContext dataContext) {
            return Linq4j.asEnumerable((List) this.rows);
        }

        @Override // org.apache.calcite.schema.StreamableTable
        public Table stream() {
            return new OrdersTable(this.rows);
        }

        @Override // org.apache.calcite.test.StreamTest.BaseOrderStreamTable, org.apache.calcite.schema.Table
        public boolean isRolledUp(String str) {
            return false;
        }

        @Override // org.apache.calcite.test.StreamTest.BaseOrderStreamTable, org.apache.calcite.schema.Table
        public boolean rolledUpColumnValidInsideAgg(String str, SqlCall sqlCall, SqlNode sqlNode, CalciteConnectionConfig calciteConnectionConfig) {
            return false;
        }

        @Override // org.apache.calcite.test.StreamTest.BaseOrderStreamTable, org.apache.calcite.schema.Table
        public /* bridge */ /* synthetic */ Schema.TableType getJdbcTableType() {
            return super.getJdbcTableType();
        }

        @Override // org.apache.calcite.test.StreamTest.BaseOrderStreamTable, org.apache.calcite.schema.Table
        public /* bridge */ /* synthetic */ Statistic getStatistic() {
            return super.getStatistic();
        }

        @Override // org.apache.calcite.test.StreamTest.BaseOrderStreamTable, org.apache.calcite.schema.Table
        public /* bridge */ /* synthetic */ RelDataType getRowType(RelDataTypeFactory relDataTypeFactory) {
            return super.getRowType(relDataTypeFactory);
        }
    }

    /* loaded from: input_file:BOOT-INF/lib/calcite-core-1.22.0-tests.jar:org/apache/calcite/test/StreamTest$ProductsTable.class */
    public static class ProductsTable implements ScannableTable {
        private final ImmutableList<Object[]> rows;
        private final RelProtoDataType protoRowType = relDataTypeFactory -> {
            return relDataTypeFactory.builder().add(DTDParser.TYPE_ID, SqlTypeName.VARCHAR, 32).add("SUPPLIER", SqlTypeName.INTEGER).build();
        };

        public ProductsTable(ImmutableList<Object[]> immutableList) {
            this.rows = immutableList;
        }

        @Override // org.apache.calcite.schema.ScannableTable
        public Enumerable<Object[]> scan(DataContext dataContext) {
            return Linq4j.asEnumerable((List) this.rows);
        }

        @Override // org.apache.calcite.schema.Table
        public RelDataType getRowType(RelDataTypeFactory relDataTypeFactory) {
            return this.protoRowType.apply(relDataTypeFactory);
        }

        @Override // org.apache.calcite.schema.Table
        public Statistic getStatistic() {
            return Statistics.of(200.0d, ImmutableList.of());
        }

        @Override // org.apache.calcite.schema.Table
        public Schema.TableType getJdbcTableType() {
            return Schema.TableType.TABLE;
        }

        @Override // org.apache.calcite.schema.Table
        public boolean isRolledUp(String str) {
            return false;
        }

        @Override // org.apache.calcite.schema.Table
        public boolean rolledUpColumnValidInsideAgg(String str, SqlCall sqlCall, SqlNode sqlNode, CalciteConnectionConfig calciteConnectionConfig) {
            return false;
        }
    }

    /* loaded from: input_file:BOOT-INF/lib/calcite-core-1.22.0-tests.jar:org/apache/calcite/test/StreamTest$ProductsTableFactory.class */
    public static class ProductsTableFactory implements TableFactory<Table> {
        @Override // org.apache.calcite.schema.TableFactory
        public Table create(SchemaPlus schemaPlus, String str, Map<String, Object> map, RelDataType relDataType) {
            return new ProductsTable(ImmutableList.copyOf(new Object[]{new Object[]{"paint", 1}, new Object[]{"paper", 0}, new Object[]{"brush", 1}}));
        }
    }

    /* loaded from: input_file:BOOT-INF/lib/calcite-core-1.22.0-tests.jar:org/apache/calcite/test/StreamTest$ProductsTemporalTable.class */
    public static class ProductsTemporalTable implements TemporalTable {
        private final RelProtoDataType protoRowType = relDataTypeFactory -> {
            return relDataTypeFactory.builder().add(DTDParser.TYPE_ID, SqlTypeName.VARCHAR, 32).add("SUPPLIER", SqlTypeName.INTEGER).add("SYS_START", SqlTypeName.TIMESTAMP).add("SYS_END", SqlTypeName.TIMESTAMP).build();
        };

        @Override // org.apache.calcite.schema.TemporalTable
        public String getSysStartFieldName() {
            return "SYS_START";
        }

        @Override // org.apache.calcite.schema.TemporalTable
        public String getSysEndFieldName() {
            return "SYS_END";
        }

        @Override // org.apache.calcite.schema.Table
        public RelDataType getRowType(RelDataTypeFactory relDataTypeFactory) {
            return this.protoRowType.apply(relDataTypeFactory);
        }

        @Override // org.apache.calcite.schema.Table
        public Statistic getStatistic() {
            return Statistics.of(200.0d, ImmutableList.of());
        }

        @Override // org.apache.calcite.schema.Table
        public Schema.TableType getJdbcTableType() {
            return Schema.TableType.TABLE;
        }

        @Override // org.apache.calcite.schema.Table
        public boolean isRolledUp(String str) {
            return false;
        }

        @Override // org.apache.calcite.schema.Table
        public boolean rolledUpColumnValidInsideAgg(String str, SqlCall sqlCall, SqlNode sqlNode, CalciteConnectionConfig calciteConnectionConfig) {
            return false;
        }
    }

    private static String schemaFor(String str, Class<? extends TableFactory> cls) {
        return "     {\n       name: '" + str + "',\n       tables: [ {\n         type: 'custom',\n         name: 'ORDERS',\n         stream: {\n           stream: true\n         },\n         factory: '" + cls.getName() + "'\n       } ]\n     }";
    }

    @Test
    public void testStream() {
        CalciteAssert.model(STREAM_MODEL).withDefaultSchema(STREAM_SCHEMA_NAME).query("select stream * from orders").convertContains("LogicalDelta\n  LogicalProject(ROWTIME=[$0], ID=[$1], PRODUCT=[$2], UNITS=[$3])\n    LogicalTableScan(table=[[STREAMS, ORDERS]])\n").explainContains("EnumerableInterpreter\n  BindableTableScan(table=[[STREAMS, ORDERS, (STREAM)]])").returns(startsWith("ROWTIME=2015-02-15 10:15:00; ID=1; PRODUCT=paint; UNITS=10", "ROWTIME=2015-02-15 10:24:15; ID=2; PRODUCT=paper; UNITS=5"));
    }

    @Test
    public void testStreamFilterProject() {
        CalciteAssert.model(STREAM_MODEL).withDefaultSchema(STREAM_SCHEMA_NAME).query("select stream product from orders where units > 6").convertContains("LogicalDelta\n  LogicalProject(PRODUCT=[$2])\n    LogicalFilter(condition=[>($3, 6)])\n      LogicalTableScan(table=[[STREAMS, ORDERS]])\n").explainContains("EnumerableCalc(expr#0..3=[{inputs}], expr#4=[6], expr#5=[>($t3, $t4)], PRODUCT=[$t2], $condition=[$t5])\n  EnumerableInterpreter\n    BindableTableScan(table=[[STREAMS, ORDERS, (STREAM)]])").returns(startsWith("PRODUCT=paint", "PRODUCT=brush"));
    }

    @Test
    public void testStreamGroupByHaving() {
        CalciteAssert.model(STREAM_MODEL).withDefaultSchema(STREAM_SCHEMA_NAME).query("select stream floor(rowtime to hour) as rowtime,\n  product, count(*) as c\nfrom orders\ngroup by floor(rowtime to hour), product\nhaving count(*) > 1").convertContains("LogicalDelta\n  LogicalFilter(condition=[>($2, 1)])\n    LogicalAggregate(group=[{0, 1}], C=[COUNT()])\n      LogicalProject(ROWTIME=[FLOOR($0, FLAG(HOUR))], PRODUCT=[$2])\n        LogicalTableScan(table=[[STREAMS, ORDERS]])\n").explainContains("EnumerableCalc(expr#0..2=[{inputs}], expr#3=[1], expr#4=[>($t2, $t3)], proj#0..2=[{exprs}], $condition=[$t4])\n  EnumerableAggregate(group=[{0, 1}], C=[COUNT()])\n    EnumerableCalc(expr#0..3=[{inputs}], expr#4=[FLAG(HOUR)], expr#5=[FLOOR($t0, $t4)], ROWTIME=[$t5], PRODUCT=[$t2])\n      EnumerableInterpreter\n        BindableTableScan(table=[[STREAMS, ORDERS, (STREAM)]])").returns(startsWith("ROWTIME=2015-02-15 10:00:00; PRODUCT=paint; C=2"));
    }

    @Test
    public void testStreamOrderBy() {
        CalciteAssert.model(STREAM_MODEL).withDefaultSchema(STREAM_SCHEMA_NAME).query("select stream floor(rowtime to hour) as rowtime,\n  product, units\nfrom orders\norder by floor(orders.rowtime to hour), product desc").convertContains("LogicalDelta\n  LogicalSort(sort0=[$0], sort1=[$1], dir0=[ASC], dir1=[DESC])\n    LogicalProject(ROWTIME=[FLOOR($0, FLAG(HOUR))], PRODUCT=[$2], UNITS=[$3])\n      LogicalTableScan(table=[[STREAMS, ORDERS]])\n").explainContains("EnumerableSort(sort0=[$0], sort1=[$1], dir0=[ASC], dir1=[DESC])\n  EnumerableCalc(expr#0..3=[{inputs}], expr#4=[FLAG(HOUR)], expr#5=[FLOOR($t0, $t4)], ROWTIME=[$t5], PRODUCT=[$t2], UNITS=[$t3])\n    EnumerableInterpreter\n      BindableTableScan(table=[[STREAMS, ORDERS, (STREAM)]])").returns(startsWith("ROWTIME=2015-02-15 10:00:00; PRODUCT=paper; UNITS=5", "ROWTIME=2015-02-15 10:00:00; PRODUCT=paint; UNITS=10", "ROWTIME=2015-02-15 10:00:00; PRODUCT=paint; UNITS=3"));
    }

    @Disabled
    @Test
    public void testStreamUnionAllOrderBy() {
        CalciteAssert.model(STREAM_MODEL).withDefaultSchema(STREAM_SCHEMA_NAME).query("select stream *\nfrom (\n  select rowtime, product\n  from orders\n  union all\n  select rowtime, product\n  from orders)\norder by rowtime\n").convertContains("LogicalDelta\n  LogicalSort(sort0=[$0], dir0=[ASC])\n    LogicalProject(ROWTIME=[$0], PRODUCT=[$1])\n      LogicalUnion(all=[true])\n        LogicalProject(ROWTIME=[$0], PRODUCT=[$2])\n          EnumerableTableScan(table=[[STREAMS, ORDERS]])\n        LogicalProject(ROWTIME=[$0], PRODUCT=[$2])\n          EnumerableTableScan(table=[[STREAMS, ORDERS]])\n").explainContains("EnumerableSort(sort0=[$0], sort1=[$1], dir0=[ASC], dir1=[DESC])\n  EnumerableCalc(expr#0..3=[{inputs}], expr#4=[FLAG(HOUR)], expr#5=[FLOOR($t0, $t4)], ROWTIME=[$t5], PRODUCT=[$t2], UNITS=[$t3])\n    EnumerableInterpreter\n      BindableTableScan(table=[[]])").returns(startsWith("ROWTIME=2015-02-15 10:00:00; PRODUCT=paper; UNITS=5", "ROWTIME=2015-02-15 10:00:00; PRODUCT=paint; UNITS=10", "ROWTIME=2015-02-15 10:00:00; PRODUCT=paint; UNITS=3"));
    }

    @Test
    public void testInfiniteStreamsDoNotBufferInMemory() {
        CalciteAssert.model(STREAM_MODEL).withDefaultSchema(INFINITE_STREAM_SCHEMA_NAME).query("select stream * from orders").limit(100).explainContains("EnumerableInterpreter\n  BindableTableScan(table=[[INFINITE_STREAMS, ORDERS, (STREAM)]])").returnsCount(100);
    }

    @Timeout(10)
    @Test
    public void testStreamCancel() {
        CalciteAssert.model(STREAM_MODEL).withDefaultSchema(INFINITE_STREAM_SCHEMA_NAME).query("select stream * from orders").explainContains("EnumerableInterpreter\n  BindableTableScan(table=[[INFINITE_STREAMS, ORDERS, (STREAM)]])").returns(resultSet -> {
            int i = 0;
            while (resultSet.next()) {
                try {
                    i++;
                    if (i == 5) {
                        new Thread(() -> {
                            try {
                                Thread.sleep(3L);
                                resultSet.getStatement().cancel();
                            } catch (InterruptedException | SQLException e) {
                            }
                        }).start();
                    }
                } catch (SQLException e) {
                    MatcherAssert.assertThat(e.getMessage(), CoreMatchers.is("Statement canceled"));
                }
            }
            Assertions.fail("expected cancel, got end-of-data");
            MatcherAssert.assertThat(Integer.valueOf(i), ComparatorMatcherBuilder.usingNaturalOrdering().greaterThan(5));
        });
    }

    @Test
    public void testStreamToRelationJoin() {
        CalciteAssert.model(STREAM_JOINS_MODEL).withDefaultSchema(STREAM_JOINS_SCHEMA_NAME).query("select stream orders.rowtime as rowtime, orders.id as orderId, products.supplier as supplierId from orders join products on orders.product = products.id").convertContains("LogicalDelta\n  LogicalProject(ROWTIME=[$0], ORDERID=[$1], SUPPLIERID=[$6])\n    LogicalJoin(condition=[=($4, $5)], joinType=[inner])\n      LogicalProject(ROWTIME=[$0], ID=[$1], PRODUCT=[$2], UNITS=[$3], PRODUCT0=[CAST($2):VARCHAR(32) NOT NULL])\n        LogicalTableScan(table=[[STREAM_JOINS, ORDERS]])\n      LogicalTableScan(table=[[STREAM_JOINS, PRODUCTS]])\n").explainContains("EnumerableCalc(expr#0..6=[{inputs}], proj#0..1=[{exprs}], SUPPLIERID=[$t6])\n  EnumerableHashJoin(condition=[=($4, $5)], joinType=[inner])\n    EnumerableCalc(expr#0..3=[{inputs}], expr#4=[CAST($t2):VARCHAR(32) NOT NULL], proj#0..4=[{exprs}])\n      EnumerableInterpreter\n        BindableTableScan(table=[[STREAM_JOINS, ORDERS, (STREAM)]])\n    EnumerableTableScan(table=[[STREAM_JOINS, PRODUCTS]])").returns(startsWith("ROWTIME=2015-02-15 10:15:00; ORDERID=1; SUPPLIERID=1", "ROWTIME=2015-02-15 10:24:15; ORDERID=2; SUPPLIERID=0", "ROWTIME=2015-02-15 10:24:45; ORDERID=3; SUPPLIERID=1"));
    }

    @Disabled
    @Test
    public void testTumbleViaOver() {
        CalciteAssert.model(STREAM_JOINS_MODEL).query("WITH HourlyOrderTotals (rowtime, productId, c, su) AS (\n  SELECT FLOOR(rowtime TO HOUR),\n    productId,\n    COUNT(*),\n    SUM(units)\n  FROM Orders\n  GROUP BY FLOOR(rowtime TO HOUR), productId)\nSELECT STREAM rowtime,\n  productId,\n  SUM(su) OVER w AS su,\n  SUM(c) OVER w AS c\nFROM HourlyTotals\nWINDOW w AS (\n  ORDER BY rowtime\n  PARTITION BY productId\n  RANGE INTERVAL '2' HOUR PRECEDING)\n");
    }

    private Consumer<ResultSet> startsWith(String... strArr) {
        ImmutableList copyOf = ImmutableList.copyOf(strArr);
        return resultSet -> {
            try {
                CalciteAssert.ResultSetFormatter resultSetFormatter = new CalciteAssert.ResultSetFormatter();
                ResultSetMetaData metaData = resultSet.getMetaData();
                UnmodifiableIterator it = copyOf.iterator();
                while (it.hasNext()) {
                    String str = (String) it.next();
                    if (!resultSet.next()) {
                        throw new AssertionError("input ended too soon");
                    }
                    resultSetFormatter.rowToString(resultSet, metaData);
                    MatcherAssert.assertThat(resultSetFormatter.string(), CoreMatchers.equalTo(str));
                }
            } catch (SQLException e) {
                throw TestUtil.rethrow(e);
            }
        };
    }
}
