Size: a a a

ReactiveX - русскоговорящее сообщество

2020 May 19

EC

Evgeny Cherkasov in ReactiveX - русскоговорящее сообщество
iamthevoid
Проблема в том, что проблема, с вероятностью 99.9% в коде. А если и нет, то без кода диагностировать не представляется возможным, надо понимать, что именно у тебя происходит. Поэтому у тебя выбор либо поделиться, либо закопаться самому. Ну можно заменить либу, конечно, но это слишком радикально.

Кстати, то что ты описал представляется довольно компактным. Зависит от преобразований, кончно, но это пайплайн в несколько строк и пара трансформеров.
Ok, попробую.
Вот начало:
Observable<Object> observable = Observable.just(contentContainer);
       Recipe recipeDynamicParserVideos = Recipe.newInstance(mAppContext, "recipes/ContentsRecipe.json");
       Subscription subscription = observable
               .subscribeOn(Schedulers.newThread())
               .concatMap(contentContainerAsObject -> {
                   return mContentLoader.getLoadContentsObservable(Observable.just(contentContainerAsObject), recipeDynamicParserVideos);
               })
               .onBackpressureBuffer() // This must be right after concatMap.
               .doOnNext(o -> { })
               .observeOn(AndroidSchedulers.mainThread())
               .subscribe(
                       objectPair -> {
                       },
                       throwable -> {
                           ErrorHelper.injectErrorFragment(
                                   mNavigator.getActiveActivity(),
                                   ErrorUtils.ERROR_CATEGORY.FEED_ERROR,
                                   (errorDialogFragment, errorButtonType, errorCategory) -> {
                                       if (errorButtonType == ErrorUtils.ERROR_BUTTON_TYPE.EXIT_APP) {
                                           mNavigator.getActiveActivity().finishAffinity();
                                       }
                                   });
                       },
                       () -> {
LocalBroadcastManager.getInstance(mNavigator.getActiveActivity())
                                   .sendBroadcast(new Intent(BROADCAST_DATA_LOADED));
                       });
Тут что то может быть не так?
источник

i

iamthevoid in ReactiveX - русскоговорящее сообщество
Evgeny Cherkasov
Ok, попробую.
Вот начало:
Observable<Object> observable = Observable.just(contentContainer);
       Recipe recipeDynamicParserVideos = Recipe.newInstance(mAppContext, "recipes/ContentsRecipe.json");
       Subscription subscription = observable
               .subscribeOn(Schedulers.newThread())
               .concatMap(contentContainerAsObject -> {
                   return mContentLoader.getLoadContentsObservable(Observable.just(contentContainerAsObject), recipeDynamicParserVideos);
               })
               .onBackpressureBuffer() // This must be right after concatMap.
               .doOnNext(o -> { })
               .observeOn(AndroidSchedulers.mainThread())
               .subscribe(
                       objectPair -> {
                       },
                       throwable -> {
                           ErrorHelper.injectErrorFragment(
                                   mNavigator.getActiveActivity(),
                                   ErrorUtils.ERROR_CATEGORY.FEED_ERROR,
                                   (errorDialogFragment, errorButtonType, errorCategory) -> {
                                       if (errorButtonType == ErrorUtils.ERROR_BUTTON_TYPE.EXIT_APP) {
                                           mNavigator.getActiveActivity().finishAffinity();
                                       }
                                   });
                       },
                       () -> {
LocalBroadcastManager.getInstance(mNavigator.getActiveActivity())
                                   .sendBroadcast(new Intent(BROADCAST_DATA_LOADED));
                       });
Тут что то может быть не так?
покажи mContentLoader.getLoadContentsObservable
источник

EC

Evgeny Cherkasov in ReactiveX - русскоговорящее сообщество
iamthevoid
покажи mContentLoader.getLoadContentsObservable
public Observable<Object> getLoadContentsObservable(Observable<Object> observable, Recipe recipeDynamicParser) {
       return observable
               .map(contentContainerAsObject -> {
                   // Делаем что-то с contentContainerAsObject
                   return contentContainerAsObject;
               })
               .concatMap(contentContainerAsObject -> {
                   ContentContainer contentContainer = (ContentContainer) contentContainerAsObject;
                       // Loading playlist videos
                       return getPlaylistVideosFeedObservable(contentContainerAsObject);
               })
               // Parse videos feed to Content objects
               .concatMap(objectPair -> {
                   ContentContainer contentContainer = (ContentContainer) objectPair.first;
                   String feed = (String) objectPair.second;
                   String[] params = new String[] { contentContainer.getExtraStringValue(Recipe.KEY_DATA_TYPE_TAG) };

                   if (TextUtils.isEmpty(feed)) {
                       return Observable.just(Pair.create(contentContainer, null));
                   }
                   else {
                       return mDynamicParser
                               .cookRecipeObservable(recipeDynamicParser, feed, null, params)
                               .map(contentAsObject -> {
                                   Content content = (Content) contentAsObject;
                                   if (content != null) {
                                       contentContainer.addContent(content);
                                   }
                                   return Pair.create(contentContainer, contentAsObject);
                               });
                   }
               });
   }
источник

EC

Evgeny Cherkasov in ReactiveX - русскоговорящее сообщество
Все корректно работает до `mDynamicParser
                               .cookRecipeObservable(recipeDynamicParser, feed, null, params)`.
Он возвращает меньшее число объектов, чем было загружено.
источник

i

iamthevoid in ReactiveX - русскоговорящее сообщество
Evgeny Cherkasov
Все корректно работает до `mDynamicParser
                               .cookRecipeObservable(recipeDynamicParser, feed, null, params)`.
Он возвращает меньшее число объектов, чем было загружено.
отлично, тогда показывай и его )
источник

EC

Evgeny Cherkasov in ReactiveX - русскоговорящее сообщество
iamthevoid
отлично, тогда показывай и его )
public Observable<Object> cookRecipeObservable(Recipe recipe, Object input, Bundle bundle,
                                                  String[] params) {

       Observable<Object> dynamicParserObservable = Observable.create(subscriber -> {

           try {
               // Make sure recipe and input is valid.
               checkCookRecipeInput(recipe, input);

               // Parse input into a list of maps for translation
               List<Map<String, Object>> resultList = parseInput(recipe, input.toString(), params);

               // Translate each map in the result list into the model object defined in the
               // recipe. Return each model once it completes translation via subscriber.
               translateMapsToObjects(false, recipe, resultList, new IRecipeCookerCallbacks() {

                   @Override
                   public void onPreRecipeCook(Recipe recipe, Object output, Bundle bundle) {

                   }

                   @Override
                   public void onRecipeCooked(Recipe recipe, Object output, Bundle bundle, boolean
                           done) {

                       if (!subscriber.isUnsubscribed()) {
                           subscriber.onNext(output);
                           if (done) {
                               subscriber.onCompleted();
                           }
                       }
                   }

                   @Override
                   public void onPostRecipeCooked(Recipe recipe, Object output, Bundle bundle) {

                   }

                   @Override
                   public void onRecipeError(Recipe recipe, Exception e, String msg) {

                       if (e instanceof ValueNotFoundException) {
                           Log.e(TAG, "Error during parsing, skipping an item:", e);
                       }
                       else {
                           subscriber.onError(e);
                       }
                   }
               }, bundle);

           }
           catch (Exception e) {
               subscriber.onError(e);
           }

       });

       return dynamicParserObservable;
   }
источник

i

iamthevoid in ReactiveX - русскоговорящее сообщество
Тут не всё
источник

EC

Evgeny Cherkasov in ReactiveX - русскоговорящее сообщество
resultLlist содержит 20 элементов.
onRecipeCooked() вызывается для всех. Но на 18-ом subscriber.onNext(output); крашится
источник

EC

Evgeny Cherkasov in ReactiveX - русскоговорящее сообщество
iamthevoid
Тут не всё
В смысле не все?
источник

i

iamthevoid in ReactiveX - русскоговорящее сообщество
У меня есть подозрение
источник

i

iamthevoid in ReactiveX - русскоговорящее сообщество
что у тебя onComplete вызывается раньше, чем отрабатывают все onNext
источник

i

iamthevoid in ReactiveX - русскоговорящее сообщество
Evgeny Cherkasov
В смысле не все?
ну в observable.create вроде ещё бэкпрешшур хэндлер передаётся, у тебя не вижу этого
источник

EC

Evgeny Cherkasov in ReactiveX - русскоговорящее сообщество
iamthevoid
что у тебя onComplete вызывается раньше, чем отрабатывают все onNext
Флаг done я проверял, он true только с последним объектом.
источник

EC

Evgeny Cherkasov in ReactiveX - русскоговорящее сообщество
iamthevoid
ну в observable.create вроде ещё бэкпрешшур хэндлер передаётся, у тебя не вижу этого
Rx либа:
public final static <T> Observable<T> create(OnSubscribe<T> f) {
       return new Observable<T>(hook.onCreate(f));
   }
источник

i

iamthevoid in ReactiveX - русскоговорящее сообщество
А, это ансейф
источник

EC

Evgeny Cherkasov in ReactiveX - русскоговорящее сообщество
вроде один параметр
источник

i

iamthevoid in ReactiveX - русскоговорящее сообщество
Точно, есть там такое
источник

i

iamthevoid in ReactiveX - русскоговорящее сообщество
А попробуй прямо в этот метод (create ) вторым параметром передать Emitter.BackpressureMode.BUFFER
источник

EC

Evgeny Cherkasov in ReactiveX - русскоговорящее сообщество
iamthevoid
А попробуй прямо в этот метод (create ) вторым параметром передать Emitter.BackpressureMode.BUFFER
Emitter не определяется
источник

i

iamthevoid in ReactiveX - русскоговорящее сообщество
А какая версия либы?
источник