import { DataSource } from '@angular/cdk/table';
import { Product } from './product';
import { BehaviorSubject, Observable, of } from 'rxjs';
import { ProductService } from '../services/product.service';
import { CollectionViewer } from '@angular/cdk/collections';
import { catchError, finalize } from 'rxjs/operators';
import { PagingConstant } from '../shared/constants/constants';
import { PaginationResponse } from './pagination-response';

export class ProductDataSource implements DataSource<Product> {

    private productsSubject = new BehaviorSubject<Product[]>([]);
    private loadingSubject = new BehaviorSubject<boolean>(false);
    public totalRecords: number;

    public loading$ = this.loadingSubject.asObservable();
    public products$ = this.productsSubject.asObservable();

  constructor(private productService: ProductService) { }

    connect(collectionViewer: CollectionViewer): Observable<Product[]> {
      return this.productsSubject.asObservable();
    }

    disconnect(collectionViewer: CollectionViewer): void {
      this.productsSubject.complete();
        this.loadingSubject.complete();
    }

    getProducts(isIncludedInactive: boolean, sortField = '', sortDirection = 'asc', pageIndex = PagingConstant.pageIndex, pageSize = PagingConstant.pageSize) {

        this.loadingSubject.next(true);

      this.productService.getProducts(isIncludedInactive, sortField, sortDirection, pageIndex, pageSize).pipe(
            catchError(() => of([])),
            finalize(() => this.loadingSubject.next(false))
        )
            .subscribe(products => {
              var rs = products as PaginationResponse<Product>;
              this.totalRecords = rs.total;
              this.productsSubject.next((rs.items) as any);
            });
    }
}
